diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/cowboy.erl | 9 | ||||
-rw-r--r-- | src/cowboy_constraints.erl | 2 | ||||
-rw-r--r-- | src/cowboy_handler.erl | 5 | ||||
-rw-r--r-- | src/cowboy_http.erl | 88 | ||||
-rw-r--r-- | src/cowboy_http2.erl | 90 | ||||
-rw-r--r-- | src/cowboy_loop.erl | 55 | ||||
-rw-r--r-- | src/cowboy_req.erl | 766 | ||||
-rw-r--r-- | src/cowboy_rest.erl | 42 | ||||
-rw-r--r-- | src/cowboy_stream_h.erl | 53 | ||||
-rw-r--r-- | src/cowboy_websocket.erl | 70 |
10 files changed, 421 insertions, 759 deletions
diff --git a/src/cowboy.erl b/src/cowboy.erl index 3387224..3b272c5 100644 --- a/src/cowboy.erl +++ b/src/cowboy.erl @@ -19,6 +19,7 @@ -export([stop_listener/1]). -export([set_env/3]). +%% @todo Detailed opts. -type opts() :: map(). -export_type([opts/0]). @@ -27,19 +28,15 @@ | {atom(), cowboy_constraints:constraint() | [cowboy_constraints:constraint()], any()}]. -export_type([fields/0]). --type http_headers() :: [{binary(), iodata()}]. +-type http_headers() :: #{binary() => iodata()}. -export_type([http_headers/0]). -type http_status() :: non_neg_integer() | binary(). -export_type([http_status/0]). --type http_version() :: 'HTTP/1.1' | 'HTTP/1.0'. +-type http_version() :: 'HTTP/2' | 'HTTP/1.1' | 'HTTP/1.0'. -export_type([http_version/0]). --type onresponse_fun() :: - fun((http_status(), http_headers(), iodata(), Req) -> Req). --export_type([onresponse_fun/0]). - -spec start_clear(ranch:ref(), non_neg_integer(), ranch_tcp:opts(), cowboy_protocol:opts()) -> {ok, pid()} | {error, any()}. start_clear(Ref, NbAcceptors, TransOpts0, ProtoOpts) diff --git a/src/cowboy_constraints.erl b/src/cowboy_constraints.erl index 9a379e1..9bd578e 100644 --- a/src/cowboy_constraints.erl +++ b/src/cowboy_constraints.erl @@ -52,7 +52,7 @@ apply_constraint(Value, F) when is_function(F) -> %% Constraint functions. int(Value) when is_binary(Value) -> - try {true, list_to_integer(binary_to_list(Value))} + try {true, binary_to_integer(Value)} catch _:_ -> false end. diff --git a/src/cowboy_handler.erl b/src/cowboy_handler.erl index 7fb2dd5..af21342 100644 --- a/src/cowboy_handler.erl +++ b/src/cowboy_handler.erl @@ -36,7 +36,7 @@ -spec execute(Req, Env) -> {ok, Req, Env} when Req::cowboy_req:req(), Env::cowboy_middleware:env(). execute(Req, Env=#{handler := Handler, handler_opts := HandlerOpts}) -> - case Handler:init(Req, HandlerOpts) of + try Handler:init(Req, HandlerOpts) of {ok, Req2, State} -> Result = terminate(normal, Req2, State, Handler), {ok, Req2, [{result, Result}|Env]}; @@ -48,6 +48,9 @@ execute(Req, Env=#{handler := Handler, handler_opts := HandlerOpts}) -> Mod:upgrade(Req2, Env, Handler, State, Timeout, run); {Mod, Req2, State, Timeout, hibernate} -> Mod:upgrade(Req2, Env, Handler, State, Timeout, hibernate) + catch Class:Reason -> + terminate({crash, Class, Reason}, Req, HandlerOpts, Handler), + erlang:raise(Class, Reason, erlang:get_stacktrace()) end. -spec terminate(any(), Req, any(), module()) -> ok when Req::cowboy_req:req(). diff --git a/src/cowboy_http.erl b/src/cowboy_http.erl index 0231def..ae42e6d 100644 --- a/src/cowboy_http.erl +++ b/src/cowboy_http.erl @@ -111,8 +111,7 @@ %% The connection will be closed after this stream. last_streamid = undefined :: pos_integer(), - %% Currently active HTTP/1.1 streams. Streams may be initiated either - %% by the client or by the server through PUSH_PROMISE frames. + %% Currently active HTTP/1.1 streams. streams = [] :: [stream()], %% Children which are in the process of shutting down. @@ -202,7 +201,7 @@ loop(State=#state{parent=Parent, socket=Socket, transport=Transport, loop(State, Buffer); %% Unknown messages. Msg -> - error_logger:error_msg("Received stray message ~p.", [Msg]), + error_logger:error_msg("Received stray message ~p.~n", [Msg]), loop(State, Buffer) %% @todo Configurable timeout. This should be a global inactivity timeout %% that triggers when really nothing happens (ie something went really wrong). @@ -271,6 +270,7 @@ after_parse({request, Req=#{streamid := StreamID, headers := Headers, version := error_logger:error_msg("Exception occurred in ~s:init(~p, ~p, ~p) " "with reason ~p:~p.", [Handler, StreamID, Req, Opts, Class, Reason]), + %% @todo Bad value returned here. Crashes. ok %% @todo Status code. % stream_reset(State, StreamID, {internal_error, {Class, Reason}, @@ -288,6 +288,7 @@ after_parse({data, StreamID, IsFin, Data, State=#state{handler=Handler, catch Class:Reason -> error_logger:error_msg("Exception occurred in ~s:data(~p, ~p, ~p, ~p) with reason ~p:~p.", [Handler, StreamID, IsFin, Data, StreamState0, Class, Reason]), + %% @todo Bad value returned here. Crashes. ok %% @todo % stream_reset(State, StreamID, {internal_error, {Class, Reason}, @@ -502,6 +503,8 @@ parse_hd_value(<< $\r, $\n, Rest/bits >>, S, Headers0, Name, SoFar) -> Value = clean_value_ws_end(SoFar, byte_size(SoFar) - 1), Headers = case maps:get(Name, Headers0, undefined) of undefined -> Headers0#{Name => Value}; + %% The cookie header does not use proper HTTP header lists. + Value0 when Name =:= <<"cookie">> -> Headers0#{Name => << Value0/binary, "; ", Value/binary >>}; Value0 -> Headers0#{Name => << Value0/binary, ", ", Value/binary >>} end, parse_header(Rest, S, Headers); @@ -552,7 +555,7 @@ request(Buffer, State=#state{transport=Transport, in_streamid=StreamID, undefined -> request(Buffer, State, Headers, <<>>, default_port(Transport:secure())); RawHost -> - try parse_host(RawHost, false, <<>>) of + try cow_http_hd:parse_host(RawHost) of {Host, undefined} -> request(Buffer, State, Headers, Host, default_port(Transport:secure())); {Host, Port} -> @@ -567,20 +570,6 @@ request(Buffer, State=#state{transport=Transport, in_streamid=StreamID, default_port(true) -> 443; default_port(_) -> 80. -%% @todo Yeah probably just call the cowlib function. -%% Same code as cow_http:parse_fullhost/1, but inline because we -%% really want this to go fast. -parse_host(<< $[, Rest/bits >>, false, <<>>) -> - parse_host(Rest, true, << $[ >>); -parse_host(<<>>, false, Acc) -> - {Acc, undefined}; -parse_host(<< $:, Rest/bits >>, false, Acc) -> - {Acc, list_to_integer(binary_to_list(Rest))}; -parse_host(<< $], Rest/bits >>, true, Acc) -> - parse_host(Rest, false, << Acc/binary, $] >>); -parse_host(<< C, Rest/bits >>, E, Acc) -> - ?LOWER(parse_host, Rest, E, Acc). - %% End of request parsing. request(Buffer, State0=#state{ref=Ref, transport=Transport, peer=Peer, in_streamid=StreamID, @@ -617,6 +606,9 @@ request(Buffer, State0=#state{ref=Ref, transport=Transport, peer=Peer, in_stream scheme => Scheme, host => Host, port => Port, + +%% @todo So the path component needs to be normalized. + path => Path, qs => Qs, version => Version, @@ -734,6 +726,11 @@ parse_body(Buffer, State=#state{in_streamid=StreamID, in_state= %% Message handling. +%% @todo There is a difference in behavior between HTTP/1.1 and HTTP/2 +%% when an error or crash occurs after sending a 500 response. In HTTP/2 +%% the error will be printed, in HTTP/1.1 the error will be ignored. +%% This is due to HTTP/1.1 disabling streams differently after both +%% requests and responses have been sent. down(State=#state{children=Children0}, Pid, Msg) -> case lists:keytake(Pid, 1, Children0) of {value, {_, undefined, _}, Children} -> @@ -741,7 +738,7 @@ down(State=#state{children=Children0}, Pid, Msg) -> {value, {_, StreamID, _}, Children} -> info(State#state{children=Children}, StreamID, Msg); false -> - error_logger:error_msg("Received EXIT signal ~p for unknown process ~p.", [Msg, Pid]), + error_logger:error_msg("Received EXIT signal ~p for unknown process ~p.~n", [Msg, Pid]), State end. @@ -762,16 +759,10 @@ info(State=#state{handler=Handler, streams=Streams0}, StreamID, Msg) -> % 'Exception occurred in StreamHandler:info/3 call.'}) end; false -> - error_logger:error_msg("Received message ~p for unknown stream ~p.", [Msg, StreamID]), + error_logger:error_msg("Received message ~p for unknown stream ~p.~n", [Msg, StreamID]), State end. -%% @todo commands/3 -%% @todo stream_reset - - - - %% Commands. commands(State, _, []) -> @@ -800,20 +791,24 @@ commands(State, StreamID, [{flow, _Length}|Tail]) -> %% @todo Set the body reading length to min(Length, BodyLength) commands(State, StreamID, Tail); -%% @todo Probably a good idea to have an atomic response send (single send call for resp+body). +%% Error responses are sent only if a response wasn't sent already. +commands(State=#state{out_state=wait}, StreamID, [{error_response, StatusCode, Headers, Body}|Tail]) -> + commands(State, StreamID, [{response, StatusCode, Headers, Body}|Tail]); +commands(State, StreamID, [{error_response, _, _, _}|Tail]) -> + commands(State, StreamID, Tail); %% Send a full response. %% %% @todo Kill the stream if it sent a response when one has already been sent. %% @todo Keep IsFin in the state. %% @todo Same two things above apply to DATA, possibly promise too. -commands(State0=#state{socket=Socket, transport=Transport, streams=Streams}, StreamID, +commands(State0=#state{socket=Socket, transport=Transport, out_state=wait, streams=Streams}, StreamID, [{response, StatusCode, Headers0, Body}|Tail]) -> %% @todo I'm pretty sure the last stream in the list is the one we want %% considering all others are queued. #stream{version=Version} = lists:keyfind(StreamID, #stream.id, Streams), {State, Headers} = connection(State0, Headers0, StreamID, Version), %% @todo Ensure content-length is set. - Response = cow_http:response(StatusCode, 'HTTP/1.1', maps:to_list(Headers)), + Response = cow_http:response(StatusCode, 'HTTP/1.1', headers_to_list(Headers)), case Body of {sendfile, O, B, P} -> Transport:send(Socket, Response), @@ -838,18 +833,22 @@ commands(State0=#state{socket=Socket, transport=Transport, streams=Streams}, Str {State0#state{last_streamid=StreamID}, Headers0} end, {State, Headers} = connection(State1, Headers1, StreamID, Version), - Transport:send(Socket, cow_http:response(StatusCode, 'HTTP/1.1', maps:to_list(Headers))), + Transport:send(Socket, cow_http:response(StatusCode, 'HTTP/1.1', headers_to_list(Headers))), commands(State#state{out_state=chunked}, StreamID, Tail); %% Send a response body chunk. %% %% @todo WINDOW_UPDATE stuff require us to buffer some data. +%% @todo We probably want to allow Data to be the {sendfile, ...} tuple also. commands(State=#state{socket=Socket, transport=Transport, streams=Streams}, StreamID, [{data, IsFin, Data}|Tail]) -> + + %% @todo We need to kill the stream if it tries to send data before headers. + %% @todo Same as above. case lists:keyfind(StreamID, #stream.id, Streams) of #stream{version='HTTP/1.1'} -> Size = iolist_size(Data), - Transport:send(Socket, [integer_to_list(Size, 16), <<"\r\n">>, Data, <<"\r\n">>]); + Transport:send(Socket, [integer_to_binary(Size, 16), <<"\r\n">>, Data, <<"\r\n">>]); #stream{version='HTTP/1.0'} -> Transport:send(Socket, Data) end, @@ -885,7 +884,20 @@ commands(State0=#state{ref=Ref, parent=Parent, socket=Socket, transport=Transpor commands(State, StreamID, [stop|Tail]) -> %% @todo Do we want to run the commands after a stop? % commands(stream_terminate(State, StreamID, stop), StreamID, Tail). - maybe_terminate(State, StreamID, Tail, fin). + + %% @todo I think that's where we need to terminate streams. + + maybe_terminate(State, StreamID, Tail, fin); +%% HTTP/1.1 does not support push; ignore. +commands(State, StreamID, [{push, _, _, _, _, _, _, _}|Tail]) -> + commands(State, StreamID, Tail). + +%% The set-cookie header is special; we can only send one cookie per header. +headers_to_list(Headers0=#{<<"set-cookie">> := SetCookies}) -> + Headers1 = maps:to_list(maps:remove(<<"set-cookie">>, Headers0)), + Headers1 ++ [{<<"set-cookie">>, Value} || Value <- SetCookies]; +headers_to_list(Headers) -> + maps:to_list(Headers). flush() -> receive _ -> flush() after 0 -> ok end. @@ -1013,18 +1025,6 @@ error_terminate(StatusCode, State=#state{socket=Socket, transport=Transport}, Re terminate(_State, _Reason) -> exit(normal). %% @todo - - - - - - - - - - - - %% System callbacks. -spec system_continue(_, _, #state{}) -> ok. diff --git a/src/cowboy_http2.erl b/src/cowboy_http2.erl index ffcc17f..5c71628 100644 --- a/src/cowboy_http2.erl +++ b/src/cowboy_http2.erl @@ -369,6 +369,11 @@ info(State=#state{handler=Handler, streams=Streams}, StreamID, Msg) -> commands(State, Stream, []) -> after_commands(State, Stream); +%% Error responses are sent only if a response wasn't sent already. +commands(State, Stream=#stream{local=idle}, [{error_response, StatusCode, Headers, Body}|Tail]) -> + commands(State, Stream, [{response, StatusCode, Headers, Body}|Tail]); +commands(State, Stream, [{error_response, _, _, _}|Tail]) -> + commands(State, Stream, Tail); %% Send response headers. %% %% @todo Kill the stream if it sent a response when one has already been sent. @@ -376,7 +381,7 @@ commands(State, Stream, []) -> %% @todo Same two things above apply to DATA, possibly promise too. commands(State=#state{socket=Socket, transport=Transport, encode_state=EncodeState0}, Stream=#stream{id=StreamID, local=idle}, [{response, StatusCode, Headers0, Body}|Tail]) -> - Headers = Headers0#{<<":status">> => integer_to_binary(StatusCode)}, + Headers = Headers0#{<<":status">> => status(StatusCode)}, {HeaderBlock, EncodeState} = headers_encode(Headers, EncodeState0), case Body of <<>> -> @@ -387,17 +392,18 @@ commands(State=#state{socket=Socket, transport=Transport, encode_state=EncodeSta commands(State#state{encode_state=EncodeState}, Stream#stream{local=nofin}, [{sendfile, fin, O, B, P}|Tail]); _ -> - Transport:send(Socket, [ - cow_http2:headers(StreamID, nofin, HeaderBlock), - cow_http2:data(StreamID, fin, Body) - ]), + Transport:send(Socket, cow_http2:headers(StreamID, nofin, HeaderBlock)), + %% @todo 16384 is the default SETTINGS_MAX_FRAME_SIZE. + %% Use the length set by the server instead, if any. + %% @todo Would be better if we didn't have to convert to binary. + send_data(Socket, Transport, StreamID, fin, iolist_to_binary(Body), 16384), commands(State#state{encode_state=EncodeState}, Stream#stream{local=fin}, Tail) end; %% @todo response when local!=idle %% Send response headers and initiate chunked encoding. commands(State=#state{socket=Socket, transport=Transport, encode_state=EncodeState0}, Stream=#stream{id=StreamID, local=idle}, [{headers, StatusCode, Headers0}|Tail]) -> - Headers = Headers0#{<<":status">> => integer_to_binary(StatusCode)}, + Headers = Headers0#{<<":status">> => status(StatusCode)}, {HeaderBlock, EncodeState} = headers_encode(Headers, EncodeState0), Transport:send(Socket, cow_http2:headers(StreamID, nofin, HeaderBlock)), commands(State#state{encode_state=EncodeState}, Stream#stream{local=nofin}, Tail); @@ -417,7 +423,9 @@ commands(State=#state{socket=Socket, transport=Transport}, Stream=#stream{id=Str [{data, IsFin, Data}|Tail]) -> Transport:send(Socket, cow_http2:data(StreamID, IsFin, Data)), commands(State, Stream#stream{local=IsFin}, Tail); + %% @todo data when local!=nofin + %% Send a file. %% %% @todo This implementation is terrible. A good implementation would @@ -441,11 +449,20 @@ commands(State=#state{socket=Socket, transport=Transport}, Stream=#stream{id=Str %% end up with an infinite loop of promises. commands(State0=#state{socket=Socket, transport=Transport, server_streamid=PromisedStreamID, encode_state=EncodeState0}, Stream=#stream{id=StreamID}, - [{promise, Method, Scheme, Authority, Path, Headers0}|Tail]) -> + [{push, Method, Scheme, Host, Port, Path, Qs, Headers0}|Tail]) -> + Authority = case {Scheme, Port} of + {<<"http">>, 80} -> Host; + {<<"https">>, 443} -> Host; + _ -> [Host, $:, integer_to_binary(Port)] + end, + PathWithQs = case Qs of + <<>> -> Path; + _ -> [Path, $?, Qs] + end, Headers = Headers0#{<<":method">> => Method, <<":scheme">> => Scheme, <<":authority">> => Authority, - <<":path">> => Path}, + <<":path">> => PathWithQs}, {HeaderBlock, EncodeState} = headers_encode(Headers, EncodeState0), Transport:send(Socket, cow_http2:push_promise(StreamID, PromisedStreamID, HeaderBlock)), %% @todo iolist_to_binary(HeaderBlock) isn't optimal. Need a shortcut. @@ -484,6 +501,22 @@ after_commands(State=#state{streams=Streams0}, Stream=#stream{id=StreamID}) -> Streams = lists:keystore(StreamID, #stream.id, Streams0, Stream), State#state{streams=Streams}. +status(Status) when is_integer(Status) -> + integer_to_binary(Status); +status(<< H, T, U, _/bits >>) when H >= $1, H =< $9, T >= $0, T =< $9, U >= $0, U =< $9 -> + << H, T, U >>. + +%% This same function is found in gun_http2. +send_data(Socket, Transport, StreamID, IsFin, Data, Length) -> + if + Length < byte_size(Data) -> + << Payload:Length/binary, Rest/bits >> = Data, + Transport:send(Socket, cow_http2:data(StreamID, nofin, Payload)), + send_data(Socket, Transport, StreamID, IsFin, Rest, Length); + true -> + Transport:send(Socket, cow_http2:data(StreamID, IsFin, Data)) + end. + terminate(#state{socket=Socket, transport=Transport, handler=Handler, streams=Streams, children=Children}, Reason) -> %% @todo Send GOAWAY frame; need to keep track of last good stream id; how? @@ -511,6 +544,23 @@ stream_init(State0=#state{ref=Ref, socket=Socket, transport=Transport, peer=Peer <<":path">> := PathWithQs}, DecodeState} -> State = State0#state{decode_state=DecodeState}, Headers = maps:without([<<":method">>, <<":scheme">>, <<":authority">>, <<":path">>], Headers0), + BodyLength = case Headers of + _ when IsFin =:= fin -> + 0; + #{<<"content-length">> := <<"0">>} -> + 0; + #{<<"content-length">> := BinLength} -> + Length = try + cow_http_hd:parse_content_length(BinLength) + catch _:_ -> + terminate(State0, {stream_error, StreamID, protocol_error, + ''}) %% @todo + %% @todo Err should terminate here... + end, + Length; + _ -> + undefined + end, {Host, Port} = cow_http_hd:parse_host(Authority), {Path, Qs} = cow_http:parse_fullpath(PathWithQs), Req = #{ @@ -527,7 +577,8 @@ stream_init(State0=#state{ref=Ref, socket=Socket, transport=Transport, peer=Peer version => 'HTTP/2', headers => Headers, - has_body => IsFin =:= nofin + has_body => IsFin =:= nofin, + body_length => BodyLength %% @todo multipart? keep state separate %% meta values (cowboy_websocket, cowboy_rest) @@ -609,9 +660,26 @@ stream_terminate_children([Child|Tail], StreamID, Acc) -> headers_decode(HeaderBlock, DecodeState0) -> {Headers, DecodeState} = cow_hpack:decode(HeaderBlock, DecodeState0), - {maps:from_list(Headers), DecodeState}. + {headers_to_map(Headers, #{}), DecodeState}. -%% @todo We will need to special-case the set-cookie header here. +%% This function is necessary to properly handle duplicate headers +%% and the special-case cookie header. +headers_to_map([], Acc) -> + Acc; +headers_to_map([{Name, Value}|Tail], Acc0) -> + Acc = case Acc0 of + %% The cookie header does not use proper HTTP header lists. + #{Name := Value0} when Name =:= <<"cookie">> -> Acc0#{Name => << Value0/binary, "; ", Value/binary >>}; + #{Name := Value0} -> Acc0#{Name => << Value0/binary, ", ", Value/binary >>}; + _ -> Acc0#{Name => Value} + end, + headers_to_map(Tail, Acc). + +%% The set-cookie header is special; we can only send one cookie per header. +headers_encode(Headers0=#{<<"set-cookie">> := SetCookies}, EncodeState) -> + Headers1 = maps:to_list(maps:remove(<<"set-cookie">>, Headers0)), + Headers = Headers1 ++ [{<<"set-cookie">>, Value} || Value <- SetCookies], + cow_hpack:encode(Headers, EncodeState); headers_encode(Headers0, EncodeState) -> Headers = maps:to_list(Headers0), cow_hpack:encode(Headers, EncodeState). diff --git a/src/cowboy_loop.erl b/src/cowboy_loop.erl index 1418950..8acf896 100644 --- a/src/cowboy_loop.erl +++ b/src/cowboy_loop.erl @@ -48,46 +48,21 @@ buffer_size = 0 :: non_neg_integer(), max_buffer = 5000 :: non_neg_integer() | infinity, timeout = infinity :: timeout(), - timeout_ref = undefined :: undefined | reference(), - resp_sent = false :: boolean() + timeout_ref = undefined :: undefined | reference() }). -spec upgrade(Req, Env, module(), any(), timeout(), run | hibernate) -> {ok, Req, Env} | {suspend, module(), atom(), [any()]} when Req::cowboy_req:req(), Env::cowboy_middleware:env(). -upgrade(Req, Env, Handler, HandlerState, Timeout, run) -> - State = #state{env=Env, max_buffer=get_max_buffer(Env), timeout=Timeout}, +upgrade(Req, Env, Handler, HandlerState, Timeout, Hibernate) -> + State = #state{env=Env, max_buffer=get_max_buffer(Env), timeout=Timeout, + hibernate=Hibernate =:= hibernate}, State2 = timeout(State), - after_call(Req, State2, Handler, HandlerState); -upgrade(Req, Env, Handler, HandlerState, Timeout, hibernate) -> - -% dbg:start(), -% dbg:tracer(), -% dbg:tpl(?MODULE, []), -% dbg:tpl(long_polling_h, []), -% dbg:tpl(loop_handler_body_h, []), -% dbg:tpl(cowboy_req, []), -% dbg:p(all, c), - - State = #state{env=Env, max_buffer=get_max_buffer(Env), hibernate=true, timeout=Timeout}, - State2 = timeout(State), - after_call(Req, State2, Handler, HandlerState). + before_loop(Req, State2, Handler, HandlerState). get_max_buffer(#{loop_max_buffer := MaxBuffer}) -> MaxBuffer; get_max_buffer(_) -> 5000. -%% Update the state if the response was sent in the callback. -after_call(Req, State=#state{resp_sent=false}, Handler, - HandlerState) -> - receive - {cowboy_req, resp_sent} -> - before_loop(Req, State#state{resp_sent=true}, Handler, HandlerState) - after 0 -> - before_loop(Req, State, Handler, HandlerState) - end; -after_call(Req, State, Handler, HandlerState) -> - before_loop(Req, State, Handler, HandlerState). - before_loop(Req, State=#state{hibernate=true}, Handler, HandlerState) -> %% @todo Yeah we can't get the socket anymore. @@ -131,29 +106,17 @@ loop(Req, State=#state{timeout_ref=TRef}, Handler, HandlerState) -> call(Req, State, Handler, HandlerState, Message) end. -call(Req, State=#state{resp_sent=RespSent}, - Handler, HandlerState, Message) -> +call(Req, State, Handler, HandlerState, Message) -> try Handler:info(Message, Req, HandlerState) of {ok, Req2, HandlerState2} -> - after_call(Req2, State, Handler, HandlerState2); + before_loop(Req2, State, Handler, HandlerState2); {ok, Req2, HandlerState2, hibernate} -> - after_call(Req2, State#state{hibernate=true}, Handler, HandlerState2); + before_loop(Req2, State#state{hibernate=true}, Handler, HandlerState2); {stop, Req2, HandlerState2} -> terminate(Req2, State, Handler, HandlerState2, stop) catch Class:Reason -> - Stacktrace = erlang:get_stacktrace(), - if RespSent -> ok; true -> - cowboy_req:maybe_reply(Stacktrace, Req) - end, cowboy_handler:terminate({crash, Class, Reason}, Req, HandlerState, Handler), - exit({cowboy_handler, [ - {class, Class}, - {reason, Reason}, - {mfa, {Handler, info, 3}}, - {stacktrace, Stacktrace}, - {req, Req}, - {state, HandlerState} - ]}) + erlang:raise(Class, Reason, erlang:get_stacktrace()) end. terminate(Req, #state{env=Env, timeout_ref=TRef}, diff --git a/src/cowboy_req.erl b/src/cowboy_req.erl index 77d4a79..4235f6d 100644 --- a/src/cowboy_req.erl +++ b/src/cowboy_req.erl @@ -15,7 +15,7 @@ -module(cowboy_req). -%% Request API. +%% Request. -export([method/1]). -export([version/1]). -export([peer/1]). @@ -41,109 +41,93 @@ -export([parse_cookies/1]). -export([match_cookies/2]). -%% Request body API. +%% Request body. -export([has_body/1]). -export([body_length/1]). -export([read_body/1]). -export([read_body/2]). +-export([read_urlencoded_body/1]). +-export([read_urlencoded_body/2]). --export([body/1]). --export([body/2]). --export([body_qs/1]). --export([body_qs/2]). +%% Multipart. +-export([read_part/1]). +-export([read_part/2]). +-export([read_part_body/1]). +-export([read_part_body/2]). -%% Multipart API. --export([part/1]). --export([part/2]). --export([part_body/1]). --export([part_body/2]). - -%% Response API. +%% Response. +-export([set_resp_cookie/3]). -export([set_resp_cookie/4]). -export([set_resp_header/3]). --export([set_resp_body/2]). --export([set_resp_body_fun/2]). --export([set_resp_body_fun/3]). +%% @todo set_resp_headers/2 -export([has_resp_header/2]). --export([has_resp_body/1]). +%% @todo resp_header -export([delete_resp_header/2]). +-export([set_resp_body/2]). %% @todo Use set_resp_body for iodata() | {sendfile ...} +%% @todo set_resp_body/3 with a ContentType or even Headers argument, to set content headers. +-export([has_resp_body/1]). -export([reply/2]). -export([reply/3]). -export([reply/4]). +-export([stream_reply/2]). +-export([stream_reply/3]). +%% @todo stream_reply/2 (nofin) +-export([stream_body/3]). +%% @todo stream_event/2,3 +-export([push/3]). +-export([push/4]). --export([send_body/3]). - --export([chunked_reply/2]). --export([chunked_reply/3]). --export([chunk/2]). --export([continue/1]). --export([maybe_reply/2]). --export([ensure_response/2]). +%% Internal. +-export([response_headers/2]). -type cookie_opts() :: cow_cookie:cookie_opts(). -export_type([cookie_opts/0]). --type content_decode_fun() :: fun((binary()) -> binary()). --type transfer_decode_fun() :: fun((binary(), any()) - -> cow_http_te:decode_ret()). - --type body_opts() :: [{continue, boolean()} %% doesn't apply - | {length, non_neg_integer()} - | {read_length, non_neg_integer()} %% to be added back later as optimization - | {read_timeout, timeout()} %% same - | {transfer_decode, transfer_decode_fun(), any()} %% doesn't apply - | {content_decode, content_decode_fun()}]. %% does apply +-type body_opts() :: #{ + length => non_neg_integer(), + period => non_neg_integer(), + timeout => timeout() +}. -export_type([body_opts/0]). --type resp_body_fun() :: fun((any(), module()) -> ok). --type send_chunk_fun() :: fun((iodata()) -> ok). --type resp_chunked_fun() :: fun((send_chunk_fun()) -> ok). - --record(http_req, { - %% Transport. - socket = undefined :: any(), - transport = undefined :: undefined | module(), - connection = keepalive :: keepalive | close, - - %% Request. - pid = undefined :: pid(), - method = <<"GET">> :: binary(), - version = 'HTTP/1.1' :: cowboy:http_version(), - peer = undefined :: undefined | {inet:ip_address(), inet:port_number()}, - host = undefined :: undefined | binary(), - host_info = undefined :: undefined | cowboy_router:tokens(), - port = undefined :: undefined | inet:port_number(), - path = undefined :: binary(), - path_info = undefined :: undefined | cowboy_router:tokens(), - qs = undefined :: binary(), - bindings = undefined :: undefined | cowboy_router:bindings(), - headers = [] :: cowboy:http_headers(), - meta = [] :: [{atom(), any()}], - - %% Request body. - body_state = waiting :: waiting | done | {stream, non_neg_integer(), - transfer_decode_fun(), any(), content_decode_fun()}, - buffer = <<>> :: binary(), - multipart = undefined :: undefined | {binary(), binary()}, - - %% Response. - resp_compress = false :: boolean(), - resp_state = waiting :: locked | waiting | waiting_stream - | chunks | stream | done, - resp_headers = [] :: cowboy:http_headers(), - resp_body = <<>> :: iodata() | resp_body_fun() - | {non_neg_integer(), resp_body_fun()} - | {chunked, resp_chunked_fun()}, - - %% Functions. - onresponse = undefined :: undefined | already_called - | cowboy:onresponse_fun() -}). - --opaque req() :: #http_req{}. +%% While sendfile allows a Len of 0 that means "everything past Offset", +%% Cowboy expects the real length as it is used as metadata. +%% @todo We should probably explicitly reject it. +-type resp_body() :: iodata() + | {sendfile, non_neg_integer(), pos_integer(), file:name_all()}. +-export_type([resp_body/0]). + +-type push_opts() :: map(). %% @todo +-export_type([push_opts/0]). + +-type req() :: map(). %% @todo #{ +% ref := ranch:ref(), +% pid := pid(), +% streamid := cowboy_stream:streamid(), +% peer := {inet:ip_address(), inet:port_number()}, +% +% method := binary(), %% case sensitive +% version := cowboy:http_version(), +% scheme := binary(), %% <<"http">> or <<"https">> +% host := binary(), %% lowercase; case insensitive +% port := inet:port_number(), +% path := binary(), %% case sensitive +% qs := binary(), %% case sensitive +% headers := cowboy:http_headers(), +% +% host_info => cowboy_router:tokens(), +% path_info => cowboy_router:tokens(), +% bindings => cowboy_router:bindings(), +% +% has_body := boolean(), +% has_read_body => true, +% body_length := undefined | non_neg_integer() +% +%% @todo resp_* +%}. -export_type([req/0]). -%% Request API. +%% Request. -spec method(req()) -> binary(). method(#{method := Method}) -> @@ -165,6 +149,7 @@ scheme(#{scheme := Scheme}) -> host(#{host := Host}) -> Host. +%% @todo The host_info is undefined if cowboy_router isn't used. Do we want to crash? -spec host_info(req()) -> cowboy_router:tokens() | undefined. host_info(#{host_info := HostInfo}) -> HostInfo. @@ -177,6 +162,7 @@ port(#{port := Port}) -> path(#{path := Path}) -> Path. +%% @todo The path_info is undefined if cowboy_router isn't used. Do we want to crash? -spec path_info(req()) -> cowboy_router:tokens() | undefined. path_info(#{path_info := PathInfo}) -> PathInfo. @@ -185,6 +171,7 @@ path_info(#{path_info := PathInfo}) -> qs(#{qs := Qs}) -> Qs. +%% @todo Might be useful to limit the number of keys. -spec parse_qs(req()) -> [{binary(), binary() | true}]. parse_qs(#{qs := Qs}) -> cow_qs:parse_qs(Qs). @@ -359,6 +346,7 @@ parse_header(Name = <<"content-length">>, Req) -> parse_header(Name, Req, 0, fun cow_http_hd:parse_content_length/1); parse_header(Name = <<"cookie">>, Req) -> parse_header(Name, Req, [], fun cow_cookie:parse_cookie/1); +%% @todo That header is abstracted out and should never reach cowboy_req. parse_header(Name = <<"transfer-encoding">>, Req) -> parse_header(Name, Req, [<<"identity">>], fun cow_http_hd:parse_transfer_encoding/1); parse_header(Name, Req) -> @@ -403,266 +391,125 @@ parse_cookies(Req) -> match_cookies(Fields, Req) -> filter(Fields, kvlist_to_map(Fields, parse_cookies(Req))). -%% Request Body API. +%% Request body. -spec has_body(req()) -> boolean(). has_body(#{has_body := HasBody}) -> HasBody. -%% The length may not be known if Transfer-Encoding is not identity, -%% and the body hasn't been read at the time of the call. +%% The length may not be known if HTTP/1.1 with a transfer-encoding; +%% or HTTP/2 with no content-length header. The length is always +%% known once the body has been completely read. -spec body_length(req()) -> undefined | non_neg_integer(). body_length(#{body_length := Length}) -> Length. --spec body(Req) -> {ok, binary(), Req} | {more, binary(), Req} when Req::req(). -body(Req) -> - body(Req, []). - -spec read_body(Req) -> {ok, binary(), Req} | {more, binary(), Req} when Req::req(). read_body(Req) -> - read_body(Req, []). + read_body(Req, #{}). -spec read_body(Req, body_opts()) -> {ok, binary(), Req} | {more, binary(), Req} when Req::req(). +read_body(Req=#{has_body := false}, _) -> + {ok, <<>>, Req}; +read_body(Req=#{has_read_body := true}, _) -> + {ok, <<>>, Req}; read_body(Req=#{pid := Pid, streamid := StreamID}, Opts) -> - %% @todo Opts should be a map - Length = case lists:keyfind(length, 1, Opts) of - false -> 8000000; - {_, ChunkLen0} -> ChunkLen0 - end, - ReadTimeout = case lists:keyfind(read_timeout, 1, Opts) of - false -> 15000; - {_, ReadTimeout0} -> ReadTimeout0 - end, + Length = maps:get(length, Opts, 8000000), + Period = maps:get(period, Opts, 15000), + Timeout = maps:get(timeout, Opts, Period + 1000), Ref = make_ref(), - Pid ! {{Pid, StreamID}, {read_body, Ref, Length}}, + Pid ! {{Pid, StreamID}, {read_body, Ref, Length, Period}}, receive {request_body, Ref, nofin, Body} -> {more, Body, Req}; {request_body, Ref, {fin, BodyLength}, Body} -> {ok, Body, set_body_length(Req, BodyLength)} - after ReadTimeout -> - exit(read_body_timeout) + after Timeout -> + exit(timeout) end. set_body_length(Req=#{headers := Headers}, BodyLength) -> Req#{ headers => Headers#{<<"content-length">> => integer_to_binary(BodyLength)}, - body_length => BodyLength + body_length => BodyLength, + has_read_body => true }. --spec body(Req, body_opts()) -> {ok, binary(), Req} | {more, binary(), Req} when Req::req(). -body(Req=#http_req{body_state=waiting}, Opts) -> - %% Send a 100 continue if needed (enabled by default). - case lists:keyfind(continue, 1, Opts) of - {_, false} -> - ok; - _ -> - ExpectHeader = parse_header(<<"expect">>, Req), - ok = case ExpectHeader of - continue -> continue(Req); - _ -> ok - end - end, - %% Initialize body streaming state. - CFun = case lists:keyfind(content_decode, 1, Opts) of - false -> - fun body_content_decode_identity/1; - {_, CFun0} -> - CFun0 - end, - case lists:keyfind(transfer_decode, 1, Opts) of - false -> - case parse_header(<<"transfer-encoding">>, Req) of - [<<"chunked">>] -> - body(Req#http_req{body_state={stream, 0, - fun cow_http_te:stream_chunked/2, {0, 0}, CFun}}, Opts); - [<<"identity">>] -> - case body_length(Req) of - 0 -> - {ok, <<>>, Req#http_req{body_state=done}}; - Len -> - body(Req#http_req{body_state={stream, Len, - fun cow_http_te:stream_identity/2, {0, Len}, - CFun}}, Opts) - end - end; - {_, TFun, TState} -> - body(Req#http_req{body_state={stream, 0, - TFun, TState, CFun}}, Opts) - end; -body(Req=#http_req{body_state=done}, _) -> - {ok, <<>>, Req}; -body(Req, Opts) -> - ChunkLen = case lists:keyfind(length, 1, Opts) of - false -> 8000000; - {_, ChunkLen0} -> ChunkLen0 - end, - ReadLen = case lists:keyfind(read_length, 1, Opts) of - false -> 1000000; - {_, ReadLen0} -> ReadLen0 - end, - ReadTimeout = case lists:keyfind(read_timeout, 1, Opts) of - false -> 15000; - {_, ReadTimeout0} -> ReadTimeout0 - end, - body_loop(Req, ReadTimeout, ReadLen, ChunkLen, <<>>). - -%% Default identity function for content decoding. -%% @todo Move into cowlib when more content decode functions get implemented. -body_content_decode_identity(Data) -> Data. - -body_loop(Req=#http_req{buffer=Buffer, body_state={stream, Length, _, _, _}}, - ReadTimeout, ReadLength, ChunkLength, Acc) -> - {Tag, Res, Req2} = case Buffer of - <<>> -> - body_recv(Req, ReadTimeout, min(Length, ReadLength)); - _ -> - body_decode(Req, ReadTimeout) - end, - case {Tag, Res} of - {ok, Data} -> - {ok, << Acc/binary, Data/binary >>, Req2}; - {more, Data} -> - Acc2 = << Acc/binary, Data/binary >>, - case byte_size(Acc2) >= ChunkLength of - true -> {more, Acc2, Req2}; - false -> body_loop(Req2, ReadTimeout, ReadLength, ChunkLength, Acc2) - end - end. - -body_recv(Req=#http_req{transport=Transport, socket=Socket, buffer=Buffer}, - ReadTimeout, ReadLength) -> - {ok, Data} = Transport:recv(Socket, ReadLength, ReadTimeout), - body_decode(Req#http_req{buffer= << Buffer/binary, Data/binary >>}, ReadTimeout). - -%% Two decodings happen. First a decoding function is applied to the -%% transferred data, and then another is applied to the actual content. -%% -%% Transfer encoding is generally used for chunked bodies. The decoding -%% function uses a state to keep track of how much it has read, which is -%% also initialized through this function. -%% -%% Content encoding is generally used for compression. -%% -%% @todo Handle chunked after-the-facts headers. -%% @todo Depending on the length returned we might want to 0 or +5 it. -body_decode(Req=#http_req{buffer=Data, body_state={stream, _, - TDecode, TState, CDecode}}, ReadTimeout) -> - case TDecode(Data, TState) of - more -> - body_recv(Req#http_req{body_state={stream, 0, - TDecode, TState, CDecode}}, ReadTimeout, 0); - {more, Data2, TState2} -> - {more, CDecode(Data2), Req#http_req{body_state={stream, 0, - TDecode, TState2, CDecode}, buffer= <<>>}}; - {more, Data2, Length, TState2} when is_integer(Length) -> - {more, CDecode(Data2), Req#http_req{body_state={stream, Length, - TDecode, TState2, CDecode}, buffer= <<>>}}; - {more, Data2, Rest, TState2} -> - {more, CDecode(Data2), Req#http_req{body_state={stream, 0, - TDecode, TState2, CDecode}, buffer=Rest}}; - {done, TotalLength, Rest} -> - {ok, <<>>, body_decode_end(Req, TotalLength, Rest)}; - {done, Data2, TotalLength, Rest} -> - {ok, CDecode(Data2), body_decode_end(Req, TotalLength, Rest)} - end. +-spec read_urlencoded_body(Req) -> {ok, [{binary(), binary() | true}], Req} when Req::req(). +read_urlencoded_body(Req) -> + read_urlencoded_body(Req, #{length => 64000, period => 5000}). -body_decode_end(Req=#http_req{headers=Headers}, TotalLength, Rest) -> - Headers2 = lists:keystore(<<"content-length">>, 1, Headers, - {<<"content-length">>, integer_to_binary(TotalLength)}), - %% At this point we just assume TEs were all decoded. - Headers3 = lists:keydelete(<<"transfer-encoding">>, 1, Headers2), - Req#http_req{buffer=Rest, body_state=done, headers=Headers3}. - --spec body_qs(Req) -> {ok, [{binary(), binary() | true}], Req} - | {badlength, Req} when Req::req(). -body_qs(Req) -> - body_qs(Req, [ - {length, 64000}, - {read_length, 64000}, - {read_timeout, 5000}]). - --spec body_qs(Req, body_opts()) -> {ok, [{binary(), binary() | true}], Req} - | {badlength, Req} when Req::req(). -body_qs(Req, Opts) -> - case read_body(Req, Opts) of - {ok, Body, Req2} -> - {ok, cow_qs:parse_qs(Body), Req2}; - {more, _, Req2} -> - {badlength, Req2} - end. +-spec read_urlencoded_body(Req, body_opts()) -> {ok, [{binary(), binary() | true}], Req} when Req::req(). +read_urlencoded_body(Req0, Opts) -> + {ok, Body, Req} = read_body(Req0, Opts), + {ok, cow_qs:parse_qs(Body), Req}. -%% Multipart API. +%% Multipart. --spec part(Req) +-spec read_part(Req) -> {ok, cow_multipart:headers(), Req} | {done, Req} when Req::req(). -part(Req) -> - part(Req, [ - {length, 64000}, - {read_length, 64000}, - {read_timeout, 5000}]). +read_part(Req) -> + read_part(Req, #{length => 64000, period => 5000}). --spec part(Req, body_opts()) +-spec read_part(Req, body_opts()) -> {ok, cow_multipart:headers(), Req} | {done, Req} when Req::req(). -part(Req, Opts) -> +read_part(Req, Opts) -> case maps:is_key(multipart, Req) of true -> {Data, Req2} = stream_multipart(Req, Opts), - part(Data, Opts, Req2); + read_part(Data, Opts, Req2); false -> - part(init_multipart(Req), Opts) + read_part(init_multipart(Req), Opts) end. -part(Buffer, Opts, Req=#{multipart := {Boundary, _}}) -> +read_part(Buffer, Opts, Req=#{multipart := {Boundary, _}}) -> case cow_multipart:parse_headers(Buffer, Boundary) of more -> {Data, Req2} = stream_multipart(Req, Opts), - part(<< Buffer/binary, Data/binary >>, Opts, Req2); + read_part(<< Buffer/binary, Data/binary >>, Opts, Req2); {more, Buffer2} -> {Data, Req2} = stream_multipart(Req, Opts), - part(<< Buffer2/binary, Data/binary >>, Opts, Req2); + read_part(<< Buffer2/binary, Data/binary >>, Opts, Req2); {ok, Headers, Rest} -> + %% @todo We may want headers as a map. Need to check the + %% rules for multipart header parsing before taking a decision. {ok, Headers, Req#{multipart => {Boundary, Rest}}}; %% Ignore epilogue. {done, _} -> {done, Req#{multipart => done}} end. --spec part_body(Req) +-spec read_part_body(Req) -> {ok, binary(), Req} | {more, binary(), Req} when Req::req(). -part_body(Req) -> - part_body(Req, []). +read_part_body(Req) -> + read_part_body(Req, #{}). --spec part_body(Req, body_opts()) +-spec read_part_body(Req, body_opts()) -> {ok, binary(), Req} | {more, binary(), Req} when Req::req(). -part_body(Req, Opts) -> +read_part_body(Req, Opts) -> case maps:is_key(multipart, Req) of true -> - part_body(<<>>, Opts, Req, <<>>); + read_part_body(<<>>, Opts, Req, <<>>); false -> - part_body(init_multipart(Req), Opts) + read_part_body(init_multipart(Req), Opts) end. -part_body(Buffer, Opts, Req=#{multipart := {Boundary, _}}, Acc) -> - ChunkLen = case lists:keyfind(length, 1, Opts) of - false -> 8000000; - {_, ChunkLen0} -> ChunkLen0 - end, - case byte_size(Acc) > ChunkLen of +read_part_body(Buffer, Opts, Req=#{multipart := {Boundary, _}}, Acc) -> + Length = maps:get(length, Opts, 8000000), + case byte_size(Acc) > Length of true -> {more, Acc, Req#{multipart => {Boundary, Buffer}}}; false -> {Data, Req2} = stream_multipart(Req, Opts), case cow_multipart:parse_body(<< Buffer/binary, Data/binary >>, Boundary) of {ok, Body} -> - part_body(<<>>, Opts, Req2, << Acc/binary, Body/binary >>); + read_part_body(<<>>, Opts, Req2, << Acc/binary, Body/binary >>); {ok, Body, Rest} -> - part_body(Rest, Opts, Req2, << Acc/binary, Body/binary >>); + read_part_body(Rest, Opts, Req2, << Acc/binary, Body/binary >>); done -> {ok, Acc, Req2}; {done, Body} -> @@ -686,19 +533,27 @@ stream_multipart(Req=#{multipart := {_, <<>>}}, Opts) -> stream_multipart(Req=#{multipart := {Boundary, Buffer}}, _) -> {Buffer, Req#{multipart => {Boundary, <<>>}}}. -%% Response API. +%% Response. + +-spec set_resp_cookie(iodata(), iodata(), Req) + -> Req when Req::req(). +set_resp_cookie(Name, Value, Req) -> + set_resp_cookie(Name, Value, #{}, Req). %% The cookie name cannot contain any of the following characters: %% =,;\s\t\r\n\013\014 %% %% The cookie value cannot contain any of the following characters: %% ,; \t\r\n\013\014 +%% @todo Fix the cookie_opts() type. -spec set_resp_cookie(iodata(), iodata(), cookie_opts(), Req) -> Req when Req::req(). set_resp_cookie(Name, Value, Opts, Req) -> - Cookie = cow_cookie:setcookie(Name, Value, Opts), - %% @todo Nah, keep separate. - set_resp_header(<<"set-cookie">>, Cookie, Req). + Cookie = cow_cookie:setcookie(Name, Value, maps:to_list(Opts)), + RespCookies = maps:get(resp_cookies, Req, #{}), + Req#{resp_cookies => RespCookies#{Name => Cookie}}. + +%% @todo We could add has_resp_cookie and delete_resp_cookie now. -spec set_resp_header(binary(), iodata(), Req) -> Req when Req::req(). @@ -707,29 +562,9 @@ set_resp_header(Name, Value, Req=#{resp_headers := RespHeaders}) -> set_resp_header(Name,Value, Req) -> Req#{resp_headers => #{Name => Value}}. -%% @todo {sendfile, Offset, Bytes, Path} tuple --spec set_resp_body(iodata(), Req) -> Req when Req::req(). +-spec set_resp_body(resp_body(), Req) -> Req when Req::req(). set_resp_body(Body, Req) -> Req#{resp_body => Body}. -%set_resp_body(Body, Req) -> -% Req#http_req{resp_body=Body}. - --spec set_resp_body_fun(resp_body_fun(), Req) -> Req when Req::req(). -set_resp_body_fun(StreamFun, Req) when is_function(StreamFun) -> - Req#http_req{resp_body=StreamFun}. - -%% If the body function crashes while writing the response body or writes -%% fewer bytes than declared the behaviour is undefined. --spec set_resp_body_fun(non_neg_integer(), resp_body_fun(), Req) - -> Req when Req::req(); - (chunked, resp_chunked_fun(), Req) - -> Req when Req::req(). -set_resp_body_fun(StreamLen, StreamFun, Req) - when is_integer(StreamLen), is_function(StreamFun) -> - Req#http_req{resp_body={StreamLen, StreamFun}}; -set_resp_body_fun(chunked, StreamFun, Req) - when is_function(StreamFun) -> - Req#http_req{resp_body={chunked, StreamFun}}. -spec has_resp_header(binary(), req()) -> boolean(). has_resp_header(Name, #{resp_headers := RespHeaders}) -> @@ -738,22 +573,13 @@ has_resp_header(_, _) -> false. -spec has_resp_body(req()) -> boolean(). -has_resp_body(#{resp_body := {sendfile, Len, _}}) -> - Len > 0; +has_resp_body(#{resp_body := {sendfile, _, _, _}}) -> + true; has_resp_body(#{resp_body := RespBody}) -> iolist_size(RespBody) > 0; has_resp_body(_) -> false. -%has_resp_body(#http_req{resp_body=RespBody}) when is_function(RespBody) -> -% true; -%has_resp_body(#http_req{resp_body={chunked, _}}) -> -% true; -%has_resp_body(#http_req{resp_body={Length, _}}) -> -% Length > 0; -%has_resp_body(#http_req{resp_body=RespBody}) -> -% iolist_size(RespBody) > 0. - -spec delete_resp_header(binary(), Req) -> Req when Req::req(). delete_resp_header(Name, Req=#{resp_headers := RespHeaders}) -> @@ -770,287 +596,99 @@ reply(Status, Headers, Req=#{resp_body := Body}) -> reply(Status, Headers, Req) -> reply(Status, Headers, <<>>, Req). --spec reply(cowboy:http_status(), cowboy:http_headers(), - iodata() | resp_body_fun() | {non_neg_integer(), resp_body_fun()} - | {chunked, resp_chunked_fun()}, Req) +-spec reply(cowboy:http_status(), cowboy:http_headers(), resp_body(), Req) -> Req when Req::req(). -reply(Status, Headers, Stream = {stream, undefined, _}, Req) -> - do_stream_reply(Status, Headers, Stream, Req); -reply(Status, Headers, Stream = {stream, Len, _}, Req) -> - do_stream_reply(Status, Headers#{ - <<"content-length">> => integer_to_binary(Len) - }, Stream, Req); -reply(Status, Headers, SendFile = {sendfile, _, Len, _}, Req) -> +reply(_, _, _, #{has_sent_resp := _}) -> + error(function_clause); +reply(Status, Headers, SendFile = {sendfile, _, Len, _}, Req) + when is_integer(Status); is_binary(Status) -> do_reply(Status, Headers#{ <<"content-length">> => integer_to_binary(Len) }, SendFile, Req); -reply(Status, Headers, Body, Req) -> +reply(Status, Headers, Body, Req) + when is_integer(Status); is_binary(Status) -> do_reply(Status, Headers#{ <<"content-length">> => integer_to_binary(iolist_size(Body)) }, Body, Req). -do_stream_reply(Status, Headers, {stream, _, Fun}, Req=#{pid := Pid, streamid := StreamID}) -> - Pid ! {{Pid, StreamID}, {headers, Status, response_headers(Headers, Req)}}, - Fun(), - ok. - +%% Don't send any body for HEAD responses. While the protocol code is +%% supposed to enforce this rule, we prefer to avoid copying too much +%% data around if we can avoid it. +do_reply(Status, Headers, _, Req=#{pid := Pid, streamid := StreamID, method := <<"HEAD">>}) -> + Pid ! {{Pid, StreamID}, {response, Status, response_headers(Headers, Req), <<>>}}, + done_replying(Req, true); do_reply(Status, Headers, Body, Req=#{pid := Pid, streamid := StreamID}) -> Pid ! {{Pid, StreamID}, {response, Status, response_headers(Headers, Req), Body}}, - ok. + done_replying(Req, true). --spec send_body(iodata(), fin | nofin, req()) -> ok. -send_body(Data, IsFin, #{pid := Pid, streamid := StreamID}) -> - Pid ! {{Pid, StreamID}, {data, IsFin, Data}}, - ok. +done_replying(Req, HasSentResp) -> + maps:without([resp_cookies, resp_headers, resp_body], Req#{has_sent_resp => HasSentResp}). -response_headers(Headers, Req) -> - RespHeaders = maps:get(resp_headers, Req, #{}), - maps:merge(#{ - <<"date">> => cowboy_clock:rfc1123(), - <<"server">> => <<"Cowboy">> - }, maps:merge(RespHeaders, Headers)). - -%reply(Status, Headers, Body, Req=#http_req{ -% socket=Socket, transport=Transport, -% version=Version, connection=Connection, -% method=Method, resp_compress=Compress, -% resp_state=RespState, resp_headers=RespHeaders}) -% when RespState =:= waiting; RespState =:= waiting_stream -> -% Req3 = case Body of -% BodyFun when is_function(BodyFun) -> -% %% We stream the response body until we close the connection. -% RespConn = close, -% {RespType, Req2} = if -% true -> -% response(Status, Headers, RespHeaders, [ -% {<<"connection">>, <<"close">>}, -% {<<"date">>, cowboy_clock:rfc1123()}, -% {<<"server">>, <<"Cowboy">>}, -% {<<"transfer-encoding">>, <<"identity">>} -% ], <<>>, Req) -% end, -% if RespType =/= hook, Method =/= <<"HEAD">> -> -% BodyFun(Socket, Transport); -% true -> ok -% end, -% Req2#http_req{connection=RespConn}; -% {chunked, BodyFun} -> -% %% We stream the response body in chunks. -% {RespType, Req2} = chunked_response(Status, Headers, Req), -% if RespType =/= hook, Method =/= <<"HEAD">> -> -% ChunkFun = fun(IoData) -> chunk(IoData, Req2) end, -% BodyFun(ChunkFun), -% %% Send the last chunk if chunked encoding was used. -% if -% Version =:= 'HTTP/1.0'; RespState =:= waiting_stream -> -% Req2; -% true -> -% last_chunk(Req2) -% end; -% true -> Req2 -% end; -% {ContentLength, BodyFun} -> -% %% We stream the response body for ContentLength bytes. -% RespConn = response_connection(Headers, Connection), -% {RespType, Req2} = response(Status, Headers, RespHeaders, [ -% {<<"content-length">>, integer_to_list(ContentLength)}, -% {<<"date">>, cowboy_clock:rfc1123()}, -% {<<"server">>, <<"Cowboy">>} -% |HTTP11Headers], stream, Req), -% if RespType =/= hook, Method =/= <<"HEAD">> -> -% BodyFun(Socket, Transport); -% true -> ok -% end, -% Req2#http_req{connection=RespConn}; -% _ when Compress -> -% RespConn = response_connection(Headers, Connection), -% Req2 = reply_may_compress(Status, Headers, Body, Req, -% RespHeaders, HTTP11Headers, Method), -% Req2#http_req{connection=RespConn}; -% _ -> -% RespConn = response_connection(Headers, Connection), -% Req2 = reply_no_compress(Status, Headers, Body, Req, -% RespHeaders, HTTP11Headers, Method, iolist_size(Body)), -% Req2#http_req{connection=RespConn} -% end, -% Req3#http_req{resp_state=done, resp_headers=[], resp_body= <<>>}. - -%reply_may_compress(Status, Headers, Body, Req, -% RespHeaders, HTTP11Headers, Method) -> -% BodySize = iolist_size(Body), -% try parse_header(<<"accept-encoding">>, Req) of -% Encodings -> -% CanGzip = (BodySize > 300) -% andalso (false =:= lists:keyfind(<<"content-encoding">>, -% 1, Headers)) -% andalso (false =:= lists:keyfind(<<"content-encoding">>, -% 1, RespHeaders)) -% andalso (false =:= lists:keyfind(<<"transfer-encoding">>, -% 1, Headers)) -% andalso (false =:= lists:keyfind(<<"transfer-encoding">>, -% 1, RespHeaders)) -% andalso (Encodings =/= undefined) -% andalso (false =/= lists:keyfind(<<"gzip">>, 1, Encodings)), -% case CanGzip of -% true -> -% GzBody = zlib:gzip(Body), -% {_, Req2} = response(Status, Headers, RespHeaders, [ -% {<<"content-length">>, integer_to_list(byte_size(GzBody))}, -% {<<"content-encoding">>, <<"gzip">>}, -% |HTTP11Headers], -% case Method of <<"HEAD">> -> <<>>; _ -> GzBody end, -% Req), -% Req2; -% false -> -% reply_no_compress(Status, Headers, Body, Req, -% RespHeaders, HTTP11Headers, Method, BodySize) -% end -% catch _:_ -> -% reply_no_compress(Status, Headers, Body, Req, -% RespHeaders, HTTP11Headers, Method, BodySize) -% end. -% -%reply_no_compress(Status, Headers, Body, Req, -% RespHeaders, HTTP11Headers, Method, BodySize) -> -% {_, Req2} = response(Status, Headers, RespHeaders, [ -% {<<"content-length">>, integer_to_list(BodySize)}, -% |HTTP11Headers], -% case Method of <<"HEAD">> -> <<>>; _ -> Body end, -% Req), -% Req2. - --spec chunked_reply(cowboy:http_status(), Req) -> Req when Req::req(). -chunked_reply(Status, Req) -> - chunked_reply(Status, #{}, Req). - --spec chunked_reply(cowboy:http_status(), cowboy:http_headers(), Req) +-spec stream_reply(cowboy:http_status(), Req) -> Req when Req::req(). +stream_reply(Status, Req) -> + stream_reply(Status, #{}, Req). + +-spec stream_reply(cowboy:http_status(), cowboy:http_headers(), Req) -> Req when Req::req(). -chunked_reply(Status, Headers, Req=#{pid := Pid, streamid := StreamID}) -> +stream_reply(_, _, #{has_sent_resp := _}) -> + error(function_clause); +stream_reply(Status, Headers=#{}, Req=#{pid := Pid, streamid := StreamID}) + when is_integer(Status); is_binary(Status) -> Pid ! {{Pid, StreamID}, {headers, Status, response_headers(Headers, Req)}}, - Req. %% @todo return ok -% ok. + done_replying(Req, headers). --spec chunk(iodata(), req()) -> ok. -chunk(_Data, #{method := <<"HEAD">>}) -> +-spec stream_body(iodata(), fin | nofin, req()) -> ok. +%% Error out if headers were not sent. +%% Don't send any body for HEAD responses. +stream_body(_, _, #{method := <<"HEAD">>, has_sent_resp := headers}) -> ok; -chunk(Data, #{pid := Pid, streamid := StreamID}) -> +%% Don't send a message if the data is empty, except for the +%% very last message with IsFin=fin. +stream_body(Data, IsFin=nofin, #{pid := Pid, streamid := StreamID, has_sent_resp := headers}) -> case iolist_size(Data) of 0 -> ok; _ -> - Pid ! {{Pid, StreamID}, {data, nofin, Data}}, + Pid ! {{Pid, StreamID}, {data, IsFin, Data}}, ok - end. - -%% If ever made public, need to send nothing if HEAD. --spec last_chunk(Req) -> Req when Req::req(). -last_chunk(Req=#http_req{socket=Socket, transport=Transport}) -> - _ = Transport:send(Socket, <<"0\r\n\r\n">>), - Req#http_req{resp_state=done}. - --spec continue(req()) -> ok. -continue(#http_req{socket=Socket, transport=Transport, - version=Version}) -> - HTTPVer = atom_to_binary(Version, latin1), - ok = Transport:send(Socket, - << HTTPVer/binary, " ", (status(100))/binary, "\r\n\r\n" >>). - -%% Meant to be used internally for sending errors after crashes. --spec maybe_reply([{module(), atom(), arity() | [term()], _}], req()) -> ok. -maybe_reply(Stacktrace, Req) -> - receive - {cowboy_req, resp_sent} -> ok - after 0 -> - _ = do_maybe_reply(Stacktrace, Req), - ok - end. - -do_maybe_reply([{erlang, binary_to_integer, _, _}, {cow_http_hd, parse_content_length, _, _}|_], Req) -> - cowboy_req:reply(400, Req); -do_maybe_reply([{cow_http_hd, _, _, _}|_], Req) -> - cowboy_req:reply(400, Req); -do_maybe_reply(_, Req) -> - cowboy_req:reply(500, Req). + end; +stream_body(Data, IsFin, #{pid := Pid, streamid := StreamID, has_sent_resp := headers}) -> + Pid ! {{Pid, StreamID}, {data, IsFin, Data}}, + ok. --spec ensure_response(req(), cowboy:http_status()) -> ok. -%% The response has already been fully sent to the client. -ensure_response(#http_req{resp_state=done}, _) -> - ok; -%% No response has been sent but everything apparently went fine. -%% Reply with the status code found in the second argument. -ensure_response(Req=#http_req{resp_state=RespState}, Status) - when RespState =:= waiting; RespState =:= waiting_stream -> - _ = reply(Status, [], [], Req), - ok; -%% Terminate the chunked body for HTTP/1.1 only. -ensure_response(#http_req{method= <<"HEAD">>}, _) -> - ok; -ensure_response(Req=#http_req{resp_state=chunks}, _) -> - _ = last_chunk(Req), - ok; -ensure_response(#http_req{}, _) -> +-spec push(binary(), cowboy:http_headers(), req()) -> ok. +push(Path, Headers, Req) -> + push(Path, Headers, Req, #{}). + +%% @todo Optimization: don't send anything at all for HTTP/1.0 and HTTP/1.1. +%% @todo Path, Headers, Opts, everything should be in proper binary, +%% or normalized when creating the Req object. +-spec push(binary(), cowboy:http_headers(), req(), push_opts()) -> ok. +push(Path, Headers, #{pid := Pid, streamid := StreamID, + scheme := Scheme0, host := Host0, port := Port0}, Opts) -> + Method = maps:get(method, Opts, <<"GET">>), + Scheme = maps:get(scheme, Opts, Scheme0), + Host = maps:get(host, Opts, Host0), + Port = maps:get(port, Opts, Port0), + Qs = maps:get(qs, Opts, <<>>), + Pid ! {{Pid, StreamID}, {push, Method, Scheme, Host, Port, Path, Qs, Headers}}, ok. %% Internal. --spec status(cowboy:http_status()) -> binary(). -status(100) -> <<"100 Continue">>; -status(101) -> <<"101 Switching Protocols">>; -status(102) -> <<"102 Processing">>; -status(200) -> <<"200 OK">>; -status(201) -> <<"201 Created">>; -status(202) -> <<"202 Accepted">>; -status(203) -> <<"203 Non-Authoritative Information">>; -status(204) -> <<"204 No Content">>; -status(205) -> <<"205 Reset Content">>; -status(206) -> <<"206 Partial Content">>; -status(207) -> <<"207 Multi-Status">>; -status(226) -> <<"226 IM Used">>; -status(300) -> <<"300 Multiple Choices">>; -status(301) -> <<"301 Moved Permanently">>; -status(302) -> <<"302 Found">>; -status(303) -> <<"303 See Other">>; -status(304) -> <<"304 Not Modified">>; -status(305) -> <<"305 Use Proxy">>; -status(306) -> <<"306 Switch Proxy">>; -status(307) -> <<"307 Temporary Redirect">>; -status(400) -> <<"400 Bad Request">>; -status(401) -> <<"401 Unauthorized">>; -status(402) -> <<"402 Payment Required">>; -status(403) -> <<"403 Forbidden">>; -status(404) -> <<"404 Not Found">>; -status(405) -> <<"405 Method Not Allowed">>; -status(406) -> <<"406 Not Acceptable">>; -status(407) -> <<"407 Proxy Authentication Required">>; -status(408) -> <<"408 Request Timeout">>; -status(409) -> <<"409 Conflict">>; -status(410) -> <<"410 Gone">>; -status(411) -> <<"411 Length Required">>; -status(412) -> <<"412 Precondition Failed">>; -status(413) -> <<"413 Request Entity Too Large">>; -status(414) -> <<"414 Request-URI Too Long">>; -status(415) -> <<"415 Unsupported Media Type">>; -status(416) -> <<"416 Requested Range Not Satisfiable">>; -status(417) -> <<"417 Expectation Failed">>; -status(418) -> <<"418 I'm a teapot">>; -status(422) -> <<"422 Unprocessable Entity">>; -status(423) -> <<"423 Locked">>; -status(424) -> <<"424 Failed Dependency">>; -status(425) -> <<"425 Unordered Collection">>; -status(426) -> <<"426 Upgrade Required">>; -status(428) -> <<"428 Precondition Required">>; -status(429) -> <<"429 Too Many Requests">>; -status(431) -> <<"431 Request Header Fields Too Large">>; -status(500) -> <<"500 Internal Server Error">>; -status(501) -> <<"501 Not Implemented">>; -status(502) -> <<"502 Bad Gateway">>; -status(503) -> <<"503 Service Unavailable">>; -status(504) -> <<"504 Gateway Timeout">>; -status(505) -> <<"505 HTTP Version Not Supported">>; -status(506) -> <<"506 Variant Also Negotiates">>; -status(507) -> <<"507 Insufficient Storage">>; -status(510) -> <<"510 Not Extended">>; -status(511) -> <<"511 Network Authentication Required">>; -status(B) when is_binary(B) -> B. +%% @todo What about set-cookie headers set through set_resp_header or reply? +response_headers(Headers0, Req) -> + RespHeaders = maps:get(resp_headers, Req, #{}), + Headers = maps:merge(#{ + <<"date">> => cowboy_clock:rfc1123(), + <<"server">> => <<"Cowboy">> + }, maps:merge(RespHeaders, Headers0)), + %% The set-cookie header is special; we can only send one cookie per header. + %% We send the list of values for many cookies in one key of the map, + %% and let the protocols deal with it directly. + case maps:get(resp_cookies, Req, undefined) of + undefined -> Headers; + RespCookies -> Headers#{<<"set-cookie">> => maps:values(RespCookies)} + end. %% Create map, convert keys to atoms and group duplicate keys into lists. %% Keys that are not found in the user provided list are entirely skipped. diff --git a/src/cowboy_rest.erl b/src/cowboy_rest.erl index 55b4e22..b760df2 100644 --- a/src/cowboy_rest.erl +++ b/src/cowboy_rest.erl @@ -654,7 +654,7 @@ variances(Req, State=#state{content_types_p=CTP, resource_exists(Req3, State2) end catch Class:Reason -> - error_terminate(Req, State, Class, Reason, variances) + error_terminate(Req, State, Class, Reason) end. variances(Req, State, Variances) -> @@ -693,7 +693,7 @@ if_match(Req, State, EtagsList) -> false -> precondition_failed(Req2, State2) end catch Class:Reason -> - error_terminate(Req, State, Class, Reason, generate_etag) + error_terminate(Req, State, Class, Reason) end. if_match_must_not_exist(Req, State) -> @@ -721,7 +721,7 @@ if_unmodified_since(Req, State, IfUnmodifiedSince) -> false -> if_none_match_exists(Req2, State2) end catch Class:Reason -> - error_terminate(Req, State, Class, Reason, last_modified) + error_terminate(Req, State, Class, Reason) end. if_none_match_exists(Req, State) -> @@ -747,7 +747,7 @@ if_none_match(Req, State, EtagsList) -> end end catch Class:Reason -> - error_terminate(Req, State, Class, Reason, generate_etag) + error_terminate(Req, State, Class, Reason) end. %% Weak Etag comparison: only check the opaque tag. @@ -790,7 +790,7 @@ if_modified_since(Req, State, IfModifiedSince) -> false -> not_modified(Req2, State2) end catch Class:Reason -> - error_terminate(Req, State, Class, Reason, last_modified) + error_terminate(Req, State, Class, Reason) end. not_modified(Req, State) -> @@ -801,10 +801,10 @@ not_modified(Req, State) -> {Req4, State3} -> respond(Req4, State3, 304) catch Class:Reason -> - error_terminate(Req, State2, Class, Reason, expires) + error_terminate(Req, State2, Class, Reason) end catch Class:Reason -> - error_terminate(Req, State, Class, Reason, generate_etag) + error_terminate(Req, State, Class, Reason) end. precondition_failed(Req, State) -> @@ -951,7 +951,7 @@ process_content_type(Req, State=#state{method=Method, exists=Exists}, Fun) -> true -> respond(Req3, State2, 201) end end catch Class:Reason = {case_clause, no_call} -> - error_terminate(Req, State, Class, Reason, Fun) + error_terminate(Req, State, Class, Reason) end. %% If PUT was used then the resource has been created at the current URL. @@ -978,7 +978,7 @@ set_resp_body_etag(Req, State) -> {Req2, State2} -> set_resp_body_last_modified(Req2, State2) catch Class:Reason -> - error_terminate(Req, State, Class, Reason, generate_etag) + error_terminate(Req, State, Class, Reason) end. %% Set the Last-Modified header if any for the response provided. @@ -995,7 +995,7 @@ set_resp_body_last_modified(Req, State) -> set_resp_body_expires(Req3, State2) end catch Class:Reason -> - error_terminate(Req, State, Class, Reason, last_modified) + error_terminate(Req, State, Class, Reason) end. %% Set the Expires header if any for the response provided. @@ -1004,7 +1004,7 @@ set_resp_body_expires(Req, State) -> {Req2, State2} -> set_resp_body(Req2, State2) catch Class:Reason -> - error_terminate(Req, State, Class, Reason, expires) + error_terminate(Req, State, Class, Reason) end. %% Set the response headers and call the callback found using @@ -1028,7 +1028,7 @@ set_resp_body(Req, State=#state{content_type_a={_, Callback}}) -> end, multiple_choices(Req3, State2) end catch Class:Reason = {case_clause, no_call} -> - error_terminate(Req, State, Class, Reason, Callback) + error_terminate(Req, State, Class, Reason) end. multiple_choices(Req, State) -> @@ -1131,7 +1131,7 @@ call(Req, State=#state{handler=Handler, handler_state=HandlerState}, try Handler:Callback(Req, HandlerState) catch Class:Reason -> - error_terminate(Req, State, Class, Reason, Callback) + error_terminate(Req, State, Class, Reason) end; false -> no_call @@ -1152,20 +1152,10 @@ next(Req, State, StatusCode) when is_integer(StatusCode) -> respond(Req, State, StatusCode) -> terminate(cowboy_req:reply(StatusCode, Req), State). --spec error_terminate(cowboy_req:req(), #state{}, atom(), any(), atom()) -> no_return(). -error_terminate(Req, #state{handler=Handler, handler_state=HandlerState}, - Class, Reason, Callback) -> - Stacktrace = erlang:get_stacktrace(), - cowboy_req:maybe_reply(Stacktrace, Req), +-spec error_terminate(cowboy_req:req(), #state{}, atom(), any()) -> no_return(). +error_terminate(Req, #state{handler=Handler, handler_state=HandlerState}, Class, Reason) -> cowboy_handler:terminate({crash, Class, Reason}, Req, HandlerState, Handler), - exit({cowboy_handler, [ - {class, Class}, - {reason, Reason}, - {mfa, {Handler, Callback, 2}}, - {stacktrace, Stacktrace}, - {req, Req}, - {state, HandlerState} - ]}). + erlang:raise(Class, Reason, erlang:get_stacktrace()). terminate(Req, #state{handler=Handler, handler_state=HandlerState}) -> Result = cowboy_handler:terminate(normal, Req, HandlerState, Handler), diff --git a/src/cowboy_stream_h.erl b/src/cowboy_stream_h.erl index b834c17..a7ce721 100644 --- a/src/cowboy_stream_h.erl +++ b/src/cowboy_stream_h.erl @@ -29,7 +29,8 @@ ref = undefined :: ranch:ref(), pid = undefined :: pid(), read_body_ref = undefined :: reference(), - read_body_length = 0 :: non_neg_integer(), + read_body_timer_ref = undefined :: reference(), + read_body_length = 0 :: non_neg_integer() | infinity, read_body_is_fin = nofin :: nofin | fin, read_body_buffer = <<>> :: binary() }). @@ -58,49 +59,48 @@ data(_StreamID, IsFin, Data, State=#state{read_body_ref=undefined, read_body_buf {[], State#state{read_body_is_fin=IsFin, read_body_buffer= << Buffer/binary, Data/binary >>}}; data(_StreamID, nofin, Data, State=#state{read_body_length=Length, read_body_buffer=Buffer}) when byte_size(Data) + byte_size(Buffer) < Length -> {[], State#state{read_body_buffer= << Buffer/binary, Data/binary >>}}; -data(_StreamID, IsFin, Data, State=#state{pid=Pid, read_body_ref=Ref, read_body_buffer=Buffer}) -> +data(_StreamID, IsFin, Data, State=#state{pid=Pid, read_body_ref=Ref, + read_body_timer_ref=TRef, read_body_buffer=Buffer}) -> + ok = erlang:cancel_timer(TRef, [{async, true}, {info, false}]), Pid ! {request_body, Ref, IsFin, << Buffer/binary, Data/binary >>}, - {[], State#state{read_body_ref=undefined, read_body_buffer= <<>>}}. + {[], State#state{read_body_ref=undefined, read_body_timer_ref=undefined, read_body_buffer= <<>>}}. %% @todo proper specs -spec info(_,_,_) -> _. info(_StreamID, {'EXIT', Pid, normal}, State=#state{pid=Pid}) -> {[stop], State}; -%% @todo Transition. -%% In the future it would be better to simplify things -%% and only catch this at the stream level. -%% -%% Maybe we don't need specific error messages -%% for every single callbacks anymore? -info(_StreamID, Exit = {'EXIT', Pid, {cowboy_handler, _}}, State=#state{pid=Pid}) -> - %% No crash report; one has already been sent. - {[ - {response, 500, #{<<"content-length">> => <<"0">>}, <<>>}, - {internal_error, Exit, 'Stream process crashed.'} - ], State}; info(_StreamID, {'EXIT', Pid, {_Reason, [_, {cow_http_hd, _, _, _}|_]}}, State=#state{pid=Pid}) -> %% @todo Have an option to enable/disable this specific crash report? %%report_crash(Ref, StreamID, Pid, Reason, Stacktrace), %% @todo Headers? Details in body? More stuff in debug only? - {[{response, 400, #{}, <<>>}, stop], State}; + {[{error_response, 400, #{}, <<>>}, stop], State}; info(StreamID, Exit = {'EXIT', Pid, {Reason, Stacktrace}}, State=#state{ref=Ref, pid=Pid}) -> report_crash(Ref, StreamID, Pid, Reason, Stacktrace), {[ - {response, 500, #{<<"content-length">> => <<"0">>}, <<>>}, + {error_response, 500, #{<<"content-length">> => <<"0">>}, <<>>}, {internal_error, Exit, 'Stream process crashed.'} ], State}; %% Request body, no body buffer but IsFin=fin. -info(_StreamID, {read_body, Ref, _}, State=#state{pid=Pid, read_body_is_fin=fin, read_body_buffer= <<>>}) -> - Pid ! {request_body, Ref, fin, <<>>}, - {[], State}; +%info(_StreamID, {read_body, Ref, _, _}, State=#state{pid=Pid, read_body_is_fin=fin, read_body_buffer= <<>>}) -> +% Pid ! {request_body, Ref, fin, <<>>}, +% {[], State}; %% Request body, body buffered large enough or complete. -info(_StreamID, {read_body, Ref, Length}, State=#state{pid=Pid, read_body_is_fin=IsFin, read_body_buffer=Data}) +info(_StreamID, {read_body, Ref, Length, _}, + State=#state{pid=Pid, read_body_is_fin=IsFin, read_body_buffer=Data}) when element(1, IsFin) =:= fin; byte_size(Data) >= Length -> Pid ! {request_body, Ref, IsFin, Data}, {[], State#state{read_body_buffer= <<>>}}; %% Request body, not enough to send yet. -info(_StreamID, {read_body, Ref, Length}, State) -> - {[{flow, Length}], State#state{read_body_ref=Ref, read_body_length=Length}}; +info(StreamID, {read_body, Ref, Length, Period}, State) -> + TRef = erlang:send_after(Period, self(), {{self(), StreamID}, {read_body_timeout, Ref}}), + {[{flow, Length}], State#state{read_body_ref=Ref, read_body_timer_ref=TRef, read_body_length=Length}}; +%% Request body reading timeout; send what we got. +info(_StreamID, {read_body_timeout, Ref}, State=#state{pid=Pid, read_body_ref=Ref, + read_body_is_fin=IsFin, read_body_buffer=Buffer}) -> + Pid ! {request_body, Ref, IsFin, Buffer}, + {[], State#state{read_body_ref=undefined, read_body_timer_ref=undefined, read_body_buffer= <<>>}}; +info(_StreamID, {read_body_timeout, _}, State) -> + {[], State}; %% Response. info(_StreamID, Response = {response, _, _, _}, State) -> {[Response], State}; @@ -108,6 +108,8 @@ info(_StreamID, Headers = {headers, _, _}, State) -> {[Headers], State}; info(_StreamID, Data = {data, _, _}, State) -> {[Data], State}; +info(_StreamID, Push = {push, _, _, _, _, _, _, _}, State) -> + {[Push], State}; info(_StreamID, SwitchProtocol = {switch_protocol, _, _, _}, State) -> {[SwitchProtocol], State}; %% Stray message. @@ -138,6 +140,11 @@ report_crash(Ref, StreamID, Pid, Reason, Stacktrace) -> %% Request process. +%% @todo This should wrap with try/catch to get the full error +%% in the stream handler. Only then can we decide what to do +%% about it. This means that we should remove any other try/catch +%% in the request process. + %% This hack is necessary because proc_lib does not propagate %% stacktraces by default. This is ugly because we end up %% having two try/catch instead of one (the one in proc_lib), diff --git a/src/cowboy_websocket.erl b/src/cowboy_websocket.erl index b8ec2e1..bc9bd31 100644 --- a/src/cowboy_websocket.erl +++ b/src/cowboy_websocket.erl @@ -32,6 +32,12 @@ | {module(), Req, any(), timeout()} | {module(), Req, any(), timeout(), hibernate} when Req::cowboy_req:req(). + +-callback websocket_init(Req, State) + -> {ok, Req, State} + when Req::cowboy_req:req(), State::any(). +-optional_callbacks([websocket_init/2]). + -callback websocket_handle({text | binary | ping | pong, binary()}, Req, State) -> {ok, Req, State} | {ok, Req, State, hibernate} @@ -70,17 +76,18 @@ -spec upgrade(Req, Env, module(), any(), timeout(), run | hibernate) -> {ok, Req, Env} when Req::cowboy_req:req(), Env::cowboy_middleware:env(). -upgrade(Req, Env, Handler, HandlerState, Timeout, Hibernate) -> - State = #state{handler=Handler, timeout=Timeout, - hibernate=Hibernate =:= hibernate}, - %% @todo We need to fail if HTTP/2. - try websocket_upgrade(State, Req) of - {ok, State2, Req2} -> - websocket_handshake(State2, Req2, HandlerState, Env) +%% @todo Immediately crash if a response has already been sent. +%% @todo Error out if HTTP/2. +upgrade(Req0, Env, Handler, HandlerState, Timeout, Hibernate) -> + try websocket_upgrade(#state{handler=Handler, timeout=Timeout, + hibernate=Hibernate =:= hibernate}, Req0) of + {ok, State, Req} -> + websocket_handshake(State, Req, HandlerState, Env) catch _:_ -> + %% @todo Probably log something here? %% @todo Test that we can have 2 /ws 400 status code in a row on the same connection. - cowboy_req:reply(400, Req), - {ok, Req, Env} + %% @todo Does this even work? + {ok, cowboy_req:reply(400, Req0), Env} end. -spec websocket_upgrade(#state{}, Req) @@ -91,7 +98,7 @@ websocket_upgrade(State, Req) -> %% @todo Should probably send a 426 if the Upgrade header is missing. [<<"websocket">>] = cowboy_req:parse_header(<<"upgrade">>, Req), Version = cowboy_req:header(<<"sec-websocket-version">>, Req), - IntVersion = list_to_integer(binary_to_list(Version)), + IntVersion = binary_to_integer(Version), true = (IntVersion =:= 7) orelse (IntVersion =:= 8) orelse (IntVersion =:= 13), Key = cowboy_req:header(<<"sec-websocket-key">>, Req), @@ -100,45 +107,44 @@ websocket_upgrade(State, Req) -> -spec websocket_extensions(#state{}, Req) -> {ok, #state{}, Req} when Req::cowboy_req:req(). -websocket_extensions(State, Req) -> +websocket_extensions(State, Req=#{ref := Ref}) -> %% @todo We want different options for this. For example %% * compress everything auto %% * compress only text auto %% * compress only binary auto %% * compress nothing auto (but still enabled it) %% * disable compression - Compress = maps:get(websocket_compress, Req, false), - Req2 = Req#{websocket_compress => false}, - case {Compress, cowboy_req:parse_header(<<"sec-websocket-extensions">>, Req2)} of + Compress = maps:get(websocket_compress, ranch:get_protocol_options(Ref), false), + case {Compress, cowboy_req:parse_header(<<"sec-websocket-extensions">>, Req)} of {true, Extensions} when Extensions =/= undefined -> - websocket_extensions(State, Req2, Extensions, []); + websocket_extensions(State, Req, Extensions, []); _ -> - {ok, State, Req2} + {ok, State, Req} end. websocket_extensions(State, Req, [], []) -> {ok, State, Req}; websocket_extensions(State, Req, [], [<<", ">>|RespHeader]) -> {ok, State, cowboy_req:set_resp_header(<<"sec-websocket-extensions">>, lists:reverse(RespHeader), Req)}; -websocket_extensions(State=#state{extensions=Extensions}, Req, [{<<"permessage-deflate">>, Params}|Tail], RespHeader) -> +websocket_extensions(State=#state{extensions=Extensions}, Req=#{pid := Pid}, + [{<<"permessage-deflate">>, Params}|Tail], RespHeader) -> %% @todo Make deflate options configurable. Opts = #{level => best_compression, mem_level => 8, strategy => default}, - case cow_ws:negotiate_permessage_deflate(Params, Extensions, Opts) of + case cow_ws:negotiate_permessage_deflate(Params, Extensions, Opts#{owner => Pid}) of {ok, RespExt, Extensions2} -> - Req2 = Req#{websocket_compress => true}, websocket_extensions(State#state{extensions=Extensions2}, - Req2, Tail, [<<", ">>, RespExt|RespHeader]); + Req, Tail, [<<", ">>, RespExt|RespHeader]); ignore -> websocket_extensions(State, Req, Tail, RespHeader) end; -websocket_extensions(State=#state{extensions=Extensions}, Req, [{<<"x-webkit-deflate-frame">>, Params}|Tail], RespHeader) -> +websocket_extensions(State=#state{extensions=Extensions}, Req=#{pid := Pid}, + [{<<"x-webkit-deflate-frame">>, Params}|Tail], RespHeader) -> %% @todo Make deflate options configurable. Opts = #{level => best_compression, mem_level => 8, strategy => default}, - case cow_ws:negotiate_x_webkit_deflate_frame(Params, Extensions, Opts) of + case cow_ws:negotiate_x_webkit_deflate_frame(Params, Extensions, Opts#{owner => Pid}) of {ok, RespExt, Extensions2} -> - Req2 = cowboy_req:set_meta(websocket_compress, true, Req), websocket_extensions(State#state{extensions=Extensions2}, - Req2, Tail, [<<", ">>, RespExt|RespHeader]); + Req, Tail, [<<", ">>, RespExt|RespHeader]); ignore -> websocket_extensions(State, Req, Tail, RespHeader) end; @@ -152,13 +158,11 @@ websocket_handshake(State=#state{key=Key}, Req=#{pid := Pid, streamid := StreamID}, HandlerState, Env) -> Challenge = base64:encode(crypto:hash(sha, << Key/binary, "258EAFA5-E914-47DA-95CA-C5AB0DC85B11" >>)), - RespHeaders = maps:get(resp_headers, Req, #{}), - Headers = maps:merge(RespHeaders, #{ - %% @todo Hmm should those be here or in cowboy_http? + Headers = cowboy_req:response_headers(#{ <<"connection">> => <<"Upgrade">>, <<"upgrade">> => <<"websocket">>, <<"sec-websocket-accept">> => Challenge - }), + }, Req), Pid ! {{Pid, StreamID}, {switch_protocol, Headers, ?MODULE, {Req, State, HandlerState}}}, {ok, Req, Env}. @@ -369,15 +373,7 @@ handler_call(State=#state{handler=Handler}, Req, HandlerState, websocket_close(State, Req2, HandlerState2, stop) catch Class:Reason -> _ = websocket_close(State, Req, HandlerState, {crash, Class, Reason}), - exit({cowboy_handler, [ - {class, Class}, - {reason, Reason}, - {mfa, {Handler, Callback, 3}}, - {stacktrace, erlang:get_stacktrace()}, - {msg, Message}, - {req, Req}, - {state, HandlerState} - ]}) + erlang:raise(Class, Reason, erlang:get_stacktrace()) end. -spec websocket_send(cow_ws:frame(), #state{}) -> ok | stop | {error, atom()}. |