diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/gun.erl | 300 | ||||
-rw-r--r-- | src/gun_event.erl | 5 | ||||
-rw-r--r-- | src/gun_http.erl | 56 | ||||
-rw-r--r-- | src/gun_http2.erl | 144 | ||||
-rw-r--r-- | src/gun_ws.erl | 94 |
5 files changed, 400 insertions, 199 deletions
diff --git a/src/gun.erl b/src/gun.erl index 7b06aaf..bb659e7 100644 --- a/src/gun.erl +++ b/src/gun.erl @@ -98,6 +98,7 @@ -export([connecting/3]). -export([tls_handshake/3]). -export([connected/3]). +-export([closing/3]). -export([terminate/3]). -type req_headers() :: [{binary() | string() | atom(), iodata()}] @@ -165,19 +166,24 @@ -export_type([req_opts/0]). -type http_opts() :: #{ - keepalive => timeout(), + closing_timeout => timeout(), + flow => pos_integer(), + keepalive => timeout(), transform_header_name => fun((binary()) -> binary()), - version => 'HTTP/1.1' | 'HTTP/1.0' + version => 'HTTP/1.1' | 'HTTP/1.0' }. -export_type([http_opts/0]). -type http2_opts() :: #{ + closing_timeout => timeout(), + flow => pos_integer(), keepalive => timeout() }. -export_type([http2_opts/0]). %% @todo keepalive -type ws_opts() :: #{ + closing_timeout => timeout(), compress => boolean(), flow => pos_integer(), protocols => [{binary(), module()}] @@ -186,7 +192,7 @@ -record(state, { owner :: pid(), - owner_ref :: reference(), + status :: {up, reference()} | {down, any()} | shutdown, host :: inet:hostname() | inet:ip_address(), port :: inet:port_number(), origin_scheme :: binary(), @@ -781,7 +787,7 @@ init({Owner, Host, Port, Opts}) -> origin_port => Port, opts => Opts }, EvHandlerState0), - State = #state{owner=Owner, owner_ref=OwnerRef, + State = #state{owner=Owner, status={up, OwnerRef}, host=Host, port=Port, origin_scheme=OriginScheme, origin_host=Host, origin_port=Port, opts=Opts, transport=Transport, messages=Transport:messages(), @@ -875,11 +881,7 @@ connecting(_, {retries, Retries, LookupInfo}, State=#state{opts=Opts, }, EvHandlerState1), {next_state, not_connected, State#state{event_handler_state=EvHandlerState}, {next_event, internal, {retries, Retries, Reason}}} - end; -connecting({call, From}, {stream_info, _}, _) -> - {keep_state_and_data, {reply, From, {error, not_connected}}}; -connecting(Type, Event, State) -> - handle_common(Type, Event, ?FUNCTION_NAME, State). + end. tls_handshake(_, {retries, Retries, Socket0}, State=#state{opts=Opts, event_handler=EvHandler, event_handler_state=EvHandlerState0}) -> @@ -910,11 +912,7 @@ tls_handshake(_, {retries, Retries, Socket0}, State=#state{opts=Opts, }, EvHandlerState1), {next_state, not_connected, State#state{event_handler_state=EvHandlerState}, {next_event, internal, {retries, Retries, Reason}}} - end; -tls_handshake({call, From}, {stream_info, _}, _) -> - {keep_state_and_data, {reply, From, {error, not_connected}}}; -tls_handshake(Type, Event, State) -> - handle_common(Type, Event, ?FUNCTION_NAME, State). + end. ensure_alpn(Protocols0, TransOpts) -> Protocols = [case P of @@ -937,28 +935,6 @@ connected(internal, {connected, Socket, Protocol}, Owner ! {gun_up, self(), Protocol:name()}, {keep_state, keepalive_timeout(active(State#state{socket=Socket, protocol=Protocol, protocol_state=ProtoState}))}; -%% Socket events. -connected(info, {OK, Socket, Data}, State0=#state{socket=Socket, messages={OK, _, _}, - protocol=Protocol, protocol_state=ProtoState, - event_handler=EvHandler, event_handler_state=EvHandlerState0}) -> - {Commands, EvHandlerState} = Protocol:handle(Data, ProtoState, EvHandler, EvHandlerState0), - case commands(Commands, State0#state{event_handler_state=EvHandlerState}) of - {keep_state, State} -> - {keep_state, active(State)}; - Res -> - Res - end; -connected(info, {Closed, Socket}, State=#state{socket=Socket, messages={_, Closed, _}}) -> - disconnect(State, closed); -connected(info, {Error, Socket, Reason}, State=#state{socket=Socket, messages={_, _, Error}}) -> - disconnect(State, {error, Reason}); -%% Timeouts. -%% @todo HTTP/2 requires more timeouts than just the keepalive timeout. -%% We should have a timeout function in protocols that deal with -%% received timeouts. Currently the timeout messages are ignored. -connected(info, keepalive, State=#state{protocol=Protocol, protocol_state=ProtoState}) -> - ProtoState2 = Protocol:keepalive(ProtoState), - {keep_state, keepalive_timeout(State#state{protocol_state=ProtoState2})}; %% Public HTTP interface. connected(cast, {headers, ReplyTo, StreamRef, Method, Path, Headers, InitialFlow}, State=#state{origin_host=Host, origin_port=Port, @@ -976,14 +952,6 @@ connected(cast, {request, ReplyTo, StreamRef, Method, Path, Headers, Body, Initi StreamRef, ReplyTo, Method, Host, Port, Path, Headers, Body, InitialFlow, EvHandler, EvHandlerState0), {keep_state, State#state{protocol_state=ProtoState2, event_handler_state=EvHandlerState}}; -%% @todo Do we want to reject ReplyTo if it's not the process -%% who initiated the connection? For both data and cancel. -connected(cast, {data, ReplyTo, StreamRef, IsFin, Data}, - State=#state{protocol=Protocol, protocol_state=ProtoState, - event_handler=EvHandler, event_handler_state=EvHandlerState0}) -> - {ProtoState2, EvHandlerState} = Protocol:data(ProtoState, - StreamRef, ReplyTo, IsFin, Data, EvHandler, EvHandlerState0), - {keep_state, State#state{protocol_state=ProtoState2, event_handler_state=EvHandlerState}}; connected(cast, {connect, ReplyTo, StreamRef, Destination0, Headers, InitialFlow}, State=#state{protocol=Protocol, protocol_state=ProtoState}) -> %% The protocol option has been deprecated in favor of the protocols option. @@ -1006,41 +974,6 @@ connected(cast, {connect, ReplyTo, StreamRef, Destination0, Headers, InitialFlow end, ProtoState2 = Protocol:connect(ProtoState, StreamRef, ReplyTo, Destination, Headers, InitialFlow), {keep_state, State#state{protocol_state=ProtoState2}}; -%% When using gun_tls_proxy we need a separate message to know whether -%% the handshake succeeded and whether we need to switch to a different protocol. -connected(info, {gun_tls_proxy, Socket, {ok, NewProtocol}, HandshakeEvent}, - State0=#state{socket=Socket, protocol=CurrentProtocol, protocol_state=ProtoState, - event_handler=EvHandler, event_handler_state=EvHandlerState0}) -> - EvHandlerState = EvHandler:tls_handshake_end(HandshakeEvent#{ - socket => Socket, - protocol => NewProtocol:name() - }, EvHandlerState0), - State = State0#state{event_handler_state=EvHandlerState}, - case NewProtocol of - CurrentProtocol -> {keep_state, State}; - _ -> commands([{switch_protocol, NewProtocol, ProtoState}], State) - end; -connected(info, {gun_tls_proxy, Socket, Error = {error, Reason}, HandshakeEvent}, - State=#state{socket=Socket, event_handler=EvHandler, event_handler_state=EvHandlerState0}) -> - EvHandlerState = EvHandler:tls_handshake_end(HandshakeEvent#{ - error => Reason - }, EvHandlerState0), - commands([Error], State#state{event_handler_state=EvHandlerState}); -connected(cast, {update_flow, ReplyTo, StreamRef, Flow}, State0=#state{ - protocol=Protocol, protocol_state=ProtoState}) -> - Commands = Protocol:update_flow(ProtoState, ReplyTo, StreamRef, Flow), - case commands(Commands, State0) of - {keep_state, State} -> - {keep_state, active(State)}; - Res -> - Res - end; -connected(cast, {cancel, ReplyTo, StreamRef}, State=#state{ - protocol=Protocol, protocol_state=ProtoState, - event_handler=EvHandler, event_handler_state=EvHandlerState0}) -> - {ProtoState2, EvHandlerState} = Protocol:cancel(ProtoState, - StreamRef, ReplyTo, EvHandler, EvHandlerState0), - {keep_state, State#state{protocol_state=ProtoState2, event_handler_state=EvHandlerState}}; %% Public Websocket interface. %% @todo Maybe make an interface in the protocol module instead of checking on protocol name. %% An interface would also make sure that HTTP/1.0 can't upgrade. @@ -1067,40 +1000,148 @@ connected(cast, {ws_upgrade, ReplyTo, StreamRef, _, _, _}, _) -> ReplyTo ! {gun_error, self(), StreamRef, {badstate, "Websocket is only supported over HTTP/1.1."}}, keep_state_and_data; -connected(cast, {ws_send, Owner, Frame}, State=#state{ +connected(cast, {ws_send, Owner, Frames}, State=#state{ owner=Owner, protocol=Protocol=gun_ws, protocol_state=ProtoState, event_handler=EvHandler, event_handler_state=EvHandlerState0}) -> - {Commands, EvHandlerState} = Protocol:send(Frame, ProtoState, EvHandler, EvHandlerState0), + {Commands, EvHandlerState} = Protocol:send(Frames, ProtoState, EvHandler, EvHandlerState0), commands(Commands, State#state{event_handler_state=EvHandlerState}); connected(cast, {ws_send, ReplyTo, _}, _) -> ReplyTo ! {gun_error, self(), {badstate, "Connection needs to be upgraded to Websocket " "before the gun:ws_send/1 function can be used."}}, keep_state_and_data; -connected({call, From}, {stream_info, StreamRef}, +connected(Type, Event, State) -> + handle_common_connected(Type, Event, ?FUNCTION_NAME, State). + +%% Switch to the graceful connection close state. +closing(State=#state{protocol=Protocol, protocol_state=ProtoState, + event_handler=EvHandler, event_handler_state=EvHandlerState0}, Reason) -> + {Commands, EvHandlerState} = Protocol:closing(Reason, ProtoState, EvHandler, EvHandlerState0), + commands(Commands, State#state{event_handler_state=EvHandlerState}). + +%% @todo Should explicitly reject ws_send in this state? +closing(state_timeout, closing_timeout, State=#state{status=Status}) -> + Reason = case Status of + shutdown -> shutdown; + {down, _} -> owner_down; + _ -> normal + end, + disconnect(State, Reason); +closing(Type, Event, State) -> + handle_common_connected(Type, Event, ?FUNCTION_NAME, State). + +%% Common events when we have a connection. +%% +%% Socket events. +handle_common_connected(info, {OK, Socket, Data}, _, State0=#state{socket=Socket, messages={OK, _, _}, + protocol=Protocol, protocol_state=ProtoState, + event_handler=EvHandler, event_handler_state=EvHandlerState0}) -> + {Commands, EvHandlerState} = Protocol:handle(Data, ProtoState, EvHandler, EvHandlerState0), + case commands(Commands, State0#state{event_handler_state=EvHandlerState}) of + {keep_state, State} -> + {keep_state, active(State)}; + {next_state, closing, State, Actions} -> + {next_state, closing, active(State), Actions}; + Res -> + Res + end; +handle_common_connected(info, {Closed, Socket}, _, State=#state{socket=Socket, messages={_, Closed, _}}) -> + disconnect(State, closed); +handle_common_connected(info, {Error, Socket, Reason}, _, State=#state{socket=Socket, messages={_, _, Error}}) -> + disconnect(State, {error, Reason}); +%% Timeouts. +%% @todo HTTP/2 requires more timeouts than just the keepalive timeout. +%% We should have a timeout function in protocols that deal with +%% received timeouts. Currently the timeout messages are ignored. +handle_common_connected(info, keepalive, _, State=#state{protocol=Protocol, protocol_state=ProtoState}) -> + ProtoState2 = Protocol:keepalive(ProtoState), + {keep_state, keepalive_timeout(State#state{protocol_state=ProtoState2})}; +%% When using gun_tls_proxy we need a separate message to know whether +%% the handshake succeeded and whether we need to switch to a different protocol. +handle_common_connected(info, {gun_tls_proxy, Socket, {ok, NewProtocol}, HandshakeEvent}, _, + State0=#state{socket=Socket, protocol=CurrentProtocol, protocol_state=ProtoState, + event_handler=EvHandler, event_handler_state=EvHandlerState0}) -> + EvHandlerState = EvHandler:tls_handshake_end(HandshakeEvent#{ + socket => Socket, + protocol => NewProtocol:name() + }, EvHandlerState0), + State = State0#state{event_handler_state=EvHandlerState}, + case NewProtocol of + CurrentProtocol -> {keep_state, State}; + _ -> commands([{switch_protocol, NewProtocol, ProtoState}], State) + end; +handle_common_connected(info, {gun_tls_proxy, Socket, Error = {error, Reason}, HandshakeEvent}, _, + State=#state{socket=Socket, event_handler=EvHandler, event_handler_state=EvHandlerState0}) -> + EvHandlerState = EvHandler:tls_handshake_end(HandshakeEvent#{ + error => Reason + }, EvHandlerState0), + commands([Error], State#state{event_handler_state=EvHandlerState}); +%% @todo Do we want to reject ReplyTo if it's not the process +%% who initiated the connection? For both data and cancel. +handle_common_connected(cast, {data, ReplyTo, StreamRef, IsFin, Data}, _, + State=#state{protocol=Protocol, protocol_state=ProtoState, + event_handler=EvHandler, event_handler_state=EvHandlerState0}) -> + {ProtoState2, EvHandlerState} = Protocol:data(ProtoState, + StreamRef, ReplyTo, IsFin, Data, EvHandler, EvHandlerState0), + {keep_state, State#state{protocol_state=ProtoState2, event_handler_state=EvHandlerState}}; +handle_common_connected(cast, {update_flow, ReplyTo, StreamRef, Flow}, _, State0=#state{ + protocol=Protocol, protocol_state=ProtoState}) -> + Commands = Protocol:update_flow(ProtoState, ReplyTo, StreamRef, Flow), + case commands(Commands, State0) of + {keep_state, State} -> + {keep_state, active(State)}; + Res -> + Res + end; +handle_common_connected(cast, {cancel, ReplyTo, StreamRef}, _, State=#state{ + protocol=Protocol, protocol_state=ProtoState, + event_handler=EvHandler, event_handler_state=EvHandlerState0}) -> + {ProtoState2, EvHandlerState} = Protocol:cancel(ProtoState, + StreamRef, ReplyTo, EvHandler, EvHandlerState0), + {keep_state, State#state{protocol_state=ProtoState2, event_handler_state=EvHandlerState}}; +handle_common_connected({call, From}, {stream_info, StreamRef}, _, #state{protocol=Protocol, protocol_state=ProtoState}) -> {keep_state_and_data, {reply, From, Protocol:stream_info(ProtoState, StreamRef)}}; -connected(Type, Event, State) -> - handle_common(Type, Event, ?FUNCTION_NAME, State). +handle_common_connected(Type, Event, StateName, State) -> + handle_common(Type, Event, StateName, State). %% Common events. -handle_common(cast, {shutdown, Owner}, _, #state{owner=Owner}) -> - %% @todo Graceful shutdown. - stop; +handle_common(cast, {shutdown, Owner}, StateName, State=#state{ + owner=Owner, status=Status, socket=Socket, transport=Transport, protocol=Protocol}) -> + case {Socket, Protocol} of + {undefined, _} -> + {stop, shutdown}; + {_, undefined} -> + %% @todo This is missing the disconnect event. + Transport:close(Socket), + {stop, shutdown}; + _ when StateName =:= closing, element(1, Status) =:= up -> + {keep_state, status(State, shutdown)}; + _ when StateName =:= closing -> + keep_state_and_data; + _ -> + closing(status(State, shutdown), shutdown) + end; %% We stop when the owner is down. -handle_common(info, {'DOWN', OwnerRef, process, Owner, Reason}, _, State=#state{ - owner=Owner, owner_ref=OwnerRef, socket=Socket, transport=Transport, - protocol=Protocol, protocol_state=ProtoState, - event_handler=EvHandler, event_handler_state=EvHandlerState0}) -> - {_, EvHandlerState} = case Protocol of - undefined -> {ok, EvHandlerState0}; - _ -> Protocol:close(owner_down, ProtoState, EvHandler, EvHandlerState0) - end, - _ = case Socket of - undefined -> ok; - _ -> Transport:close(Socket) - end, - owner_down(Reason, State#state{event_handler_state=EvHandlerState}); +%% @todo We need to demonitor/flush when the status is no longer up. +handle_common(info, {'DOWN', OwnerRef, process, Owner, Reason}, StateName, State=#state{ + owner=Owner, status={up, OwnerRef}, socket=Socket, transport=Transport, protocol=Protocol}) -> + case Socket of + undefined -> + owner_down(Reason, State); + _ -> + case Protocol of + undefined -> + %% @todo This is missing the disconnect event. + Transport:close(Socket), + owner_down(Reason, State); + %% We are already closing so no need to initiate closing again. + _ when StateName =:= closing -> + {keep_state, status(State, {down, Reason})}; + _ -> + closing(status(State, {down, Reason}), owner_down) + end + end; handle_common({call, From}, _, _, _) -> {keep_state_and_data, {reply, From, {error, bad_call}}}; %% @todo The ReplyTo patch disabled the notowner behavior. @@ -1123,6 +1164,9 @@ commands([], State) -> {keep_state, State}; commands([close|_], State) -> disconnect(State, normal); +commands([{closing, Timeout}|_], State) -> + {next_state, closing, keepalive_cancel(State), + {state_timeout, Timeout, closing_timeout}}; commands([Error={error, _}|_], State) -> disconnect(State, Error); commands([{active, Active}|Tail], State) when is_boolean(Active) -> @@ -1176,33 +1220,37 @@ commands([{switch_protocol, Protocol, _ProtoState0}|Tail], State=#state{ commands(Tail, keepalive_timeout(State#state{protocol=Protocol, protocol_state=ProtoState, event_handler_state=EvHandlerState})). -disconnect(State=#state{owner=Owner, opts=Opts, +disconnect(State0=#state{owner=Owner, status=Status, opts=Opts, socket=Socket, transport=Transport, protocol=Protocol, protocol_state=ProtoState, event_handler=EvHandler, event_handler_state=EvHandlerState0}, Reason) -> - {_, EvHandlerState1} = Protocol:close(Reason, ProtoState, EvHandler, EvHandlerState0), - %% @todo Need a special state for orderly shutdown of a connection. + EvHandlerState1 = Protocol:close(Reason, ProtoState, EvHandler, EvHandlerState0), _ = Transport:close(Socket), - %% We closed the socket, discard any remaining socket events. - disconnect_flush(State), - %% @todo Stop keepalive timeout, flush message. - {KilledStreams, UnprocessedStreams} = Protocol:down(ProtoState), - Owner ! {gun_down, self(), Protocol:name(), Reason, KilledStreams, UnprocessedStreams}, - %% Trigger the disconnect event. - DisconnectEvent = #{ - reason => Reason - }, - EvHandlerState = EvHandler:disconnect(DisconnectEvent, EvHandlerState1), - Retry = maps:get(retry, Opts, 5), - case Retry of - 0 -> - {stop, {shutdown, Reason}, State#state{event_handler_state=EvHandlerState}}; - _ -> - {next_state, not_connected, - keepalive_cancel(State#state{socket=undefined, - protocol=undefined, protocol_state=undefined, - event_handler_state=EvHandlerState}), - {next_event, internal, {retries, Retry - 1, Reason}}} + EvHandlerState = EvHandler:disconnect(#{reason => Reason}, EvHandlerState1), + State = State0#state{event_handler_state=EvHandlerState}, + case Status of + {down, DownReason} -> + owner_down(DownReason, State); + shutdown -> + {stop, shutdown, State}; + {up, _} -> + %% We closed the socket, discard any remaining socket events. + disconnect_flush(State), + %% @todo Stop keepalive timeout, flush message. + {KilledStreams, UnprocessedStreams} = Protocol:down(ProtoState), + Owner ! {gun_down, self(), Protocol:name(), Reason, KilledStreams, UnprocessedStreams}, + Retry = maps:get(retry, Opts, 5), + case Retry of + 0 when Reason =:= normal -> + {stop, normal, State}; + 0 -> + {stop, {shutdown, Reason}, State}; + _ -> + {next_state, not_connected, + keepalive_cancel(State#state{socket=undefined, + protocol=undefined, protocol_state=undefined}), + {next_event, internal, {retries, Retry - 1, Reason}}} + end end. disconnect_flush(State=#state{socket=Socket, messages={OK, Closed, Error}}) -> @@ -1220,6 +1268,12 @@ active(State=#state{socket=Socket, transport=Transport}) -> Transport:setopts(Socket, [{active, once}]), State. +status(State=#state{status={up, OwnerRef}}, NewStatus) -> + demonitor(OwnerRef, [flush]), + State#state{status=NewStatus}; +status(State, NewStatus) -> + State#state{status=NewStatus}. + keepalive_timeout(State=#state{opts=Opts, protocol=Protocol}) -> {ProtoOptsKey, Default} = case Protocol of gun_http -> {http_opts, infinity}; diff --git a/src/gun_event.erl b/src/gun_event.erl index 1e158c4..a984796 100644 --- a/src/gun_event.erl +++ b/src/gun_event.erl @@ -113,7 +113,10 @@ -type push_promise_end_event() :: #{ stream_ref := reference(), reply_to := pid(), - promised_stream_ref := reference(), + %% No stream is created if we receive the push_promise while + %% in the process of gracefully shutting down the connection. + %% The promised stream is canceled immediately. + promised_stream_ref => reference(), method := binary(), uri := binary(), headers := [{binary(), iodata()}] diff --git a/src/gun_http.erl b/src/gun_http.erl index ec268ad..309772e 100644 --- a/src/gun_http.erl +++ b/src/gun_http.erl @@ -19,6 +19,7 @@ -export([init/4]). -export([handle/4]). -export([update_flow/4]). +-export([closing/4]). -export([close/4]). -export([keepalive/1]). -export([headers/11]). @@ -71,6 +72,10 @@ check_options(Opts) -> do_check_options([]) -> ok; +do_check_options([{closing_timeout, infinity}|Opts]) -> + do_check_options(Opts); +do_check_options([{closing_timeout, T}|Opts]) when is_integer(T), T > 0 -> + do_check_options(Opts); do_check_options([Opt={content_handlers, Handlers}|Opts]) -> case gun_content_handler:check_option(Handlers) of ok -> do_check_options(Opts); @@ -460,26 +465,47 @@ update_flow(State=#http_state{streams=Streams0}, _ReplyTo, StreamRef, Inc) -> end || Tuple = #stream{ref=Ref, flow=Flow} <- Streams0], {state, State#http_state{streams=Streams}}. -%% @todo Use Reason. -close(_, State=#http_state{in=body_close, streams=[#stream{ref=StreamRef, reply_to=ReplyTo}|Tail]}, - EvHandler, EvHandlerState0) -> +%% We can immediately close the connection when there's no streams. +closing(_, #http_state{streams=[]}, _, EvHandlerState) -> + {close, EvHandlerState}; +%% Otherwise we set connection: close (even if the header was not sent) +%% and close any pipelined streams, only keeping the active stream. +closing(Reason, State=#http_state{streams=[LastStream|Tail]}, _, EvHandlerState) -> + close_streams(Tail, {closing, Reason}), + {[ + {state, State#http_state{connection=close, streams=[LastStream]}}, + closing(State) + ], EvHandlerState}. + +closing(#http_state{opts=Opts}) -> + Timeout = maps:get(closing_timeout, Opts, 15000), + {closing, Timeout}. + +close(Reason, State=#http_state{in=body_close, + streams=[#stream{ref=StreamRef, reply_to=ReplyTo}|Tail]}, + EvHandler, EvHandlerState) -> + %% We may have more than one stream in case we somehow close abruptly. + close_streams(Tail, close_reason(Reason)), _ = send_data(<<>>, State, fin), - EvHandlerState = EvHandler:response_end(#{ + EvHandler:response_end(#{ stream_ref => StreamRef, reply_to => ReplyTo - }, EvHandlerState0), - {close_streams(Tail), EvHandlerState}; -close(_, #http_state{streams=Streams}, _, EvHandlerState) -> - {close_streams(Streams), EvHandlerState}. + }, EvHandlerState); +close(Reason, #http_state{streams=Streams}, _, EvHandlerState) -> + close_streams(Streams, close_reason(Reason)), + EvHandlerState. + +close_reason(closed) -> closed; +close_reason(Reason) -> {closed, Reason}. -close_streams([]) -> +%% @todo Do we want an event for this? +close_streams([], _) -> ok; -close_streams([#stream{is_alive=false}|Tail]) -> - close_streams(Tail); -close_streams([#stream{ref=StreamRef, reply_to=ReplyTo}|Tail]) -> - ReplyTo ! {gun_error, self(), StreamRef, {closed, - "The connection was lost."}}, - close_streams(Tail). +close_streams([#stream{is_alive=false}|Tail], Reason) -> + close_streams(Tail, Reason); +close_streams([#stream{ref=StreamRef, reply_to=ReplyTo}|Tail], Reason) -> + ReplyTo ! {gun_error, self(), StreamRef, Reason}, + close_streams(Tail, Reason). %% We don't send a keep-alive when a CONNECT request was initiated. keepalive(State=#http_state{streams=[#stream{ref={connect, _, _}}]}) -> diff --git a/src/gun_http2.erl b/src/gun_http2.erl index 3b3b79b..5942037 100644 --- a/src/gun_http2.erl +++ b/src/gun_http2.erl @@ -19,6 +19,7 @@ -export([init/4]). -export([handle/4]). -export([update_flow/4]). +-export([closing/4]). -export([close/4]). -export([keepalive/1]). -export([headers/11]). @@ -53,6 +54,10 @@ content_handlers :: gun_content_handler:opt(), buffer = <<>> :: binary(), + %% Current status of the connection. We use this to ensure we are + %% not sending the GOAWAY frame more than once. + status = connected :: connected | goaway | closing, + %% HTTP/2 state machine. http2_machine :: cow_http2_machine:http2_machine(), @@ -66,6 +71,10 @@ check_options(Opts) -> do_check_options([]) -> ok; +do_check_options([{closing_timeout, infinity}|Opts]) -> + do_check_options(Opts); +do_check_options([{closing_timeout, T}|Opts]) when is_integer(T), T > 0 -> + do_check_options(Opts); do_check_options([Opt={content_handlers, Handlers}|Opts]) -> case gun_content_handler:check_option(Handlers) of ok -> do_check_options(Opts); @@ -99,7 +108,8 @@ handle(Data, State=#http2_state{buffer=Buffer}, EvHandler, EvHandlerState) -> parse(<< Buffer/binary, Data/binary >>, State#http2_state{buffer= <<>>}, EvHandler, EvHandlerState). -parse(Data, State0=#http2_state{http2_machine=HTTP2Machine}, EvHandler, EvHandlerState0) -> +parse(Data, State0=#http2_state{status=Status, http2_machine=HTTP2Machine, streams=Streams}, + EvHandler, EvHandlerState0) -> MaxFrameSize = cow_http2_machine:get_local_setting(max_frame_size, HTTP2Machine), case cow_http2:parse(Data, MaxFrameSize) of {ok, Frame, Rest} -> @@ -116,7 +126,14 @@ parse(Data, State0=#http2_state{http2_machine=HTTP2Machine}, EvHandler, EvHandle parse(Rest, reset_stream(State0, StreamID, {stream_error, Reason, Human}), EvHandler, EvHandlerState0); Error = {connection_error, _, _} -> - {terminate(State0, Error), EvHandlerState0}; + {connection_error(State0, Error), EvHandlerState0}; + %% If we both received and sent a GOAWAY frame and there are no streams + %% currently running, we can close the connection immediately. + more when Status =/= connected, Streams =:= [] -> + {[{state, State0#http2_state{buffer=Data, status=closing}}, close], EvHandlerState0}; + %% Otherwise we enter the closing state. + more when Status =:= goaway -> + {[{state, State0#http2_state{buffer=Data, status=closing}}, closing(State0)], EvHandlerState0}; more -> {{state, State0#http2_state{buffer=Data}}, EvHandlerState0} end. @@ -169,9 +186,8 @@ frame(State=#http2_state{http2_machine=HTTP2Machine0}, Frame, EvHandler, EvHandl push_promise_frame(State#http2_state{http2_machine=HTTP2Machine}, StreamID, PromisedStreamID, Headers, PseudoHeaders, EvHandler, EvHandlerState); - {ok, Frame={goaway, _StreamID, _Reason, _Data}, HTTP2Machine} -> - {terminate(State#http2_state{http2_machine=HTTP2Machine}, - {stop, Frame, 'Server is going away.'}), + {ok, GoAway={goaway, _, _, _}, HTTP2Machine} -> + {goaway(State#http2_state{http2_machine=HTTP2Machine}, GoAway), EvHandlerState}; {send, SendData, HTTP2Machine} -> send_data(maybe_ack(State#http2_state{http2_machine=HTTP2Machine}, Frame), SendData, @@ -181,7 +197,7 @@ frame(State=#http2_state{http2_machine=HTTP2Machine0}, Frame, EvHandler, EvHandl StreamID, {stream_error, Reason, Human}), EvHandlerState}; {error, Error={connection_error, _, _}, HTTP2Machine} -> - {terminate(State#http2_state{http2_machine=HTTP2Machine}, Error), + {connection_error(State#http2_state{http2_machine=HTTP2Machine}, Error), EvHandlerState} end. @@ -318,7 +334,8 @@ rst_stream_frame(State=#http2_state{streams=Streams0}, end. %% Pushed streams receive the same initial flow value as the parent stream. -push_promise_frame(State=#http2_state{streams=Streams}, +push_promise_frame(State=#http2_state{socket=Socket, transport=Transport, + status=Status, http2_machine=HTTP2Machine0, streams=Streams}, StreamID, PromisedStreamID, Headers, #{ method := Method, scheme := Scheme, authority := Authority, path := Path}, @@ -326,25 +343,39 @@ push_promise_frame(State=#http2_state{streams=Streams}, #stream{ref=StreamRef, reply_to=ReplyTo, flow=InitialFlow} = get_stream_by_id(State, StreamID), PromisedStreamRef = make_ref(), URI = iolist_to_binary([Scheme, <<"://">>, Authority, Path]), - ReplyTo ! {gun_push, self(), StreamRef, PromisedStreamRef, Method, URI, Headers}, - EvHandlerState = EvHandler:push_promise_end(#{ + PushPromiseEvent0 = #{ stream_ref => StreamRef, reply_to => ReplyTo, - promised_stream_ref => PromisedStreamRef, method => Method, uri => URI, headers => Headers - }, EvHandlerState0), - NewStream = #stream{id=PromisedStreamID, ref=PromisedStreamRef, - reply_to=ReplyTo, flow=InitialFlow}, - {State#http2_state{streams=[NewStream|Streams]}, EvHandlerState}. + }, + PushPromiseEvent = case Status of + connected -> + ReplyTo ! {gun_push, self(), StreamRef, PromisedStreamRef, Method, URI, Headers}, + PushPromiseEvent0#{promised_stream_ref => PromisedStreamRef}; + _ -> + PushPromiseEvent0 + end, + EvHandlerState = EvHandler:push_promise_end(PushPromiseEvent, EvHandlerState0), + case Status of + connected -> + NewStream = #stream{id=PromisedStreamID, ref=PromisedStreamRef, + reply_to=ReplyTo, flow=InitialFlow}, + {State#http2_state{streams=[NewStream|Streams]}, EvHandlerState}; + %% We cancel the push_promise immediately when we are shutting down. + _ -> + {ok, HTTP2Machine} = cow_http2_machine:reset_stream(PromisedStreamID, HTTP2Machine0), + Transport:send(Socket, cow_http2:rst_stream(PromisedStreamID, cancel)), + {State#http2_state{http2_machine=HTTP2Machine}, EvHandlerState} + end. ignored_frame(State=#http2_state{http2_machine=HTTP2Machine0}) -> case cow_http2_machine:ignored_frame(HTTP2Machine0) of {ok, HTTP2Machine} -> State#http2_state{http2_machine=HTTP2Machine}; {error, Error={connection_error, _, _}, HTTP2Machine} -> - terminate(State#http2_state{http2_machine=HTTP2Machine}, Error) + connection_error(State#http2_state{http2_machine=HTTP2Machine}, Error) end. update_flow(State=#http2_state{socket=Socket, transport=Transport, @@ -369,16 +400,72 @@ update_flow(State=#http2_state{socket=Socket, transport=Transport, [] end. -%% @todo Use Reason. -close(_, #http2_state{streams=Streams}, _, EvHandlerState) -> - {close_streams(Streams), EvHandlerState}. +%% We may have to cancel streams even if we receive multiple +%% GOAWAY frames as the LastStreamID value may be lower than +%% the one previously received. +goaway(State0=#http2_state{socket=Socket, transport=Transport, http2_machine=HTTP2Machine, + status=Status, streams=Streams0}, {goaway, LastStreamID, Reason, _}) -> + Streams = goaway_streams(Streams0, LastStreamID, {goaway, Reason, 'The connection is going away.'}, []), + State = State0#http2_state{streams=Streams}, + case Status of + connected -> + Transport:send(Socket, cow_http2:goaway( + cow_http2_machine:get_last_streamid(HTTP2Machine), + no_error, <<>>)), + State#http2_state{status=goaway}; + _ -> + State + end. + +%% Cancel server-initiated streams that are above LastStreamID. +goaway_streams([], _, _, Acc) -> + Acc; +goaway_streams([Stream=#stream{id=StreamID}|Tail], LastStreamID, Reason, Acc) + when StreamID > LastStreamID, (StreamID rem 2) =:= 1 -> + close_stream(Stream, Reason), + goaway_streams(Tail, LastStreamID, Reason, Acc); +goaway_streams([Stream|Tail], LastStreamID, Reason, Acc) -> + goaway_streams(Tail, LastStreamID, Reason, [Stream|Acc]). + +%% We are already closing, do nothing. +closing(_, #http2_state{status=closing}, _, EvHandlerState) -> + {[], EvHandlerState}; +closing(Reason0, State=#http2_state{socket=Socket, transport=Transport, + http2_machine=HTTP2Machine}, _, EvHandlerState) -> + Reason = case Reason0 of + normal -> no_error; + owner_down -> no_error; + _ -> internal_error + end, + Transport:send(Socket, cow_http2:goaway( + cow_http2_machine:get_last_streamid(HTTP2Machine), + Reason, <<>>)), + {[ + {state, State#http2_state{status=closing}}, + closing(State) + ], EvHandlerState}. -close_streams([]) -> +closing(#http2_state{opts=Opts}) -> + Timeout = maps:get(closing_timeout, Opts, 15000), + {closing, Timeout}. + +close(Reason, #http2_state{streams=Streams}, _, EvHandlerState) -> + close_streams(Streams, close_reason(Reason)), + EvHandlerState. + +close_reason(closed) -> closed; +close_reason(Reason) -> {closed, Reason}. + +close_streams([], _) -> ok; -close_streams([#stream{ref=StreamRef, reply_to=ReplyTo}|Tail]) -> - ReplyTo ! {gun_error, self(), StreamRef, {closed, - "The connection was lost."}}, - close_streams(Tail). +close_streams([Stream|Tail], Reason) -> + close_stream(Stream, Reason), + close_streams(Tail, Reason). + +%% @todo Do we want an event for this? +close_stream(#stream{ref=StreamRef, reply_to=ReplyTo}, Reason) -> + ReplyTo ! {gun_error, self(), StreamRef, Reason}, + ok. keepalive(State=#http2_state{socket=Socket, transport=Transport}) -> Transport:send(Socket, cow_http2:ping(0)), @@ -429,6 +516,7 @@ request(State=#http2_state{socket=Socket, transport=Transport, opts=Opts, headers => Headers }, EvHandlerState1 = EvHandler:request_start(RequestEvent, EvHandlerState0), + %% @todo We should not send an empty DATA frame on empty bodies. {ok, IsFin, HeaderBlock, HTTP2Machine} = cow_http2_machine:prepare_headers( StreamID, HTTP2Machine1, nofin, PseudoHeaders, Headers), Transport:send(Socket, cow_http2:headers(StreamID, IsFin, HeaderBlock)), @@ -586,8 +674,9 @@ down(#http2_state{streams=Streams}) -> KilledStreams = [Ref || #stream{ref=Ref} <- Streams], {KilledStreams, []}. -terminate(#http2_state{socket=Socket, transport=Transport, - http2_machine=HTTP2Machine, streams=Streams}, Reason) -> +connection_error(#http2_state{socket=Socket, transport=Transport, + http2_machine=HTTP2Machine, streams=Streams}, + {connection_error, Reason, _}) -> %% The connection is going away either at the request of the server, %% or because an error occurred in the protocol. Inform the streams. %% @todo We should not send duplicate messages to processes. @@ -597,12 +686,9 @@ terminate(#http2_state{socket=Socket, transport=Transport, _ = [ReplyTo ! {gun_error, self(), Reason} || #stream{reply_to=ReplyTo} <- Streams], Transport:send(Socket, cow_http2:goaway( cow_http2_machine:get_last_streamid(HTTP2Machine), - terminate_reason(Reason), <<>>)), + Reason, <<>>)), close. -terminate_reason({connection_error, Reason, _}) -> Reason; -terminate_reason({stop, _, _}) -> no_error. - %% Stream functions. error_stream_closed(State, StreamRef, ReplyTo) -> diff --git a/src/gun_ws.erl b/src/gun_ws.erl index 42cf049..49911dc 100644 --- a/src/gun_ws.erl +++ b/src/gun_ws.erl @@ -19,6 +19,7 @@ -export([init/9]). -export([handle/4]). -export([update_flow/4]). +-export([closing/4]). -export([close/4]). -export([send/4]). -export([down/1]). @@ -38,8 +39,10 @@ stream_ref :: reference(), socket :: inet:socket() | ssl:sslsocket(), transport :: module(), + opts = #{} :: map(), %% @todo buffer = <<>> :: binary(), in = head :: head | #payload{} | close, + out = head :: head | close, frag_state = undefined :: cow_ws:frag_state(), utf8_state = 0 :: cow_ws:utf8_state(), extensions = #{} :: cow_ws:extensions(), @@ -53,6 +56,10 @@ check_options(Opts) -> do_check_options([]) -> ok; +do_check_options([{closing_timeout, infinity}|Opts]) -> + do_check_options(Opts); +do_check_options([{closing_timeout, T}|Opts]) when is_integer(T), T > 0 -> + do_check_options(Opts); do_check_options([{compress, B}|Opts]) when B =:= true; B =:= false -> do_check_options(Opts); do_check_options([{default_protocol, M}|Opts]) when is_atom(M) -> @@ -75,12 +82,15 @@ init(Owner, Socket, Transport, StreamRef, Headers, Extensions, InitialFlow, Hand Owner ! {gun_upgrade, self(), StreamRef, [<<"websocket">>], Headers}, {ok, HandlerState} = Handler:init(Owner, StreamRef, Headers, Opts), {switch_protocol, ?MODULE, #ws_state{owner=Owner, stream_ref=StreamRef, - socket=Socket, transport=Transport, extensions=Extensions, + socket=Socket, transport=Transport, opts=Opts, extensions=Extensions, flow=InitialFlow, handler=Handler, handler_state=HandlerState}}. %% Do not handle anything if we received a close frame. -handle(_, State=#ws_state{in=close}, _, EvHandlerState) -> - {{state, State}, EvHandlerState}; +%% Initiate or terminate the closing state depending on whether we sent a close yet. +handle(_, State=#ws_state{in=close, out=close}, _, EvHandlerState) -> + {[{state, State}, close], EvHandlerState}; +handle(_, State=#ws_state{in=close}, EvHandler, EvHandlerState) -> + closing(normal, State, EvHandler, EvHandlerState); %% Shortcut for common case when Data is empty after processing a frame. handle(<<>>, State=#ws_state{in=head}, _, EvHandlerState) -> maybe_active(State, EvHandlerState); @@ -119,7 +129,7 @@ handle(Data, State=#ws_state{owner=ReplyTo, stream_ref=StreamRef, buffer=Buffer, more -> maybe_active(State#ws_state{buffer=Data2}, EvHandlerState1); error -> - close({error, badframe}, State, EvHandler, EvHandlerState1) + closing({error, badframe}, State, EvHandler, EvHandlerState1) end; handle(Data, State=#ws_state{in=In=#payload{type=Type, rsv=Rsv, len=Len, mask_key=MaskKey, close_code=CloseCode, unmasked=Unmasked, unmasked_len=UnmaskedLen}, frag_state=FragState, @@ -143,7 +153,7 @@ handle(Data, State=#ws_state{in=In=#payload{type=Type, rsv=Rsv, len=Len, mask_ke len=Len - byte_size(Data), unmasked_len=UnmaskedLen + byte_size(Data)}, utf8_state=Utf8State2}, EvHandlerState); Error = {error, _Reason} -> - close(Error, State, EvHandler, EvHandlerState) + closing(Error, State, EvHandler, EvHandlerState) end. maybe_active(State=#ws_state{flow=Flow}, EvHandlerState) -> @@ -165,11 +175,11 @@ dispatch(Rest, State0=#ws_state{owner=ReplyTo, stream_ref=StreamRef, }, EvHandlerState0), case cow_ws:make_frame(Type, Payload, CloseCode, FragState) of ping -> - {{state, State}, EvHandlerState} = send(pong, State0, EvHandler, EvHandlerState1), - handle(Rest, State, EvHandler, EvHandlerState); + {[], EvHandlerState} = send(pong, State0, EvHandler, EvHandlerState1), + handle(Rest, State0, EvHandler, EvHandlerState); {ping, Payload} -> - {{state, State}, EvHandlerState} = send({pong, Payload}, State0, EvHandler, EvHandlerState1), - handle(Rest, State, EvHandler, EvHandlerState); + {[], EvHandlerState} = send({pong, Payload}, State0, EvHandler, EvHandlerState1), + handle(Rest, State0, EvHandler, EvHandlerState); pong -> handle(Rest, State0, EvHandler, EvHandlerState1); {pong, _} -> @@ -200,26 +210,30 @@ update_flow(State=#ws_state{flow=Flow0}, _ReplyTo, _StreamRef, Inc) -> {active, Flow > 0} ]. -close(Reason, State, EvHandler, EvHandlerState) -> - case Reason of - normal -> - send({close, 1000, <<>>}, State, EvHandler, EvHandlerState); - owner_down -> - send({close, 1001, <<>>}, State, EvHandler, EvHandlerState); - {error, badframe} -> - send({close, 1002, <<>>}, State, EvHandler, EvHandlerState); - {error, badencoding} -> - send({close, 1007, <<>>}, State, EvHandler, EvHandlerState); - %% Socket errors; do nothing. - closed -> - {ok, EvHandlerState}; - {error, _} -> - {ok, EvHandlerState} - end. +%% The user already sent the close frame; do nothing. +closing(_, State=#ws_state{out=close}, _, EvHandlerState) -> + {closing(State), EvHandlerState}; +closing(Reason, State, EvHandler, EvHandlerState) -> + Code = case Reason of + normal -> 1000; + owner_down -> 1001; + shutdown -> 1001; + {error, badframe} -> 1002; + {error, badencoding} -> 1007 + end, + send({close, Code, <<>>}, State, EvHandler, EvHandlerState). +closing(#ws_state{opts=Opts}) -> + Timeout = maps:get(closing_timeout, Opts, 15000), + {closing, Timeout}. + +close(_, _, _, EvHandlerState) -> + EvHandlerState. + +%% Send one frame. send(Frame, State=#ws_state{owner=ReplyTo, stream_ref=StreamRef, - socket=Socket, transport=Transport, extensions=Extensions}, - EvHandler, EvHandlerState0) -> + socket=Socket, transport=Transport, in=In, extensions=Extensions}, + EvHandler, EvHandlerState0) when not is_list(Frame) -> WsSendFrameEvent = #{ stream_ref => StreamRef, reply_to => ReplyTo, @@ -229,10 +243,28 @@ send(Frame, State=#ws_state{owner=ReplyTo, stream_ref=StreamRef, EvHandlerState1 = EvHandler:ws_send_frame_start(WsSendFrameEvent, EvHandlerState0), Transport:send(Socket, cow_ws:masked_frame(Frame, Extensions)), EvHandlerState = EvHandler:ws_send_frame_end(WsSendFrameEvent, EvHandlerState1), - case Frame of - close -> {close, EvHandlerState}; - {close, _, _} -> {close, EvHandlerState}; - _ -> {{state, State}, EvHandlerState} + if + Frame =:= close; element(1, Frame) =:= close -> + {[ + {state, State#ws_state{out=close}}, + %% We can close immediately if we already received a close frame. + case In of + close -> close; + _ -> closing(State) + end + ], EvHandlerState}; + true -> + {[], EvHandlerState} + end; +%% Send many frames. +send([], _, _, EvHandlerState) -> + {[], EvHandlerState}; +send([Frame|Tail], State, EvHandler, EvHandlerState0) -> + case send(Frame, State, EvHandler, EvHandlerState0) of + {[], EvHandlerState} -> + send(Tail, State, EvHandler, EvHandlerState); + Other -> + Other end. %% Websocket has no concept of streams. |