aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/gun.erl300
-rw-r--r--src/gun_event.erl5
-rw-r--r--src/gun_http.erl56
-rw-r--r--src/gun_http2.erl144
-rw-r--r--src/gun_ws.erl94
5 files changed, 400 insertions, 199 deletions
diff --git a/src/gun.erl b/src/gun.erl
index 7b06aaf..bb659e7 100644
--- a/src/gun.erl
+++ b/src/gun.erl
@@ -98,6 +98,7 @@
-export([connecting/3]).
-export([tls_handshake/3]).
-export([connected/3]).
+-export([closing/3]).
-export([terminate/3]).
-type req_headers() :: [{binary() | string() | atom(), iodata()}]
@@ -165,19 +166,24 @@
-export_type([req_opts/0]).
-type http_opts() :: #{
- keepalive => timeout(),
+ closing_timeout => timeout(),
+ flow => pos_integer(),
+ keepalive => timeout(),
transform_header_name => fun((binary()) -> binary()),
- version => 'HTTP/1.1' | 'HTTP/1.0'
+ version => 'HTTP/1.1' | 'HTTP/1.0'
}.
-export_type([http_opts/0]).
-type http2_opts() :: #{
+ closing_timeout => timeout(),
+ flow => pos_integer(),
keepalive => timeout()
}.
-export_type([http2_opts/0]).
%% @todo keepalive
-type ws_opts() :: #{
+ closing_timeout => timeout(),
compress => boolean(),
flow => pos_integer(),
protocols => [{binary(), module()}]
@@ -186,7 +192,7 @@
-record(state, {
owner :: pid(),
- owner_ref :: reference(),
+ status :: {up, reference()} | {down, any()} | shutdown,
host :: inet:hostname() | inet:ip_address(),
port :: inet:port_number(),
origin_scheme :: binary(),
@@ -781,7 +787,7 @@ init({Owner, Host, Port, Opts}) ->
origin_port => Port,
opts => Opts
}, EvHandlerState0),
- State = #state{owner=Owner, owner_ref=OwnerRef,
+ State = #state{owner=Owner, status={up, OwnerRef},
host=Host, port=Port, origin_scheme=OriginScheme,
origin_host=Host, origin_port=Port, opts=Opts,
transport=Transport, messages=Transport:messages(),
@@ -875,11 +881,7 @@ connecting(_, {retries, Retries, LookupInfo}, State=#state{opts=Opts,
}, EvHandlerState1),
{next_state, not_connected, State#state{event_handler_state=EvHandlerState},
{next_event, internal, {retries, Retries, Reason}}}
- end;
-connecting({call, From}, {stream_info, _}, _) ->
- {keep_state_and_data, {reply, From, {error, not_connected}}};
-connecting(Type, Event, State) ->
- handle_common(Type, Event, ?FUNCTION_NAME, State).
+ end.
tls_handshake(_, {retries, Retries, Socket0}, State=#state{opts=Opts,
event_handler=EvHandler, event_handler_state=EvHandlerState0}) ->
@@ -910,11 +912,7 @@ tls_handshake(_, {retries, Retries, Socket0}, State=#state{opts=Opts,
}, EvHandlerState1),
{next_state, not_connected, State#state{event_handler_state=EvHandlerState},
{next_event, internal, {retries, Retries, Reason}}}
- end;
-tls_handshake({call, From}, {stream_info, _}, _) ->
- {keep_state_and_data, {reply, From, {error, not_connected}}};
-tls_handshake(Type, Event, State) ->
- handle_common(Type, Event, ?FUNCTION_NAME, State).
+ end.
ensure_alpn(Protocols0, TransOpts) ->
Protocols = [case P of
@@ -937,28 +935,6 @@ connected(internal, {connected, Socket, Protocol},
Owner ! {gun_up, self(), Protocol:name()},
{keep_state, keepalive_timeout(active(State#state{socket=Socket,
protocol=Protocol, protocol_state=ProtoState}))};
-%% Socket events.
-connected(info, {OK, Socket, Data}, State0=#state{socket=Socket, messages={OK, _, _},
- protocol=Protocol, protocol_state=ProtoState,
- event_handler=EvHandler, event_handler_state=EvHandlerState0}) ->
- {Commands, EvHandlerState} = Protocol:handle(Data, ProtoState, EvHandler, EvHandlerState0),
- case commands(Commands, State0#state{event_handler_state=EvHandlerState}) of
- {keep_state, State} ->
- {keep_state, active(State)};
- Res ->
- Res
- end;
-connected(info, {Closed, Socket}, State=#state{socket=Socket, messages={_, Closed, _}}) ->
- disconnect(State, closed);
-connected(info, {Error, Socket, Reason}, State=#state{socket=Socket, messages={_, _, Error}}) ->
- disconnect(State, {error, Reason});
-%% Timeouts.
-%% @todo HTTP/2 requires more timeouts than just the keepalive timeout.
-%% We should have a timeout function in protocols that deal with
-%% received timeouts. Currently the timeout messages are ignored.
-connected(info, keepalive, State=#state{protocol=Protocol, protocol_state=ProtoState}) ->
- ProtoState2 = Protocol:keepalive(ProtoState),
- {keep_state, keepalive_timeout(State#state{protocol_state=ProtoState2})};
%% Public HTTP interface.
connected(cast, {headers, ReplyTo, StreamRef, Method, Path, Headers, InitialFlow},
State=#state{origin_host=Host, origin_port=Port,
@@ -976,14 +952,6 @@ connected(cast, {request, ReplyTo, StreamRef, Method, Path, Headers, Body, Initi
StreamRef, ReplyTo, Method, Host, Port, Path, Headers, Body,
InitialFlow, EvHandler, EvHandlerState0),
{keep_state, State#state{protocol_state=ProtoState2, event_handler_state=EvHandlerState}};
-%% @todo Do we want to reject ReplyTo if it's not the process
-%% who initiated the connection? For both data and cancel.
-connected(cast, {data, ReplyTo, StreamRef, IsFin, Data},
- State=#state{protocol=Protocol, protocol_state=ProtoState,
- event_handler=EvHandler, event_handler_state=EvHandlerState0}) ->
- {ProtoState2, EvHandlerState} = Protocol:data(ProtoState,
- StreamRef, ReplyTo, IsFin, Data, EvHandler, EvHandlerState0),
- {keep_state, State#state{protocol_state=ProtoState2, event_handler_state=EvHandlerState}};
connected(cast, {connect, ReplyTo, StreamRef, Destination0, Headers, InitialFlow},
State=#state{protocol=Protocol, protocol_state=ProtoState}) ->
%% The protocol option has been deprecated in favor of the protocols option.
@@ -1006,41 +974,6 @@ connected(cast, {connect, ReplyTo, StreamRef, Destination0, Headers, InitialFlow
end,
ProtoState2 = Protocol:connect(ProtoState, StreamRef, ReplyTo, Destination, Headers, InitialFlow),
{keep_state, State#state{protocol_state=ProtoState2}};
-%% When using gun_tls_proxy we need a separate message to know whether
-%% the handshake succeeded and whether we need to switch to a different protocol.
-connected(info, {gun_tls_proxy, Socket, {ok, NewProtocol}, HandshakeEvent},
- State0=#state{socket=Socket, protocol=CurrentProtocol, protocol_state=ProtoState,
- event_handler=EvHandler, event_handler_state=EvHandlerState0}) ->
- EvHandlerState = EvHandler:tls_handshake_end(HandshakeEvent#{
- socket => Socket,
- protocol => NewProtocol:name()
- }, EvHandlerState0),
- State = State0#state{event_handler_state=EvHandlerState},
- case NewProtocol of
- CurrentProtocol -> {keep_state, State};
- _ -> commands([{switch_protocol, NewProtocol, ProtoState}], State)
- end;
-connected(info, {gun_tls_proxy, Socket, Error = {error, Reason}, HandshakeEvent},
- State=#state{socket=Socket, event_handler=EvHandler, event_handler_state=EvHandlerState0}) ->
- EvHandlerState = EvHandler:tls_handshake_end(HandshakeEvent#{
- error => Reason
- }, EvHandlerState0),
- commands([Error], State#state{event_handler_state=EvHandlerState});
-connected(cast, {update_flow, ReplyTo, StreamRef, Flow}, State0=#state{
- protocol=Protocol, protocol_state=ProtoState}) ->
- Commands = Protocol:update_flow(ProtoState, ReplyTo, StreamRef, Flow),
- case commands(Commands, State0) of
- {keep_state, State} ->
- {keep_state, active(State)};
- Res ->
- Res
- end;
-connected(cast, {cancel, ReplyTo, StreamRef}, State=#state{
- protocol=Protocol, protocol_state=ProtoState,
- event_handler=EvHandler, event_handler_state=EvHandlerState0}) ->
- {ProtoState2, EvHandlerState} = Protocol:cancel(ProtoState,
- StreamRef, ReplyTo, EvHandler, EvHandlerState0),
- {keep_state, State#state{protocol_state=ProtoState2, event_handler_state=EvHandlerState}};
%% Public Websocket interface.
%% @todo Maybe make an interface in the protocol module instead of checking on protocol name.
%% An interface would also make sure that HTTP/1.0 can't upgrade.
@@ -1067,40 +1000,148 @@ connected(cast, {ws_upgrade, ReplyTo, StreamRef, _, _, _}, _) ->
ReplyTo ! {gun_error, self(), StreamRef, {badstate,
"Websocket is only supported over HTTP/1.1."}},
keep_state_and_data;
-connected(cast, {ws_send, Owner, Frame}, State=#state{
+connected(cast, {ws_send, Owner, Frames}, State=#state{
owner=Owner, protocol=Protocol=gun_ws, protocol_state=ProtoState,
event_handler=EvHandler, event_handler_state=EvHandlerState0}) ->
- {Commands, EvHandlerState} = Protocol:send(Frame, ProtoState, EvHandler, EvHandlerState0),
+ {Commands, EvHandlerState} = Protocol:send(Frames, ProtoState, EvHandler, EvHandlerState0),
commands(Commands, State#state{event_handler_state=EvHandlerState});
connected(cast, {ws_send, ReplyTo, _}, _) ->
ReplyTo ! {gun_error, self(), {badstate,
"Connection needs to be upgraded to Websocket "
"before the gun:ws_send/1 function can be used."}},
keep_state_and_data;
-connected({call, From}, {stream_info, StreamRef},
+connected(Type, Event, State) ->
+ handle_common_connected(Type, Event, ?FUNCTION_NAME, State).
+
+%% Switch to the graceful connection close state.
+closing(State=#state{protocol=Protocol, protocol_state=ProtoState,
+ event_handler=EvHandler, event_handler_state=EvHandlerState0}, Reason) ->
+ {Commands, EvHandlerState} = Protocol:closing(Reason, ProtoState, EvHandler, EvHandlerState0),
+ commands(Commands, State#state{event_handler_state=EvHandlerState}).
+
+%% @todo Should explicitly reject ws_send in this state?
+closing(state_timeout, closing_timeout, State=#state{status=Status}) ->
+ Reason = case Status of
+ shutdown -> shutdown;
+ {down, _} -> owner_down;
+ _ -> normal
+ end,
+ disconnect(State, Reason);
+closing(Type, Event, State) ->
+ handle_common_connected(Type, Event, ?FUNCTION_NAME, State).
+
+%% Common events when we have a connection.
+%%
+%% Socket events.
+handle_common_connected(info, {OK, Socket, Data}, _, State0=#state{socket=Socket, messages={OK, _, _},
+ protocol=Protocol, protocol_state=ProtoState,
+ event_handler=EvHandler, event_handler_state=EvHandlerState0}) ->
+ {Commands, EvHandlerState} = Protocol:handle(Data, ProtoState, EvHandler, EvHandlerState0),
+ case commands(Commands, State0#state{event_handler_state=EvHandlerState}) of
+ {keep_state, State} ->
+ {keep_state, active(State)};
+ {next_state, closing, State, Actions} ->
+ {next_state, closing, active(State), Actions};
+ Res ->
+ Res
+ end;
+handle_common_connected(info, {Closed, Socket}, _, State=#state{socket=Socket, messages={_, Closed, _}}) ->
+ disconnect(State, closed);
+handle_common_connected(info, {Error, Socket, Reason}, _, State=#state{socket=Socket, messages={_, _, Error}}) ->
+ disconnect(State, {error, Reason});
+%% Timeouts.
+%% @todo HTTP/2 requires more timeouts than just the keepalive timeout.
+%% We should have a timeout function in protocols that deal with
+%% received timeouts. Currently the timeout messages are ignored.
+handle_common_connected(info, keepalive, _, State=#state{protocol=Protocol, protocol_state=ProtoState}) ->
+ ProtoState2 = Protocol:keepalive(ProtoState),
+ {keep_state, keepalive_timeout(State#state{protocol_state=ProtoState2})};
+%% When using gun_tls_proxy we need a separate message to know whether
+%% the handshake succeeded and whether we need to switch to a different protocol.
+handle_common_connected(info, {gun_tls_proxy, Socket, {ok, NewProtocol}, HandshakeEvent}, _,
+ State0=#state{socket=Socket, protocol=CurrentProtocol, protocol_state=ProtoState,
+ event_handler=EvHandler, event_handler_state=EvHandlerState0}) ->
+ EvHandlerState = EvHandler:tls_handshake_end(HandshakeEvent#{
+ socket => Socket,
+ protocol => NewProtocol:name()
+ }, EvHandlerState0),
+ State = State0#state{event_handler_state=EvHandlerState},
+ case NewProtocol of
+ CurrentProtocol -> {keep_state, State};
+ _ -> commands([{switch_protocol, NewProtocol, ProtoState}], State)
+ end;
+handle_common_connected(info, {gun_tls_proxy, Socket, Error = {error, Reason}, HandshakeEvent}, _,
+ State=#state{socket=Socket, event_handler=EvHandler, event_handler_state=EvHandlerState0}) ->
+ EvHandlerState = EvHandler:tls_handshake_end(HandshakeEvent#{
+ error => Reason
+ }, EvHandlerState0),
+ commands([Error], State#state{event_handler_state=EvHandlerState});
+%% @todo Do we want to reject ReplyTo if it's not the process
+%% who initiated the connection? For both data and cancel.
+handle_common_connected(cast, {data, ReplyTo, StreamRef, IsFin, Data}, _,
+ State=#state{protocol=Protocol, protocol_state=ProtoState,
+ event_handler=EvHandler, event_handler_state=EvHandlerState0}) ->
+ {ProtoState2, EvHandlerState} = Protocol:data(ProtoState,
+ StreamRef, ReplyTo, IsFin, Data, EvHandler, EvHandlerState0),
+ {keep_state, State#state{protocol_state=ProtoState2, event_handler_state=EvHandlerState}};
+handle_common_connected(cast, {update_flow, ReplyTo, StreamRef, Flow}, _, State0=#state{
+ protocol=Protocol, protocol_state=ProtoState}) ->
+ Commands = Protocol:update_flow(ProtoState, ReplyTo, StreamRef, Flow),
+ case commands(Commands, State0) of
+ {keep_state, State} ->
+ {keep_state, active(State)};
+ Res ->
+ Res
+ end;
+handle_common_connected(cast, {cancel, ReplyTo, StreamRef}, _, State=#state{
+ protocol=Protocol, protocol_state=ProtoState,
+ event_handler=EvHandler, event_handler_state=EvHandlerState0}) ->
+ {ProtoState2, EvHandlerState} = Protocol:cancel(ProtoState,
+ StreamRef, ReplyTo, EvHandler, EvHandlerState0),
+ {keep_state, State#state{protocol_state=ProtoState2, event_handler_state=EvHandlerState}};
+handle_common_connected({call, From}, {stream_info, StreamRef}, _,
#state{protocol=Protocol, protocol_state=ProtoState}) ->
{keep_state_and_data, {reply, From, Protocol:stream_info(ProtoState, StreamRef)}};
-connected(Type, Event, State) ->
- handle_common(Type, Event, ?FUNCTION_NAME, State).
+handle_common_connected(Type, Event, StateName, State) ->
+ handle_common(Type, Event, StateName, State).
%% Common events.
-handle_common(cast, {shutdown, Owner}, _, #state{owner=Owner}) ->
- %% @todo Graceful shutdown.
- stop;
+handle_common(cast, {shutdown, Owner}, StateName, State=#state{
+ owner=Owner, status=Status, socket=Socket, transport=Transport, protocol=Protocol}) ->
+ case {Socket, Protocol} of
+ {undefined, _} ->
+ {stop, shutdown};
+ {_, undefined} ->
+ %% @todo This is missing the disconnect event.
+ Transport:close(Socket),
+ {stop, shutdown};
+ _ when StateName =:= closing, element(1, Status) =:= up ->
+ {keep_state, status(State, shutdown)};
+ _ when StateName =:= closing ->
+ keep_state_and_data;
+ _ ->
+ closing(status(State, shutdown), shutdown)
+ end;
%% We stop when the owner is down.
-handle_common(info, {'DOWN', OwnerRef, process, Owner, Reason}, _, State=#state{
- owner=Owner, owner_ref=OwnerRef, socket=Socket, transport=Transport,
- protocol=Protocol, protocol_state=ProtoState,
- event_handler=EvHandler, event_handler_state=EvHandlerState0}) ->
- {_, EvHandlerState} = case Protocol of
- undefined -> {ok, EvHandlerState0};
- _ -> Protocol:close(owner_down, ProtoState, EvHandler, EvHandlerState0)
- end,
- _ = case Socket of
- undefined -> ok;
- _ -> Transport:close(Socket)
- end,
- owner_down(Reason, State#state{event_handler_state=EvHandlerState});
+%% @todo We need to demonitor/flush when the status is no longer up.
+handle_common(info, {'DOWN', OwnerRef, process, Owner, Reason}, StateName, State=#state{
+ owner=Owner, status={up, OwnerRef}, socket=Socket, transport=Transport, protocol=Protocol}) ->
+ case Socket of
+ undefined ->
+ owner_down(Reason, State);
+ _ ->
+ case Protocol of
+ undefined ->
+ %% @todo This is missing the disconnect event.
+ Transport:close(Socket),
+ owner_down(Reason, State);
+ %% We are already closing so no need to initiate closing again.
+ _ when StateName =:= closing ->
+ {keep_state, status(State, {down, Reason})};
+ _ ->
+ closing(status(State, {down, Reason}), owner_down)
+ end
+ end;
handle_common({call, From}, _, _, _) ->
{keep_state_and_data, {reply, From, {error, bad_call}}};
%% @todo The ReplyTo patch disabled the notowner behavior.
@@ -1123,6 +1164,9 @@ commands([], State) ->
{keep_state, State};
commands([close|_], State) ->
disconnect(State, normal);
+commands([{closing, Timeout}|_], State) ->
+ {next_state, closing, keepalive_cancel(State),
+ {state_timeout, Timeout, closing_timeout}};
commands([Error={error, _}|_], State) ->
disconnect(State, Error);
commands([{active, Active}|Tail], State) when is_boolean(Active) ->
@@ -1176,33 +1220,37 @@ commands([{switch_protocol, Protocol, _ProtoState0}|Tail], State=#state{
commands(Tail, keepalive_timeout(State#state{protocol=Protocol, protocol_state=ProtoState,
event_handler_state=EvHandlerState})).
-disconnect(State=#state{owner=Owner, opts=Opts,
+disconnect(State0=#state{owner=Owner, status=Status, opts=Opts,
socket=Socket, transport=Transport,
protocol=Protocol, protocol_state=ProtoState,
event_handler=EvHandler, event_handler_state=EvHandlerState0}, Reason) ->
- {_, EvHandlerState1} = Protocol:close(Reason, ProtoState, EvHandler, EvHandlerState0),
- %% @todo Need a special state for orderly shutdown of a connection.
+ EvHandlerState1 = Protocol:close(Reason, ProtoState, EvHandler, EvHandlerState0),
_ = Transport:close(Socket),
- %% We closed the socket, discard any remaining socket events.
- disconnect_flush(State),
- %% @todo Stop keepalive timeout, flush message.
- {KilledStreams, UnprocessedStreams} = Protocol:down(ProtoState),
- Owner ! {gun_down, self(), Protocol:name(), Reason, KilledStreams, UnprocessedStreams},
- %% Trigger the disconnect event.
- DisconnectEvent = #{
- reason => Reason
- },
- EvHandlerState = EvHandler:disconnect(DisconnectEvent, EvHandlerState1),
- Retry = maps:get(retry, Opts, 5),
- case Retry of
- 0 ->
- {stop, {shutdown, Reason}, State#state{event_handler_state=EvHandlerState}};
- _ ->
- {next_state, not_connected,
- keepalive_cancel(State#state{socket=undefined,
- protocol=undefined, protocol_state=undefined,
- event_handler_state=EvHandlerState}),
- {next_event, internal, {retries, Retry - 1, Reason}}}
+ EvHandlerState = EvHandler:disconnect(#{reason => Reason}, EvHandlerState1),
+ State = State0#state{event_handler_state=EvHandlerState},
+ case Status of
+ {down, DownReason} ->
+ owner_down(DownReason, State);
+ shutdown ->
+ {stop, shutdown, State};
+ {up, _} ->
+ %% We closed the socket, discard any remaining socket events.
+ disconnect_flush(State),
+ %% @todo Stop keepalive timeout, flush message.
+ {KilledStreams, UnprocessedStreams} = Protocol:down(ProtoState),
+ Owner ! {gun_down, self(), Protocol:name(), Reason, KilledStreams, UnprocessedStreams},
+ Retry = maps:get(retry, Opts, 5),
+ case Retry of
+ 0 when Reason =:= normal ->
+ {stop, normal, State};
+ 0 ->
+ {stop, {shutdown, Reason}, State};
+ _ ->
+ {next_state, not_connected,
+ keepalive_cancel(State#state{socket=undefined,
+ protocol=undefined, protocol_state=undefined}),
+ {next_event, internal, {retries, Retry - 1, Reason}}}
+ end
end.
disconnect_flush(State=#state{socket=Socket, messages={OK, Closed, Error}}) ->
@@ -1220,6 +1268,12 @@ active(State=#state{socket=Socket, transport=Transport}) ->
Transport:setopts(Socket, [{active, once}]),
State.
+status(State=#state{status={up, OwnerRef}}, NewStatus) ->
+ demonitor(OwnerRef, [flush]),
+ State#state{status=NewStatus};
+status(State, NewStatus) ->
+ State#state{status=NewStatus}.
+
keepalive_timeout(State=#state{opts=Opts, protocol=Protocol}) ->
{ProtoOptsKey, Default} = case Protocol of
gun_http -> {http_opts, infinity};
diff --git a/src/gun_event.erl b/src/gun_event.erl
index 1e158c4..a984796 100644
--- a/src/gun_event.erl
+++ b/src/gun_event.erl
@@ -113,7 +113,10 @@
-type push_promise_end_event() :: #{
stream_ref := reference(),
reply_to := pid(),
- promised_stream_ref := reference(),
+ %% No stream is created if we receive the push_promise while
+ %% in the process of gracefully shutting down the connection.
+ %% The promised stream is canceled immediately.
+ promised_stream_ref => reference(),
method := binary(),
uri := binary(),
headers := [{binary(), iodata()}]
diff --git a/src/gun_http.erl b/src/gun_http.erl
index ec268ad..309772e 100644
--- a/src/gun_http.erl
+++ b/src/gun_http.erl
@@ -19,6 +19,7 @@
-export([init/4]).
-export([handle/4]).
-export([update_flow/4]).
+-export([closing/4]).
-export([close/4]).
-export([keepalive/1]).
-export([headers/11]).
@@ -71,6 +72,10 @@ check_options(Opts) ->
do_check_options([]) ->
ok;
+do_check_options([{closing_timeout, infinity}|Opts]) ->
+ do_check_options(Opts);
+do_check_options([{closing_timeout, T}|Opts]) when is_integer(T), T > 0 ->
+ do_check_options(Opts);
do_check_options([Opt={content_handlers, Handlers}|Opts]) ->
case gun_content_handler:check_option(Handlers) of
ok -> do_check_options(Opts);
@@ -460,26 +465,47 @@ update_flow(State=#http_state{streams=Streams0}, _ReplyTo, StreamRef, Inc) ->
end || Tuple = #stream{ref=Ref, flow=Flow} <- Streams0],
{state, State#http_state{streams=Streams}}.
-%% @todo Use Reason.
-close(_, State=#http_state{in=body_close, streams=[#stream{ref=StreamRef, reply_to=ReplyTo}|Tail]},
- EvHandler, EvHandlerState0) ->
+%% We can immediately close the connection when there's no streams.
+closing(_, #http_state{streams=[]}, _, EvHandlerState) ->
+ {close, EvHandlerState};
+%% Otherwise we set connection: close (even if the header was not sent)
+%% and close any pipelined streams, only keeping the active stream.
+closing(Reason, State=#http_state{streams=[LastStream|Tail]}, _, EvHandlerState) ->
+ close_streams(Tail, {closing, Reason}),
+ {[
+ {state, State#http_state{connection=close, streams=[LastStream]}},
+ closing(State)
+ ], EvHandlerState}.
+
+closing(#http_state{opts=Opts}) ->
+ Timeout = maps:get(closing_timeout, Opts, 15000),
+ {closing, Timeout}.
+
+close(Reason, State=#http_state{in=body_close,
+ streams=[#stream{ref=StreamRef, reply_to=ReplyTo}|Tail]},
+ EvHandler, EvHandlerState) ->
+ %% We may have more than one stream in case we somehow close abruptly.
+ close_streams(Tail, close_reason(Reason)),
_ = send_data(<<>>, State, fin),
- EvHandlerState = EvHandler:response_end(#{
+ EvHandler:response_end(#{
stream_ref => StreamRef,
reply_to => ReplyTo
- }, EvHandlerState0),
- {close_streams(Tail), EvHandlerState};
-close(_, #http_state{streams=Streams}, _, EvHandlerState) ->
- {close_streams(Streams), EvHandlerState}.
+ }, EvHandlerState);
+close(Reason, #http_state{streams=Streams}, _, EvHandlerState) ->
+ close_streams(Streams, close_reason(Reason)),
+ EvHandlerState.
+
+close_reason(closed) -> closed;
+close_reason(Reason) -> {closed, Reason}.
-close_streams([]) ->
+%% @todo Do we want an event for this?
+close_streams([], _) ->
ok;
-close_streams([#stream{is_alive=false}|Tail]) ->
- close_streams(Tail);
-close_streams([#stream{ref=StreamRef, reply_to=ReplyTo}|Tail]) ->
- ReplyTo ! {gun_error, self(), StreamRef, {closed,
- "The connection was lost."}},
- close_streams(Tail).
+close_streams([#stream{is_alive=false}|Tail], Reason) ->
+ close_streams(Tail, Reason);
+close_streams([#stream{ref=StreamRef, reply_to=ReplyTo}|Tail], Reason) ->
+ ReplyTo ! {gun_error, self(), StreamRef, Reason},
+ close_streams(Tail, Reason).
%% We don't send a keep-alive when a CONNECT request was initiated.
keepalive(State=#http_state{streams=[#stream{ref={connect, _, _}}]}) ->
diff --git a/src/gun_http2.erl b/src/gun_http2.erl
index 3b3b79b..5942037 100644
--- a/src/gun_http2.erl
+++ b/src/gun_http2.erl
@@ -19,6 +19,7 @@
-export([init/4]).
-export([handle/4]).
-export([update_flow/4]).
+-export([closing/4]).
-export([close/4]).
-export([keepalive/1]).
-export([headers/11]).
@@ -53,6 +54,10 @@
content_handlers :: gun_content_handler:opt(),
buffer = <<>> :: binary(),
+ %% Current status of the connection. We use this to ensure we are
+ %% not sending the GOAWAY frame more than once.
+ status = connected :: connected | goaway | closing,
+
%% HTTP/2 state machine.
http2_machine :: cow_http2_machine:http2_machine(),
@@ -66,6 +71,10 @@ check_options(Opts) ->
do_check_options([]) ->
ok;
+do_check_options([{closing_timeout, infinity}|Opts]) ->
+ do_check_options(Opts);
+do_check_options([{closing_timeout, T}|Opts]) when is_integer(T), T > 0 ->
+ do_check_options(Opts);
do_check_options([Opt={content_handlers, Handlers}|Opts]) ->
case gun_content_handler:check_option(Handlers) of
ok -> do_check_options(Opts);
@@ -99,7 +108,8 @@ handle(Data, State=#http2_state{buffer=Buffer}, EvHandler, EvHandlerState) ->
parse(<< Buffer/binary, Data/binary >>, State#http2_state{buffer= <<>>},
EvHandler, EvHandlerState).
-parse(Data, State0=#http2_state{http2_machine=HTTP2Machine}, EvHandler, EvHandlerState0) ->
+parse(Data, State0=#http2_state{status=Status, http2_machine=HTTP2Machine, streams=Streams},
+ EvHandler, EvHandlerState0) ->
MaxFrameSize = cow_http2_machine:get_local_setting(max_frame_size, HTTP2Machine),
case cow_http2:parse(Data, MaxFrameSize) of
{ok, Frame, Rest} ->
@@ -116,7 +126,14 @@ parse(Data, State0=#http2_state{http2_machine=HTTP2Machine}, EvHandler, EvHandle
parse(Rest, reset_stream(State0, StreamID, {stream_error, Reason, Human}),
EvHandler, EvHandlerState0);
Error = {connection_error, _, _} ->
- {terminate(State0, Error), EvHandlerState0};
+ {connection_error(State0, Error), EvHandlerState0};
+ %% If we both received and sent a GOAWAY frame and there are no streams
+ %% currently running, we can close the connection immediately.
+ more when Status =/= connected, Streams =:= [] ->
+ {[{state, State0#http2_state{buffer=Data, status=closing}}, close], EvHandlerState0};
+ %% Otherwise we enter the closing state.
+ more when Status =:= goaway ->
+ {[{state, State0#http2_state{buffer=Data, status=closing}}, closing(State0)], EvHandlerState0};
more ->
{{state, State0#http2_state{buffer=Data}}, EvHandlerState0}
end.
@@ -169,9 +186,8 @@ frame(State=#http2_state{http2_machine=HTTP2Machine0}, Frame, EvHandler, EvHandl
push_promise_frame(State#http2_state{http2_machine=HTTP2Machine},
StreamID, PromisedStreamID, Headers, PseudoHeaders,
EvHandler, EvHandlerState);
- {ok, Frame={goaway, _StreamID, _Reason, _Data}, HTTP2Machine} ->
- {terminate(State#http2_state{http2_machine=HTTP2Machine},
- {stop, Frame, 'Server is going away.'}),
+ {ok, GoAway={goaway, _, _, _}, HTTP2Machine} ->
+ {goaway(State#http2_state{http2_machine=HTTP2Machine}, GoAway),
EvHandlerState};
{send, SendData, HTTP2Machine} ->
send_data(maybe_ack(State#http2_state{http2_machine=HTTP2Machine}, Frame), SendData,
@@ -181,7 +197,7 @@ frame(State=#http2_state{http2_machine=HTTP2Machine0}, Frame, EvHandler, EvHandl
StreamID, {stream_error, Reason, Human}),
EvHandlerState};
{error, Error={connection_error, _, _}, HTTP2Machine} ->
- {terminate(State#http2_state{http2_machine=HTTP2Machine}, Error),
+ {connection_error(State#http2_state{http2_machine=HTTP2Machine}, Error),
EvHandlerState}
end.
@@ -318,7 +334,8 @@ rst_stream_frame(State=#http2_state{streams=Streams0},
end.
%% Pushed streams receive the same initial flow value as the parent stream.
-push_promise_frame(State=#http2_state{streams=Streams},
+push_promise_frame(State=#http2_state{socket=Socket, transport=Transport,
+ status=Status, http2_machine=HTTP2Machine0, streams=Streams},
StreamID, PromisedStreamID, Headers, #{
method := Method, scheme := Scheme,
authority := Authority, path := Path},
@@ -326,25 +343,39 @@ push_promise_frame(State=#http2_state{streams=Streams},
#stream{ref=StreamRef, reply_to=ReplyTo, flow=InitialFlow} = get_stream_by_id(State, StreamID),
PromisedStreamRef = make_ref(),
URI = iolist_to_binary([Scheme, <<"://">>, Authority, Path]),
- ReplyTo ! {gun_push, self(), StreamRef, PromisedStreamRef, Method, URI, Headers},
- EvHandlerState = EvHandler:push_promise_end(#{
+ PushPromiseEvent0 = #{
stream_ref => StreamRef,
reply_to => ReplyTo,
- promised_stream_ref => PromisedStreamRef,
method => Method,
uri => URI,
headers => Headers
- }, EvHandlerState0),
- NewStream = #stream{id=PromisedStreamID, ref=PromisedStreamRef,
- reply_to=ReplyTo, flow=InitialFlow},
- {State#http2_state{streams=[NewStream|Streams]}, EvHandlerState}.
+ },
+ PushPromiseEvent = case Status of
+ connected ->
+ ReplyTo ! {gun_push, self(), StreamRef, PromisedStreamRef, Method, URI, Headers},
+ PushPromiseEvent0#{promised_stream_ref => PromisedStreamRef};
+ _ ->
+ PushPromiseEvent0
+ end,
+ EvHandlerState = EvHandler:push_promise_end(PushPromiseEvent, EvHandlerState0),
+ case Status of
+ connected ->
+ NewStream = #stream{id=PromisedStreamID, ref=PromisedStreamRef,
+ reply_to=ReplyTo, flow=InitialFlow},
+ {State#http2_state{streams=[NewStream|Streams]}, EvHandlerState};
+ %% We cancel the push_promise immediately when we are shutting down.
+ _ ->
+ {ok, HTTP2Machine} = cow_http2_machine:reset_stream(PromisedStreamID, HTTP2Machine0),
+ Transport:send(Socket, cow_http2:rst_stream(PromisedStreamID, cancel)),
+ {State#http2_state{http2_machine=HTTP2Machine}, EvHandlerState}
+ end.
ignored_frame(State=#http2_state{http2_machine=HTTP2Machine0}) ->
case cow_http2_machine:ignored_frame(HTTP2Machine0) of
{ok, HTTP2Machine} ->
State#http2_state{http2_machine=HTTP2Machine};
{error, Error={connection_error, _, _}, HTTP2Machine} ->
- terminate(State#http2_state{http2_machine=HTTP2Machine}, Error)
+ connection_error(State#http2_state{http2_machine=HTTP2Machine}, Error)
end.
update_flow(State=#http2_state{socket=Socket, transport=Transport,
@@ -369,16 +400,72 @@ update_flow(State=#http2_state{socket=Socket, transport=Transport,
[]
end.
-%% @todo Use Reason.
-close(_, #http2_state{streams=Streams}, _, EvHandlerState) ->
- {close_streams(Streams), EvHandlerState}.
+%% We may have to cancel streams even if we receive multiple
+%% GOAWAY frames as the LastStreamID value may be lower than
+%% the one previously received.
+goaway(State0=#http2_state{socket=Socket, transport=Transport, http2_machine=HTTP2Machine,
+ status=Status, streams=Streams0}, {goaway, LastStreamID, Reason, _}) ->
+ Streams = goaway_streams(Streams0, LastStreamID, {goaway, Reason, 'The connection is going away.'}, []),
+ State = State0#http2_state{streams=Streams},
+ case Status of
+ connected ->
+ Transport:send(Socket, cow_http2:goaway(
+ cow_http2_machine:get_last_streamid(HTTP2Machine),
+ no_error, <<>>)),
+ State#http2_state{status=goaway};
+ _ ->
+ State
+ end.
+
+%% Cancel server-initiated streams that are above LastStreamID.
+goaway_streams([], _, _, Acc) ->
+ Acc;
+goaway_streams([Stream=#stream{id=StreamID}|Tail], LastStreamID, Reason, Acc)
+ when StreamID > LastStreamID, (StreamID rem 2) =:= 1 ->
+ close_stream(Stream, Reason),
+ goaway_streams(Tail, LastStreamID, Reason, Acc);
+goaway_streams([Stream|Tail], LastStreamID, Reason, Acc) ->
+ goaway_streams(Tail, LastStreamID, Reason, [Stream|Acc]).
+
+%% We are already closing, do nothing.
+closing(_, #http2_state{status=closing}, _, EvHandlerState) ->
+ {[], EvHandlerState};
+closing(Reason0, State=#http2_state{socket=Socket, transport=Transport,
+ http2_machine=HTTP2Machine}, _, EvHandlerState) ->
+ Reason = case Reason0 of
+ normal -> no_error;
+ owner_down -> no_error;
+ _ -> internal_error
+ end,
+ Transport:send(Socket, cow_http2:goaway(
+ cow_http2_machine:get_last_streamid(HTTP2Machine),
+ Reason, <<>>)),
+ {[
+ {state, State#http2_state{status=closing}},
+ closing(State)
+ ], EvHandlerState}.
-close_streams([]) ->
+closing(#http2_state{opts=Opts}) ->
+ Timeout = maps:get(closing_timeout, Opts, 15000),
+ {closing, Timeout}.
+
+close(Reason, #http2_state{streams=Streams}, _, EvHandlerState) ->
+ close_streams(Streams, close_reason(Reason)),
+ EvHandlerState.
+
+close_reason(closed) -> closed;
+close_reason(Reason) -> {closed, Reason}.
+
+close_streams([], _) ->
ok;
-close_streams([#stream{ref=StreamRef, reply_to=ReplyTo}|Tail]) ->
- ReplyTo ! {gun_error, self(), StreamRef, {closed,
- "The connection was lost."}},
- close_streams(Tail).
+close_streams([Stream|Tail], Reason) ->
+ close_stream(Stream, Reason),
+ close_streams(Tail, Reason).
+
+%% @todo Do we want an event for this?
+close_stream(#stream{ref=StreamRef, reply_to=ReplyTo}, Reason) ->
+ ReplyTo ! {gun_error, self(), StreamRef, Reason},
+ ok.
keepalive(State=#http2_state{socket=Socket, transport=Transport}) ->
Transport:send(Socket, cow_http2:ping(0)),
@@ -429,6 +516,7 @@ request(State=#http2_state{socket=Socket, transport=Transport, opts=Opts,
headers => Headers
},
EvHandlerState1 = EvHandler:request_start(RequestEvent, EvHandlerState0),
+ %% @todo We should not send an empty DATA frame on empty bodies.
{ok, IsFin, HeaderBlock, HTTP2Machine} = cow_http2_machine:prepare_headers(
StreamID, HTTP2Machine1, nofin, PseudoHeaders, Headers),
Transport:send(Socket, cow_http2:headers(StreamID, IsFin, HeaderBlock)),
@@ -586,8 +674,9 @@ down(#http2_state{streams=Streams}) ->
KilledStreams = [Ref || #stream{ref=Ref} <- Streams],
{KilledStreams, []}.
-terminate(#http2_state{socket=Socket, transport=Transport,
- http2_machine=HTTP2Machine, streams=Streams}, Reason) ->
+connection_error(#http2_state{socket=Socket, transport=Transport,
+ http2_machine=HTTP2Machine, streams=Streams},
+ {connection_error, Reason, _}) ->
%% The connection is going away either at the request of the server,
%% or because an error occurred in the protocol. Inform the streams.
%% @todo We should not send duplicate messages to processes.
@@ -597,12 +686,9 @@ terminate(#http2_state{socket=Socket, transport=Transport,
_ = [ReplyTo ! {gun_error, self(), Reason} || #stream{reply_to=ReplyTo} <- Streams],
Transport:send(Socket, cow_http2:goaway(
cow_http2_machine:get_last_streamid(HTTP2Machine),
- terminate_reason(Reason), <<>>)),
+ Reason, <<>>)),
close.
-terminate_reason({connection_error, Reason, _}) -> Reason;
-terminate_reason({stop, _, _}) -> no_error.
-
%% Stream functions.
error_stream_closed(State, StreamRef, ReplyTo) ->
diff --git a/src/gun_ws.erl b/src/gun_ws.erl
index 42cf049..49911dc 100644
--- a/src/gun_ws.erl
+++ b/src/gun_ws.erl
@@ -19,6 +19,7 @@
-export([init/9]).
-export([handle/4]).
-export([update_flow/4]).
+-export([closing/4]).
-export([close/4]).
-export([send/4]).
-export([down/1]).
@@ -38,8 +39,10 @@
stream_ref :: reference(),
socket :: inet:socket() | ssl:sslsocket(),
transport :: module(),
+ opts = #{} :: map(), %% @todo
buffer = <<>> :: binary(),
in = head :: head | #payload{} | close,
+ out = head :: head | close,
frag_state = undefined :: cow_ws:frag_state(),
utf8_state = 0 :: cow_ws:utf8_state(),
extensions = #{} :: cow_ws:extensions(),
@@ -53,6 +56,10 @@ check_options(Opts) ->
do_check_options([]) ->
ok;
+do_check_options([{closing_timeout, infinity}|Opts]) ->
+ do_check_options(Opts);
+do_check_options([{closing_timeout, T}|Opts]) when is_integer(T), T > 0 ->
+ do_check_options(Opts);
do_check_options([{compress, B}|Opts]) when B =:= true; B =:= false ->
do_check_options(Opts);
do_check_options([{default_protocol, M}|Opts]) when is_atom(M) ->
@@ -75,12 +82,15 @@ init(Owner, Socket, Transport, StreamRef, Headers, Extensions, InitialFlow, Hand
Owner ! {gun_upgrade, self(), StreamRef, [<<"websocket">>], Headers},
{ok, HandlerState} = Handler:init(Owner, StreamRef, Headers, Opts),
{switch_protocol, ?MODULE, #ws_state{owner=Owner, stream_ref=StreamRef,
- socket=Socket, transport=Transport, extensions=Extensions,
+ socket=Socket, transport=Transport, opts=Opts, extensions=Extensions,
flow=InitialFlow, handler=Handler, handler_state=HandlerState}}.
%% Do not handle anything if we received a close frame.
-handle(_, State=#ws_state{in=close}, _, EvHandlerState) ->
- {{state, State}, EvHandlerState};
+%% Initiate or terminate the closing state depending on whether we sent a close yet.
+handle(_, State=#ws_state{in=close, out=close}, _, EvHandlerState) ->
+ {[{state, State}, close], EvHandlerState};
+handle(_, State=#ws_state{in=close}, EvHandler, EvHandlerState) ->
+ closing(normal, State, EvHandler, EvHandlerState);
%% Shortcut for common case when Data is empty after processing a frame.
handle(<<>>, State=#ws_state{in=head}, _, EvHandlerState) ->
maybe_active(State, EvHandlerState);
@@ -119,7 +129,7 @@ handle(Data, State=#ws_state{owner=ReplyTo, stream_ref=StreamRef, buffer=Buffer,
more ->
maybe_active(State#ws_state{buffer=Data2}, EvHandlerState1);
error ->
- close({error, badframe}, State, EvHandler, EvHandlerState1)
+ closing({error, badframe}, State, EvHandler, EvHandlerState1)
end;
handle(Data, State=#ws_state{in=In=#payload{type=Type, rsv=Rsv, len=Len, mask_key=MaskKey,
close_code=CloseCode, unmasked=Unmasked, unmasked_len=UnmaskedLen}, frag_state=FragState,
@@ -143,7 +153,7 @@ handle(Data, State=#ws_state{in=In=#payload{type=Type, rsv=Rsv, len=Len, mask_ke
len=Len - byte_size(Data), unmasked_len=UnmaskedLen + byte_size(Data)}, utf8_state=Utf8State2},
EvHandlerState);
Error = {error, _Reason} ->
- close(Error, State, EvHandler, EvHandlerState)
+ closing(Error, State, EvHandler, EvHandlerState)
end.
maybe_active(State=#ws_state{flow=Flow}, EvHandlerState) ->
@@ -165,11 +175,11 @@ dispatch(Rest, State0=#ws_state{owner=ReplyTo, stream_ref=StreamRef,
}, EvHandlerState0),
case cow_ws:make_frame(Type, Payload, CloseCode, FragState) of
ping ->
- {{state, State}, EvHandlerState} = send(pong, State0, EvHandler, EvHandlerState1),
- handle(Rest, State, EvHandler, EvHandlerState);
+ {[], EvHandlerState} = send(pong, State0, EvHandler, EvHandlerState1),
+ handle(Rest, State0, EvHandler, EvHandlerState);
{ping, Payload} ->
- {{state, State}, EvHandlerState} = send({pong, Payload}, State0, EvHandler, EvHandlerState1),
- handle(Rest, State, EvHandler, EvHandlerState);
+ {[], EvHandlerState} = send({pong, Payload}, State0, EvHandler, EvHandlerState1),
+ handle(Rest, State0, EvHandler, EvHandlerState);
pong ->
handle(Rest, State0, EvHandler, EvHandlerState1);
{pong, _} ->
@@ -200,26 +210,30 @@ update_flow(State=#ws_state{flow=Flow0}, _ReplyTo, _StreamRef, Inc) ->
{active, Flow > 0}
].
-close(Reason, State, EvHandler, EvHandlerState) ->
- case Reason of
- normal ->
- send({close, 1000, <<>>}, State, EvHandler, EvHandlerState);
- owner_down ->
- send({close, 1001, <<>>}, State, EvHandler, EvHandlerState);
- {error, badframe} ->
- send({close, 1002, <<>>}, State, EvHandler, EvHandlerState);
- {error, badencoding} ->
- send({close, 1007, <<>>}, State, EvHandler, EvHandlerState);
- %% Socket errors; do nothing.
- closed ->
- {ok, EvHandlerState};
- {error, _} ->
- {ok, EvHandlerState}
- end.
+%% The user already sent the close frame; do nothing.
+closing(_, State=#ws_state{out=close}, _, EvHandlerState) ->
+ {closing(State), EvHandlerState};
+closing(Reason, State, EvHandler, EvHandlerState) ->
+ Code = case Reason of
+ normal -> 1000;
+ owner_down -> 1001;
+ shutdown -> 1001;
+ {error, badframe} -> 1002;
+ {error, badencoding} -> 1007
+ end,
+ send({close, Code, <<>>}, State, EvHandler, EvHandlerState).
+closing(#ws_state{opts=Opts}) ->
+ Timeout = maps:get(closing_timeout, Opts, 15000),
+ {closing, Timeout}.
+
+close(_, _, _, EvHandlerState) ->
+ EvHandlerState.
+
+%% Send one frame.
send(Frame, State=#ws_state{owner=ReplyTo, stream_ref=StreamRef,
- socket=Socket, transport=Transport, extensions=Extensions},
- EvHandler, EvHandlerState0) ->
+ socket=Socket, transport=Transport, in=In, extensions=Extensions},
+ EvHandler, EvHandlerState0) when not is_list(Frame) ->
WsSendFrameEvent = #{
stream_ref => StreamRef,
reply_to => ReplyTo,
@@ -229,10 +243,28 @@ send(Frame, State=#ws_state{owner=ReplyTo, stream_ref=StreamRef,
EvHandlerState1 = EvHandler:ws_send_frame_start(WsSendFrameEvent, EvHandlerState0),
Transport:send(Socket, cow_ws:masked_frame(Frame, Extensions)),
EvHandlerState = EvHandler:ws_send_frame_end(WsSendFrameEvent, EvHandlerState1),
- case Frame of
- close -> {close, EvHandlerState};
- {close, _, _} -> {close, EvHandlerState};
- _ -> {{state, State}, EvHandlerState}
+ if
+ Frame =:= close; element(1, Frame) =:= close ->
+ {[
+ {state, State#ws_state{out=close}},
+ %% We can close immediately if we already received a close frame.
+ case In of
+ close -> close;
+ _ -> closing(State)
+ end
+ ], EvHandlerState};
+ true ->
+ {[], EvHandlerState}
+ end;
+%% Send many frames.
+send([], _, _, EvHandlerState) ->
+ {[], EvHandlerState};
+send([Frame|Tail], State, EvHandler, EvHandlerState0) ->
+ case send(Frame, State, EvHandler, EvHandlerState0) of
+ {[], EvHandlerState} ->
+ send(Tail, State, EvHandler, EvHandlerState);
+ Other ->
+ Other
end.
%% Websocket has no concept of streams.