aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorLoïc Hoguin <[email protected]>2018-12-19 14:45:39 +0100
committerLoïc Hoguin <[email protected]>2018-12-19 14:45:39 +0100
commit9c579eb9b37836b6deacd3f0b81da8a0d1ee72a6 (patch)
tree15e7f54d0e5f938c3eb4d02e655cc99665b53555 /src
parentd1cda6d1f05b672bc29bea5e84de6b0bb6815863 (diff)
downloadgun-9c579eb9b37836b6deacd3f0b81da8a0d1ee72a6.tar.gz
gun-9c579eb9b37836b6deacd3f0b81da8a0d1ee72a6.tar.bz2
gun-9c579eb9b37836b6deacd3f0b81da8a0d1ee72a6.zip
Convert the gun process to gen_statem
Diffstat (limited to 'src')
-rw-r--r--src/gun.erl545
-rw-r--r--src/gun_http.erl7
-rw-r--r--src/gun_http2.erl5
-rw-r--r--src/gun_ws.erl25
4 files changed, 271 insertions, 311 deletions
diff --git a/src/gun.erl b/src/gun.erl
index 16349e1..1e83979 100644
--- a/src/gun.erl
+++ b/src/gun.erl
@@ -13,6 +13,7 @@
%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
-module(gun).
+-behavior(gen_statem).
-ifdef(OTP_RELEASE).
-compile({nowarn_deprecated_function, [{erlang, get_stacktrace, 0}]}).
@@ -85,10 +86,10 @@
%% Internals.
-export([start_link/4]).
--export([proc_lib_hack/5]).
--export([system_continue/3]).
--export([system_terminate/4]).
--export([system_code_change/4]).
+-export([callback_mode/0]).
+-export([init/1]).
+-export([not_connected/3]).
+-export([connected/3]).
-type headers() :: [{binary(), iodata()}].
@@ -162,7 +163,6 @@
-export_type([ws_opts/0]).
-record(state, {
- parent :: pid(),
owner :: pid(),
owner_ref :: reference(),
host :: inet:hostname() | inet:ip_address(),
@@ -174,6 +174,7 @@
keepalive_ref :: undefined | reference(),
socket :: undefined | inet:socket() | ssl:sslsocket(),
transport :: module(),
+ messages :: {atom(), atom(), atom()},
protocol :: module(),
protocol_state :: any(),
last_error :: any()
@@ -309,8 +310,7 @@ close(ServerPid) ->
-spec shutdown(pid()) -> ok.
shutdown(ServerPid) ->
- _ = ServerPid ! {shutdown, self()},
- ok.
+ gen_statem:cast(ServerPid, {shutdown, self()}).
%% Requests.
@@ -411,15 +411,14 @@ request(ServerPid, Method, Path, Headers, Body) ->
request(ServerPid, Method, Path, Headers, Body, ReqOpts) ->
StreamRef = make_ref(),
ReplyTo = maps:get(reply_to, ReqOpts, self()),
- _ = ServerPid ! {request, ReplyTo, StreamRef, Method, Path, Headers, Body},
+ gen_statem:cast(ServerPid, {request, ReplyTo, StreamRef, Method, Path, Headers, Body}),
StreamRef.
%% Streaming data.
-spec data(pid(), reference(), fin | nofin, iodata()) -> ok.
data(ServerPid, StreamRef, IsFin, Data) ->
- _ = ServerPid ! {data, self(), StreamRef, IsFin, Data},
- ok.
+ gen_statem:cast(ServerPid, {data, self(), StreamRef, IsFin, Data}).
%% Tunneling.
@@ -435,7 +434,7 @@ connect(ServerPid, Destination, Headers) ->
connect(ServerPid, Destination, Headers, ReqOpts) ->
StreamRef = make_ref(),
ReplyTo = maps:get(reply_to, ReqOpts, self()),
- _ = ServerPid ! {connect, ReplyTo, StreamRef, Destination, Headers},
+ gen_statem:cast(ServerPid, {connect, ReplyTo, StreamRef, Destination, Headers}),
StreamRef.
%% Awaiting gun messages.
@@ -606,8 +605,7 @@ flush_ref(StreamRef) ->
-spec cancel(pid(), reference()) -> ok.
cancel(ServerPid, StreamRef) ->
- _ = ServerPid ! {cancel, self(), StreamRef},
- ok.
+ gen_statem:cast(ServerPid, {cancel, self(), StreamRef}).
%% @todo Allow upgrading an HTTP/1.1 connection to HTTP/2.
%% http2_upgrade
@@ -621,83 +619,80 @@ ws_upgrade(ServerPid, Path) ->
-spec ws_upgrade(pid(), iodata(), headers()) -> reference().
ws_upgrade(ServerPid, Path, Headers) ->
StreamRef = make_ref(),
- _ = ServerPid ! {ws_upgrade, self(), StreamRef, Path, Headers},
+ gen_statem:cast(ServerPid, {ws_upgrade, self(), StreamRef, Path, Headers}),
StreamRef.
-spec ws_upgrade(pid(), iodata(), headers(), ws_opts()) -> reference().
ws_upgrade(ServerPid, Path, Headers, Opts) ->
ok = gun_ws:check_options(Opts),
StreamRef = make_ref(),
- _ = ServerPid ! {ws_upgrade, self(), StreamRef, Path, Headers, Opts},
+ gen_statem:cast(ServerPid, {ws_upgrade, self(), StreamRef, Path, Headers, Opts}),
StreamRef.
%% @todo ws_send/2 will need to be deprecated in favor of a variant with StreamRef.
%% But it can be kept for the time being since it can still work for HTTP/1.1.
-spec ws_send(pid(), ws_frame() | [ws_frame()]) -> ok.
ws_send(ServerPid, Frames) ->
- _ = ServerPid ! {ws_send, self(), Frames},
- ok.
+ gen_statem:cast(ServerPid, {ws_send, self(), Frames}).
%% Internals.
+callback_mode() -> state_functions.
+
start_link(Owner, Host, Port, Opts) ->
- proc_lib:start_link(?MODULE, proc_lib_hack,
- [self(), Owner, Host, Port, Opts]).
-
-proc_lib_hack(Parent, Owner, Host, Port, Opts) ->
- try
- init(Parent, Owner, Host, Port, Opts)
- catch
- _:normal -> exit(normal);
- _:shutdown -> exit(shutdown);
- _:Reason = {shutdown, _} -> exit(Reason);
- _:Reason -> exit({Reason, erlang:get_stacktrace()})
- end.
+ gen_statem:start_link(?MODULE, {Owner, Host, Port, Opts}, []).
-init(Parent, Owner, Host, Port, Opts) ->
- ok = proc_lib:init_ack(Parent, {ok, self()}),
+init({Owner, Host, Port, Opts}) ->
Retry = maps:get(retry, Opts, 5),
Transport = case maps:get(transport, Opts, default_transport(Port)) of
tcp -> gun_tcp;
tls -> gun_tls
end,
OwnerRef = monitor(process, Owner),
- transport_connect(#state{parent=Parent, owner=Owner, owner_ref=OwnerRef,
+ State = #state{owner=Owner, owner_ref=OwnerRef,
host=Host, port=Port, origin_host=Host, origin_port=Port,
- opts=Opts, transport=Transport}, Retry).
+ opts=Opts, transport=Transport, messages=Transport:messages()},
+ {ok, not_connected, State,
+ {next_event, internal, {retries, Retry}}}.
default_transport(443) -> tls;
default_transport(_) -> tcp.
-transport_connect(State=#state{host=Host, port=Port, opts=Opts, transport=Transport=gun_tls}, Retries) ->
- TransportOpts = [binary, {active, false}|ensure_alpn(
- maps:get(protocols, Opts, [http2, http]),
- maps:get(transport_opts, Opts, []))],
- case Transport:connect(Host, Port, TransportOpts, maps:get(connect_timeout, Opts, infinity)) of
- {ok, Socket} ->
- {Protocol, ProtoOptsKey} = case ssl:negotiated_protocol(Socket) of
- {ok, <<"h2">>} -> {gun_http2, http2_opts};
- _ -> {gun_http, http_opts}
+not_connected(_, {retries, Retries},
+ State=#state{host=Host, port=Port, opts=Opts, transport=Transport}) ->
+ TransOpts0 = maps:get(transport_opts, Opts, []),
+ TransOpts1 = case Transport of
+ gun_tcp -> TransOpts0;
+ gun_tls -> ensure_alpn(maps:get(protocols, Opts, [http2, http]), TransOpts0)
+ end,
+ TransOpts = [binary, {active, false}|TransOpts1],
+ ConnectTimeout = maps:get(connect_timeout, Opts, infinity),
+ case Transport:connect(Host, Port, TransOpts, ConnectTimeout) of
+ {ok, Socket} when Transport =:= gun_tcp ->
+ Protocol = case maps:get(protocols, Opts, [http]) of
+ [http] -> gun_http;
+ [http2] -> gun_http2
end,
- up(State, Socket, Protocol, ProtoOptsKey);
- {error, Reason} ->
- retry(State#state{last_error=Reason}, Retries)
- end;
-transport_connect(State=#state{host=Host, port=Port, opts=Opts, transport=Transport}, Retries) ->
- TransportOpts = [binary, {active, false}
- |maps:get(transport_opts, Opts, [])],
- case Transport:connect(Host, Port, TransportOpts, maps:get(connect_timeout, Opts, infinity)) of
- {ok, Socket} ->
- {Protocol, ProtoOptsKey} = case maps:get(protocols, Opts, [http]) of
- [http] -> {gun_http, http_opts};
- [http2] -> {gun_http2, http2_opts}
+ {next_state, connected, State,
+ {next_event, internal, {connected, Socket, Protocol}}};
+ {ok, Socket} when Transport =:= gun_tls ->
+ Protocol = case ssl:negotiated_protocol(Socket) of
+ {ok, <<"h2">>} -> gun_http2;
+ _ -> gun_http
end,
- up(State, Socket, Protocol, ProtoOptsKey);
+ {next_state, connected, State,
+ {next_event, internal, {connected, Socket, Protocol}}};
+ {error, Reason} when Retries =:= 0 ->
+ {stop, {shutdown, Reason}};
{error, Reason} ->
- retry(State#state{last_error=Reason}, Retries)
- end.
+ Timeout = maps:get(retry_timeout, Opts, 5000),
+ {keep_state, State#state{last_error=Reason},
+ {state_timeout, Timeout, {retries, Retries - 1}}}
+ end;
+not_connected(Type, Event, State) ->
+ handle_common(Type, Event, ?FUNCTION_NAME, State).
-ensure_alpn(Protocols0, TransportOpts) ->
+ensure_alpn(Protocols0, TransOpts) ->
Protocols = [case P of
http -> <<"http/1.1">>;
http2 -> <<"h2">>
@@ -705,181 +700,152 @@ ensure_alpn(Protocols0, TransportOpts) ->
[
{alpn_advertised_protocols, Protocols},
{client_preferred_next_protocols, {client, Protocols, <<"http/1.1">>}}
- |TransportOpts].
+ |TransOpts].
-up(State=#state{owner=Owner, opts=Opts, transport=Transport}, Socket, Protocol, ProtoOptsKey) ->
- ProtoOpts = maps:get(ProtoOptsKey, Opts, #{}),
- ProtoState = Protocol:init(Owner, Socket, Transport, ProtoOpts),
- Owner ! {gun_up, self(), Protocol:name()},
- before_loop(State#state{socket=Socket, protocol=Protocol, protocol_state=ProtoState}).
-
-down(State=#state{owner=Owner, opts=Opts, protocol=Protocol, protocol_state=ProtoState}, Reason) ->
- {KilledStreams, UnprocessedStreams} = Protocol:down(ProtoState),
- Owner ! {gun_down, self(), Protocol:name(), Reason, KilledStreams, UnprocessedStreams},
- retry(State#state{socket=undefined, protocol=undefined, protocol_state=undefined,
- last_error=Reason}, maps:get(retry, Opts, 5)).
-
-retry(#state{last_error=Reason}, 0) ->
- exit({shutdown, Reason});
-retry(State=#state{keepalive_ref=KeepaliveRef}, Retries) when is_reference(KeepaliveRef) ->
- _ = erlang:cancel_timer(KeepaliveRef),
- %% Flush if we have a keepalive message
- receive
- keepalive -> ok
- after 0 ->
- ok
- end,
- retry_loop(State#state{keepalive_ref=undefined}, Retries - 1);
-retry(State, Retries) ->
- retry_loop(State, Retries - 1).
-
-retry_loop(State=#state{parent=Parent, opts=Opts}, Retries) ->
- _ = erlang:send_after(maps:get(retry_timeout, Opts, 5000), self(), retry),
- receive
- retry ->
- transport_connect(State, Retries);
- {system, From, Request} ->
- sys:handle_system_msg(Request, From, Parent, ?MODULE, [],
- {retry_loop, State, Retries})
- end.
-
-before_loop(State=#state{opts=Opts, protocol=Protocol}) ->
- %% @todo Might not be worth checking every time?
+connected(internal, {connected, Socket, Protocol},
+ State=#state{owner=Owner, opts=Opts, transport=Transport}) ->
ProtoOptsKey = case Protocol of
gun_http -> http_opts;
gun_http2 -> http2_opts
end,
ProtoOpts = maps:get(ProtoOptsKey, Opts, #{}),
- Keepalive = maps:get(keepalive, ProtoOpts, 5000),
- KeepaliveRef = case Keepalive of
- infinity -> undefined;
- _ -> erlang:send_after(Keepalive, self(), keepalive)
+ ProtoState = Protocol:init(Owner, Socket, Transport, ProtoOpts),
+ Owner ! {gun_up, self(), Protocol:name()},
+ {keep_state, keepalive_timeout(active(State#state{socket=Socket,
+ protocol=Protocol, protocol_state=ProtoState}))};
+%% Socket events.
+connected(info, {OK, Socket, Data}, State=#state{socket=Socket, messages={OK, _, _},
+ protocol=Protocol, protocol_state=ProtoState}) ->
+ commands(Protocol:handle(Data, ProtoState), active(State));
+connected(info, {Closed, Socket}, State=#state{socket=Socket, messages={_, Closed, _}}) ->
+ disconnect(State, closed);
+connected(info, {Error, Socket, Reason}, State=#state{socket=Socket, messages={_, _, Error}}) ->
+ disconnect(State, {error, Reason});
+%% Timeouts.
+%% @todo HTTP/2 requires more timeouts than just the keepalive timeout.
+%% We should have a timeout function in protocols that deal with
+%% received timeouts. Currently the timeout messages are ignored.
+connected(info, keepalive, State=#state{protocol=Protocol, protocol_state=ProtoState}) ->
+ ProtoState2 = Protocol:keepalive(ProtoState),
+ {keep_state, keepalive_timeout(State#state{protocol_state=ProtoState2})};
+%% Public HTTP interface.
+connected(cast, {request, ReplyTo, StreamRef, Method, Path, Headers, Body},
+ State=#state{origin_host=Host, origin_port=Port,
+ protocol=Protocol, protocol_state=ProtoState}) ->
+ ProtoState2 = case Body of
+ <<>> -> Protocol:request(ProtoState,
+ StreamRef, ReplyTo, Method, Host, Port, Path, Headers);
+ _ -> Protocol:request(ProtoState,
+ StreamRef, ReplyTo, Method, Host, Port, Path, Headers, Body)
end,
- loop(State#state{keepalive_ref=KeepaliveRef}).
-
-loop(State=#state{parent=Parent, owner=Owner, owner_ref=OwnerRef,
- origin_host=Host, origin_port=Port, opts=Opts, socket=Socket,
- transport=Transport, protocol=Protocol, protocol_state=ProtoState}) ->
- {OK, Closed, Error} = Transport:messages(),
- Transport:setopts(Socket, [{active, once}]),
- receive
- {OK, Socket, Data} ->
- case Protocol:handle(Data, ProtoState) of
- Commands when is_list(Commands) ->
- commands(Commands, State);
- Command ->
- commands([Command], State)
- end;
- {Closed, Socket} ->
- Protocol:close(ProtoState),
- Transport:close(Socket),
- down(State, closed);
- {Error, Socket, Reason} ->
- Protocol:close(ProtoState),
- Transport:close(Socket),
- down(State, {error, Reason});
- {OK, _PreviousSocket, _Data} ->
- loop(State);
- {Closed, _PreviousSocket} ->
- loop(State);
- {Error, _PreviousSocket, _} ->
- loop(State);
- keepalive ->
- ProtoState2 = Protocol:keepalive(ProtoState),
- before_loop(State#state{protocol_state=ProtoState2});
- {request, ReplyTo, StreamRef, Method, Path, Headers, <<>>} ->
- ProtoState2 = Protocol:request(ProtoState,
- StreamRef, ReplyTo, Method, Host, Port, Path, Headers),
- loop(State#state{protocol_state=ProtoState2});
- {request, ReplyTo, StreamRef, Method, Path, Headers, Body} ->
- ProtoState2 = Protocol:request(ProtoState,
- StreamRef, ReplyTo, Method, Host, Port, Path, Headers, Body),
- loop(State#state{protocol_state=ProtoState2});
- %% @todo Do we want to reject ReplyTo if it's not the process
- %% who initiated the connection? For both data and cancel.
- {data, ReplyTo, StreamRef, IsFin, Data} ->
- ProtoState2 = Protocol:data(ProtoState,
- StreamRef, ReplyTo, IsFin, Data),
- loop(State#state{protocol_state=ProtoState2});
- {connect, ReplyTo, StreamRef, Destination0, Headers} ->
- %% The protocol option has been deprecated in favor of the protocols option.
- %% Nobody probably ended up using it, but let's not break the interface.
- Destination1 = case Destination0 of
- #{protocols := _} ->
- Destination0;
- #{protocol := DestProto} ->
- Destination0#{protocols => [DestProto]};
- _ ->
- Destination0
- end,
- Destination = case Destination1 of
- #{transport := tls} ->
- Destination1#{tls_opts => ensure_alpn(
- maps:get(protocols, Destination1, [http]),
- maps:get(tls_opts, Destination1, []))};
- _ ->
- Destination1
- end,
- ProtoState2 = Protocol:connect(ProtoState, StreamRef, ReplyTo, Destination, Headers),
- loop(State#state{protocol_state=ProtoState2});
- {cancel, ReplyTo, StreamRef} ->
- ProtoState2 = Protocol:cancel(ProtoState, StreamRef, ReplyTo),
- loop(State#state{protocol_state=ProtoState2});
- %% @todo Maybe make an interface in the protocol module instead of checking on protocol name.
- %% An interface would also make sure that HTTP/1.0 can't upgrade.
- {ws_upgrade, Owner, StreamRef, Path, Headers} when Protocol =:= gun_http ->
- WsOpts = maps:get(ws_opts, Opts, #{}),
- ProtoState2 = Protocol:ws_upgrade(ProtoState, StreamRef, Host, Port, Path, Headers, WsOpts),
- loop(State#state{protocol_state=ProtoState2});
- {ws_upgrade, Owner, StreamRef, Path, Headers, WsOpts} when Protocol =:= gun_http ->
- ProtoState2 = Protocol:ws_upgrade(ProtoState, StreamRef, Host, Port, Path, Headers, WsOpts),
- loop(State#state{protocol_state=ProtoState2});
- %% @todo can fail if http/1.0
- {shutdown, Owner} ->
- %% @todo Protocol:shutdown?
- ok;
- {'DOWN', OwnerRef, process, Owner, Reason} ->
- Protocol:close(ProtoState),
- Transport:close(Socket),
- owner_gone(Reason);
- {system, From, Request} ->
- sys:handle_system_msg(Request, From, Parent, ?MODULE, [],
- {loop, State});
- %% @todo HTTP/2 requires more timeouts than just the keepalive timeout.
- %% We should have a timeout function in protocols that deal with
- %% received timeouts. Currently the timeout messages are ignored.
- {ws_upgrade, _, StreamRef, _, _} ->
- Owner ! {gun_error, self(), StreamRef, {badstate,
- "Websocket is only supported over HTTP/1.1."}},
- loop(State);
- {ws_upgrade, _, StreamRef, _, _, _} ->
- Owner ! {gun_error, self(), StreamRef, {badstate,
- "Websocket is only supported over HTTP/1.1."}},
- loop(State);
- {ws_send, _, _} ->
- Owner ! {gun_error, self(), {badstate,
- "Connection needs to be upgraded to Websocket "
- "before the gun:ws_send/1 function can be used."}},
- loop(State);
- %% @todo The ReplyTo patch disabled the notowner behavior.
- %% We need to add an option to enforce this behavior if needed.
- Any when is_tuple(Any), is_pid(element(2, Any)) ->
- element(2, Any) ! {gun_error, self(), {notowner,
- "Operations are restricted to the owner of the connection."}},
- loop(State);
- Any ->
- error_logger:error_msg("Unexpected message: ~w~n", [Any]),
- loop(State)
- end.
-
+ {keep_state, State#state{protocol_state=ProtoState2}};
+%% @todo Do we want to reject ReplyTo if it's not the process
+%% who initiated the connection? For both data and cancel.
+connected(cast, {data, ReplyTo, StreamRef, IsFin, Data},
+ State=#state{protocol=Protocol, protocol_state=ProtoState}) ->
+ ProtoState2 = Protocol:data(ProtoState, StreamRef, ReplyTo, IsFin, Data),
+ {keep_state, State#state{protocol_state=ProtoState2}};
+connected(cast, {connect, ReplyTo, StreamRef, Destination0, Headers},
+ State=#state{protocol=Protocol, protocol_state=ProtoState}) ->
+ %% The protocol option has been deprecated in favor of the protocols option.
+ %% Nobody probably ended up using it, but let's not break the interface.
+ Destination1 = case Destination0 of
+ #{protocols := _} ->
+ Destination0;
+ #{protocol := DestProto} ->
+ Destination0#{protocols => [DestProto]};
+ _ ->
+ Destination0
+ end,
+ Destination = case Destination1 of
+ #{transport := tls} ->
+ Destination1#{tls_opts => ensure_alpn(
+ maps:get(protocols, Destination1, [http]),
+ maps:get(tls_opts, Destination1, []))};
+ _ ->
+ Destination1
+ end,
+ ProtoState2 = Protocol:connect(ProtoState, StreamRef, ReplyTo, Destination, Headers),
+ {keep_state, State#state{protocol_state=ProtoState2}};
+connected(cast, {cancel, ReplyTo, StreamRef},
+ State=#state{protocol=Protocol, protocol_state=ProtoState}) ->
+ ProtoState2 = Protocol:cancel(ProtoState, StreamRef, ReplyTo),
+ {keep_state, State#state{protocol_state=ProtoState2}};
+%% Public Websocket interface.
+%% @todo Maybe make an interface in the protocol module instead of checking on protocol name.
+%% An interface would also make sure that HTTP/1.0 can't upgrade.
+connected(cast, {ws_upgrade, Owner, StreamRef, Path, Headers},
+ State=#state{owner=Owner, origin_host=Host, origin_port=Port, opts=Opts,
+ protocol=Protocol, protocol_state=ProtoState})
+ when Protocol =:= gun_http ->
+ WsOpts = maps:get(ws_opts, Opts, #{}),
+ ProtoState2 = Protocol:ws_upgrade(ProtoState, StreamRef, Host, Port, Path, Headers, WsOpts),
+ {keep_state, State#state{protocol_state=ProtoState2}};
+connected(cast, {ws_upgrade, Owner, StreamRef, Path, Headers, WsOpts},
+ State=#state{owner=Owner, origin_host=Host, origin_port=Port,
+ protocol=Protocol, protocol_state=ProtoState})
+ when Protocol =:= gun_http ->
+ ProtoState2 = Protocol:ws_upgrade(ProtoState, StreamRef, Host, Port, Path, Headers, WsOpts),
+ {keep_state, State#state{protocol_state=ProtoState2}};
+ %% @todo can fail if http/1.0
+%% @todo Probably don't error out here, have a protocol function/command.
+connected(cast, {ws_upgrade, ReplyTo, StreamRef, _, _}, _) ->
+ ReplyTo ! {gun_error, self(), StreamRef, {badstate,
+ "Websocket is only supported over HTTP/1.1."}},
+ keep_state_and_data;
+connected(cast, {ws_upgrade, ReplyTo, StreamRef, _, _, _}, _) ->
+ ReplyTo ! {gun_error, self(), StreamRef, {badstate,
+ "Websocket is only supported over HTTP/1.1."}},
+ keep_state_and_data;
+connected(cast, {ws_send, Owner, Frame},
+ State=#state{owner=Owner, protocol=Protocol=gun_ws, protocol_state=ProtoState}) ->
+ commands(Protocol:send(Frame, ProtoState), State);
+connected(cast, {ws_send, ReplyTo, _}, _) ->
+ ReplyTo ! {gun_error, self(), {badstate,
+ "Connection needs to be upgraded to Websocket "
+ "before the gun:ws_send/1 function can be used."}},
+ keep_state_and_data;
+connected(Type, Event, State) ->
+ handle_common(Type, Event, ?FUNCTION_NAME, State).
+
+%% Common events.
+handle_common(cast, {shutdown, Owner}, _, #state{owner=Owner}) ->
+ %% @todo Graceful shutdown.
+ stop;
+%% We stop when the owner is gone.
+handle_common(info, {'DOWN', OwnerRef, process, Owner, Reason}, _, #state{
+ owner=Owner, owner_ref=OwnerRef, socket=Socket, transport=Transport,
+ protocol=Protocol, protocol_state=ProtoState}) ->
+ _ = case Protocol of
+ undefined -> ok;
+ _ -> Protocol:close(owner_gone, ProtoState)
+ end,
+ _ = case Socket of
+ undefined -> ok;
+ _ -> Transport:close(Socket)
+ end,
+ owner_gone(Reason);
+handle_common({call, From}, _, _, _) ->
+ {keep_state_and_data, {reply, From, {error, bad_call}}};
+%% @todo The ReplyTo patch disabled the notowner behavior.
+%% We need to add an option to enforce this behavior if needed.
+handle_common(cast, Any, _, #state{owner=Owner}) when element(2, Any) =/= Owner ->
+ element(2, Any) ! {gun_error, self(), {notowner,
+ "Operations are restricted to the owner of the connection."}},
+ keep_state_and_data;
+handle_common(Type, Event, StateName, StateData) ->
+ error_logger:error_msg("Unexpected event in state ~p of type ~p:~n~w~n~p~n",
+ [StateName, Type, Event, StateData]),
+ keep_state_and_data.
+
+commands(Command, State) when not is_list(Command) ->
+ commands([Command], State);
commands([], State) ->
- loop(State);
-commands([close|_], State=#state{socket=Socket, transport=Transport}) ->
- Transport:close(Socket),
- down(State, normal);
-commands([Error={error, _}|_], State=#state{socket=Socket, transport=Transport}) ->
- Transport:close(Socket),
- down(State, Error);
+ {keep_state, State};
+commands([close|_], State) ->
+ disconnect(State, normal);
+commands([Error={error, _}|_], State) ->
+ disconnect(State, Error);
commands([{state, ProtoState}|Tail], State) ->
commands(Tail, State#state{protocol_state=ProtoState});
%% @todo The scheme should probably not be ignored.
@@ -904,80 +870,73 @@ commands([{switch_transport, Transport, Socket}|Tail], State) ->
commands(Tail, State#state{socket=Socket, transport=Transport});
%% @todo The two loops should be reunified and this clause generalized.
commands([{switch_protocol, Protocol=gun_ws, ProtoState}], State) ->
- ws_loop(State#state{protocol=Protocol, protocol_state=ProtoState});
+ {keep_state, keepalive_cancel(State#state{protocol=Protocol, protocol_state=ProtoState})};
%% @todo And this state should probably not be ignored.
commands([{switch_protocol, Protocol, _ProtoState0}|Tail],
State=#state{owner=Owner, opts=Opts, socket=Socket, transport=Transport}) ->
ProtoOpts = maps:get(http2_opts, Opts, #{}),
ProtoState = Protocol:init(Owner, Socket, Transport, ProtoOpts),
- commands(Tail, State#state{protocol=Protocol, protocol_state=ProtoState}).
+ commands(Tail, keepalive_timeout(State#state{protocol=Protocol, protocol_state=ProtoState})).
-ws_loop(State=#state{parent=Parent, owner=Owner, owner_ref=OwnerRef, socket=Socket,
- transport=Transport, protocol=Protocol, protocol_state=ProtoState}) ->
- {OK, Closed, Error} = Transport:messages(),
- Transport:setopts(Socket, [{active, once}]),
+disconnect(State=#state{owner=Owner, opts=Opts,
+ socket=Socket, transport=Transport,
+ protocol=Protocol, protocol_state=ProtoState}, Reason) ->
+ Protocol:close(Reason, ProtoState),
+ %% @todo Need a special state for orderly shutdown of a connection.
+ Transport:close(Socket),
+ %% We closed the socket, discard any remaining socket events.
+ disconnect_flush(State),
+ %% @todo Stop keepalive timeout, flush message.
+ {KilledStreams, UnprocessedStreams} = Protocol:down(ProtoState),
+ Owner ! {gun_down, self(), Protocol:name(), Reason, KilledStreams, UnprocessedStreams},
+ Retry = maps:get(retry, Opts, 5),
+ {next_state, not_connected,
+ keepalive_cancel(State#state{socket=undefined,
+ protocol=undefined, protocol_state=undefined, last_error=Reason}),
+ {next_event, internal, {retries, Retry}}}.
+
+disconnect_flush(State=#state{socket=Socket, messages={OK, Closed, Error}}) ->
receive
- {OK, Socket, Data} ->
- case Protocol:handle(Data, ProtoState) of
- close ->
- Transport:close(Socket),
- down(State, normal);
- ProtoState2 ->
- ws_loop(State#state{protocol_state=ProtoState2})
- end;
- {Closed, Socket} ->
- Transport:close(Socket),
- down(State, closed);
- {Error, Socket, Reason} ->
- Transport:close(Socket),
- down(State, {error, Reason});
- %% Ignore any previous HTTP keep-alive.
- keepalive ->
- ws_loop(State);
-% {ws_send, Owner, Frames} when is_list(Frames) ->
-% todo; %% @todo
- {ws_send, Owner, Frame} ->
- case Protocol:send(Frame, ProtoState) of
- close ->
- Transport:close(Socket),
- down(State, normal);
- ProtoState2 ->
- ws_loop(State#state{protocol_state=ProtoState2})
- end;
- {shutdown, Owner} ->
- %% @todo Protocol:shutdown? %% @todo close frame
- ok;
- {'DOWN', OwnerRef, process, Owner, Reason} ->
- Protocol:close(owner_gone, ProtoState),
- Transport:close(Socket),
- owner_gone(Reason);
- {system, From, Request} ->
- sys:handle_system_msg(Request, From, Parent, ?MODULE, [],
- {ws_loop, State});
- Any when is_tuple(Any), is_pid(element(2, Any)) ->
- element(2, Any) ! {gun_error, self(), {notowner,
- "Operations are restricted to the owner of the connection."}},
- ws_loop(State);
- Any ->
- error_logger:error_msg("Unexpected message: ~w~n", [Any])
+ {OK, Socket, _} -> disconnect_flush(State);
+ {Closed, Socket} -> disconnect_flush(State);
+ {Error, Socket, _} -> disconnect_flush(State)
+ after 0 ->
+ ok
end.
--spec owner_gone(_) -> no_return().
-owner_gone(normal) -> exit(normal);
-owner_gone(shutdown) -> exit(shutdown);
-owner_gone(Shutdown = {shutdown, _}) -> exit(Shutdown);
-owner_gone(Reason) -> error({owner_gone, Reason}).
-
-system_continue(_, _, {retry_loop, State, Retry}) ->
- retry_loop(State, Retry);
-system_continue(_, _, {loop, State}) ->
- loop(State);
-system_continue(_, _, {ws_loop, State}) ->
- ws_loop(State).
-
--spec system_terminate(any(), _, _, _) -> no_return().
-system_terminate(Reason, _, _, _) ->
- exit(Reason).
-
-system_code_change(Misc, _, _, _) ->
- {ok, Misc}.
+active(State=#state{socket=Socket, transport=Transport}) ->
+ Transport:setopts(Socket, [{active, once}]),
+ State.
+
+keepalive_timeout(State=#state{opts=Opts, protocol=Protocol}) ->
+ %% @todo Might not be worth checking every time?
+ ProtoOptsKey = case Protocol of
+ gun_http -> http_opts;
+ gun_http2 -> http2_opts
+ end,
+ ProtoOpts = maps:get(ProtoOptsKey, Opts, #{}),
+ Keepalive = maps:get(keepalive, ProtoOpts, 5000),
+ KeepaliveRef = case Keepalive of
+ infinity -> undefined;
+ %% @todo Maybe change that to a start_timer.
+ _ -> erlang:send_after(Keepalive, self(), keepalive)
+ end,
+ State#state{keepalive_ref=KeepaliveRef}.
+
+keepalive_cancel(State=#state{keepalive_ref=undefined}) ->
+ State;
+keepalive_cancel(State=#state{keepalive_ref=KeepaliveRef}) ->
+ _ = erlang:cancel_timer(KeepaliveRef),
+ %% Flush if we have a keepalive message
+ receive
+ keepalive -> ok
+ after 0 ->
+ ok
+ end,
+ State#state{keepalive_ref=undefined}.
+
+-spec owner_gone(_) -> stop | {stop, _}.
+owner_gone(normal) -> stop;
+owner_gone(shutdown) -> {stop, shutdown};
+owner_gone(Shutdown = {shutdown, _}) -> {stop, Shutdown};
+owner_gone(Reason) -> {stop, {owner_gone, Reason}}.
diff --git a/src/gun_http.erl b/src/gun_http.erl
index e2b37d1..81310bf 100644
--- a/src/gun_http.erl
+++ b/src/gun_http.erl
@@ -18,7 +18,7 @@
-export([name/0]).
-export([init/4]).
-export([handle/2]).
--export([close/1]).
+-export([close/2]).
-export([keepalive/1]).
-export([request/8]).
-export([request/9]).
@@ -304,10 +304,11 @@ send_data_if_alive(Data, State=#http_state{streams=[Stream=#stream{
send_data_if_alive(_, State, _) ->
State.
-close(State=#http_state{in=body_close, streams=[_|Tail]}) ->
+%% @todo Use Reason.
+close(_, State=#http_state{in=body_close, streams=[_|Tail]}) ->
_ = send_data_if_alive(<<>>, State, fin),
close_streams(Tail);
-close(#http_state{streams=Streams}) ->
+close(_, #http_state{streams=Streams}) ->
close_streams(Streams).
close_streams([]) ->
diff --git a/src/gun_http2.erl b/src/gun_http2.erl
index 3468448..6538083 100644
--- a/src/gun_http2.erl
+++ b/src/gun_http2.erl
@@ -18,7 +18,7 @@
-export([name/0]).
-export([init/4]).
-export([handle/2]).
--export([close/1]).
+-export([close/2]).
-export([keepalive/1]).
-export([request/8]).
-export([request/9]).
@@ -225,7 +225,8 @@ ignored_frame(State=#http2_state{http2_machine=HTTP2Machine0}) ->
terminate(State#http2_state{http2_machine=HTTP2Machine}, Error)
end.
-close(#http2_state{streams=Streams}) ->
+%% @todo Use Reason.
+close(_, #http2_state{streams=Streams}) ->
close_streams(Streams).
close_streams([]) ->
diff --git a/src/gun_ws.erl b/src/gun_ws.erl
index b89840e..cccb4e4 100644
--- a/src/gun_ws.erl
+++ b/src/gun_ws.erl
@@ -74,10 +74,10 @@ init(Owner, Socket, Transport, StreamRef, Headers, Extensions, Handler, Opts) ->
%% Do not handle anything if we received a close frame.
handle(_, State=#ws_state{in=close}) ->
- State;
+ {state, State};
%% Shortcut for common case when Data is empty after processing a frame.
handle(<<>>, State=#ws_state{in=head}) ->
- State;
+ {state, State};
handle(Data, State=#ws_state{buffer=Buffer, in=head, frag_state=FragState, extensions=Extensions}) ->
Data2 = << Buffer/binary, Data/binary >>,
case cow_ws:parse_header(Data2, Extensions, FragState) of
@@ -85,7 +85,7 @@ handle(Data, State=#ws_state{buffer=Buffer, in=head, frag_state=FragState, exten
handle(Rest, State#ws_state{buffer= <<>>,
in=#payload{type=Type, rsv=Rsv, len=Len, mask_key=MaskKey}, frag_state=FragState2});
more ->
- State#ws_state{buffer=Data2};
+ {state, State#ws_state{buffer=Data2}};
error ->
close({error, badframe}, State)
end;
@@ -97,11 +97,11 @@ handle(Data, State=#ws_state{in=In=#payload{type=Type, rsv=Rsv, len=Len, mask_ke
{ok, Payload, Utf8State2, Rest} ->
dispatch(Rest, State#ws_state{in=head, utf8_state=Utf8State2}, Type, << Unmasked/binary, Payload/binary >>, CloseCode);
{more, CloseCode2, Payload, Utf8State2} ->
- State#ws_state{in=In#payload{close_code=CloseCode2, unmasked= << Unmasked/binary, Payload/binary >>,
- len=Len - byte_size(Data), unmasked_len=2 + byte_size(Data)}, utf8_state=Utf8State2};
+ {state, State#ws_state{in=In#payload{close_code=CloseCode2, unmasked= << Unmasked/binary, Payload/binary >>,
+ len=Len - byte_size(Data), unmasked_len=2 + byte_size(Data)}, utf8_state=Utf8State2}};
{more, Payload, Utf8State2} ->
- State#ws_state{in=In#payload{unmasked= << Unmasked/binary, Payload/binary >>,
- len=Len - byte_size(Data), unmasked_len=UnmaskedLen + byte_size(Data)}, utf8_state=Utf8State2};
+ {state, State#ws_state{in=In#payload{unmasked= << Unmasked/binary, Payload/binary >>,
+ len=Len - byte_size(Data), unmasked_len=UnmaskedLen + byte_size(Data)}, utf8_state=Utf8State2}};
Error = {error, _Reason} ->
close(Error, State)
end.
@@ -111,10 +111,10 @@ dispatch(Rest, State0=#ws_state{frag_state=FragState,
Type0, Payload0, CloseCode0) ->
case cow_ws:make_frame(Type0, Payload0, CloseCode0, FragState) of
ping ->
- State = send(pong, State0),
+ {state, State} = send(pong, State0),
handle(Rest, State);
{ping, Payload} ->
- State = send({pong, Payload}, State0),
+ {state, State} = send({pong, Payload}, State0),
handle(Rest, State);
pong ->
handle(Rest, State0);
@@ -133,9 +133,8 @@ dispatch(Rest, State0=#ws_state{frag_state=FragState,
close(Reason, State) ->
case Reason of
-%% @todo We need to send a close frame from gun:ws_loop on close.
-% Normal when Normal =:= stop; Normal =:= timeout ->
-% send({close, 1000, <<>>}, State);
+ normal ->
+ send({close, 1000, <<>>}, State);
owner_gone ->
send({close, 1001, <<>>}, State);
{error, badframe} ->
@@ -149,7 +148,7 @@ send(Frame, State=#ws_state{socket=Socket, transport=Transport, extensions=Exten
case Frame of
close -> close;
{close, _, _} -> close;
- _ -> State
+ _ -> {state, State}
end.
%% Websocket has no concept of streams.