diff options
-rw-r--r-- | doc/src/manual/cowboy_stream.asciidoc | 9 | ||||
-rw-r--r-- | src/cowboy_http.erl | 163 | ||||
-rw-r--r-- | src/cowboy_http2.erl | 142 | ||||
-rw-r--r-- | test/handlers/stream_handler_h.erl | 8 | ||||
-rw-r--r-- | test/stream_handler_SUITE.erl | 2 |
5 files changed, 197 insertions, 127 deletions
diff --git a/doc/src/manual/cowboy_stream.asciidoc b/doc/src/manual/cowboy_stream.asciidoc index 148c23e..bdc56f2 100644 --- a/doc/src/manual/cowboy_stream.asciidoc +++ b/doc/src/manual/cowboy_stream.asciidoc @@ -84,6 +84,13 @@ the `early_error/5` callback must return a response command. // @todo The logger option and the {log, Level, Format, Args} // options need to be documented and tested. +The order in which the commands are given matters. For example, +when sending a response and at the same time creating a new child +process, the first command should be the `spawn` and the second the +`response`. The reason for that is that the sending of the response +may result in a socket error which leads to the termination of +the connection before the rest of the commands are executed. + The following commands are defined: [[inform_command]] @@ -236,6 +243,8 @@ will end successfully as far as the client is concerned. To indicate that an error occurred, either use `error_response` before stopping, or use `internal_error`. +No other command can be executed after the `stop` command. + === internal_error Stop the stream with an error. diff --git a/src/cowboy_http.erl b/src/cowboy_http.erl index c9bceed..02051cd 100644 --- a/src/cowboy_http.erl +++ b/src/cowboy_http.erl @@ -157,9 +157,11 @@ -spec init(pid(), ranch:ref(), inet:socket(), module(), ranch_proxy_header:proxy_info(), cowboy:opts()) -> ok. init(Parent, Ref, Socket, Transport, ProxyHeader, Opts) -> - Peer0 = Transport:peername(Socket), - Sock0 = Transport:sockname(Socket), - Cert1 = case Transport:name() of + {ok, Peer} = maybe_socket_error(undefined, Transport:peername(Socket), + 'A socket error occurred when retrieving the peer name.'), + {ok, Sock} = maybe_socket_error(undefined, Transport:sockname(Socket), + 'A socket error occurred when retrieving the sock name.'), + CertResult = case Transport:name() of ssl -> case ssl:peercert(Socket) of {error, no_peercert} -> @@ -170,36 +172,29 @@ init(Parent, Ref, Socket, Transport, ProxyHeader, Opts) -> _ -> {ok, undefined} end, - case {Peer0, Sock0, Cert1} of - {{ok, Peer}, {ok, Sock}, {ok, Cert}} -> - State = #state{ - parent=Parent, ref=Ref, socket=Socket, - transport=Transport, proxy_header=ProxyHeader, opts=Opts, - peer=Peer, sock=Sock, cert=Cert, - last_streamid=maps:get(max_keepalive, Opts, 1000)}, - setopts_active(State), - loop(set_timeout(State, request_timeout)); - {{error, Reason}, _, _} -> - terminate(undefined, {socket_error, Reason, - 'A socket error occurred when retrieving the peer name.'}); - {_, {error, Reason}, _} -> - terminate(undefined, {socket_error, Reason, - 'A socket error occurred when retrieving the sock name.'}); - {_, _, {error, Reason}} -> - terminate(undefined, {socket_error, Reason, - 'A socket error occurred when retrieving the client TLS certificate.'}) - end. + {ok, Cert} = maybe_socket_error(undefined, CertResult, + 'A socket error occurred when retrieving the client TLS certificate.'), + State = #state{ + parent=Parent, ref=Ref, socket=Socket, + transport=Transport, proxy_header=ProxyHeader, opts=Opts, + peer=Peer, sock=Sock, cert=Cert, + last_streamid=maps:get(max_keepalive, Opts, 1000)}, + safe_setopts_active(State), + loop(set_timeout(State, request_timeout)). setopts_active(#state{socket=Socket, transport=Transport, opts=Opts}) -> N = maps:get(active_n, Opts, 100), Transport:setopts(Socket, [{active, N}]). +safe_setopts_active(State) -> + ok = maybe_socket_error(State, setopts_active(State)). + active(State) -> - setopts_active(State), + safe_setopts_active(State), State#state{active=true}. passive(State=#state{socket=Socket, transport=Transport}) -> - Transport:setopts(Socket, [{active, false}]), + ok = maybe_socket_error(State, Transport:setopts(Socket, [{active, false}])), Messages = Transport:messages(), flush_passive(Socket, Messages), State#state{active=false}. @@ -234,7 +229,7 @@ loop(State=#state{parent=Parent, socket=Socket, transport=Transport, opts=Opts, {Passive, Socket} when Passive =:= element(4, Messages); %% Hardcoded for compatibility with Ranch 1.x. Passive =:= tcp_passive; Passive =:= ssl_passive -> - setopts_active(State), + safe_setopts_active(State), loop(State); %% Timeouts. {timeout, Ref, {shutdown, Pid}} -> @@ -953,6 +948,11 @@ info(State=#state{opts=Opts, streams=Streams0}, StreamID, Msg) -> end. %% Commands. +%% +%% The order in which the commands are given matters. Cowboy may +%% stop processing commands after the 'stop' command or when an +%% error occurred, such as a socket error. Critical commands such +%% as 'spawn' should always be given first. commands(State, _, []) -> State; @@ -1013,8 +1013,8 @@ commands(State=#state{socket=Socket, transport=Transport, out_state=wait, stream #stream{version=Version} = lists:keyfind(StreamID, #stream.id, Streams), _ = case Version of 'HTTP/1.1' -> - Transport:send(Socket, cow_http:response(StatusCode, 'HTTP/1.1', - headers_to_list(Headers))); + ok = maybe_socket_error(State, Transport:send(Socket, + cow_http:response(StatusCode, 'HTTP/1.1', headers_to_list(Headers)))); %% Do not send informational responses to HTTP/1.0 clients. (RFC7231 6.2) 'HTTP/1.0' -> ok @@ -1037,10 +1037,10 @@ commands(State0=#state{socket=Socket, transport=Transport, out_state=wait, strea %% @todo 204 and 304 responses must not include a response body. (RFC7230 3.3.1, RFC7230 3.3.2) case Body of {sendfile, _, _, _} -> - Transport:send(Socket, Response), + ok = maybe_socket_error(State, Transport:send(Socket, Response)), sendfile(State, Body); _ -> - Transport:send(Socket, [Response, Body]) + ok = maybe_socket_error(State, Transport:send(Socket, [Response, Body])) end, commands(State, StreamID, Tail); %% Send response headers and initiate chunked encoding or streaming. @@ -1079,7 +1079,8 @@ commands(State0=#state{socket=Socket, transport=Transport, _ -> maps:remove(<<"trailer">>, Headers1) end, {State, Headers} = connection(State1, Headers2, StreamID, Version), - Transport:send(Socket, cow_http:response(StatusCode, 'HTTP/1.1', headers_to_list(Headers))), + ok = maybe_socket_error(State, Transport:send(Socket, + cow_http:response(StatusCode, 'HTTP/1.1', headers_to_list(Headers)))), commands(State, StreamID, Tail); %% Send a response body chunk. %% @todo We need to kill the stream if it tries to send data before headers. @@ -1098,27 +1099,33 @@ commands(State0=#state{socket=Socket, transport=Transport, streams=Streams0, out Stream0=#stream{method= <<"HEAD">>} -> Stream0; Stream0 when Size =:= 0, IsFin =:= fin, OutState =:= chunked -> - Transport:send(Socket, <<"0\r\n\r\n">>), + ok = maybe_socket_error(State0, + Transport:send(Socket, <<"0\r\n\r\n">>)), Stream0; Stream0 when Size =:= 0 -> Stream0; Stream0 when is_tuple(Data), OutState =:= chunked -> - Transport:send(Socket, [integer_to_binary(Size, 16), <<"\r\n">>]), + ok = maybe_socket_error(State0, + Transport:send(Socket, [integer_to_binary(Size, 16), <<"\r\n">>])), sendfile(State0, Data), - Transport:send(Socket, - case IsFin of - fin -> <<"\r\n0\r\n\r\n">>; - nofin -> <<"\r\n">> - end), + ok = maybe_socket_error(State0, + Transport:send(Socket, + case IsFin of + fin -> <<"\r\n0\r\n\r\n">>; + nofin -> <<"\r\n">> + end) + ), Stream0; Stream0 when OutState =:= chunked -> - Transport:send(Socket, [ - integer_to_binary(Size, 16), <<"\r\n">>, Data, - case IsFin of - fin -> <<"\r\n0\r\n\r\n">>; - nofin -> <<"\r\n">> - end - ]), + ok = maybe_socket_error(State0, + Transport:send(Socket, [ + integer_to_binary(Size, 16), <<"\r\n">>, Data, + case IsFin of + fin -> <<"\r\n0\r\n\r\n">>; + nofin -> <<"\r\n">> + end + ]) + ), Stream0; Stream0 when OutState =:= streaming -> #stream{local_sent_size=SentSize0, local_expected_size=ExpectedSize} = Stream0, @@ -1130,7 +1137,7 @@ commands(State0=#state{socket=Socket, transport=Transport, streams=Streams0, out is_tuple(Data) -> sendfile(State0, Data); true -> - Transport:send(Socket, Data) + ok = maybe_socket_error(State0, Transport:send(Socket, Data)) end, Stream0#stream{local_sent_size=SentSize} end, @@ -1144,13 +1151,16 @@ commands(State=#state{socket=Socket, transport=Transport, streams=Streams, out_s StreamID, [{trailers, Trailers}|Tail]) -> case stream_te(OutState, lists:keyfind(StreamID, #stream.id, Streams)) of trailers -> - Transport:send(Socket, [ - <<"0\r\n">>, - cow_http:headers(maps:to_list(Trailers)), - <<"\r\n">> - ]); + ok = maybe_socket_error(State, + Transport:send(Socket, [ + <<"0\r\n">>, + cow_http:headers(maps:to_list(Trailers)), + <<"\r\n">> + ]) + ); no_trailers -> - Transport:send(Socket, <<"0\r\n\r\n">>); + ok = maybe_socket_error(State, + Transport:send(Socket, <<"0\r\n\r\n">>)); not_chunked -> ok end, @@ -1238,10 +1248,12 @@ sendfile(State=#state{socket=Socket, transport=Transport, opts=Opts}, {sendfile, Offset, Bytes, Path}) -> try %% When sendfile is disabled we explicitly use the fallback. - _ = case maps:get(sendfile, Opts, true) of - true -> Transport:sendfile(Socket, Path, Offset, Bytes); - false -> ranch_transport:sendfile(Transport, Socket, Path, Offset, Bytes, []) - end, + {ok, _} = maybe_socket_error(State, + case maps:get(sendfile, Opts, true) of + true -> Transport:sendfile(Socket, Path, Offset, Bytes); + false -> ranch_transport:sendfile(Transport, Socket, Path, Offset, Bytes, []) + end + ), ok catch _:_ -> terminate(State, {socket_error, sendfile_crash, @@ -1420,28 +1432,31 @@ error_terminate(StatusCode, State=#state{ref=Ref, peer=Peer, in_state=StreamStat early_error(StatusCode, State, Reason, PartialReq) -> early_error(StatusCode, State, Reason, PartialReq, #{}). -early_error(StatusCode0, #state{socket=Socket, transport=Transport, +early_error(StatusCode0, State=#state{socket=Socket, transport=Transport, opts=Opts, in_streamid=StreamID}, Reason, PartialReq, RespHeaders0) -> RespHeaders1 = RespHeaders0#{<<"content-length">> => <<"0">>}, Resp = {response, StatusCode0, RespHeaders1, <<>>}, try cowboy_stream:early_error(StreamID, Reason, PartialReq, Resp, Opts) of {response, StatusCode, RespHeaders, RespBody} -> - Transport:send(Socket, [ - cow_http:response(StatusCode, 'HTTP/1.1', maps:to_list(RespHeaders)), - %% @todo We shouldn't send the body when the method is HEAD. - %% @todo Technically we allow the sendfile tuple. - RespBody - ]) + ok = maybe_socket_error(State, + Transport:send(Socket, [ + cow_http:response(StatusCode, 'HTTP/1.1', maps:to_list(RespHeaders)), + %% @todo We shouldn't send the body when the method is HEAD. + %% @todo Technically we allow the sendfile tuple. + RespBody + ]) + ) catch Class:Exception:Stacktrace -> cowboy:log(cowboy_stream:make_error_log(early_error, [StreamID, Reason, PartialReq, Resp, Opts], Class, Exception, Stacktrace), Opts), %% We still need to send an error response, so send what we initially %% wanted to send. It's better than nothing. - Transport:send(Socket, cow_http:response(StatusCode0, - 'HTTP/1.1', maps:to_list(RespHeaders1))) - end, - ok. + ok = maybe_socket_error(State, + Transport:send(Socket, cow_http:response(StatusCode0, + 'HTTP/1.1', maps:to_list(RespHeaders1))) + ) + end. initiate_closing(State=#state{streams=[]}, Reason) -> terminate(State, Reason); @@ -1450,6 +1465,19 @@ initiate_closing(State=#state{streams=[_Stream|Streams], terminate_all_streams(State, Streams, Reason), State#state{last_streamid=OutStreamID}. +%% Function replicated in cowboy_http2. +maybe_socket_error(State, {error, closed}) -> + terminate(State, {socket_error, closed, 'The socket has been closed.'}); +maybe_socket_error(State, Reason) -> + maybe_socket_error(State, Reason, 'An error has occurred on the socket.'). + +maybe_socket_error(_, Result = ok, _) -> + Result; +maybe_socket_error(_, Result = {ok, _}, _) -> + Result; +maybe_socket_error(State, {error, Reason}, Human) -> + terminate(State, {socket_error, Reason, Human}). + -spec terminate(_, _) -> no_return(). terminate(undefined, Reason) -> exit({shutdown, Reason}); @@ -1484,6 +1512,9 @@ terminate_linger(State=#state{socket=Socket, transport=Transport, opts=Opts}) -> terminate_linger_before_loop(State, TimerRef, Messages) -> %% We may already be in active mode when we do this %% but it's OK because we are shutting down anyway. + %% + %% We specially handle the socket error to terminate + %% when an error occurs. case setopts_active(State) of ok -> terminate_linger_loop(State, TimerRef, Messages); diff --git a/src/cowboy_http2.erl b/src/cowboy_http2.erl index 9ad16bd..ebd02bc 100644 --- a/src/cowboy_http2.erl +++ b/src/cowboy_http2.erl @@ -134,9 +134,11 @@ -spec init(pid(), ranch:ref(), inet:socket(), module(), ranch_proxy_header:proxy_info() | undefined, cowboy:opts()) -> ok. init(Parent, Ref, Socket, Transport, ProxyHeader, Opts) -> - Peer0 = Transport:peername(Socket), - Sock0 = Transport:sockname(Socket), - Cert1 = case Transport:name() of + {ok, Peer} = maybe_socket_error(undefined, Transport:peername(Socket), + 'A socket error occurred when retrieving the peer name.'), + {ok, Sock} = maybe_socket_error(undefined, Transport:sockname(Socket), + 'A socket error occurred when retrieving the sock name.'), + CertResult = case Transport:name() of ssl -> case ssl:peercert(Socket) of {error, no_peercert} -> @@ -147,19 +149,9 @@ init(Parent, Ref, Socket, Transport, ProxyHeader, Opts) -> _ -> {ok, undefined} end, - case {Peer0, Sock0, Cert1} of - {{ok, Peer}, {ok, Sock}, {ok, Cert}} -> - init(Parent, Ref, Socket, Transport, ProxyHeader, Opts, Peer, Sock, Cert, <<>>); - {{error, Reason}, _, _} -> - terminate(undefined, {socket_error, Reason, - 'A socket error occurred when retrieving the peer name.'}); - {_, {error, Reason}, _} -> - terminate(undefined, {socket_error, Reason, - 'A socket error occurred when retrieving the sock name.'}); - {_, _, {error, Reason}} -> - terminate(undefined, {socket_error, Reason, - 'A socket error occurred when retrieving the client TLS certificate.'}) - end. + {ok, Cert} = maybe_socket_error(undefined, CertResult, + 'A socket error occurred when retrieving the client TLS certificate.'), + init(Parent, Ref, Socket, Transport, ProxyHeader, Opts, Peer, Sock, Cert, <<>>). -spec init(pid(), ranch:ref(), inet:socket(), module(), ranch_proxy_header:proxy_info() | undefined, cowboy:opts(), @@ -167,12 +159,13 @@ init(Parent, Ref, Socket, Transport, ProxyHeader, Opts) -> binary() | undefined, binary()) -> ok. init(Parent, Ref, Socket, Transport, ProxyHeader, Opts, Peer, Sock, Cert, Buffer) -> {ok, Preface, HTTP2Machine} = cow_http2_machine:init(server, Opts), + %% Send the preface before doing all the init in case we get a socket error. + ok = maybe_socket_error(undefined, Transport:send(Socket, Preface)), State = set_idle_timeout(init_rate_limiting(#state{parent=Parent, ref=Ref, socket=Socket, transport=Transport, proxy_header=ProxyHeader, opts=Opts, peer=Peer, sock=Sock, cert=Cert, http2_status=sequence, http2_machine=HTTP2Machine})), - Transport:send(Socket, Preface), - setopts_active(State), + safe_setopts_active(State), case Buffer of <<>> -> loop(State, Buffer); _ -> parse(State, Buffer) @@ -228,8 +221,10 @@ init(Parent, Ref, Socket, Transport, ProxyHeader, Opts, Peer, Sock, Cert, Buffer <<"upgrade">> => <<"h2c">> }, ?MODULE, undefined}), %% @todo undefined or #{}? State = set_idle_timeout(init_rate_limiting(State2#state{http2_status=sequence})), - Transport:send(Socket, Preface), - setopts_active(State), + %% In the case of HTTP/1.1 Upgrade we cannot send the Preface + %% until we send the 101 response. + ok = maybe_socket_error(State, Transport:send(Socket, Preface)), + safe_setopts_active(State), case Buffer of <<>> -> loop(State, Buffer); _ -> parse(State, Buffer) @@ -242,6 +237,9 @@ setopts_active(#state{socket=Socket, transport=Transport, opts=Opts}) -> N = maps:get(active_n, Opts, 100), Transport:setopts(Socket, [{active, N}]). +safe_setopts_active(State) -> + ok = maybe_socket_error(State, setopts_active(State)). + loop(State=#state{parent=Parent, socket=Socket, transport=Transport, opts=Opts, timer=TimerRef, children=Children}, Buffer) -> Messages = Transport:messages(), @@ -261,7 +259,7 @@ loop(State=#state{parent=Parent, socket=Socket, transport=Transport, {Passive, Socket} when Passive =:= element(4, Messages); %% Hardcoded for compatibility with Ranch 1.x. Passive =:= tcp_passive; Passive =:= ssl_passive -> - setopts_active(State), + safe_setopts_active(State), loop(State, Buffer); %% System messages. {'EXIT', Parent, shutdown} -> @@ -413,8 +411,10 @@ maybe_ack(State=#state{http2_status=settings}, Frame) -> maybe_ack(State#state{http2_status=connected}, Frame); maybe_ack(State=#state{socket=Socket, transport=Transport}, Frame) -> case Frame of - {settings, _} -> Transport:send(Socket, cow_http2:settings_ack()); - {ping, Opaque} -> Transport:send(Socket, cow_http2:ping_ack(Opaque)); + {settings, _} -> + ok = maybe_socket_error(State, Transport:send(Socket, cow_http2:settings_ack())); + {ping, Opaque} -> + ok = maybe_socket_error(State, Transport:send(Socket, cow_http2:ping_ack(Opaque))); _ -> ok end, State. @@ -734,10 +734,10 @@ commands(State0=#state{socket=Socket, transport=Transport, http2_machine=HTTP2Ma State = case cow_http2_machine:prepare_push_promise(StreamID, HTTP2Machine0, PseudoHeaders, Headers) of {ok, PromisedStreamID, HeaderBlock, HTTP2Machine} -> - Transport:send(Socket, cow_http2:push_promise( - StreamID, PromisedStreamID, HeaderBlock)), - headers_frame(State0#state{http2_machine=HTTP2Machine}, - PromisedStreamID, fin, Headers, PseudoHeaders, 0); + State1 = State0#state{http2_machine=HTTP2Machine}, + ok = maybe_socket_error(State1, Transport:send(Socket, + cow_http2:push_promise(StreamID, PromisedStreamID, HeaderBlock))), + headers_frame(State1, PromisedStreamID, fin, Headers, PseudoHeaders, 0); {error, no_push} -> State0 end, @@ -763,7 +763,8 @@ commands(State, StreamID, [Error = {internal_error, _, _}|_Tail]) -> commands(State=#state{socket=Socket, transport=Transport, http2_status=upgrade}, StreamID, [{switch_protocol, Headers, ?MODULE, _}|Tail]) -> %% @todo This 101 response needs to be passed through stream handlers. - Transport:send(Socket, cow_http:response(101, 'HTTP/1.1', maps:to_list(Headers))), + ok = maybe_socket_error(State, Transport:send(Socket, + cow_http:response(101, 'HTTP/1.1', maps:to_list(Headers)))), commands(State, StreamID, Tail); %% Use a different protocol within the stream (CONNECT :protocol). %% @todo Make sure we error out when the feature is disabled. @@ -784,7 +785,7 @@ commands(State=#state{opts=Opts}, StreamID, [Log={log, _, _, _}|Tail]) -> %% Tentatively update the window after the flow was updated. -update_window(State=#state{socket=Socket, transport=Transport, +update_window(State0=#state{socket=Socket, transport=Transport, http2_machine=HTTP2Machine0, flow=Flow, streams=Streams}, StreamID) -> #{StreamID := #stream{flow=StreamFlow}} = Streams, {Data1, HTTP2Machine2} = case cow_http2_machine:ensure_window(Flow, HTTP2Machine0) of @@ -795,11 +796,12 @@ update_window(State=#state{socket=Socket, transport=Transport, ok -> {<<>>, HTTP2Machine2}; {ok, Increment2, HTTP2Machine3} -> {cow_http2:window_update(StreamID, Increment2), HTTP2Machine3} end, + State = State0#state{http2_machine=HTTP2Machine}, case {Data1, Data2} of {<<>>, <<>>} -> ok; - _ -> Transport:send(Socket, [Data1, Data2]) + _ -> ok = maybe_socket_error(State, Transport:send(Socket, [Data1, Data2])) end, - State#state{http2_machine=HTTP2Machine}. + State. %% Send the response, trailers or data. @@ -823,14 +825,16 @@ send_response(State0=#state{http2_machine=HTTP2Machine0}, StreamID, StatusCode, [cow_http2:headers(StreamID, nofin, HeaderBlock)]) end. -send_headers(State=#state{socket=Socket, transport=Transport, +send_headers(State0=#state{socket=Socket, transport=Transport, http2_machine=HTTP2Machine0}, StreamID, IsFin0, StatusCode, Headers) -> {ok, IsFin, HeaderBlock, HTTP2Machine} = cow_http2_machine:prepare_headers(StreamID, HTTP2Machine0, IsFin0, #{status => cow_http:status_to_integer(StatusCode)}, headers_to_list(Headers)), - Transport:send(Socket, cow_http2:headers(StreamID, IsFin, HeaderBlock)), - State#state{http2_machine=HTTP2Machine}. + State = State0#state{http2_machine=HTTP2Machine}, + ok = maybe_socket_error(State, Transport:send(Socket, + cow_http2:headers(StreamID, IsFin, HeaderBlock))), + State. %% The set-cookie header is special; we can only send one cookie per header. headers_to_list(Headers0=#{<<"set-cookie">> := SetCookies}) -> @@ -847,13 +851,14 @@ maybe_send_data(State0=#state{socket=Socket, transport=Transport, end, case cow_http2_machine:send_or_queue_data(StreamID, HTTP2Machine0, IsFin, Data) of {ok, HTTP2Machine} -> + State1 = State0#state{http2_machine=HTTP2Machine}, %% If we have prefix data (like a HEADERS frame) we need to send it %% even if we do not send any DATA frames. case Prefix of [] -> ok; - _ -> Transport:send(Socket, Prefix) + _ -> ok = maybe_socket_error(State1, Transport:send(Socket, Prefix)) end, - maybe_send_data_alarm(State0#state{http2_machine=HTTP2Machine}, HTTP2Machine0, StreamID); + maybe_send_data_alarm(State1, HTTP2Machine0, StreamID); {send, SendData, HTTP2Machine} -> State = #state{http2_status=Status, streams=Streams} = send_data(State0#state{http2_machine=HTTP2Machine}, SendData, Prefix), @@ -871,12 +876,15 @@ send_data(State0=#state{socket=Socket, transport=Transport, opts=Opts}, SendData _ = [case Data of {sendfile, Offset, Bytes, Path} -> %% When sendfile is disabled we explicitly use the fallback. - _ = case maps:get(sendfile, Opts, true) of - true -> Transport:sendfile(Socket, Path, Offset, Bytes); - false -> ranch_transport:sendfile(Transport, Socket, Path, Offset, Bytes, []) - end; + {ok, _} = maybe_socket_error(State, + case maps:get(sendfile, Opts, true) of + true -> Transport:sendfile(Socket, Path, Offset, Bytes); + false -> ranch_transport:sendfile(Transport, Socket, Path, Offset, Bytes, []) + end + ), + ok; _ -> - Transport:send(Socket, Data) + ok = maybe_socket_error(State, Transport:send(Socket, Data)) end || Data <- Acc], send_data_terminate(State, SendData). @@ -980,17 +988,17 @@ goaway(State0=#state{socket=Socket, transport=Transport, http2_machine=HTTP2Mach when Status =:= connected; Status =:= closing_initiated; Status =:= closing -> Streams = goaway_streams(State0, maps:to_list(Streams0), LastStreamID, {stop, {goaway, Reason}, 'The connection is going away.'}, []), - State = State0#state{streams=maps:from_list(Streams)}, + State1 = State0#state{streams=maps:from_list(Streams)}, if Status =:= connected; Status =:= closing_initiated -> {OurLastStreamID, HTTP2Machine} = cow_http2_machine:set_last_streamid(HTTP2Machine0), - Transport:send(Socket, cow_http2:goaway( - OurLastStreamID, no_error, <<>>)), - State#state{http2_status=closing, - http2_machine=HTTP2Machine}; + State = State1#state{http2_status=closing, http2_machine=HTTP2Machine}, + ok = maybe_socket_error(State, Transport:send(Socket, + cow_http2:goaway(OurLastStreamID, no_error, <<>>))), + State; true -> - State + State1 end; %% We terminate the connection immediately if it hasn't fully been initialized. goaway(State, {goaway, _, Reason, _}) -> @@ -1016,7 +1024,8 @@ goaway_streams(State, [Stream|Tail], LastStreamID, Reason, Acc) -> -spec initiate_closing(#state{}, _) -> #state{}. initiate_closing(State=#state{http2_status=connected, socket=Socket, transport=Transport, opts=Opts}, Reason) -> - Transport:send(Socket, cow_http2:goaway(16#7fffffff, no_error, <<>>)), + ok = maybe_socket_error(State, Transport:send(Socket, + cow_http2:goaway(16#7fffffff, no_error, <<>>))), Timeout = maps:get(goaway_initial_timeout, Opts, 1000), Message = {goaway_initial_timeout, Reason}, set_timeout(State#state{http2_status=closing_initiated}, Timeout, Message); @@ -1032,14 +1041,16 @@ initiate_closing(State, Reason) -> -spec closing(#state{}, Reason :: term()) -> #state{}. closing(State=#state{streams=Streams}, Reason) when Streams =:= #{} -> terminate(State, Reason); -closing(State=#state{http2_status=closing_initiated, +closing(State0=#state{http2_status=closing_initiated, http2_machine=HTTP2Machine0, socket=Socket, transport=Transport}, Reason) -> %% Stop accepting new streams. {LastStreamID, HTTP2Machine} = cow_http2_machine:set_last_streamid(HTTP2Machine0), - Transport:send(Socket, cow_http2:goaway(LastStreamID, no_error, <<>>)), - closing(State#state{http2_status=closing, http2_machine=HTTP2Machine}, Reason); + State = State0#state{http2_status=closing, http2_machine=HTTP2Machine}, + ok = maybe_socket_error(State, Transport:send(Socket, + cow_http2:goaway(LastStreamID, no_error, <<>>))), + closing(State, Reason); closing(State=#state{http2_status=closing, opts=Opts}, Reason) -> %% If client sent GOAWAY, we may already be in 'closing' but without the %% goaway complete timeout set. @@ -1050,6 +1061,19 @@ closing(State=#state{http2_status=closing, opts=Opts}, Reason) -> stop_reason({stop, Reason, _}) -> Reason; stop_reason(Reason) -> Reason. +%% Function copied from cowboy_http. +maybe_socket_error(State, {error, closed}) -> + terminate(State, {socket_error, closed, 'The socket has been closed.'}); +maybe_socket_error(State, Reason) -> + maybe_socket_error(State, Reason, 'An error has occurred on the socket.'). + +maybe_socket_error(_, Result = ok, _) -> + Result; +maybe_socket_error(_, Result = {ok, _}, _) -> + Result; +maybe_socket_error(State, {error, Reason}, Human) -> + terminate(State, {socket_error, Reason, Human}). + -spec terminate(#state{}, _) -> no_return(). terminate(undefined, Reason) -> exit({shutdown, Reason}); @@ -1060,7 +1084,8 @@ terminate(State=#state{socket=Socket, transport=Transport, http2_status=Status, %% as debug data in the GOAWAY frame here. Perhaps more. if Status =:= connected; Status =:= closing_initiated -> - Transport:send(Socket, cow_http2:goaway( + %% We are terminating so it's OK if we can't send the GOAWAY anymore. + _ = Transport:send(Socket, cow_http2:goaway( cow_http2_machine:get_last_streamid(HTTP2Machine), terminate_reason(Reason), <<>>)); %% We already sent the GOAWAY frame. @@ -1071,8 +1096,8 @@ terminate(State=#state{socket=Socket, transport=Transport, http2_status=Status, cowboy_children:terminate(Children), terminate_linger(State), exit({shutdown, Reason}); -terminate(#state{socket=Socket, transport=Transport}, Reason) -> - Transport:close(Socket), +%% We are not fully connected so we can just terminate the connection. +terminate(_State, Reason) -> exit({shutdown, Reason}). terminate_reason({connection_error, Reason, _}) -> Reason; @@ -1106,6 +1131,9 @@ terminate_linger(State=#state{socket=Socket, transport=Transport, opts=Opts}) -> terminate_linger_before_loop(State, TimerRef, Messages) -> %% We may already be in active mode when we do this %% but it's OK because we are shutting down anyway. + %% + %% We specially handle the socket error to terminate + %% when an error occurs. case setopts_active(State) of ok -> terminate_linger_loop(State, TimerRef, Messages); @@ -1136,7 +1164,8 @@ reset_stream(State0=#state{socket=Socket, transport=Transport, {internal_error, _, _} -> internal_error; {stream_error, Reason0, _} -> Reason0 end, - Transport:send(Socket, cow_http2:rst_stream(StreamID, Reason)), + ok = maybe_socket_error(State0, Transport:send(Socket, + cow_http2:rst_stream(StreamID, Reason))), State1 = case cow_http2_machine:reset_stream(StreamID, HTTP2Machine0) of {ok, HTTP2Machine} -> terminate_stream(State0#state{http2_machine=HTTP2Machine}, StreamID, Error); @@ -1208,7 +1237,8 @@ terminate_stream(State0=#state{socket=Socket, transport=Transport, http2_machine=HTTP2Machine0}, StreamID) -> State = case cow_http2_machine:get_stream_local_state(StreamID, HTTP2Machine0) of {ok, fin, _} -> - Transport:send(Socket, cow_http2:rst_stream(StreamID, no_error)), + ok = maybe_socket_error(State0, Transport:send(Socket, + cow_http2:rst_stream(StreamID, no_error))), {ok, HTTP2Machine} = cow_http2_machine:reset_stream(StreamID, HTTP2Machine0), State0#state{http2_machine=HTTP2Machine}; {error, closed} -> diff --git a/test/handlers/stream_handler_h.erl b/test/handlers/stream_handler_h.erl index 370d15a..7a1e5ec 100644 --- a/test/handlers/stream_handler_h.erl +++ b/test/handlers/stream_handler_h.erl @@ -44,16 +44,16 @@ init_commands(_, _, #state{test=set_options_ignore_unknown}) -> ]; init_commands(_, _, State=#state{test=shutdown_on_stream_stop}) -> Spawn = init_process(false, State), - [{headers, 200, #{}}, {spawn, Spawn, 5000}, stop]; + [{spawn, Spawn, 5000}, {headers, 200, #{}}, stop]; init_commands(_, _, State=#state{test=shutdown_on_socket_close}) -> Spawn = init_process(false, State), - [{headers, 200, #{}}, {spawn, Spawn, 5000}]; + [{spawn, Spawn, 5000}, {headers, 200, #{}}]; init_commands(_, _, State=#state{test=shutdown_timeout_on_stream_stop}) -> Spawn = init_process(true, State), - [{headers, 200, #{}}, {spawn, Spawn, 2000}, stop]; + [{spawn, Spawn, 2000}, {headers, 200, #{}}, stop]; init_commands(_, _, State=#state{test=shutdown_timeout_on_socket_close}) -> Spawn = init_process(true, State), - [{headers, 200, #{}}, {spawn, Spawn, 2000}]; + [{spawn, Spawn, 2000}, {headers, 200, #{}}]; init_commands(_, _, State=#state{test=switch_protocol_after_headers}) -> [{headers, 200, #{}}, {switch_protocol, #{}, ?MODULE, State}]; init_commands(_, _, State=#state{test=switch_protocol_after_headers_data}) -> diff --git a/test/stream_handler_SUITE.erl b/test/stream_handler_SUITE.erl index 46a05b2..0643d3d 100644 --- a/test/stream_handler_SUITE.erl +++ b/test/stream_handler_SUITE.erl @@ -410,7 +410,7 @@ shutdown_timeout_on_socket_close(Config) -> receive {Self, Pid, terminate, _, _, _} -> ok after 1000 -> error(timeout) end, %% We should NOT receive a DOWN message immediately. receive {'DOWN', MRef, process, Spawn, killed} -> error(killed) after 1500 -> ok end, - %% We should received it now. + %% We should receive it now. receive {'DOWN', MRef, process, Spawn, killed} -> ok after 1000 -> error(timeout) end, ok. |