diff options
Diffstat (limited to 'src/gun.erl')
-rw-r--r-- | src/gun.erl | 300 |
1 files changed, 177 insertions, 123 deletions
diff --git a/src/gun.erl b/src/gun.erl index 7b06aaf..bb659e7 100644 --- a/src/gun.erl +++ b/src/gun.erl @@ -98,6 +98,7 @@ -export([connecting/3]). -export([tls_handshake/3]). -export([connected/3]). +-export([closing/3]). -export([terminate/3]). -type req_headers() :: [{binary() | string() | atom(), iodata()}] @@ -165,19 +166,24 @@ -export_type([req_opts/0]). -type http_opts() :: #{ - keepalive => timeout(), + closing_timeout => timeout(), + flow => pos_integer(), + keepalive => timeout(), transform_header_name => fun((binary()) -> binary()), - version => 'HTTP/1.1' | 'HTTP/1.0' + version => 'HTTP/1.1' | 'HTTP/1.0' }. -export_type([http_opts/0]). -type http2_opts() :: #{ + closing_timeout => timeout(), + flow => pos_integer(), keepalive => timeout() }. -export_type([http2_opts/0]). %% @todo keepalive -type ws_opts() :: #{ + closing_timeout => timeout(), compress => boolean(), flow => pos_integer(), protocols => [{binary(), module()}] @@ -186,7 +192,7 @@ -record(state, { owner :: pid(), - owner_ref :: reference(), + status :: {up, reference()} | {down, any()} | shutdown, host :: inet:hostname() | inet:ip_address(), port :: inet:port_number(), origin_scheme :: binary(), @@ -781,7 +787,7 @@ init({Owner, Host, Port, Opts}) -> origin_port => Port, opts => Opts }, EvHandlerState0), - State = #state{owner=Owner, owner_ref=OwnerRef, + State = #state{owner=Owner, status={up, OwnerRef}, host=Host, port=Port, origin_scheme=OriginScheme, origin_host=Host, origin_port=Port, opts=Opts, transport=Transport, messages=Transport:messages(), @@ -875,11 +881,7 @@ connecting(_, {retries, Retries, LookupInfo}, State=#state{opts=Opts, }, EvHandlerState1), {next_state, not_connected, State#state{event_handler_state=EvHandlerState}, {next_event, internal, {retries, Retries, Reason}}} - end; -connecting({call, From}, {stream_info, _}, _) -> - {keep_state_and_data, {reply, From, {error, not_connected}}}; -connecting(Type, Event, State) -> - handle_common(Type, Event, ?FUNCTION_NAME, State). + end. tls_handshake(_, {retries, Retries, Socket0}, State=#state{opts=Opts, event_handler=EvHandler, event_handler_state=EvHandlerState0}) -> @@ -910,11 +912,7 @@ tls_handshake(_, {retries, Retries, Socket0}, State=#state{opts=Opts, }, EvHandlerState1), {next_state, not_connected, State#state{event_handler_state=EvHandlerState}, {next_event, internal, {retries, Retries, Reason}}} - end; -tls_handshake({call, From}, {stream_info, _}, _) -> - {keep_state_and_data, {reply, From, {error, not_connected}}}; -tls_handshake(Type, Event, State) -> - handle_common(Type, Event, ?FUNCTION_NAME, State). + end. ensure_alpn(Protocols0, TransOpts) -> Protocols = [case P of @@ -937,28 +935,6 @@ connected(internal, {connected, Socket, Protocol}, Owner ! {gun_up, self(), Protocol:name()}, {keep_state, keepalive_timeout(active(State#state{socket=Socket, protocol=Protocol, protocol_state=ProtoState}))}; -%% Socket events. -connected(info, {OK, Socket, Data}, State0=#state{socket=Socket, messages={OK, _, _}, - protocol=Protocol, protocol_state=ProtoState, - event_handler=EvHandler, event_handler_state=EvHandlerState0}) -> - {Commands, EvHandlerState} = Protocol:handle(Data, ProtoState, EvHandler, EvHandlerState0), - case commands(Commands, State0#state{event_handler_state=EvHandlerState}) of - {keep_state, State} -> - {keep_state, active(State)}; - Res -> - Res - end; -connected(info, {Closed, Socket}, State=#state{socket=Socket, messages={_, Closed, _}}) -> - disconnect(State, closed); -connected(info, {Error, Socket, Reason}, State=#state{socket=Socket, messages={_, _, Error}}) -> - disconnect(State, {error, Reason}); -%% Timeouts. -%% @todo HTTP/2 requires more timeouts than just the keepalive timeout. -%% We should have a timeout function in protocols that deal with -%% received timeouts. Currently the timeout messages are ignored. -connected(info, keepalive, State=#state{protocol=Protocol, protocol_state=ProtoState}) -> - ProtoState2 = Protocol:keepalive(ProtoState), - {keep_state, keepalive_timeout(State#state{protocol_state=ProtoState2})}; %% Public HTTP interface. connected(cast, {headers, ReplyTo, StreamRef, Method, Path, Headers, InitialFlow}, State=#state{origin_host=Host, origin_port=Port, @@ -976,14 +952,6 @@ connected(cast, {request, ReplyTo, StreamRef, Method, Path, Headers, Body, Initi StreamRef, ReplyTo, Method, Host, Port, Path, Headers, Body, InitialFlow, EvHandler, EvHandlerState0), {keep_state, State#state{protocol_state=ProtoState2, event_handler_state=EvHandlerState}}; -%% @todo Do we want to reject ReplyTo if it's not the process -%% who initiated the connection? For both data and cancel. -connected(cast, {data, ReplyTo, StreamRef, IsFin, Data}, - State=#state{protocol=Protocol, protocol_state=ProtoState, - event_handler=EvHandler, event_handler_state=EvHandlerState0}) -> - {ProtoState2, EvHandlerState} = Protocol:data(ProtoState, - StreamRef, ReplyTo, IsFin, Data, EvHandler, EvHandlerState0), - {keep_state, State#state{protocol_state=ProtoState2, event_handler_state=EvHandlerState}}; connected(cast, {connect, ReplyTo, StreamRef, Destination0, Headers, InitialFlow}, State=#state{protocol=Protocol, protocol_state=ProtoState}) -> %% The protocol option has been deprecated in favor of the protocols option. @@ -1006,41 +974,6 @@ connected(cast, {connect, ReplyTo, StreamRef, Destination0, Headers, InitialFlow end, ProtoState2 = Protocol:connect(ProtoState, StreamRef, ReplyTo, Destination, Headers, InitialFlow), {keep_state, State#state{protocol_state=ProtoState2}}; -%% When using gun_tls_proxy we need a separate message to know whether -%% the handshake succeeded and whether we need to switch to a different protocol. -connected(info, {gun_tls_proxy, Socket, {ok, NewProtocol}, HandshakeEvent}, - State0=#state{socket=Socket, protocol=CurrentProtocol, protocol_state=ProtoState, - event_handler=EvHandler, event_handler_state=EvHandlerState0}) -> - EvHandlerState = EvHandler:tls_handshake_end(HandshakeEvent#{ - socket => Socket, - protocol => NewProtocol:name() - }, EvHandlerState0), - State = State0#state{event_handler_state=EvHandlerState}, - case NewProtocol of - CurrentProtocol -> {keep_state, State}; - _ -> commands([{switch_protocol, NewProtocol, ProtoState}], State) - end; -connected(info, {gun_tls_proxy, Socket, Error = {error, Reason}, HandshakeEvent}, - State=#state{socket=Socket, event_handler=EvHandler, event_handler_state=EvHandlerState0}) -> - EvHandlerState = EvHandler:tls_handshake_end(HandshakeEvent#{ - error => Reason - }, EvHandlerState0), - commands([Error], State#state{event_handler_state=EvHandlerState}); -connected(cast, {update_flow, ReplyTo, StreamRef, Flow}, State0=#state{ - protocol=Protocol, protocol_state=ProtoState}) -> - Commands = Protocol:update_flow(ProtoState, ReplyTo, StreamRef, Flow), - case commands(Commands, State0) of - {keep_state, State} -> - {keep_state, active(State)}; - Res -> - Res - end; -connected(cast, {cancel, ReplyTo, StreamRef}, State=#state{ - protocol=Protocol, protocol_state=ProtoState, - event_handler=EvHandler, event_handler_state=EvHandlerState0}) -> - {ProtoState2, EvHandlerState} = Protocol:cancel(ProtoState, - StreamRef, ReplyTo, EvHandler, EvHandlerState0), - {keep_state, State#state{protocol_state=ProtoState2, event_handler_state=EvHandlerState}}; %% Public Websocket interface. %% @todo Maybe make an interface in the protocol module instead of checking on protocol name. %% An interface would also make sure that HTTP/1.0 can't upgrade. @@ -1067,40 +1000,148 @@ connected(cast, {ws_upgrade, ReplyTo, StreamRef, _, _, _}, _) -> ReplyTo ! {gun_error, self(), StreamRef, {badstate, "Websocket is only supported over HTTP/1.1."}}, keep_state_and_data; -connected(cast, {ws_send, Owner, Frame}, State=#state{ +connected(cast, {ws_send, Owner, Frames}, State=#state{ owner=Owner, protocol=Protocol=gun_ws, protocol_state=ProtoState, event_handler=EvHandler, event_handler_state=EvHandlerState0}) -> - {Commands, EvHandlerState} = Protocol:send(Frame, ProtoState, EvHandler, EvHandlerState0), + {Commands, EvHandlerState} = Protocol:send(Frames, ProtoState, EvHandler, EvHandlerState0), commands(Commands, State#state{event_handler_state=EvHandlerState}); connected(cast, {ws_send, ReplyTo, _}, _) -> ReplyTo ! {gun_error, self(), {badstate, "Connection needs to be upgraded to Websocket " "before the gun:ws_send/1 function can be used."}}, keep_state_and_data; -connected({call, From}, {stream_info, StreamRef}, +connected(Type, Event, State) -> + handle_common_connected(Type, Event, ?FUNCTION_NAME, State). + +%% Switch to the graceful connection close state. +closing(State=#state{protocol=Protocol, protocol_state=ProtoState, + event_handler=EvHandler, event_handler_state=EvHandlerState0}, Reason) -> + {Commands, EvHandlerState} = Protocol:closing(Reason, ProtoState, EvHandler, EvHandlerState0), + commands(Commands, State#state{event_handler_state=EvHandlerState}). + +%% @todo Should explicitly reject ws_send in this state? +closing(state_timeout, closing_timeout, State=#state{status=Status}) -> + Reason = case Status of + shutdown -> shutdown; + {down, _} -> owner_down; + _ -> normal + end, + disconnect(State, Reason); +closing(Type, Event, State) -> + handle_common_connected(Type, Event, ?FUNCTION_NAME, State). + +%% Common events when we have a connection. +%% +%% Socket events. +handle_common_connected(info, {OK, Socket, Data}, _, State0=#state{socket=Socket, messages={OK, _, _}, + protocol=Protocol, protocol_state=ProtoState, + event_handler=EvHandler, event_handler_state=EvHandlerState0}) -> + {Commands, EvHandlerState} = Protocol:handle(Data, ProtoState, EvHandler, EvHandlerState0), + case commands(Commands, State0#state{event_handler_state=EvHandlerState}) of + {keep_state, State} -> + {keep_state, active(State)}; + {next_state, closing, State, Actions} -> + {next_state, closing, active(State), Actions}; + Res -> + Res + end; +handle_common_connected(info, {Closed, Socket}, _, State=#state{socket=Socket, messages={_, Closed, _}}) -> + disconnect(State, closed); +handle_common_connected(info, {Error, Socket, Reason}, _, State=#state{socket=Socket, messages={_, _, Error}}) -> + disconnect(State, {error, Reason}); +%% Timeouts. +%% @todo HTTP/2 requires more timeouts than just the keepalive timeout. +%% We should have a timeout function in protocols that deal with +%% received timeouts. Currently the timeout messages are ignored. +handle_common_connected(info, keepalive, _, State=#state{protocol=Protocol, protocol_state=ProtoState}) -> + ProtoState2 = Protocol:keepalive(ProtoState), + {keep_state, keepalive_timeout(State#state{protocol_state=ProtoState2})}; +%% When using gun_tls_proxy we need a separate message to know whether +%% the handshake succeeded and whether we need to switch to a different protocol. +handle_common_connected(info, {gun_tls_proxy, Socket, {ok, NewProtocol}, HandshakeEvent}, _, + State0=#state{socket=Socket, protocol=CurrentProtocol, protocol_state=ProtoState, + event_handler=EvHandler, event_handler_state=EvHandlerState0}) -> + EvHandlerState = EvHandler:tls_handshake_end(HandshakeEvent#{ + socket => Socket, + protocol => NewProtocol:name() + }, EvHandlerState0), + State = State0#state{event_handler_state=EvHandlerState}, + case NewProtocol of + CurrentProtocol -> {keep_state, State}; + _ -> commands([{switch_protocol, NewProtocol, ProtoState}], State) + end; +handle_common_connected(info, {gun_tls_proxy, Socket, Error = {error, Reason}, HandshakeEvent}, _, + State=#state{socket=Socket, event_handler=EvHandler, event_handler_state=EvHandlerState0}) -> + EvHandlerState = EvHandler:tls_handshake_end(HandshakeEvent#{ + error => Reason + }, EvHandlerState0), + commands([Error], State#state{event_handler_state=EvHandlerState}); +%% @todo Do we want to reject ReplyTo if it's not the process +%% who initiated the connection? For both data and cancel. +handle_common_connected(cast, {data, ReplyTo, StreamRef, IsFin, Data}, _, + State=#state{protocol=Protocol, protocol_state=ProtoState, + event_handler=EvHandler, event_handler_state=EvHandlerState0}) -> + {ProtoState2, EvHandlerState} = Protocol:data(ProtoState, + StreamRef, ReplyTo, IsFin, Data, EvHandler, EvHandlerState0), + {keep_state, State#state{protocol_state=ProtoState2, event_handler_state=EvHandlerState}}; +handle_common_connected(cast, {update_flow, ReplyTo, StreamRef, Flow}, _, State0=#state{ + protocol=Protocol, protocol_state=ProtoState}) -> + Commands = Protocol:update_flow(ProtoState, ReplyTo, StreamRef, Flow), + case commands(Commands, State0) of + {keep_state, State} -> + {keep_state, active(State)}; + Res -> + Res + end; +handle_common_connected(cast, {cancel, ReplyTo, StreamRef}, _, State=#state{ + protocol=Protocol, protocol_state=ProtoState, + event_handler=EvHandler, event_handler_state=EvHandlerState0}) -> + {ProtoState2, EvHandlerState} = Protocol:cancel(ProtoState, + StreamRef, ReplyTo, EvHandler, EvHandlerState0), + {keep_state, State#state{protocol_state=ProtoState2, event_handler_state=EvHandlerState}}; +handle_common_connected({call, From}, {stream_info, StreamRef}, _, #state{protocol=Protocol, protocol_state=ProtoState}) -> {keep_state_and_data, {reply, From, Protocol:stream_info(ProtoState, StreamRef)}}; -connected(Type, Event, State) -> - handle_common(Type, Event, ?FUNCTION_NAME, State). +handle_common_connected(Type, Event, StateName, State) -> + handle_common(Type, Event, StateName, State). %% Common events. -handle_common(cast, {shutdown, Owner}, _, #state{owner=Owner}) -> - %% @todo Graceful shutdown. - stop; +handle_common(cast, {shutdown, Owner}, StateName, State=#state{ + owner=Owner, status=Status, socket=Socket, transport=Transport, protocol=Protocol}) -> + case {Socket, Protocol} of + {undefined, _} -> + {stop, shutdown}; + {_, undefined} -> + %% @todo This is missing the disconnect event. + Transport:close(Socket), + {stop, shutdown}; + _ when StateName =:= closing, element(1, Status) =:= up -> + {keep_state, status(State, shutdown)}; + _ when StateName =:= closing -> + keep_state_and_data; + _ -> + closing(status(State, shutdown), shutdown) + end; %% We stop when the owner is down. -handle_common(info, {'DOWN', OwnerRef, process, Owner, Reason}, _, State=#state{ - owner=Owner, owner_ref=OwnerRef, socket=Socket, transport=Transport, - protocol=Protocol, protocol_state=ProtoState, - event_handler=EvHandler, event_handler_state=EvHandlerState0}) -> - {_, EvHandlerState} = case Protocol of - undefined -> {ok, EvHandlerState0}; - _ -> Protocol:close(owner_down, ProtoState, EvHandler, EvHandlerState0) - end, - _ = case Socket of - undefined -> ok; - _ -> Transport:close(Socket) - end, - owner_down(Reason, State#state{event_handler_state=EvHandlerState}); +%% @todo We need to demonitor/flush when the status is no longer up. +handle_common(info, {'DOWN', OwnerRef, process, Owner, Reason}, StateName, State=#state{ + owner=Owner, status={up, OwnerRef}, socket=Socket, transport=Transport, protocol=Protocol}) -> + case Socket of + undefined -> + owner_down(Reason, State); + _ -> + case Protocol of + undefined -> + %% @todo This is missing the disconnect event. + Transport:close(Socket), + owner_down(Reason, State); + %% We are already closing so no need to initiate closing again. + _ when StateName =:= closing -> + {keep_state, status(State, {down, Reason})}; + _ -> + closing(status(State, {down, Reason}), owner_down) + end + end; handle_common({call, From}, _, _, _) -> {keep_state_and_data, {reply, From, {error, bad_call}}}; %% @todo The ReplyTo patch disabled the notowner behavior. @@ -1123,6 +1164,9 @@ commands([], State) -> {keep_state, State}; commands([close|_], State) -> disconnect(State, normal); +commands([{closing, Timeout}|_], State) -> + {next_state, closing, keepalive_cancel(State), + {state_timeout, Timeout, closing_timeout}}; commands([Error={error, _}|_], State) -> disconnect(State, Error); commands([{active, Active}|Tail], State) when is_boolean(Active) -> @@ -1176,33 +1220,37 @@ commands([{switch_protocol, Protocol, _ProtoState0}|Tail], State=#state{ commands(Tail, keepalive_timeout(State#state{protocol=Protocol, protocol_state=ProtoState, event_handler_state=EvHandlerState})). -disconnect(State=#state{owner=Owner, opts=Opts, +disconnect(State0=#state{owner=Owner, status=Status, opts=Opts, socket=Socket, transport=Transport, protocol=Protocol, protocol_state=ProtoState, event_handler=EvHandler, event_handler_state=EvHandlerState0}, Reason) -> - {_, EvHandlerState1} = Protocol:close(Reason, ProtoState, EvHandler, EvHandlerState0), - %% @todo Need a special state for orderly shutdown of a connection. + EvHandlerState1 = Protocol:close(Reason, ProtoState, EvHandler, EvHandlerState0), _ = Transport:close(Socket), - %% We closed the socket, discard any remaining socket events. - disconnect_flush(State), - %% @todo Stop keepalive timeout, flush message. - {KilledStreams, UnprocessedStreams} = Protocol:down(ProtoState), - Owner ! {gun_down, self(), Protocol:name(), Reason, KilledStreams, UnprocessedStreams}, - %% Trigger the disconnect event. - DisconnectEvent = #{ - reason => Reason - }, - EvHandlerState = EvHandler:disconnect(DisconnectEvent, EvHandlerState1), - Retry = maps:get(retry, Opts, 5), - case Retry of - 0 -> - {stop, {shutdown, Reason}, State#state{event_handler_state=EvHandlerState}}; - _ -> - {next_state, not_connected, - keepalive_cancel(State#state{socket=undefined, - protocol=undefined, protocol_state=undefined, - event_handler_state=EvHandlerState}), - {next_event, internal, {retries, Retry - 1, Reason}}} + EvHandlerState = EvHandler:disconnect(#{reason => Reason}, EvHandlerState1), + State = State0#state{event_handler_state=EvHandlerState}, + case Status of + {down, DownReason} -> + owner_down(DownReason, State); + shutdown -> + {stop, shutdown, State}; + {up, _} -> + %% We closed the socket, discard any remaining socket events. + disconnect_flush(State), + %% @todo Stop keepalive timeout, flush message. + {KilledStreams, UnprocessedStreams} = Protocol:down(ProtoState), + Owner ! {gun_down, self(), Protocol:name(), Reason, KilledStreams, UnprocessedStreams}, + Retry = maps:get(retry, Opts, 5), + case Retry of + 0 when Reason =:= normal -> + {stop, normal, State}; + 0 -> + {stop, {shutdown, Reason}, State}; + _ -> + {next_state, not_connected, + keepalive_cancel(State#state{socket=undefined, + protocol=undefined, protocol_state=undefined}), + {next_event, internal, {retries, Retry - 1, Reason}}} + end end. disconnect_flush(State=#state{socket=Socket, messages={OK, Closed, Error}}) -> @@ -1220,6 +1268,12 @@ active(State=#state{socket=Socket, transport=Transport}) -> Transport:setopts(Socket, [{active, once}]), State. +status(State=#state{status={up, OwnerRef}}, NewStatus) -> + demonitor(OwnerRef, [flush]), + State#state{status=NewStatus}; +status(State, NewStatus) -> + State#state{status=NewStatus}. + keepalive_timeout(State=#state{opts=Opts, protocol=Protocol}) -> {ProtoOptsKey, Default} = case Protocol of gun_http -> {http_opts, infinity}; |