diff options
-rw-r--r-- | doc/src/manual/gun.asciidoc | 33 | ||||
-rw-r--r-- | doc/src/manual/gun.close.asciidoc | 3 | ||||
-rw-r--r-- | doc/src/manual/gun.shutdown.asciidoc | 67 | ||||
-rw-r--r-- | doc/src/manual/gun.ws_send.asciidoc | 19 | ||||
-rw-r--r-- | src/gun.erl | 300 | ||||
-rw-r--r-- | src/gun_event.erl | 5 | ||||
-rw-r--r-- | src/gun_http.erl | 56 | ||||
-rw-r--r-- | src/gun_http2.erl | 144 | ||||
-rw-r--r-- | src/gun_ws.erl | 94 | ||||
-rw-r--r-- | test/gun_SUITE.erl | 88 | ||||
-rw-r--r-- | test/handlers/delayed_hello_h.erl | 11 | ||||
-rw-r--r-- | test/handlers/delayed_push_h.erl | 13 | ||||
-rw-r--r-- | test/handlers/ws_frozen_h.erl | 23 | ||||
-rw-r--r-- | test/handlers/ws_timeout_close_h.erl | 25 | ||||
-rw-r--r-- | test/rfc7540_SUITE.erl | 2 | ||||
-rw-r--r-- | test/shutdown_SUITE.erl | 609 | ||||
-rw-r--r-- | test/ws_SUITE.erl | 27 |
17 files changed, 1214 insertions, 305 deletions
diff --git a/doc/src/manual/gun.asciidoc b/doc/src/manual/gun.asciidoc index acc1454..b9fbbc2 100644 --- a/doc/src/manual/gun.asciidoc +++ b/doc/src/manual/gun.asciidoc @@ -16,7 +16,7 @@ Connection: * link:man:gun:open(3)[gun:open(3)] - Open a connection to the given host and port * link:man:gun:open_unix(3)[gun:open_unix(3)] - Open a connection to the given Unix domain socket -// @todo * link:man:gun:shutdown(3)[gun:shutdown(3)] - Gracefully close the connection +* link:man:gun:shutdown(3)[gun:shutdown(3)] - Gracefully close the connection * link:man:gun:close(3)[gun:close(3)] - Brutally close the connection * link:man:gun:info(3)[gun:info(3)] - Obtain information about the connection @@ -144,6 +144,7 @@ Handshake timeout for tunneled TLS connections. [source,erlang] ---- http_opts() :: #{ + closing_timeout => timeout(), flow => pos_integer(), keepalive => timeout(), transform_header_name => fun((binary()) -> binary()), @@ -157,6 +158,12 @@ The default value is given next to the option name: // @todo Document content_handlers and gun_sse_h. +closing_timeout (15000):: + +Time to wait before brutally closing the connection when a +graceful shutdown was requested via a call to +link:man:gun:shutdown(3)[gun:shutdown(3)]. + flow - see below:: The initial flow control value for all HTTP/1.1 streams. @@ -188,8 +195,9 @@ HTTP version to use. [source,erlang] ---- http2_opts() :: #{ - flow => pos_integer(), - keepalive => timeout() + closing_timeout => timeout(), + flow => pos_integer(), + keepalive => timeout() } ---- @@ -199,6 +207,12 @@ The default value is given next to the option name: // @todo Document content_handlers and gun_sse_h. +closing_timeout (15000):: + +Time to wait before brutally closing the connection when a +graceful shutdown was requested either via a call to +link:man:gun:shutdown(3)[gun:shutdown(3)] or by the server. + flow - see below:: The initial flow control value for all HTTP/2 streams. @@ -364,9 +378,10 @@ The pid of the process that will receive the response messages. [source,erlang] ---- ws_opts() :: #{ - compress => boolean(), - flow => pos_integer(), - protocols => [{binary(), module()}] + closing_timeout => timeout(), + compress => boolean(), + flow => pos_integer(), + protocols => [{binary(), module()}] } ---- @@ -374,6 +389,12 @@ Configuration for the Websocket protocol. The default value is given next to the option name: +closing_timeout (15000):: + +Time to wait before brutally closing the connection when a +graceful shutdown was requested either via a call to +link:man:gun:shutdown(3)[gun:shutdown(3)] or by the server. + compress (false):: Whether to enable permessage-deflate compression. This does diff --git a/doc/src/manual/gun.close.asciidoc b/doc/src/manual/gun.close.asciidoc index 20fd1bf..cdbe05f 100644 --- a/doc/src/manual/gun.close.asciidoc +++ b/doc/src/manual/gun.close.asciidoc @@ -41,4 +41,5 @@ ok = gun:close(ConnPid). link:man:gun(3)[gun(3)], link:man:gun:open(3)[gun:open(3)], -link:man:gun:open_unix(3)[gun:open_unix(3)] +link:man:gun:open_unix(3)[gun:open_unix(3)], +link:man:gun:shutdown(3)[gun:shutdown(3)] diff --git a/doc/src/manual/gun.shutdown.asciidoc b/doc/src/manual/gun.shutdown.asciidoc new file mode 100644 index 0000000..94db39d --- /dev/null +++ b/doc/src/manual/gun.shutdown.asciidoc @@ -0,0 +1,67 @@ += gun:shutdown(3) + +== Name + +gun:shutdown - Gracefully close the connection + +== Description + +[source,erlang] +---- +shutdown(ConnPid) -> ok + +ConnPid :: pid() +---- + +Gracefully close the connection. + +Gun will wait for up to `closing_timeout` milliseconds +before brutally closing the connection. The graceful +shutdown mechanism varies between the different protocols: + +* For HTTP/1.1 there is no such mechanism and Gun will + close the connection once the current response is + received. Any pipelined requests are immediately + terminated. + +* For HTTP/2 Gun will send a GOAWAY frame and wait for + the existing streams to terminate. + +* For Websocket Gun will send a close frame and wait + for the server's close frame before closing the + connection. + +The function returns immediately. The connection may +therefore still be up for some time after this call. + +Gun will not attempt to reconnect once graceful +shutdown has been initiated. + +== Arguments + +ConnPid:: + +The pid of the Gun connection process. + +== Return value + +The atom `ok` is returned. + +== Changelog + +* *2.0*: Function introduced. + +== Examples + +.Gracefully shutdown the connection +[source,erlang] +---- +ok = gun:shutdown(ConnPid). +---- + +== See also + +link:man:gun(3)[gun(3)], +link:man:gun:open(3)[gun:open(3)], +link:man:gun:open_unix(3)[gun:open_unix(3)], +link:man:gun:close(3)[gun:close(3)] diff --git a/doc/src/manual/gun.ws_send.asciidoc b/doc/src/manual/gun.ws_send.asciidoc index fbb1025..b39f3f0 100644 --- a/doc/src/manual/gun.ws_send.asciidoc +++ b/doc/src/manual/gun.ws_send.asciidoc @@ -30,8 +30,7 @@ The pid of the Gun connection process. Frames:: -A Websocket frame. -// @todo One or more Websocket frame(s). +One or more Websocket frame(s). == Return value @@ -49,14 +48,14 @@ The atom `ok` is returned. gun:ws_send(ConnPid, {text, <<"Hello world!">>}). ---- -//.Send many frames including a close frame -//[source,erlang] -//---- -//gun:ws_send(ConnPid, [ -// {text, <<"See you later, world!">>}, -// close -//]). -//---- +.Send many frames including a close frame +[source,erlang] +---- +gun:ws_send(ConnPid, [ + {text, <<"See you later, world!">>}, + close +]). +---- == See also diff --git a/src/gun.erl b/src/gun.erl index 7b06aaf..bb659e7 100644 --- a/src/gun.erl +++ b/src/gun.erl @@ -98,6 +98,7 @@ -export([connecting/3]). -export([tls_handshake/3]). -export([connected/3]). +-export([closing/3]). -export([terminate/3]). -type req_headers() :: [{binary() | string() | atom(), iodata()}] @@ -165,19 +166,24 @@ -export_type([req_opts/0]). -type http_opts() :: #{ - keepalive => timeout(), + closing_timeout => timeout(), + flow => pos_integer(), + keepalive => timeout(), transform_header_name => fun((binary()) -> binary()), - version => 'HTTP/1.1' | 'HTTP/1.0' + version => 'HTTP/1.1' | 'HTTP/1.0' }. -export_type([http_opts/0]). -type http2_opts() :: #{ + closing_timeout => timeout(), + flow => pos_integer(), keepalive => timeout() }. -export_type([http2_opts/0]). %% @todo keepalive -type ws_opts() :: #{ + closing_timeout => timeout(), compress => boolean(), flow => pos_integer(), protocols => [{binary(), module()}] @@ -186,7 +192,7 @@ -record(state, { owner :: pid(), - owner_ref :: reference(), + status :: {up, reference()} | {down, any()} | shutdown, host :: inet:hostname() | inet:ip_address(), port :: inet:port_number(), origin_scheme :: binary(), @@ -781,7 +787,7 @@ init({Owner, Host, Port, Opts}) -> origin_port => Port, opts => Opts }, EvHandlerState0), - State = #state{owner=Owner, owner_ref=OwnerRef, + State = #state{owner=Owner, status={up, OwnerRef}, host=Host, port=Port, origin_scheme=OriginScheme, origin_host=Host, origin_port=Port, opts=Opts, transport=Transport, messages=Transport:messages(), @@ -875,11 +881,7 @@ connecting(_, {retries, Retries, LookupInfo}, State=#state{opts=Opts, }, EvHandlerState1), {next_state, not_connected, State#state{event_handler_state=EvHandlerState}, {next_event, internal, {retries, Retries, Reason}}} - end; -connecting({call, From}, {stream_info, _}, _) -> - {keep_state_and_data, {reply, From, {error, not_connected}}}; -connecting(Type, Event, State) -> - handle_common(Type, Event, ?FUNCTION_NAME, State). + end. tls_handshake(_, {retries, Retries, Socket0}, State=#state{opts=Opts, event_handler=EvHandler, event_handler_state=EvHandlerState0}) -> @@ -910,11 +912,7 @@ tls_handshake(_, {retries, Retries, Socket0}, State=#state{opts=Opts, }, EvHandlerState1), {next_state, not_connected, State#state{event_handler_state=EvHandlerState}, {next_event, internal, {retries, Retries, Reason}}} - end; -tls_handshake({call, From}, {stream_info, _}, _) -> - {keep_state_and_data, {reply, From, {error, not_connected}}}; -tls_handshake(Type, Event, State) -> - handle_common(Type, Event, ?FUNCTION_NAME, State). + end. ensure_alpn(Protocols0, TransOpts) -> Protocols = [case P of @@ -937,28 +935,6 @@ connected(internal, {connected, Socket, Protocol}, Owner ! {gun_up, self(), Protocol:name()}, {keep_state, keepalive_timeout(active(State#state{socket=Socket, protocol=Protocol, protocol_state=ProtoState}))}; -%% Socket events. -connected(info, {OK, Socket, Data}, State0=#state{socket=Socket, messages={OK, _, _}, - protocol=Protocol, protocol_state=ProtoState, - event_handler=EvHandler, event_handler_state=EvHandlerState0}) -> - {Commands, EvHandlerState} = Protocol:handle(Data, ProtoState, EvHandler, EvHandlerState0), - case commands(Commands, State0#state{event_handler_state=EvHandlerState}) of - {keep_state, State} -> - {keep_state, active(State)}; - Res -> - Res - end; -connected(info, {Closed, Socket}, State=#state{socket=Socket, messages={_, Closed, _}}) -> - disconnect(State, closed); -connected(info, {Error, Socket, Reason}, State=#state{socket=Socket, messages={_, _, Error}}) -> - disconnect(State, {error, Reason}); -%% Timeouts. -%% @todo HTTP/2 requires more timeouts than just the keepalive timeout. -%% We should have a timeout function in protocols that deal with -%% received timeouts. Currently the timeout messages are ignored. -connected(info, keepalive, State=#state{protocol=Protocol, protocol_state=ProtoState}) -> - ProtoState2 = Protocol:keepalive(ProtoState), - {keep_state, keepalive_timeout(State#state{protocol_state=ProtoState2})}; %% Public HTTP interface. connected(cast, {headers, ReplyTo, StreamRef, Method, Path, Headers, InitialFlow}, State=#state{origin_host=Host, origin_port=Port, @@ -976,14 +952,6 @@ connected(cast, {request, ReplyTo, StreamRef, Method, Path, Headers, Body, Initi StreamRef, ReplyTo, Method, Host, Port, Path, Headers, Body, InitialFlow, EvHandler, EvHandlerState0), {keep_state, State#state{protocol_state=ProtoState2, event_handler_state=EvHandlerState}}; -%% @todo Do we want to reject ReplyTo if it's not the process -%% who initiated the connection? For both data and cancel. -connected(cast, {data, ReplyTo, StreamRef, IsFin, Data}, - State=#state{protocol=Protocol, protocol_state=ProtoState, - event_handler=EvHandler, event_handler_state=EvHandlerState0}) -> - {ProtoState2, EvHandlerState} = Protocol:data(ProtoState, - StreamRef, ReplyTo, IsFin, Data, EvHandler, EvHandlerState0), - {keep_state, State#state{protocol_state=ProtoState2, event_handler_state=EvHandlerState}}; connected(cast, {connect, ReplyTo, StreamRef, Destination0, Headers, InitialFlow}, State=#state{protocol=Protocol, protocol_state=ProtoState}) -> %% The protocol option has been deprecated in favor of the protocols option. @@ -1006,41 +974,6 @@ connected(cast, {connect, ReplyTo, StreamRef, Destination0, Headers, InitialFlow end, ProtoState2 = Protocol:connect(ProtoState, StreamRef, ReplyTo, Destination, Headers, InitialFlow), {keep_state, State#state{protocol_state=ProtoState2}}; -%% When using gun_tls_proxy we need a separate message to know whether -%% the handshake succeeded and whether we need to switch to a different protocol. -connected(info, {gun_tls_proxy, Socket, {ok, NewProtocol}, HandshakeEvent}, - State0=#state{socket=Socket, protocol=CurrentProtocol, protocol_state=ProtoState, - event_handler=EvHandler, event_handler_state=EvHandlerState0}) -> - EvHandlerState = EvHandler:tls_handshake_end(HandshakeEvent#{ - socket => Socket, - protocol => NewProtocol:name() - }, EvHandlerState0), - State = State0#state{event_handler_state=EvHandlerState}, - case NewProtocol of - CurrentProtocol -> {keep_state, State}; - _ -> commands([{switch_protocol, NewProtocol, ProtoState}], State) - end; -connected(info, {gun_tls_proxy, Socket, Error = {error, Reason}, HandshakeEvent}, - State=#state{socket=Socket, event_handler=EvHandler, event_handler_state=EvHandlerState0}) -> - EvHandlerState = EvHandler:tls_handshake_end(HandshakeEvent#{ - error => Reason - }, EvHandlerState0), - commands([Error], State#state{event_handler_state=EvHandlerState}); -connected(cast, {update_flow, ReplyTo, StreamRef, Flow}, State0=#state{ - protocol=Protocol, protocol_state=ProtoState}) -> - Commands = Protocol:update_flow(ProtoState, ReplyTo, StreamRef, Flow), - case commands(Commands, State0) of - {keep_state, State} -> - {keep_state, active(State)}; - Res -> - Res - end; -connected(cast, {cancel, ReplyTo, StreamRef}, State=#state{ - protocol=Protocol, protocol_state=ProtoState, - event_handler=EvHandler, event_handler_state=EvHandlerState0}) -> - {ProtoState2, EvHandlerState} = Protocol:cancel(ProtoState, - StreamRef, ReplyTo, EvHandler, EvHandlerState0), - {keep_state, State#state{protocol_state=ProtoState2, event_handler_state=EvHandlerState}}; %% Public Websocket interface. %% @todo Maybe make an interface in the protocol module instead of checking on protocol name. %% An interface would also make sure that HTTP/1.0 can't upgrade. @@ -1067,40 +1000,148 @@ connected(cast, {ws_upgrade, ReplyTo, StreamRef, _, _, _}, _) -> ReplyTo ! {gun_error, self(), StreamRef, {badstate, "Websocket is only supported over HTTP/1.1."}}, keep_state_and_data; -connected(cast, {ws_send, Owner, Frame}, State=#state{ +connected(cast, {ws_send, Owner, Frames}, State=#state{ owner=Owner, protocol=Protocol=gun_ws, protocol_state=ProtoState, event_handler=EvHandler, event_handler_state=EvHandlerState0}) -> - {Commands, EvHandlerState} = Protocol:send(Frame, ProtoState, EvHandler, EvHandlerState0), + {Commands, EvHandlerState} = Protocol:send(Frames, ProtoState, EvHandler, EvHandlerState0), commands(Commands, State#state{event_handler_state=EvHandlerState}); connected(cast, {ws_send, ReplyTo, _}, _) -> ReplyTo ! {gun_error, self(), {badstate, "Connection needs to be upgraded to Websocket " "before the gun:ws_send/1 function can be used."}}, keep_state_and_data; -connected({call, From}, {stream_info, StreamRef}, +connected(Type, Event, State) -> + handle_common_connected(Type, Event, ?FUNCTION_NAME, State). + +%% Switch to the graceful connection close state. +closing(State=#state{protocol=Protocol, protocol_state=ProtoState, + event_handler=EvHandler, event_handler_state=EvHandlerState0}, Reason) -> + {Commands, EvHandlerState} = Protocol:closing(Reason, ProtoState, EvHandler, EvHandlerState0), + commands(Commands, State#state{event_handler_state=EvHandlerState}). + +%% @todo Should explicitly reject ws_send in this state? +closing(state_timeout, closing_timeout, State=#state{status=Status}) -> + Reason = case Status of + shutdown -> shutdown; + {down, _} -> owner_down; + _ -> normal + end, + disconnect(State, Reason); +closing(Type, Event, State) -> + handle_common_connected(Type, Event, ?FUNCTION_NAME, State). + +%% Common events when we have a connection. +%% +%% Socket events. +handle_common_connected(info, {OK, Socket, Data}, _, State0=#state{socket=Socket, messages={OK, _, _}, + protocol=Protocol, protocol_state=ProtoState, + event_handler=EvHandler, event_handler_state=EvHandlerState0}) -> + {Commands, EvHandlerState} = Protocol:handle(Data, ProtoState, EvHandler, EvHandlerState0), + case commands(Commands, State0#state{event_handler_state=EvHandlerState}) of + {keep_state, State} -> + {keep_state, active(State)}; + {next_state, closing, State, Actions} -> + {next_state, closing, active(State), Actions}; + Res -> + Res + end; +handle_common_connected(info, {Closed, Socket}, _, State=#state{socket=Socket, messages={_, Closed, _}}) -> + disconnect(State, closed); +handle_common_connected(info, {Error, Socket, Reason}, _, State=#state{socket=Socket, messages={_, _, Error}}) -> + disconnect(State, {error, Reason}); +%% Timeouts. +%% @todo HTTP/2 requires more timeouts than just the keepalive timeout. +%% We should have a timeout function in protocols that deal with +%% received timeouts. Currently the timeout messages are ignored. +handle_common_connected(info, keepalive, _, State=#state{protocol=Protocol, protocol_state=ProtoState}) -> + ProtoState2 = Protocol:keepalive(ProtoState), + {keep_state, keepalive_timeout(State#state{protocol_state=ProtoState2})}; +%% When using gun_tls_proxy we need a separate message to know whether +%% the handshake succeeded and whether we need to switch to a different protocol. +handle_common_connected(info, {gun_tls_proxy, Socket, {ok, NewProtocol}, HandshakeEvent}, _, + State0=#state{socket=Socket, protocol=CurrentProtocol, protocol_state=ProtoState, + event_handler=EvHandler, event_handler_state=EvHandlerState0}) -> + EvHandlerState = EvHandler:tls_handshake_end(HandshakeEvent#{ + socket => Socket, + protocol => NewProtocol:name() + }, EvHandlerState0), + State = State0#state{event_handler_state=EvHandlerState}, + case NewProtocol of + CurrentProtocol -> {keep_state, State}; + _ -> commands([{switch_protocol, NewProtocol, ProtoState}], State) + end; +handle_common_connected(info, {gun_tls_proxy, Socket, Error = {error, Reason}, HandshakeEvent}, _, + State=#state{socket=Socket, event_handler=EvHandler, event_handler_state=EvHandlerState0}) -> + EvHandlerState = EvHandler:tls_handshake_end(HandshakeEvent#{ + error => Reason + }, EvHandlerState0), + commands([Error], State#state{event_handler_state=EvHandlerState}); +%% @todo Do we want to reject ReplyTo if it's not the process +%% who initiated the connection? For both data and cancel. +handle_common_connected(cast, {data, ReplyTo, StreamRef, IsFin, Data}, _, + State=#state{protocol=Protocol, protocol_state=ProtoState, + event_handler=EvHandler, event_handler_state=EvHandlerState0}) -> + {ProtoState2, EvHandlerState} = Protocol:data(ProtoState, + StreamRef, ReplyTo, IsFin, Data, EvHandler, EvHandlerState0), + {keep_state, State#state{protocol_state=ProtoState2, event_handler_state=EvHandlerState}}; +handle_common_connected(cast, {update_flow, ReplyTo, StreamRef, Flow}, _, State0=#state{ + protocol=Protocol, protocol_state=ProtoState}) -> + Commands = Protocol:update_flow(ProtoState, ReplyTo, StreamRef, Flow), + case commands(Commands, State0) of + {keep_state, State} -> + {keep_state, active(State)}; + Res -> + Res + end; +handle_common_connected(cast, {cancel, ReplyTo, StreamRef}, _, State=#state{ + protocol=Protocol, protocol_state=ProtoState, + event_handler=EvHandler, event_handler_state=EvHandlerState0}) -> + {ProtoState2, EvHandlerState} = Protocol:cancel(ProtoState, + StreamRef, ReplyTo, EvHandler, EvHandlerState0), + {keep_state, State#state{protocol_state=ProtoState2, event_handler_state=EvHandlerState}}; +handle_common_connected({call, From}, {stream_info, StreamRef}, _, #state{protocol=Protocol, protocol_state=ProtoState}) -> {keep_state_and_data, {reply, From, Protocol:stream_info(ProtoState, StreamRef)}}; -connected(Type, Event, State) -> - handle_common(Type, Event, ?FUNCTION_NAME, State). +handle_common_connected(Type, Event, StateName, State) -> + handle_common(Type, Event, StateName, State). %% Common events. -handle_common(cast, {shutdown, Owner}, _, #state{owner=Owner}) -> - %% @todo Graceful shutdown. - stop; +handle_common(cast, {shutdown, Owner}, StateName, State=#state{ + owner=Owner, status=Status, socket=Socket, transport=Transport, protocol=Protocol}) -> + case {Socket, Protocol} of + {undefined, _} -> + {stop, shutdown}; + {_, undefined} -> + %% @todo This is missing the disconnect event. + Transport:close(Socket), + {stop, shutdown}; + _ when StateName =:= closing, element(1, Status) =:= up -> + {keep_state, status(State, shutdown)}; + _ when StateName =:= closing -> + keep_state_and_data; + _ -> + closing(status(State, shutdown), shutdown) + end; %% We stop when the owner is down. -handle_common(info, {'DOWN', OwnerRef, process, Owner, Reason}, _, State=#state{ - owner=Owner, owner_ref=OwnerRef, socket=Socket, transport=Transport, - protocol=Protocol, protocol_state=ProtoState, - event_handler=EvHandler, event_handler_state=EvHandlerState0}) -> - {_, EvHandlerState} = case Protocol of - undefined -> {ok, EvHandlerState0}; - _ -> Protocol:close(owner_down, ProtoState, EvHandler, EvHandlerState0) - end, - _ = case Socket of - undefined -> ok; - _ -> Transport:close(Socket) - end, - owner_down(Reason, State#state{event_handler_state=EvHandlerState}); +%% @todo We need to demonitor/flush when the status is no longer up. +handle_common(info, {'DOWN', OwnerRef, process, Owner, Reason}, StateName, State=#state{ + owner=Owner, status={up, OwnerRef}, socket=Socket, transport=Transport, protocol=Protocol}) -> + case Socket of + undefined -> + owner_down(Reason, State); + _ -> + case Protocol of + undefined -> + %% @todo This is missing the disconnect event. + Transport:close(Socket), + owner_down(Reason, State); + %% We are already closing so no need to initiate closing again. + _ when StateName =:= closing -> + {keep_state, status(State, {down, Reason})}; + _ -> + closing(status(State, {down, Reason}), owner_down) + end + end; handle_common({call, From}, _, _, _) -> {keep_state_and_data, {reply, From, {error, bad_call}}}; %% @todo The ReplyTo patch disabled the notowner behavior. @@ -1123,6 +1164,9 @@ commands([], State) -> {keep_state, State}; commands([close|_], State) -> disconnect(State, normal); +commands([{closing, Timeout}|_], State) -> + {next_state, closing, keepalive_cancel(State), + {state_timeout, Timeout, closing_timeout}}; commands([Error={error, _}|_], State) -> disconnect(State, Error); commands([{active, Active}|Tail], State) when is_boolean(Active) -> @@ -1176,33 +1220,37 @@ commands([{switch_protocol, Protocol, _ProtoState0}|Tail], State=#state{ commands(Tail, keepalive_timeout(State#state{protocol=Protocol, protocol_state=ProtoState, event_handler_state=EvHandlerState})). -disconnect(State=#state{owner=Owner, opts=Opts, +disconnect(State0=#state{owner=Owner, status=Status, opts=Opts, socket=Socket, transport=Transport, protocol=Protocol, protocol_state=ProtoState, event_handler=EvHandler, event_handler_state=EvHandlerState0}, Reason) -> - {_, EvHandlerState1} = Protocol:close(Reason, ProtoState, EvHandler, EvHandlerState0), - %% @todo Need a special state for orderly shutdown of a connection. + EvHandlerState1 = Protocol:close(Reason, ProtoState, EvHandler, EvHandlerState0), _ = Transport:close(Socket), - %% We closed the socket, discard any remaining socket events. - disconnect_flush(State), - %% @todo Stop keepalive timeout, flush message. - {KilledStreams, UnprocessedStreams} = Protocol:down(ProtoState), - Owner ! {gun_down, self(), Protocol:name(), Reason, KilledStreams, UnprocessedStreams}, - %% Trigger the disconnect event. - DisconnectEvent = #{ - reason => Reason - }, - EvHandlerState = EvHandler:disconnect(DisconnectEvent, EvHandlerState1), - Retry = maps:get(retry, Opts, 5), - case Retry of - 0 -> - {stop, {shutdown, Reason}, State#state{event_handler_state=EvHandlerState}}; - _ -> - {next_state, not_connected, - keepalive_cancel(State#state{socket=undefined, - protocol=undefined, protocol_state=undefined, - event_handler_state=EvHandlerState}), - {next_event, internal, {retries, Retry - 1, Reason}}} + EvHandlerState = EvHandler:disconnect(#{reason => Reason}, EvHandlerState1), + State = State0#state{event_handler_state=EvHandlerState}, + case Status of + {down, DownReason} -> + owner_down(DownReason, State); + shutdown -> + {stop, shutdown, State}; + {up, _} -> + %% We closed the socket, discard any remaining socket events. + disconnect_flush(State), + %% @todo Stop keepalive timeout, flush message. + {KilledStreams, UnprocessedStreams} = Protocol:down(ProtoState), + Owner ! {gun_down, self(), Protocol:name(), Reason, KilledStreams, UnprocessedStreams}, + Retry = maps:get(retry, Opts, 5), + case Retry of + 0 when Reason =:= normal -> + {stop, normal, State}; + 0 -> + {stop, {shutdown, Reason}, State}; + _ -> + {next_state, not_connected, + keepalive_cancel(State#state{socket=undefined, + protocol=undefined, protocol_state=undefined}), + {next_event, internal, {retries, Retry - 1, Reason}}} + end end. disconnect_flush(State=#state{socket=Socket, messages={OK, Closed, Error}}) -> @@ -1220,6 +1268,12 @@ active(State=#state{socket=Socket, transport=Transport}) -> Transport:setopts(Socket, [{active, once}]), State. +status(State=#state{status={up, OwnerRef}}, NewStatus) -> + demonitor(OwnerRef, [flush]), + State#state{status=NewStatus}; +status(State, NewStatus) -> + State#state{status=NewStatus}. + keepalive_timeout(State=#state{opts=Opts, protocol=Protocol}) -> {ProtoOptsKey, Default} = case Protocol of gun_http -> {http_opts, infinity}; diff --git a/src/gun_event.erl b/src/gun_event.erl index 1e158c4..a984796 100644 --- a/src/gun_event.erl +++ b/src/gun_event.erl @@ -113,7 +113,10 @@ -type push_promise_end_event() :: #{ stream_ref := reference(), reply_to := pid(), - promised_stream_ref := reference(), + %% No stream is created if we receive the push_promise while + %% in the process of gracefully shutting down the connection. + %% The promised stream is canceled immediately. + promised_stream_ref => reference(), method := binary(), uri := binary(), headers := [{binary(), iodata()}] diff --git a/src/gun_http.erl b/src/gun_http.erl index ec268ad..309772e 100644 --- a/src/gun_http.erl +++ b/src/gun_http.erl @@ -19,6 +19,7 @@ -export([init/4]). -export([handle/4]). -export([update_flow/4]). +-export([closing/4]). -export([close/4]). -export([keepalive/1]). -export([headers/11]). @@ -71,6 +72,10 @@ check_options(Opts) -> do_check_options([]) -> ok; +do_check_options([{closing_timeout, infinity}|Opts]) -> + do_check_options(Opts); +do_check_options([{closing_timeout, T}|Opts]) when is_integer(T), T > 0 -> + do_check_options(Opts); do_check_options([Opt={content_handlers, Handlers}|Opts]) -> case gun_content_handler:check_option(Handlers) of ok -> do_check_options(Opts); @@ -460,26 +465,47 @@ update_flow(State=#http_state{streams=Streams0}, _ReplyTo, StreamRef, Inc) -> end || Tuple = #stream{ref=Ref, flow=Flow} <- Streams0], {state, State#http_state{streams=Streams}}. -%% @todo Use Reason. -close(_, State=#http_state{in=body_close, streams=[#stream{ref=StreamRef, reply_to=ReplyTo}|Tail]}, - EvHandler, EvHandlerState0) -> +%% We can immediately close the connection when there's no streams. +closing(_, #http_state{streams=[]}, _, EvHandlerState) -> + {close, EvHandlerState}; +%% Otherwise we set connection: close (even if the header was not sent) +%% and close any pipelined streams, only keeping the active stream. +closing(Reason, State=#http_state{streams=[LastStream|Tail]}, _, EvHandlerState) -> + close_streams(Tail, {closing, Reason}), + {[ + {state, State#http_state{connection=close, streams=[LastStream]}}, + closing(State) + ], EvHandlerState}. + +closing(#http_state{opts=Opts}) -> + Timeout = maps:get(closing_timeout, Opts, 15000), + {closing, Timeout}. + +close(Reason, State=#http_state{in=body_close, + streams=[#stream{ref=StreamRef, reply_to=ReplyTo}|Tail]}, + EvHandler, EvHandlerState) -> + %% We may have more than one stream in case we somehow close abruptly. + close_streams(Tail, close_reason(Reason)), _ = send_data(<<>>, State, fin), - EvHandlerState = EvHandler:response_end(#{ + EvHandler:response_end(#{ stream_ref => StreamRef, reply_to => ReplyTo - }, EvHandlerState0), - {close_streams(Tail), EvHandlerState}; -close(_, #http_state{streams=Streams}, _, EvHandlerState) -> - {close_streams(Streams), EvHandlerState}. + }, EvHandlerState); +close(Reason, #http_state{streams=Streams}, _, EvHandlerState) -> + close_streams(Streams, close_reason(Reason)), + EvHandlerState. + +close_reason(closed) -> closed; +close_reason(Reason) -> {closed, Reason}. -close_streams([]) -> +%% @todo Do we want an event for this? +close_streams([], _) -> ok; -close_streams([#stream{is_alive=false}|Tail]) -> - close_streams(Tail); -close_streams([#stream{ref=StreamRef, reply_to=ReplyTo}|Tail]) -> - ReplyTo ! {gun_error, self(), StreamRef, {closed, - "The connection was lost."}}, - close_streams(Tail). +close_streams([#stream{is_alive=false}|Tail], Reason) -> + close_streams(Tail, Reason); +close_streams([#stream{ref=StreamRef, reply_to=ReplyTo}|Tail], Reason) -> + ReplyTo ! {gun_error, self(), StreamRef, Reason}, + close_streams(Tail, Reason). %% We don't send a keep-alive when a CONNECT request was initiated. keepalive(State=#http_state{streams=[#stream{ref={connect, _, _}}]}) -> diff --git a/src/gun_http2.erl b/src/gun_http2.erl index 3b3b79b..5942037 100644 --- a/src/gun_http2.erl +++ b/src/gun_http2.erl @@ -19,6 +19,7 @@ -export([init/4]). -export([handle/4]). -export([update_flow/4]). +-export([closing/4]). -export([close/4]). -export([keepalive/1]). -export([headers/11]). @@ -53,6 +54,10 @@ content_handlers :: gun_content_handler:opt(), buffer = <<>> :: binary(), + %% Current status of the connection. We use this to ensure we are + %% not sending the GOAWAY frame more than once. + status = connected :: connected | goaway | closing, + %% HTTP/2 state machine. http2_machine :: cow_http2_machine:http2_machine(), @@ -66,6 +71,10 @@ check_options(Opts) -> do_check_options([]) -> ok; +do_check_options([{closing_timeout, infinity}|Opts]) -> + do_check_options(Opts); +do_check_options([{closing_timeout, T}|Opts]) when is_integer(T), T > 0 -> + do_check_options(Opts); do_check_options([Opt={content_handlers, Handlers}|Opts]) -> case gun_content_handler:check_option(Handlers) of ok -> do_check_options(Opts); @@ -99,7 +108,8 @@ handle(Data, State=#http2_state{buffer=Buffer}, EvHandler, EvHandlerState) -> parse(<< Buffer/binary, Data/binary >>, State#http2_state{buffer= <<>>}, EvHandler, EvHandlerState). -parse(Data, State0=#http2_state{http2_machine=HTTP2Machine}, EvHandler, EvHandlerState0) -> +parse(Data, State0=#http2_state{status=Status, http2_machine=HTTP2Machine, streams=Streams}, + EvHandler, EvHandlerState0) -> MaxFrameSize = cow_http2_machine:get_local_setting(max_frame_size, HTTP2Machine), case cow_http2:parse(Data, MaxFrameSize) of {ok, Frame, Rest} -> @@ -116,7 +126,14 @@ parse(Data, State0=#http2_state{http2_machine=HTTP2Machine}, EvHandler, EvHandle parse(Rest, reset_stream(State0, StreamID, {stream_error, Reason, Human}), EvHandler, EvHandlerState0); Error = {connection_error, _, _} -> - {terminate(State0, Error), EvHandlerState0}; + {connection_error(State0, Error), EvHandlerState0}; + %% If we both received and sent a GOAWAY frame and there are no streams + %% currently running, we can close the connection immediately. + more when Status =/= connected, Streams =:= [] -> + {[{state, State0#http2_state{buffer=Data, status=closing}}, close], EvHandlerState0}; + %% Otherwise we enter the closing state. + more when Status =:= goaway -> + {[{state, State0#http2_state{buffer=Data, status=closing}}, closing(State0)], EvHandlerState0}; more -> {{state, State0#http2_state{buffer=Data}}, EvHandlerState0} end. @@ -169,9 +186,8 @@ frame(State=#http2_state{http2_machine=HTTP2Machine0}, Frame, EvHandler, EvHandl push_promise_frame(State#http2_state{http2_machine=HTTP2Machine}, StreamID, PromisedStreamID, Headers, PseudoHeaders, EvHandler, EvHandlerState); - {ok, Frame={goaway, _StreamID, _Reason, _Data}, HTTP2Machine} -> - {terminate(State#http2_state{http2_machine=HTTP2Machine}, - {stop, Frame, 'Server is going away.'}), + {ok, GoAway={goaway, _, _, _}, HTTP2Machine} -> + {goaway(State#http2_state{http2_machine=HTTP2Machine}, GoAway), EvHandlerState}; {send, SendData, HTTP2Machine} -> send_data(maybe_ack(State#http2_state{http2_machine=HTTP2Machine}, Frame), SendData, @@ -181,7 +197,7 @@ frame(State=#http2_state{http2_machine=HTTP2Machine0}, Frame, EvHandler, EvHandl StreamID, {stream_error, Reason, Human}), EvHandlerState}; {error, Error={connection_error, _, _}, HTTP2Machine} -> - {terminate(State#http2_state{http2_machine=HTTP2Machine}, Error), + {connection_error(State#http2_state{http2_machine=HTTP2Machine}, Error), EvHandlerState} end. @@ -318,7 +334,8 @@ rst_stream_frame(State=#http2_state{streams=Streams0}, end. %% Pushed streams receive the same initial flow value as the parent stream. -push_promise_frame(State=#http2_state{streams=Streams}, +push_promise_frame(State=#http2_state{socket=Socket, transport=Transport, + status=Status, http2_machine=HTTP2Machine0, streams=Streams}, StreamID, PromisedStreamID, Headers, #{ method := Method, scheme := Scheme, authority := Authority, path := Path}, @@ -326,25 +343,39 @@ push_promise_frame(State=#http2_state{streams=Streams}, #stream{ref=StreamRef, reply_to=ReplyTo, flow=InitialFlow} = get_stream_by_id(State, StreamID), PromisedStreamRef = make_ref(), URI = iolist_to_binary([Scheme, <<"://">>, Authority, Path]), - ReplyTo ! {gun_push, self(), StreamRef, PromisedStreamRef, Method, URI, Headers}, - EvHandlerState = EvHandler:push_promise_end(#{ + PushPromiseEvent0 = #{ stream_ref => StreamRef, reply_to => ReplyTo, - promised_stream_ref => PromisedStreamRef, method => Method, uri => URI, headers => Headers - }, EvHandlerState0), - NewStream = #stream{id=PromisedStreamID, ref=PromisedStreamRef, - reply_to=ReplyTo, flow=InitialFlow}, - {State#http2_state{streams=[NewStream|Streams]}, EvHandlerState}. + }, + PushPromiseEvent = case Status of + connected -> + ReplyTo ! {gun_push, self(), StreamRef, PromisedStreamRef, Method, URI, Headers}, + PushPromiseEvent0#{promised_stream_ref => PromisedStreamRef}; + _ -> + PushPromiseEvent0 + end, + EvHandlerState = EvHandler:push_promise_end(PushPromiseEvent, EvHandlerState0), + case Status of + connected -> + NewStream = #stream{id=PromisedStreamID, ref=PromisedStreamRef, + reply_to=ReplyTo, flow=InitialFlow}, + {State#http2_state{streams=[NewStream|Streams]}, EvHandlerState}; + %% We cancel the push_promise immediately when we are shutting down. + _ -> + {ok, HTTP2Machine} = cow_http2_machine:reset_stream(PromisedStreamID, HTTP2Machine0), + Transport:send(Socket, cow_http2:rst_stream(PromisedStreamID, cancel)), + {State#http2_state{http2_machine=HTTP2Machine}, EvHandlerState} + end. ignored_frame(State=#http2_state{http2_machine=HTTP2Machine0}) -> case cow_http2_machine:ignored_frame(HTTP2Machine0) of {ok, HTTP2Machine} -> State#http2_state{http2_machine=HTTP2Machine}; {error, Error={connection_error, _, _}, HTTP2Machine} -> - terminate(State#http2_state{http2_machine=HTTP2Machine}, Error) + connection_error(State#http2_state{http2_machine=HTTP2Machine}, Error) end. update_flow(State=#http2_state{socket=Socket, transport=Transport, @@ -369,16 +400,72 @@ update_flow(State=#http2_state{socket=Socket, transport=Transport, [] end. -%% @todo Use Reason. -close(_, #http2_state{streams=Streams}, _, EvHandlerState) -> - {close_streams(Streams), EvHandlerState}. +%% We may have to cancel streams even if we receive multiple +%% GOAWAY frames as the LastStreamID value may be lower than +%% the one previously received. +goaway(State0=#http2_state{socket=Socket, transport=Transport, http2_machine=HTTP2Machine, + status=Status, streams=Streams0}, {goaway, LastStreamID, Reason, _}) -> + Streams = goaway_streams(Streams0, LastStreamID, {goaway, Reason, 'The connection is going away.'}, []), + State = State0#http2_state{streams=Streams}, + case Status of + connected -> + Transport:send(Socket, cow_http2:goaway( + cow_http2_machine:get_last_streamid(HTTP2Machine), + no_error, <<>>)), + State#http2_state{status=goaway}; + _ -> + State + end. + +%% Cancel server-initiated streams that are above LastStreamID. +goaway_streams([], _, _, Acc) -> + Acc; +goaway_streams([Stream=#stream{id=StreamID}|Tail], LastStreamID, Reason, Acc) + when StreamID > LastStreamID, (StreamID rem 2) =:= 1 -> + close_stream(Stream, Reason), + goaway_streams(Tail, LastStreamID, Reason, Acc); +goaway_streams([Stream|Tail], LastStreamID, Reason, Acc) -> + goaway_streams(Tail, LastStreamID, Reason, [Stream|Acc]). + +%% We are already closing, do nothing. +closing(_, #http2_state{status=closing}, _, EvHandlerState) -> + {[], EvHandlerState}; +closing(Reason0, State=#http2_state{socket=Socket, transport=Transport, + http2_machine=HTTP2Machine}, _, EvHandlerState) -> + Reason = case Reason0 of + normal -> no_error; + owner_down -> no_error; + _ -> internal_error + end, + Transport:send(Socket, cow_http2:goaway( + cow_http2_machine:get_last_streamid(HTTP2Machine), + Reason, <<>>)), + {[ + {state, State#http2_state{status=closing}}, + closing(State) + ], EvHandlerState}. -close_streams([]) -> +closing(#http2_state{opts=Opts}) -> + Timeout = maps:get(closing_timeout, Opts, 15000), + {closing, Timeout}. + +close(Reason, #http2_state{streams=Streams}, _, EvHandlerState) -> + close_streams(Streams, close_reason(Reason)), + EvHandlerState. + +close_reason(closed) -> closed; +close_reason(Reason) -> {closed, Reason}. + +close_streams([], _) -> ok; -close_streams([#stream{ref=StreamRef, reply_to=ReplyTo}|Tail]) -> - ReplyTo ! {gun_error, self(), StreamRef, {closed, - "The connection was lost."}}, - close_streams(Tail). +close_streams([Stream|Tail], Reason) -> + close_stream(Stream, Reason), + close_streams(Tail, Reason). + +%% @todo Do we want an event for this? +close_stream(#stream{ref=StreamRef, reply_to=ReplyTo}, Reason) -> + ReplyTo ! {gun_error, self(), StreamRef, Reason}, + ok. keepalive(State=#http2_state{socket=Socket, transport=Transport}) -> Transport:send(Socket, cow_http2:ping(0)), @@ -429,6 +516,7 @@ request(State=#http2_state{socket=Socket, transport=Transport, opts=Opts, headers => Headers }, EvHandlerState1 = EvHandler:request_start(RequestEvent, EvHandlerState0), + %% @todo We should not send an empty DATA frame on empty bodies. {ok, IsFin, HeaderBlock, HTTP2Machine} = cow_http2_machine:prepare_headers( StreamID, HTTP2Machine1, nofin, PseudoHeaders, Headers), Transport:send(Socket, cow_http2:headers(StreamID, IsFin, HeaderBlock)), @@ -586,8 +674,9 @@ down(#http2_state{streams=Streams}) -> KilledStreams = [Ref || #stream{ref=Ref} <- Streams], {KilledStreams, []}. -terminate(#http2_state{socket=Socket, transport=Transport, - http2_machine=HTTP2Machine, streams=Streams}, Reason) -> +connection_error(#http2_state{socket=Socket, transport=Transport, + http2_machine=HTTP2Machine, streams=Streams}, + {connection_error, Reason, _}) -> %% The connection is going away either at the request of the server, %% or because an error occurred in the protocol. Inform the streams. %% @todo We should not send duplicate messages to processes. @@ -597,12 +686,9 @@ terminate(#http2_state{socket=Socket, transport=Transport, _ = [ReplyTo ! {gun_error, self(), Reason} || #stream{reply_to=ReplyTo} <- Streams], Transport:send(Socket, cow_http2:goaway( cow_http2_machine:get_last_streamid(HTTP2Machine), - terminate_reason(Reason), <<>>)), + Reason, <<>>)), close. -terminate_reason({connection_error, Reason, _}) -> Reason; -terminate_reason({stop, _, _}) -> no_error. - %% Stream functions. error_stream_closed(State, StreamRef, ReplyTo) -> diff --git a/src/gun_ws.erl b/src/gun_ws.erl index 42cf049..49911dc 100644 --- a/src/gun_ws.erl +++ b/src/gun_ws.erl @@ -19,6 +19,7 @@ -export([init/9]). -export([handle/4]). -export([update_flow/4]). +-export([closing/4]). -export([close/4]). -export([send/4]). -export([down/1]). @@ -38,8 +39,10 @@ stream_ref :: reference(), socket :: inet:socket() | ssl:sslsocket(), transport :: module(), + opts = #{} :: map(), %% @todo buffer = <<>> :: binary(), in = head :: head | #payload{} | close, + out = head :: head | close, frag_state = undefined :: cow_ws:frag_state(), utf8_state = 0 :: cow_ws:utf8_state(), extensions = #{} :: cow_ws:extensions(), @@ -53,6 +56,10 @@ check_options(Opts) -> do_check_options([]) -> ok; +do_check_options([{closing_timeout, infinity}|Opts]) -> + do_check_options(Opts); +do_check_options([{closing_timeout, T}|Opts]) when is_integer(T), T > 0 -> + do_check_options(Opts); do_check_options([{compress, B}|Opts]) when B =:= true; B =:= false -> do_check_options(Opts); do_check_options([{default_protocol, M}|Opts]) when is_atom(M) -> @@ -75,12 +82,15 @@ init(Owner, Socket, Transport, StreamRef, Headers, Extensions, InitialFlow, Hand Owner ! {gun_upgrade, self(), StreamRef, [<<"websocket">>], Headers}, {ok, HandlerState} = Handler:init(Owner, StreamRef, Headers, Opts), {switch_protocol, ?MODULE, #ws_state{owner=Owner, stream_ref=StreamRef, - socket=Socket, transport=Transport, extensions=Extensions, + socket=Socket, transport=Transport, opts=Opts, extensions=Extensions, flow=InitialFlow, handler=Handler, handler_state=HandlerState}}. %% Do not handle anything if we received a close frame. -handle(_, State=#ws_state{in=close}, _, EvHandlerState) -> - {{state, State}, EvHandlerState}; +%% Initiate or terminate the closing state depending on whether we sent a close yet. +handle(_, State=#ws_state{in=close, out=close}, _, EvHandlerState) -> + {[{state, State}, close], EvHandlerState}; +handle(_, State=#ws_state{in=close}, EvHandler, EvHandlerState) -> + closing(normal, State, EvHandler, EvHandlerState); %% Shortcut for common case when Data is empty after processing a frame. handle(<<>>, State=#ws_state{in=head}, _, EvHandlerState) -> maybe_active(State, EvHandlerState); @@ -119,7 +129,7 @@ handle(Data, State=#ws_state{owner=ReplyTo, stream_ref=StreamRef, buffer=Buffer, more -> maybe_active(State#ws_state{buffer=Data2}, EvHandlerState1); error -> - close({error, badframe}, State, EvHandler, EvHandlerState1) + closing({error, badframe}, State, EvHandler, EvHandlerState1) end; handle(Data, State=#ws_state{in=In=#payload{type=Type, rsv=Rsv, len=Len, mask_key=MaskKey, close_code=CloseCode, unmasked=Unmasked, unmasked_len=UnmaskedLen}, frag_state=FragState, @@ -143,7 +153,7 @@ handle(Data, State=#ws_state{in=In=#payload{type=Type, rsv=Rsv, len=Len, mask_ke len=Len - byte_size(Data), unmasked_len=UnmaskedLen + byte_size(Data)}, utf8_state=Utf8State2}, EvHandlerState); Error = {error, _Reason} -> - close(Error, State, EvHandler, EvHandlerState) + closing(Error, State, EvHandler, EvHandlerState) end. maybe_active(State=#ws_state{flow=Flow}, EvHandlerState) -> @@ -165,11 +175,11 @@ dispatch(Rest, State0=#ws_state{owner=ReplyTo, stream_ref=StreamRef, }, EvHandlerState0), case cow_ws:make_frame(Type, Payload, CloseCode, FragState) of ping -> - {{state, State}, EvHandlerState} = send(pong, State0, EvHandler, EvHandlerState1), - handle(Rest, State, EvHandler, EvHandlerState); + {[], EvHandlerState} = send(pong, State0, EvHandler, EvHandlerState1), + handle(Rest, State0, EvHandler, EvHandlerState); {ping, Payload} -> - {{state, State}, EvHandlerState} = send({pong, Payload}, State0, EvHandler, EvHandlerState1), - handle(Rest, State, EvHandler, EvHandlerState); + {[], EvHandlerState} = send({pong, Payload}, State0, EvHandler, EvHandlerState1), + handle(Rest, State0, EvHandler, EvHandlerState); pong -> handle(Rest, State0, EvHandler, EvHandlerState1); {pong, _} -> @@ -200,26 +210,30 @@ update_flow(State=#ws_state{flow=Flow0}, _ReplyTo, _StreamRef, Inc) -> {active, Flow > 0} ]. -close(Reason, State, EvHandler, EvHandlerState) -> - case Reason of - normal -> - send({close, 1000, <<>>}, State, EvHandler, EvHandlerState); - owner_down -> - send({close, 1001, <<>>}, State, EvHandler, EvHandlerState); - {error, badframe} -> - send({close, 1002, <<>>}, State, EvHandler, EvHandlerState); - {error, badencoding} -> - send({close, 1007, <<>>}, State, EvHandler, EvHandlerState); - %% Socket errors; do nothing. - closed -> - {ok, EvHandlerState}; - {error, _} -> - {ok, EvHandlerState} - end. +%% The user already sent the close frame; do nothing. +closing(_, State=#ws_state{out=close}, _, EvHandlerState) -> + {closing(State), EvHandlerState}; +closing(Reason, State, EvHandler, EvHandlerState) -> + Code = case Reason of + normal -> 1000; + owner_down -> 1001; + shutdown -> 1001; + {error, badframe} -> 1002; + {error, badencoding} -> 1007 + end, + send({close, Code, <<>>}, State, EvHandler, EvHandlerState). +closing(#ws_state{opts=Opts}) -> + Timeout = maps:get(closing_timeout, Opts, 15000), + {closing, Timeout}. + +close(_, _, _, EvHandlerState) -> + EvHandlerState. + +%% Send one frame. send(Frame, State=#ws_state{owner=ReplyTo, stream_ref=StreamRef, - socket=Socket, transport=Transport, extensions=Extensions}, - EvHandler, EvHandlerState0) -> + socket=Socket, transport=Transport, in=In, extensions=Extensions}, + EvHandler, EvHandlerState0) when not is_list(Frame) -> WsSendFrameEvent = #{ stream_ref => StreamRef, reply_to => ReplyTo, @@ -229,10 +243,28 @@ send(Frame, State=#ws_state{owner=ReplyTo, stream_ref=StreamRef, EvHandlerState1 = EvHandler:ws_send_frame_start(WsSendFrameEvent, EvHandlerState0), Transport:send(Socket, cow_ws:masked_frame(Frame, Extensions)), EvHandlerState = EvHandler:ws_send_frame_end(WsSendFrameEvent, EvHandlerState1), - case Frame of - close -> {close, EvHandlerState}; - {close, _, _} -> {close, EvHandlerState}; - _ -> {{state, State}, EvHandlerState} + if + Frame =:= close; element(1, Frame) =:= close -> + {[ + {state, State#ws_state{out=close}}, + %% We can close immediately if we already received a close frame. + case In of + close -> close; + _ -> closing(State) + end + ], EvHandlerState}; + true -> + {[], EvHandlerState} + end; +%% Send many frames. +send([], _, _, EvHandlerState) -> + {[], EvHandlerState}; +send([Frame|Tail], State, EvHandler, EvHandlerState0) -> + case send(Frame, State, EvHandler, EvHandlerState0) of + {[], EvHandlerState} -> + send(Tail, State, EvHandler, EvHandlerState); + Other -> + Other end. %% Websocket has no concept of streams. diff --git a/test/gun_SUITE.erl b/test/gun_SUITE.erl index 3d3734b..0beee43 100644 --- a/test/gun_SUITE.erl +++ b/test/gun_SUITE.erl @@ -90,94 +90,6 @@ do_timeout(Opt, Timeout) -> gun:close(Pid) end. -detect_owner_down(_) -> - {ok, ListenSocket} = gen_tcp:listen(0, [binary, {active, false}]), - {ok, {_, Port}} = inet:sockname(ListenSocket), - Self = self(), - spawn(fun() -> - {ok, ConnPid} = gun:open("localhost", Port), - Self ! {conn, ConnPid}, - gun:await_up(ConnPid), - timer:sleep(100) - end), - {ok, _} = gen_tcp:accept(ListenSocket, 5000), - Pid = receive - {conn, C} -> - C - after 1000 -> - error(timeout) - end, - Ref = monitor(process, Pid), - receive - {'DOWN', Ref, process, Pid, normal} -> - ok - after 1000 -> - true = erlang:is_process_alive(Pid), - error(timeout) - end. - -detect_owner_down_unexpected(_) -> - {ok, ListenSocket} = gen_tcp:listen(0, [binary, {active, false}]), - {ok, {_, Port}} = inet:sockname(ListenSocket), - Self = self(), - spawn(fun() -> - {ok, ConnPid} = gun:open("localhost", Port), - Self ! {conn, ConnPid}, - gun:await_up(ConnPid), - timer:sleep(100), - exit(unexpected) - end), - {ok, _} = gen_tcp:accept(ListenSocket, 5000), - Pid = receive - {conn, C} -> - C - after 1000 -> - error(timeout) - end, - Ref = monitor(process, Pid), - receive - {'DOWN', Ref, process, Pid, {shutdown, {owner_down, unexpected}}} -> - ok - after 1000 -> - true = erlang:is_process_alive(Pid), - error(timeout) - end. - -detect_owner_down_ws(_) -> - Name = name(), - {ok, _} = cowboy:start_clear(Name, [], #{env => #{ - dispatch => cowboy_router:compile([{'_', [{"/", ws_echo_h, []}]}]) - }}), - Port = ranch:get_port(Name), - Self = self(), - spawn(fun() -> - {ok, ConnPid} = gun:open("localhost", Port), - Self ! {conn, ConnPid}, - gun:await_up(ConnPid), - gun:ws_upgrade(ConnPid, "/", []), - receive - {gun_upgrade, ConnPid, _, [<<"websocket">>], _} -> - ok - after 1000 -> - error(timeout) - end - end), - Pid = receive - {conn, C} -> - C - after 1000 -> - error(timeout) - end, - Ref = monitor(process, Pid), - receive - {'DOWN', Ref, process, Pid, normal} -> - ok - after 1000 -> - true = erlang:is_process_alive(Pid), - error(timeout) - end, - cowboy:stop_listener(Name). - ignore_empty_data_http(_) -> doc("When gun:data/4 is called with nofin and empty data, it must be ignored."), {ok, OriginPid, OriginPort} = init_origin(tcp, http), diff --git a/test/handlers/delayed_hello_h.erl b/test/handlers/delayed_hello_h.erl new file mode 100644 index 0000000..68ef1ad --- /dev/null +++ b/test/handlers/delayed_hello_h.erl @@ -0,0 +1,11 @@ +%% Feel free to use, reuse and abuse the code in this file. + +-module(delayed_hello_h). + +-export([init/2]). + +init(Req, Timeout) -> + timer:sleep(Timeout), + {ok, cowboy_req:reply(200, #{ + <<"content-type">> => <<"text/plain">> + }, <<"Hello world!">>, Req), Timeout}. diff --git a/test/handlers/delayed_push_h.erl b/test/handlers/delayed_push_h.erl new file mode 100644 index 0000000..dbb8e56 --- /dev/null +++ b/test/handlers/delayed_push_h.erl @@ -0,0 +1,13 @@ +%% Feel free to use, reuse and abuse the code in this file. + +-module(delayed_push_h). + +-export([init/2]). + +init(Req, Timeout) -> + timer:sleep(Timeout), + cowboy_req:push("/", #{<<"accept">> => <<"text/plain">>}, Req), + cowboy_req:push("/empty", #{<<"accept">> => <<"text/plain">>}, Req), + {ok, cowboy_req:reply(200, #{ + <<"content-type">> => <<"text/plain">> + }, <<"Hello world!">>, Req), Timeout}. diff --git a/test/handlers/ws_frozen_h.erl b/test/handlers/ws_frozen_h.erl new file mode 100644 index 0000000..bac77c2 --- /dev/null +++ b/test/handlers/ws_frozen_h.erl @@ -0,0 +1,23 @@ +%% Feel free to use, reuse and abuse the code in this file. + +-module(ws_frozen_h). + +-export([init/2]). +-export([websocket_init/1]). +-export([websocket_handle/2]). +-export([websocket_info/2]). + +init(Req, State) -> + {cowboy_websocket, Req, State, #{ + compress => true + }}. + +websocket_init(Timeout) -> + timer:sleep(Timeout), + {ok, undefined}. + +websocket_handle(_Frame, State) -> + {[], State}. + +websocket_info(_Info, State) -> + {[], State}. diff --git a/test/handlers/ws_timeout_close_h.erl b/test/handlers/ws_timeout_close_h.erl new file mode 100644 index 0000000..6fef168 --- /dev/null +++ b/test/handlers/ws_timeout_close_h.erl @@ -0,0 +1,25 @@ +%% Feel free to use, reuse and abuse the code in this file. + +-module(ws_timeout_close_h). + +-export([init/2]). +-export([websocket_init/1]). +-export([websocket_handle/2]). +-export([websocket_info/2]). + +init(Req, State) -> + {cowboy_websocket, Req, State, #{ + compress => true + }}. + +websocket_init(Timeout) -> + _ = erlang:send_after(Timeout, self(), timeout_close), + {[], undefined}. + +websocket_handle(_Frame, State) -> + {[], State}. + +websocket_info(timeout_close, State) -> + {[{close, 3333, <<>>}], State}; +websocket_info(_Info, State) -> + {[], State}. diff --git a/test/rfc7540_SUITE.erl b/test/rfc7540_SUITE.erl index f494c9f..507b75a 100644 --- a/test/rfc7540_SUITE.erl +++ b/test/rfc7540_SUITE.erl @@ -81,11 +81,11 @@ lingering_data_counts_toward_connection_window(_) -> {ok, _} = gen_tcp:recv(Socket, SkipLen, 1000), %% Skip the data. {ok, <<_:24, 0:8, _:8, 1:32>>} = Transport:recv(Socket, 9, 1000), + %% Step 3. %% Send a HEADERS frame. {HeadersBlock, _} = cow_hpack:encode([ {<<":status">>, <<"200">>} ]), - %% Step 3. ok = Transport:send(Socket, [ cow_http2:headers(1, nofin, HeadersBlock) ]), diff --git a/test/shutdown_SUITE.erl b/test/shutdown_SUITE.erl new file mode 100644 index 0000000..e52a3ab --- /dev/null +++ b/test/shutdown_SUITE.erl @@ -0,0 +1,609 @@ +%% Copyright (c) 2019, Loïc Hoguin <[email protected]> +%% +%% Permission to use, copy, modify, and/or distribute this software for any +%% purpose with or without fee is hereby granted, provided that the above +%% copyright notice and this permission notice appear in all copies. +%% +%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES +%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF +%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR +%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES +%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN +%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF +%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + +-module(shutdown_SUITE). +-compile(export_all). +-compile(nowarn_export_all). + +-import(ct_helper, [doc/1]). +-import(ct_helper, [config/2]). +-import(gun_test, [init_origin/3]). + +all() -> + [{group, shutdown}]. + +groups() -> + [{shutdown, [parallel], ct_helper:all(?MODULE)}]. + +init_per_suite(Config) -> + ProtoOpts = #{env => #{ + dispatch => cowboy_router:compile([{'_', [ + {"/", hello_h, []}, + {"/delayed", delayed_hello_h, 500}, + {"/delayed_push", delayed_push_h, 500}, + {"/empty", empty_h, []}, + {"/ws", ws_echo_h, []}, + {"/ws_frozen", ws_frozen_h, 500}, + %% This timeout determines how long the test suite will run. + {"/ws_frozen_long", ws_frozen_h, 1500}, + {"/ws_timeout_close", ws_timeout_close_h, 500} + ]}]) + }}, + {ok, _} = cowboy:start_clear(?MODULE, [], ProtoOpts), + OriginPort = ranch:get_port(?MODULE), + [{origin_port, OriginPort}|Config]. + +end_per_suite(_) -> + ok = cowboy:stop_listener(?MODULE). + +%% Tests. +%% +%% This test suite checks that the various ways to shut down +%% the connection are all working as expected for the different +%% protocols and scenarios. + +not_connected_gun_shutdown(_) -> + doc("Confirm that the Gun process shuts down gracefully " + "when calling gun:shutdown/1 while it isn't connected."), + {ok, ConnPid} = gun:open("localhost", 12345), + ConnRef = monitor(process, ConnPid), + gun:shutdown(ConnPid), + gun_is_down(ConnPid, ConnRef, shutdown). + +not_connected_owner_down(_) -> + doc("Confirm that the Gun process shuts down when the owner exits normally " + "while it isn't connected."), + do_not_connected_owner_down(normal, normal). + +not_connected_owner_down_error(_) -> + doc("Confirm that the Gun process shuts down when the owner exits with an error " + "while it isn't connected."), + do_not_connected_owner_down(unexpected, {shutdown, {owner_down, unexpected}}). + +do_not_connected_owner_down(ExitReason, DownReason) -> + Self = self(), + spawn(fun() -> + {ok, ConnPid} = gun:open("localhost", 12345), + Self ! {conn, ConnPid}, + timer:sleep(500), + exit(ExitReason) + end), + ConnPid = receive {conn, C} -> C after 1000 -> error(timeout) end, + ConnRef = monitor(process, ConnPid), + gun_is_down(ConnPid, ConnRef, DownReason). + +http1_gun_shutdown_no_streams(Config) -> + doc("HTTP/1.1: Confirm that the Gun process shuts down gracefully " + "when calling gun:shutdown/1 with no active streams."), + do_http_gun_shutdown_no_streams(Config, http). + +do_http_gun_shutdown_no_streams(Config, Protocol) -> + {ok, ConnPid} = gun:open("localhost", config(origin_port, Config), #{ + protocols => [Protocol] + }), + {ok, Protocol} = gun:await_up(ConnPid), + ConnRef = monitor(process, ConnPid), + gun:shutdown(ConnPid), + gun_is_down(ConnPid, ConnRef, shutdown). + +http1_gun_shutdown_one_stream(Config) -> + doc("HTTP/1.1: Confirm that the Gun process shuts down gracefully " + "when calling gun:shutdown/1 with one active stream."), + do_http_gun_shutdown_one_stream(Config, http). + +do_http_gun_shutdown_one_stream(Config, Protocol) -> + {ok, ConnPid} = gun:open("localhost", config(origin_port, Config), #{ + protocols => [Protocol] + }), + {ok, Protocol} = gun:await_up(ConnPid), + ConnRef = monitor(process, ConnPid), + StreamRef = gun:get(ConnPid, "/delayed"), + gun:shutdown(ConnPid), + {response, nofin, 200, _} = gun:await(ConnPid, StreamRef), + {ok, _} = gun:await_body(ConnPid, StreamRef), + gun_is_down(ConnPid, ConnRef, shutdown). + +http1_gun_shutdown_pipelined_streams(Config) -> + doc("HTTP/1.1: Confirm that the Gun process shuts down gracefully " + "when calling gun:shutdown/1 with one active stream and additional pipelined streams."), + Protocol = http, + {ok, ConnPid} = gun:open("localhost", config(origin_port, Config), #{ + protocols => [Protocol] + }), + {ok, Protocol} = gun:await_up(ConnPid), + ConnRef = monitor(process, ConnPid), + StreamRef1 = gun:get(ConnPid, "/delayed"), + StreamRef2 = gun:get(ConnPid, "/delayed"), + StreamRef3 = gun:get(ConnPid, "/delayed"), + gun:shutdown(ConnPid), + %% Pipelined streams are canceled immediately. + {error, {stream_error, {closing, shutdown}}} = gun:await(ConnPid, StreamRef2), + {error, {stream_error, {closing, shutdown}}} = gun:await(ConnPid, StreamRef3), + %% The active stream is still processed. + {response, nofin, 200, _} = gun:await(ConnPid, StreamRef1), + {ok, _} = gun:await_body(ConnPid, StreamRef1), + gun_is_down(ConnPid, ConnRef, shutdown). + +http1_gun_shutdown_timeout(Config) -> + doc("HTTP/1.1: Confirm that the Gun process shuts down when the closing_timeout " + "triggers after calling gun:shutdown/1 with one active stream."), + do_http_gun_shutdown_timeout(Config, http, http_opts). + +do_http_gun_shutdown_timeout(Config, Protocol, ProtoOpts) -> + {ok, ConnPid} = gun:open("localhost", config(origin_port, Config), #{ + ProtoOpts => #{closing_timeout => 100}, + protocols => [Protocol] + }), + {ok, Protocol} = gun:await_up(ConnPid), + ConnRef = monitor(process, ConnPid), + StreamRef = gun:get(ConnPid, "/delayed"), + gun:shutdown(ConnPid), + %% The closing timeout occurs before the server gets to send the response. + %% We get a 'closed' error instead of 'closing' as a result. + {error, {stream_error, {closed, shutdown}}} = gun:await(ConnPid, StreamRef), + gun_is_down(ConnPid, ConnRef, shutdown). + +http1_owner_down(Config) -> + doc("HTTP/1.1: Confirm that the Gun process shuts down when the owner exits normally."), + do_http_owner_down(Config, http, normal, normal). + +http1_owner_down_error(Config) -> + doc("HTTP/1.1: Confirm that the Gun process shuts down when the owner exits with an error."), + do_http_owner_down(Config, http, unexpected, {shutdown, {owner_down, unexpected}}). + +do_http_owner_down(Config, Protocol, ExitReason, DownReason) -> + Self = self(), + spawn(fun() -> + {ok, ConnPid} = gun:open("localhost", config(origin_port, Config), #{ + protocols => [Protocol] + }), + Self ! {conn, ConnPid}, + {ok, Protocol} = gun:await_up(ConnPid), + timer:sleep(500), + exit(ExitReason) + end), + ConnPid = receive {conn, C} -> C after 1000 -> error(timeout) end, + ConnRef = monitor(process, ConnPid), + gun_is_down(ConnPid, ConnRef, DownReason). + +http1_request_connection_close(Config) -> + doc("HTTP/1.1: Confirm that the Gun process shuts down gracefully " + "when sending a request with the connection: close header and " + "retry is disabled."), + Protocol = http, + {ok, ConnPid} = gun:open("localhost", config(origin_port, Config), #{ + protocols => [Protocol], + retry => 0 + }), + {ok, Protocol} = gun:await_up(ConnPid), + ConnRef = monitor(process, ConnPid), + StreamRef = gun:get(ConnPid, "/", #{ + <<"connection">> => <<"close">> + }), + %% We get the response followed by Gun shutting down. + {response, nofin, 200, _} = gun:await(ConnPid, StreamRef), + {ok, _} = gun:await_body(ConnPid, StreamRef), + gun_is_down(ConnPid, ConnRef, normal). + +http1_request_connection_close_pipeline(Config) -> + doc("HTTP/1.1: Confirm that the Gun process shuts down gracefully " + "when sending a request with the connection: close header and " + "retry is disabled. Pipelined requests get canceled."), + Protocol = http, + {ok, ConnPid} = gun:open("localhost", config(origin_port, Config), #{ + protocols => [Protocol], + retry => 0 + }), + {ok, Protocol} = gun:await_up(ConnPid), + ConnRef = monitor(process, ConnPid), + StreamRef1 = gun:get(ConnPid, "/", #{ + <<"connection">> => <<"close">> + }), + StreamRef2 = gun:get(ConnPid, "/"), + StreamRef3 = gun:get(ConnPid, "/"), + %% We get the response, pipelined streams get canceled, followed by Gun shutting down. + {response, nofin, 200, _} = gun:await(ConnPid, StreamRef1), + {ok, _} = gun:await_body(ConnPid, StreamRef1), + {error, {stream_error, {closed, normal}}} = gun:await(ConnPid, StreamRef2), + {error, {stream_error, {closed, normal}}} = gun:await(ConnPid, StreamRef3), + gun_is_down(ConnPid, ConnRef, normal). + +http1_response_connection_close(_) -> + doc("HTTP/1.1: Confirm that the Gun process shuts down gracefully " + "when receiving a response with the connection: close header and " + "retry is disabled."), + {ok, _} = cowboy:start_clear(?FUNCTION_NAME, [], #{ + env => #{dispatch => cowboy_router:compile([{'_', [{"/", hello_h, []}]}])}, + max_keepalive => 1 + }), + OriginPort = ranch:get_port(?FUNCTION_NAME), + try + Protocol = http, + {ok, ConnPid} = gun:open("localhost", OriginPort, #{ + protocols => [Protocol], + retry => 0 + }), + {ok, Protocol} = gun:await_up(ConnPid), + ConnRef = monitor(process, ConnPid), + StreamRef = gun:get(ConnPid, "/"), + %% We get the response followed by Gun shutting down. + {response, nofin, 200, _} = gun:await(ConnPid, StreamRef), + {ok, _} = gun:await_body(ConnPid, StreamRef), + gun_is_down(ConnPid, ConnRef, normal) + after + cowboy:stop_listener(?FUNCTION_NAME) + end. + +http1_response_connection_close_pipeline(_) -> + doc("HTTP/1.1: Confirm that the Gun process shuts down gracefully " + "when receiving a response with the connection: close header and " + "retry is disabled. Pipelined requests get canceled."), + {ok, _} = cowboy:start_clear(?FUNCTION_NAME, [], #{ + env => #{dispatch => cowboy_router:compile([{'_', [{"/", hello_h, []}]}])}, + max_keepalive => 1 + }), + OriginPort = ranch:get_port(?FUNCTION_NAME), + try + Protocol = http, + {ok, ConnPid} = gun:open("localhost", OriginPort, #{ + protocols => [Protocol], + retry => 0 + }), + {ok, Protocol} = gun:await_up(ConnPid), + ConnRef = monitor(process, ConnPid), + StreamRef1 = gun:get(ConnPid, "/"), + StreamRef2 = gun:get(ConnPid, "/"), + StreamRef3 = gun:get(ConnPid, "/"), + %% We get the response, pipelined streams get canceled, followed by Gun shutting down. + {response, nofin, 200, _} = gun:await(ConnPid, StreamRef1), + {ok, _} = gun:await_body(ConnPid, StreamRef1), + {error, {stream_error, {closed, normal}}} = gun:await(ConnPid, StreamRef2), + {error, {stream_error, {closed, normal}}} = gun:await(ConnPid, StreamRef3), + gun_is_down(ConnPid, ConnRef, normal) + after + cowboy:stop_listener(?FUNCTION_NAME) + end. + +http10_connection_close(Config) -> + doc("HTTP/1.0: Confirm that the Gun process shuts down gracefully " + "when sending a request without a connection header and " + "retry is disabled."), + Protocol = http, + {ok, ConnPid} = gun:open("localhost", config(origin_port, Config), #{ + http_opts => #{version => 'HTTP/1.0'}, + protocols => [Protocol], + retry => 0 + }), + {ok, Protocol} = gun:await_up(ConnPid), + ConnRef = monitor(process, ConnPid), + StreamRef = gun:get(ConnPid, "/"), + %% We get the response followed by Gun shutting down. + {response, nofin, 200, _} = gun:await(ConnPid, StreamRef), + {ok, _} = gun:await_body(ConnPid, StreamRef), + gun_is_down(ConnPid, ConnRef, normal). + +http2_gun_shutdown_no_streams(Config) -> + doc("HTTP/2: Confirm that the Gun process shuts down gracefully " + "when calling gun:shutdown/1 with no active streams."), + do_http_gun_shutdown_no_streams(Config, http2). + +http2_gun_shutdown_one_stream(Config) -> + doc("HTTP/2: Confirm that the Gun process shuts down gracefully " + "when calling gun:shutdown/1 with one active stream."), + do_http_gun_shutdown_one_stream(Config, http2). + +http2_gun_shutdown_many_streams(Config) -> + doc("HTTP/2: Confirm that the Gun process shuts down gracefully " + "when calling gun:shutdown/1 with many active streams."), + Protocol = http2, + {ok, ConnPid} = gun:open("localhost", config(origin_port, Config), #{ + protocols => [Protocol] + }), + {ok, Protocol} = gun:await_up(ConnPid), + ConnRef = monitor(process, ConnPid), + StreamRef1 = gun:get(ConnPid, "/delayed"), + StreamRef2 = gun:get(ConnPid, "/delayed"), + StreamRef3 = gun:get(ConnPid, "/delayed"), + gun:shutdown(ConnPid), + %% All streams are processed. + {response, nofin, 200, _} = gun:await(ConnPid, StreamRef1), + {ok, _} = gun:await_body(ConnPid, StreamRef1), + {response, nofin, 200, _} = gun:await(ConnPid, StreamRef2), + {ok, _} = gun:await_body(ConnPid, StreamRef2), + {response, nofin, 200, _} = gun:await(ConnPid, StreamRef3), + {ok, _} = gun:await_body(ConnPid, StreamRef3), + gun_is_down(ConnPid, ConnRef, shutdown). + +http2_gun_shutdown_timeout(Config) -> + doc("HTTP/2: Confirm that the Gun process shuts down when the closing_timeout " + "triggers after calling gun:shutdown/1 with one active stream."), + do_http_gun_shutdown_timeout(Config, http2, http2_opts). + +http2_gun_shutdown_ignore_push_promise(Config) -> + doc("HTTP/2: Confirm that the Gun process shuts down gracefully " + "when calling gun:shutdown/1 with one active stream. The " + "resource pushed by the server after we sent the GOAWAY frame " + "must be ignored."), + Protocol = http2, + {ok, ConnPid} = gun:open("localhost", config(origin_port, Config), #{ + protocols => [Protocol] + }), + {ok, Protocol} = gun:await_up(ConnPid), + ConnRef = monitor(process, ConnPid), + StreamRef = gun:get(ConnPid, "/delayed_push"), + gun:shutdown(ConnPid), + %% We do not receive the push streams. Only the response. + {response, nofin, 200, _} = gun:await(ConnPid, StreamRef), + {ok, _} = gun:await_body(ConnPid, StreamRef), + gun_is_down(ConnPid, ConnRef, shutdown). + +http2_owner_down(Config) -> + doc("HTTP/2: Confirm that the Gun process shuts down when the owner exits normally."), + do_http_owner_down(Config, http2, normal, normal). + +http2_owner_down_error(Config) -> + doc("HTTP/2: Confirm that the Gun process shuts down when the owner exits with an error."), + do_http_owner_down(Config, http2, unexpected, {shutdown, {owner_down, unexpected}}). + +http2_server_goaway_no_streams(_) -> + doc("HTTP/2: Confirm that the Gun process shuts down gracefully " + "when receiving a GOAWAY frame with no active streams and " + "retry is disabled."), + {ok, _, Port} = init_origin(tcp, http2, fun(_, Socket, Transport) -> + Transport:send(Socket, cow_http2:goaway(0, no_error, <<>>)), + timer:sleep(500) + end), + Protocol = http2, + {ok, ConnPid} = gun:open("localhost", Port, #{ + protocols => [Protocol], + retry => 0 + }), + {ok, Protocol} = gun:await_up(ConnPid), + ConnRef = monitor(process, ConnPid), + gun_is_down(ConnPid, ConnRef, normal). + +http2_server_goaway_one_stream(_) -> + doc("HTTP/2: Confirm that the Gun process shuts down gracefully " + "when receiving a GOAWAY frame with one active stream and " + "retry is disabled."), + {ok, _, OriginPort} = init_origin(tcp, http2, fun(_, Socket, Transport) -> + %% Receive a HEADERS frame. + {ok, <<SkipLen:24, 1:8, _:8, 1:32>>} = Transport:recv(Socket, 9, 1000), + %% Skip the header. + {ok, _} = gen_tcp:recv(Socket, SkipLen, 1000), + %% Skip the data. + {ok, <<_:24, 0:8, _:8, 1:32>>} = Transport:recv(Socket, 9, 1000), + %% Send a GOAWAY frame. + Transport:send(Socket, cow_http2:goaway(1, no_error, <<>>)), + %% Wait before sending the response back and closing the connection. + timer:sleep(500), + %% Send a HEADERS frame. + {HeadersBlock, _} = cow_hpack:encode([ + {<<":status">>, <<"200">>} + ]), + ok = Transport:send(Socket, [ + cow_http2:headers(1, fin, HeadersBlock) + ]), + timer:sleep(500) + end), + Protocol = http2, + {ok, ConnPid} = gun:open("localhost", OriginPort, #{ + protocols => [Protocol], + retry => 0 + }), + {ok, Protocol} = gun:await_up(ConnPid), + timer:sleep(100), %% Give enough time for the handshake to fully complete. + StreamRef = gun:get(ConnPid, "/"), + ConnRef = monitor(process, ConnPid), + {response, fin, 200, _} = gun:await(ConnPid, StreamRef), + gun_is_down(ConnPid, ConnRef, normal). + +http2_server_goaway_many_streams(_) -> + doc("HTTP/2: Confirm that the Gun process shuts down gracefully " + "when receiving a GOAWAY frame with many active streams and " + "retry is disabled."), + {ok, _, OriginPort} = init_origin(tcp, http2, fun(_, Socket, Transport) -> + %% Stream 1. + %% Receive a HEADERS frame. + {ok, <<SkipLen1:24, 1:8, _:8, 1:32>>} = Transport:recv(Socket, 9, 1000), + %% Skip the header. + {ok, _} = gen_tcp:recv(Socket, SkipLen1, 1000), + %% Skip the data. + {ok, <<_:24, 0:8, _:8, 1:32>>} = Transport:recv(Socket, 9, 1000), + %% Stream 2. + %% Receive a HEADERS frame. + {ok, <<SkipLen2:24, 1:8, _:8, 3:32>>} = Transport:recv(Socket, 9, 1000), + %% Skip the header. + {ok, _} = gen_tcp:recv(Socket, SkipLen2, 1000), + %% Skip the data. + {ok, <<_:24, 0:8, _:8, 3:32>>} = Transport:recv(Socket, 9, 1000), + %% Stream 3. + %% Receive a HEADERS frame. + {ok, <<SkipLen3:24, 1:8, _:8, 5:32>>} = Transport:recv(Socket, 9, 1000), + %% Skip the header. + {ok, _} = gen_tcp:recv(Socket, SkipLen3, 1000), + %% Skip the data. + {ok, <<_:24, 0:8, _:8, 5:32>>} = Transport:recv(Socket, 9, 1000), + %% Send a GOAWAY frame. + Transport:send(Socket, cow_http2:goaway(5, no_error, <<>>)), + %% Wait before sending the responses back and closing the connection. + timer:sleep(500), + %% Send a HEADERS frame. + {HeadersBlock1, State0} = cow_hpack:encode([ + {<<":status">>, <<"200">>} + ]), + ok = Transport:send(Socket, [ + cow_http2:headers(1, fin, HeadersBlock1) + ]), + {HeadersBlock2, State} = cow_hpack:encode([ + {<<":status">>, <<"200">>} + ], State0), + ok = Transport:send(Socket, [ + cow_http2:headers(3, fin, HeadersBlock2) + ]), + {HeadersBlock3, _} = cow_hpack:encode([ + {<<":status">>, <<"200">>} + ], State), + ok = Transport:send(Socket, [ + cow_http2:headers(5, fin, HeadersBlock3) + ]), + timer:sleep(500) + end), + Protocol = http2, + {ok, ConnPid} = gun:open("localhost", OriginPort, #{ + protocols => [Protocol], + retry => 0 + }), + {ok, Protocol} = gun:await_up(ConnPid), + timer:sleep(100), %% Give enough time for the handshake to fully complete. + StreamRef1 = gun:get(ConnPid, "/"), + StreamRef2 = gun:get(ConnPid, "/"), + StreamRef3 = gun:get(ConnPid, "/"), + ConnRef = monitor(process, ConnPid), + {response, fin, 200, _} = gun:await(ConnPid, StreamRef1), + {response, fin, 200, _} = gun:await(ConnPid, StreamRef2), + {response, fin, 200, _} = gun:await(ConnPid, StreamRef3), + gun_is_down(ConnPid, ConnRef, normal). + +ws_gun_shutdown(Config) -> + doc("Websocket: Confirm that the Gun process shuts down gracefully " + "when calling gun:shutdown/1."), + {ok, ConnPid} = gun:open("localhost", config(origin_port, Config)), + {ok, http} = gun:await_up(ConnPid), + ConnRef = monitor(process, ConnPid), + StreamRef = gun:ws_upgrade(ConnPid, "/ws", []), + {upgrade, [<<"websocket">>], _} = gun:await(ConnPid, StreamRef), + gun:shutdown(ConnPid), + gun_is_down(ConnPid, ConnRef, shutdown). + +ws_gun_shutdown_timeout(Config) -> + doc("Websocket: Confirm that the Gun process shuts down when " + "the closing_timeout triggers after calling gun:shutdown/1."), + {ok, ConnPid} = gun:open("localhost", config(origin_port, Config), #{ + ws_opts => #{closing_timeout => 100} + }), + {ok, http} = gun:await_up(ConnPid), + ConnRef = monitor(process, ConnPid), + StreamRef = gun:ws_upgrade(ConnPid, "/ws_frozen_long", []), + {upgrade, [<<"websocket">>], _} = gun:await(ConnPid, StreamRef), + gun:shutdown(ConnPid), + gun_is_down(ConnPid, ConnRef, shutdown). + +ws_owner_down(Config) -> + doc("Websocket: Confirm that the Gun process shuts down when the owner exits normally."), + do_ws_owner_down(Config, normal, normal). + +ws_owner_down_error(Config) -> + doc("Websocket: Confirm that the Gun process shuts down when the owner exits with an error."), + do_ws_owner_down(Config, unexpected, {shutdown, {owner_down, unexpected}}). + +do_ws_owner_down(Config, ExitReason, DownReason) -> + Self = self(), + spawn(fun() -> + {ok, ConnPid} = gun:open("localhost", config(origin_port, Config)), + Self ! {conn, ConnPid}, + {ok, http} = gun:await_up(ConnPid), + StreamRef = gun:ws_upgrade(ConnPid, "/ws", []), + {upgrade, [<<"websocket">>], _} = gun:await(ConnPid, StreamRef), + timer:sleep(500), + exit(ExitReason) + end), + ConnPid = receive {conn, C} -> C after 1000 -> error(timeout) end, + ConnRef = monitor(process, ConnPid), + gun_is_down(ConnPid, ConnRef, DownReason). + +ws_gun_send_close_frame(Config) -> + doc("Websocket: Confirm that the Gun process shuts down gracefully " + "when sending a close frame, with retry disabled."), + {ok, ConnPid} = gun:open("localhost", config(origin_port, Config), #{ + retry => 0 + }), + {ok, http} = gun:await_up(ConnPid), + ConnRef = monitor(process, ConnPid), + StreamRef = gun:ws_upgrade(ConnPid, "/ws", []), + {upgrade, [<<"websocket">>], _} = gun:await(ConnPid, StreamRef), + %% We send a close frame. We expect the same frame back + %% before the connection is closed. + Frame = {close, 3333, <<>>}, + gun:ws_send(ConnPid, Frame), + {ws, Frame} = gun:await(ConnPid, StreamRef), + gun_is_down(ConnPid, ConnRef, normal). + +ws_gun_receive_close_frame(Config) -> + doc("Websocket: Confirm that the Gun process shuts down gracefully " + "when receiving a close frame, with retry disabled."), + {ok, ConnPid} = gun:open("localhost", config(origin_port, Config), #{ + retry => 0 + }), + {ok, http} = gun:await_up(ConnPid), + ConnRef = monitor(process, ConnPid), + StreamRef = gun:ws_upgrade(ConnPid, "/ws_timeout_close", []), + {upgrade, [<<"websocket">>], _} = gun:await(ConnPid, StreamRef), + %% We expect a close frame before the connection is closed. + {ws, {close, 3333, <<>>}} = gun:await(ConnPid, StreamRef), + gun_is_down(ConnPid, ConnRef, normal). + +closing_gun_shutdown(Config) -> + doc("Confirm that the Gun process shuts down gracefully " + "when calling gun:shutdown/1 while Gun is closing a connection."), + {ok, ConnPid} = gun:open("localhost", config(origin_port, Config)), + {ok, http} = gun:await_up(ConnPid), + ConnRef = monitor(process, ConnPid), + StreamRef = gun:ws_upgrade(ConnPid, "/ws_frozen", []), + {upgrade, [<<"websocket">>], _} = gun:await(ConnPid, StreamRef), + %% We send a close frame then immediately call gun:shutdown/1. + %% We expect Gun to go down without retrying to reconnect. + Frame = {close, 3333, <<>>}, + gun:ws_send(ConnPid, Frame), + gun:shutdown(ConnPid), + {ws, Frame} = gun:await(ConnPid, StreamRef), + gun_is_down(ConnPid, ConnRef, shutdown). + +closing_owner_down(Config) -> + doc("Confirm that the Gun process shuts down gracefully " + "when the owner exits normally while Gun is closing a connection."), + do_closing_owner_down(Config, normal, normal). + +closing_owner_down_error(Config) -> + doc("Confirm that the Gun process shuts down gracefully " + "when the owner exits with an error while Gun is closing a connection."), + do_closing_owner_down(Config, unexpected, {shutdown, {owner_down, unexpected}}). + +do_closing_owner_down(Config, ExitReason, DownReason) -> + Self = self(), + spawn(fun() -> + {ok, ConnPid} = gun:open("localhost", config(origin_port, Config)), + Self ! {conn, ConnPid}, + {ok, http} = gun:await_up(ConnPid), + StreamRef = gun:ws_upgrade(ConnPid, "/ws_frozen", []), + {upgrade, [<<"websocket">>], _} = gun:await(ConnPid, StreamRef), + gun:ws_send(ConnPid, {close, 3333, <<>>}), + timer:sleep(100), + exit(ExitReason) + end), + ConnPid = receive {conn, C} -> C after 1000 -> error(timeout) end, + ConnRef = monitor(process, ConnPid), + gun_is_down(ConnPid, ConnRef, DownReason). + +%% Internal. + +gun_is_down(ConnPid, ConnRef, Expected) -> + receive + {'DOWN', ConnRef, process, ConnPid, Reason} -> + Expected = Reason, + ok + after 1000 -> + true = erlang:is_process_alive(ConnPid), + error(timeout) + end. diff --git a/test/ws_SUITE.erl b/test/ws_SUITE.erl index 5cc50ec..1abf046 100644 --- a/test/ws_SUITE.erl +++ b/test/ws_SUITE.erl @@ -68,3 +68,30 @@ reject_upgrade(Config) -> after 1000 -> error(timeout) end. + +send_many(Config) -> + doc("Ensure we can send a list of frames in one gun:ws_send call."), + {ok, ConnPid} = gun:open("localhost", config(port, Config)), + {ok, _} = gun:await_up(ConnPid), + StreamRef = gun:ws_upgrade(ConnPid, "/", []), + {upgrade, [<<"websocket">>], _} = gun:await(ConnPid, StreamRef), + Frame1 = {text, <<"Hello!">>}, + Frame2 = {binary, <<"World!">>}, + gun:ws_send(ConnPid, [Frame1, Frame2]), + {ws, Frame1} = gun:await(ConnPid, StreamRef), + {ws, Frame2} = gun:await(ConnPid, StreamRef), + gun:close(ConnPid). + +send_many_close(Config) -> + doc("Ensure we can send a list of frames in one gun:ws_send call, including a close frame."), + {ok, ConnPid} = gun:open("localhost", config(port, Config)), + {ok, _} = gun:await_up(ConnPid), + StreamRef = gun:ws_upgrade(ConnPid, "/", []), + {upgrade, [<<"websocket">>], _} = gun:await(ConnPid, StreamRef), + Frame1 = {text, <<"Hello!">>}, + Frame2 = {binary, <<"World!">>}, + gun:ws_send(ConnPid, [Frame1, Frame2, close]), + {ws, Frame1} = gun:await(ConnPid, StreamRef), + {ws, Frame2} = gun:await(ConnPid, StreamRef), + {ws, close} = gun:await(ConnPid, StreamRef), + gun:close(ConnPid). |