diff options
-rw-r--r-- | src/gun.erl | 21 | ||||
-rw-r--r-- | src/gun_http.erl | 22 | ||||
-rw-r--r-- | test/shutdown_SUITE.erl | 73 |
3 files changed, 104 insertions, 12 deletions
diff --git a/src/gun.erl b/src/gun.erl index ac643e0..ddb5007 100644 --- a/src/gun.erl +++ b/src/gun.erl @@ -1332,6 +1332,27 @@ closing(state_timeout, closing_timeout, State=#state{status=Status}) -> _ -> normal end, disconnect(State, Reason); +%% When reconnect is disabled, fail HTTP/Websocket operations immediately. +closing(cast, {headers, ReplyTo, StreamRef, _Method, _Path, _Headers, _InitialFlow}, + State=#state{opts=#{retry := 0}}) -> + ReplyTo ! {gun_error, self(), StreamRef, closing}, + {keep_state, State}; +closing(cast, {request, ReplyTo, StreamRef, _Method, _Path, _Headers, _Body, _InitialFlow}, + State=#state{opts=#{retry := 0}}) -> + ReplyTo ! {gun_error, self(), StreamRef, closing}, + {keep_state, State}; +closing(cast, {connect, ReplyTo, StreamRef, _Destination, _Headers, _InitialFlow}, + State=#state{opts=#{retry := 0}}) -> + ReplyTo ! {gun_error, self(), StreamRef, closing}, + {keep_state, State}; +closing(cast, {ws_upgrade, ReplyTo, StreamRef, _Path, _Headers}, + State=#state{opts=#{retry := 0}}) -> + ReplyTo ! {gun_error, self(), StreamRef, closing}, + {keep_state, State}; +closing(cast, {ws_upgrade, ReplyTo, StreamRef, _Path, _Headers, _WsOpts}, + State=#state{opts=#{retry := 0}}) -> + ReplyTo ! {gun_error, self(), StreamRef, closing}, + {keep_state, State}; closing(Type, Event, State) -> handle_common_connected(Type, Event, ?FUNCTION_NAME, State). diff --git a/src/gun_http.erl b/src/gun_http.erl index 8cbeada..9c52f2d 100644 --- a/src/gun_http.erl +++ b/src/gun_http.erl @@ -418,7 +418,7 @@ handle_response(Rest, State=#http_state{version=ClientVersion, opts=Opts, connec Status, Headers, Handlers0), EvHandlerState1} end end, - EvHandlerState = case IsFin of + EvHandlerState3 = case IsFin of nofin -> EvHandlerState2; fin -> @@ -436,17 +436,31 @@ handle_response(Rest, State=#http_state{version=ClientVersion, opts=Opts, connec %% We always reset in_state even if not chunked. if IsFin =:= fin, Conn2 =:= close -> - {close, CookieStore, EvHandlerState}; + {close, CookieStore, EvHandlerState3}; IsFin =:= fin -> handle(Rest, end_stream(State#http_state{in=In, in_state={0, 0}, connection=Conn2, streams=[Stream#stream{handler_state=Handlers}|Tail]}), - CookieStore, EvHandler, EvHandlerState); + CookieStore, EvHandler, EvHandlerState3); + Conn2 =:= close -> + close_streams(State, Tail, closing), + {CommandOrCommands, CookieStore1, EvHandlerState4} = + handle(Rest, State#http_state{in=In, + in_state={0, 0}, connection=Conn2, + streams=[Stream#stream{handler_state=Handlers}]}, + CookieStore, EvHandler, EvHandlerState3), + Commands = if + is_list(CommandOrCommands) -> + CommandOrCommands ++ [closing(State)]; + true -> + [CommandOrCommands, closing(State)] + end, + {Commands, CookieStore1, EvHandlerState4}; true -> handle(Rest, State#http_state{in=In, in_state={0, 0}, connection=Conn2, streams=[Stream#stream{handler_state=Handlers}|Tail]}, - CookieStore, EvHandler, EvHandlerState) + CookieStore, EvHandler, EvHandlerState3) end. %% The state must be first in order to retrieve it when the stream ended. diff --git a/test/shutdown_SUITE.erl b/test/shutdown_SUITE.erl index 06fd81f..891aed8 100644 --- a/test/shutdown_SUITE.erl +++ b/test/shutdown_SUITE.erl @@ -218,9 +218,9 @@ http1_request_connection_close_pipeline(Config) -> StreamRef3 = gun:get(ConnPid, "/"), %% We get the response, pipelined streams get canceled, followed by Gun shutting down. {response, nofin, 200, _} = gun:await(ConnPid, StreamRef1), + {error, {stream_error, closing}} = gun:await(ConnPid, StreamRef2), + {error, {stream_error, closing}} = gun:await(ConnPid, StreamRef3), {ok, _} = gun:await_body(ConnPid, StreamRef1), - {error, {stream_error, {closed, normal}}} = gun:await(ConnPid, StreamRef2), - {error, {stream_error, {closed, normal}}} = gun:await(ConnPid, StreamRef3), gun_is_down(ConnPid, ConnRef, normal). http1_response_connection_close(_) -> @@ -272,8 +272,8 @@ http1_response_connection_close_pipeline(_) -> %% We get the response, pipelined streams get canceled, followed by Gun shutting down. {response, nofin, 200, _} = gun:await(ConnPid, StreamRef1), {ok, _} = gun:await_body(ConnPid, StreamRef1), - {error, {stream_error, {closed, normal}}} = gun:await(ConnPid, StreamRef2), - {error, {stream_error, {closed, normal}}} = gun:await(ConnPid, StreamRef3), + {error, {stream_error, closing}} = gun:await(ConnPid, StreamRef2), + {error, {stream_error, closing}} = gun:await(ConnPid, StreamRef3), gun_is_down(ConnPid, ConnRef, normal) after cowboy:stop_listener(?FUNCTION_NAME) @@ -297,6 +297,47 @@ http10_connection_close(Config) -> {ok, _} = gun:await_body(ConnPid, StreamRef), gun_is_down(ConnPid, ConnRef, normal). +http1_response_connection_close_delayed_body(_) -> + doc("HTTP/1.1: Confirm that requests initiated when Gun has received a " + "connection: close response header fail immediately if retry " + "is disabled, without waiting for the response body."), + ServerFun = fun(_Parent, ClientSocket, gen_tcp) -> + try + {ok, Req} = gen_tcp:recv(ClientSocket, 0, 5000), + <<"GET / HTTP/1.1\r\n", _/binary>> = Req, + ok = gen_tcp:send(ClientSocket, <<"HTTP/1.1 200 OK\r\n" + "Connection: close\r\n" + "Content-Length: 12\r\n\r\nHello">>), + timer:sleep(500), + ok = gen_tcp:send(ClientSocket, " world!") + after + gen_tcp:close(ClientSocket) + end + end, + {ok, ServerPid, OriginPort} = gun_test:init_origin(tcp, http, ServerFun), + %% Client connects. + {ok, ConnPid} = gun:open("localhost", OriginPort, #{ + protocols => [http], + retry => 0 + }), + {ok, _Protocol} = gun:await_up(ConnPid), + receive {ServerPid, handshake_completed} -> ok end, + ConnRef = monitor(process, ConnPid), + StreamRef1 = gun:get(ConnPid, "/"), + StreamRef2 = gun:get(ConnPid, "/"), + %% We get the response headers with connection: close. + {response, nofin, 200, _} = gun:await(ConnPid, StreamRef1), + %% Pipelined request fails immediately. + {gun_error, ConnPid, StreamRef2, closing} = receive E2 -> E2 end, + {gun_data, ConnPid, StreamRef1, nofin, <<"Hello">>} = + receive PartialBody -> PartialBody end, + %% Request initiated when Gun is in closing state fails immediately. + StreamRef3 = gun:get(ConnPid, "/"), + {gun_error, ConnPid, StreamRef3, closing} = receive E3 -> E3 end, + {gun_data, ConnPid, StreamRef1, fin, <<" world!">>} = + receive RestBody -> RestBody end, + gun_is_down(ConnPid, ConnRef, normal). + http2_gun_shutdown_no_streams(Config) -> doc("HTTP/2: Confirm that the Gun process shuts down gracefully " "when calling gun:shutdown/1 with no active streams."), @@ -433,11 +474,20 @@ http2_server_goaway_many_streams(_) -> {ok, <<SkipLen3:24, 1:8, _:8, 5:32>>} = Transport:recv(Socket, 9, 1000), %% Skip the header. {ok, _} = gen_tcp:recv(Socket, SkipLen3, 1000), - %% Send a GOAWAY frame. + %% Stream 4. + %% Receive a HEADERS frame, but simulate that it is still + %% in-flight when the GOAWAY frame is sent. + {ok, <<SkipLen4:24, 1:8, _:8, 7:32>>} = Transport:recv(Socket, 9, 1000), + %% Skip the header. + {ok, _} = gen_tcp:recv(Socket, SkipLen4, 1000), + %% Send a GOAWAY frame. Simulate that GOAWAY was sent before + %% receiving stream 4 by including last stream ID of stream 3. Transport:send(Socket, cow_http2:goaway(5, no_error, <<>>)), - %% Wait before sending the responses back and closing the connection. + %% Gun replies with GOAWAY. + {ok, <<SkipLen5:24, 7:8, _:8, 0:32>>} = Transport:recv(Socket, 9, 1000), + {ok, _SkippedPayload} = gen_tcp:recv(Socket, SkipLen5, 1000), timer:sleep(500), - %% Send a HEADERS frame. + %% Send replies for streams 1-3. {HeadersBlock1, State0} = cow_hpack:encode([ {<<":status">>, <<"200">>} ]), @@ -456,7 +506,8 @@ http2_server_goaway_many_streams(_) -> ok = Transport:send(Socket, [ cow_http2:headers(5, fin, HeadersBlock3) ]), - timer:sleep(500) + %% Gun closes the connection. + {error, closed} = gen_tcp:recv(Socket, 9) end), Protocol = http2, {ok, ConnPid} = gun:open("localhost", OriginPort, #{ @@ -468,7 +519,13 @@ http2_server_goaway_many_streams(_) -> StreamRef1 = gun:get(ConnPid, "/"), StreamRef2 = gun:get(ConnPid, "/"), StreamRef3 = gun:get(ConnPid, "/"), + StreamRef4 = gun:get(ConnPid, "/"), ConnRef = monitor(process, ConnPid), + %% GOAWAY received. Stream 4 is cancelled. + {gun_error, ConnPid, StreamRef4, Reason4} = receive E4 -> E4 end, + {goaway, no_error, _} = Reason4, + StreamRef5 = gun:get(ConnPid, "/"), + {gun_error, ConnPid, StreamRef5, closing} = receive E5 -> E5 end, {response, fin, 200, _} = gun:await(ConnPid, StreamRef1), {response, fin, 200, _} = gun:await(ConnPid, StreamRef2), {response, fin, 200, _} = gun:await(ConnPid, StreamRef3), |