diff options
author | Loïc Hoguin <[email protected]> | 2018-12-19 14:45:39 +0100 |
---|---|---|
committer | Loïc Hoguin <[email protected]> | 2018-12-19 14:45:39 +0100 |
commit | 9c579eb9b37836b6deacd3f0b81da8a0d1ee72a6 (patch) | |
tree | 15e7f54d0e5f938c3eb4d02e655cc99665b53555 /src | |
parent | d1cda6d1f05b672bc29bea5e84de6b0bb6815863 (diff) | |
download | gun-9c579eb9b37836b6deacd3f0b81da8a0d1ee72a6.tar.gz gun-9c579eb9b37836b6deacd3f0b81da8a0d1ee72a6.tar.bz2 gun-9c579eb9b37836b6deacd3f0b81da8a0d1ee72a6.zip |
Convert the gun process to gen_statem
Diffstat (limited to 'src')
-rw-r--r-- | src/gun.erl | 545 | ||||
-rw-r--r-- | src/gun_http.erl | 7 | ||||
-rw-r--r-- | src/gun_http2.erl | 5 | ||||
-rw-r--r-- | src/gun_ws.erl | 25 |
4 files changed, 271 insertions, 311 deletions
diff --git a/src/gun.erl b/src/gun.erl index 16349e1..1e83979 100644 --- a/src/gun.erl +++ b/src/gun.erl @@ -13,6 +13,7 @@ %% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. -module(gun). +-behavior(gen_statem). -ifdef(OTP_RELEASE). -compile({nowarn_deprecated_function, [{erlang, get_stacktrace, 0}]}). @@ -85,10 +86,10 @@ %% Internals. -export([start_link/4]). --export([proc_lib_hack/5]). --export([system_continue/3]). --export([system_terminate/4]). --export([system_code_change/4]). +-export([callback_mode/0]). +-export([init/1]). +-export([not_connected/3]). +-export([connected/3]). -type headers() :: [{binary(), iodata()}]. @@ -162,7 +163,6 @@ -export_type([ws_opts/0]). -record(state, { - parent :: pid(), owner :: pid(), owner_ref :: reference(), host :: inet:hostname() | inet:ip_address(), @@ -174,6 +174,7 @@ keepalive_ref :: undefined | reference(), socket :: undefined | inet:socket() | ssl:sslsocket(), transport :: module(), + messages :: {atom(), atom(), atom()}, protocol :: module(), protocol_state :: any(), last_error :: any() @@ -309,8 +310,7 @@ close(ServerPid) -> -spec shutdown(pid()) -> ok. shutdown(ServerPid) -> - _ = ServerPid ! {shutdown, self()}, - ok. + gen_statem:cast(ServerPid, {shutdown, self()}). %% Requests. @@ -411,15 +411,14 @@ request(ServerPid, Method, Path, Headers, Body) -> request(ServerPid, Method, Path, Headers, Body, ReqOpts) -> StreamRef = make_ref(), ReplyTo = maps:get(reply_to, ReqOpts, self()), - _ = ServerPid ! {request, ReplyTo, StreamRef, Method, Path, Headers, Body}, + gen_statem:cast(ServerPid, {request, ReplyTo, StreamRef, Method, Path, Headers, Body}), StreamRef. %% Streaming data. -spec data(pid(), reference(), fin | nofin, iodata()) -> ok. data(ServerPid, StreamRef, IsFin, Data) -> - _ = ServerPid ! {data, self(), StreamRef, IsFin, Data}, - ok. + gen_statem:cast(ServerPid, {data, self(), StreamRef, IsFin, Data}). %% Tunneling. @@ -435,7 +434,7 @@ connect(ServerPid, Destination, Headers) -> connect(ServerPid, Destination, Headers, ReqOpts) -> StreamRef = make_ref(), ReplyTo = maps:get(reply_to, ReqOpts, self()), - _ = ServerPid ! {connect, ReplyTo, StreamRef, Destination, Headers}, + gen_statem:cast(ServerPid, {connect, ReplyTo, StreamRef, Destination, Headers}), StreamRef. %% Awaiting gun messages. @@ -606,8 +605,7 @@ flush_ref(StreamRef) -> -spec cancel(pid(), reference()) -> ok. cancel(ServerPid, StreamRef) -> - _ = ServerPid ! {cancel, self(), StreamRef}, - ok. + gen_statem:cast(ServerPid, {cancel, self(), StreamRef}). %% @todo Allow upgrading an HTTP/1.1 connection to HTTP/2. %% http2_upgrade @@ -621,83 +619,80 @@ ws_upgrade(ServerPid, Path) -> -spec ws_upgrade(pid(), iodata(), headers()) -> reference(). ws_upgrade(ServerPid, Path, Headers) -> StreamRef = make_ref(), - _ = ServerPid ! {ws_upgrade, self(), StreamRef, Path, Headers}, + gen_statem:cast(ServerPid, {ws_upgrade, self(), StreamRef, Path, Headers}), StreamRef. -spec ws_upgrade(pid(), iodata(), headers(), ws_opts()) -> reference(). ws_upgrade(ServerPid, Path, Headers, Opts) -> ok = gun_ws:check_options(Opts), StreamRef = make_ref(), - _ = ServerPid ! {ws_upgrade, self(), StreamRef, Path, Headers, Opts}, + gen_statem:cast(ServerPid, {ws_upgrade, self(), StreamRef, Path, Headers, Opts}), StreamRef. %% @todo ws_send/2 will need to be deprecated in favor of a variant with StreamRef. %% But it can be kept for the time being since it can still work for HTTP/1.1. -spec ws_send(pid(), ws_frame() | [ws_frame()]) -> ok. ws_send(ServerPid, Frames) -> - _ = ServerPid ! {ws_send, self(), Frames}, - ok. + gen_statem:cast(ServerPid, {ws_send, self(), Frames}). %% Internals. +callback_mode() -> state_functions. + start_link(Owner, Host, Port, Opts) -> - proc_lib:start_link(?MODULE, proc_lib_hack, - [self(), Owner, Host, Port, Opts]). - -proc_lib_hack(Parent, Owner, Host, Port, Opts) -> - try - init(Parent, Owner, Host, Port, Opts) - catch - _:normal -> exit(normal); - _:shutdown -> exit(shutdown); - _:Reason = {shutdown, _} -> exit(Reason); - _:Reason -> exit({Reason, erlang:get_stacktrace()}) - end. + gen_statem:start_link(?MODULE, {Owner, Host, Port, Opts}, []). -init(Parent, Owner, Host, Port, Opts) -> - ok = proc_lib:init_ack(Parent, {ok, self()}), +init({Owner, Host, Port, Opts}) -> Retry = maps:get(retry, Opts, 5), Transport = case maps:get(transport, Opts, default_transport(Port)) of tcp -> gun_tcp; tls -> gun_tls end, OwnerRef = monitor(process, Owner), - transport_connect(#state{parent=Parent, owner=Owner, owner_ref=OwnerRef, + State = #state{owner=Owner, owner_ref=OwnerRef, host=Host, port=Port, origin_host=Host, origin_port=Port, - opts=Opts, transport=Transport}, Retry). + opts=Opts, transport=Transport, messages=Transport:messages()}, + {ok, not_connected, State, + {next_event, internal, {retries, Retry}}}. default_transport(443) -> tls; default_transport(_) -> tcp. -transport_connect(State=#state{host=Host, port=Port, opts=Opts, transport=Transport=gun_tls}, Retries) -> - TransportOpts = [binary, {active, false}|ensure_alpn( - maps:get(protocols, Opts, [http2, http]), - maps:get(transport_opts, Opts, []))], - case Transport:connect(Host, Port, TransportOpts, maps:get(connect_timeout, Opts, infinity)) of - {ok, Socket} -> - {Protocol, ProtoOptsKey} = case ssl:negotiated_protocol(Socket) of - {ok, <<"h2">>} -> {gun_http2, http2_opts}; - _ -> {gun_http, http_opts} +not_connected(_, {retries, Retries}, + State=#state{host=Host, port=Port, opts=Opts, transport=Transport}) -> + TransOpts0 = maps:get(transport_opts, Opts, []), + TransOpts1 = case Transport of + gun_tcp -> TransOpts0; + gun_tls -> ensure_alpn(maps:get(protocols, Opts, [http2, http]), TransOpts0) + end, + TransOpts = [binary, {active, false}|TransOpts1], + ConnectTimeout = maps:get(connect_timeout, Opts, infinity), + case Transport:connect(Host, Port, TransOpts, ConnectTimeout) of + {ok, Socket} when Transport =:= gun_tcp -> + Protocol = case maps:get(protocols, Opts, [http]) of + [http] -> gun_http; + [http2] -> gun_http2 end, - up(State, Socket, Protocol, ProtoOptsKey); - {error, Reason} -> - retry(State#state{last_error=Reason}, Retries) - end; -transport_connect(State=#state{host=Host, port=Port, opts=Opts, transport=Transport}, Retries) -> - TransportOpts = [binary, {active, false} - |maps:get(transport_opts, Opts, [])], - case Transport:connect(Host, Port, TransportOpts, maps:get(connect_timeout, Opts, infinity)) of - {ok, Socket} -> - {Protocol, ProtoOptsKey} = case maps:get(protocols, Opts, [http]) of - [http] -> {gun_http, http_opts}; - [http2] -> {gun_http2, http2_opts} + {next_state, connected, State, + {next_event, internal, {connected, Socket, Protocol}}}; + {ok, Socket} when Transport =:= gun_tls -> + Protocol = case ssl:negotiated_protocol(Socket) of + {ok, <<"h2">>} -> gun_http2; + _ -> gun_http end, - up(State, Socket, Protocol, ProtoOptsKey); + {next_state, connected, State, + {next_event, internal, {connected, Socket, Protocol}}}; + {error, Reason} when Retries =:= 0 -> + {stop, {shutdown, Reason}}; {error, Reason} -> - retry(State#state{last_error=Reason}, Retries) - end. + Timeout = maps:get(retry_timeout, Opts, 5000), + {keep_state, State#state{last_error=Reason}, + {state_timeout, Timeout, {retries, Retries - 1}}} + end; +not_connected(Type, Event, State) -> + handle_common(Type, Event, ?FUNCTION_NAME, State). -ensure_alpn(Protocols0, TransportOpts) -> +ensure_alpn(Protocols0, TransOpts) -> Protocols = [case P of http -> <<"http/1.1">>; http2 -> <<"h2">> @@ -705,181 +700,152 @@ ensure_alpn(Protocols0, TransportOpts) -> [ {alpn_advertised_protocols, Protocols}, {client_preferred_next_protocols, {client, Protocols, <<"http/1.1">>}} - |TransportOpts]. + |TransOpts]. -up(State=#state{owner=Owner, opts=Opts, transport=Transport}, Socket, Protocol, ProtoOptsKey) -> - ProtoOpts = maps:get(ProtoOptsKey, Opts, #{}), - ProtoState = Protocol:init(Owner, Socket, Transport, ProtoOpts), - Owner ! {gun_up, self(), Protocol:name()}, - before_loop(State#state{socket=Socket, protocol=Protocol, protocol_state=ProtoState}). - -down(State=#state{owner=Owner, opts=Opts, protocol=Protocol, protocol_state=ProtoState}, Reason) -> - {KilledStreams, UnprocessedStreams} = Protocol:down(ProtoState), - Owner ! {gun_down, self(), Protocol:name(), Reason, KilledStreams, UnprocessedStreams}, - retry(State#state{socket=undefined, protocol=undefined, protocol_state=undefined, - last_error=Reason}, maps:get(retry, Opts, 5)). - -retry(#state{last_error=Reason}, 0) -> - exit({shutdown, Reason}); -retry(State=#state{keepalive_ref=KeepaliveRef}, Retries) when is_reference(KeepaliveRef) -> - _ = erlang:cancel_timer(KeepaliveRef), - %% Flush if we have a keepalive message - receive - keepalive -> ok - after 0 -> - ok - end, - retry_loop(State#state{keepalive_ref=undefined}, Retries - 1); -retry(State, Retries) -> - retry_loop(State, Retries - 1). - -retry_loop(State=#state{parent=Parent, opts=Opts}, Retries) -> - _ = erlang:send_after(maps:get(retry_timeout, Opts, 5000), self(), retry), - receive - retry -> - transport_connect(State, Retries); - {system, From, Request} -> - sys:handle_system_msg(Request, From, Parent, ?MODULE, [], - {retry_loop, State, Retries}) - end. - -before_loop(State=#state{opts=Opts, protocol=Protocol}) -> - %% @todo Might not be worth checking every time? +connected(internal, {connected, Socket, Protocol}, + State=#state{owner=Owner, opts=Opts, transport=Transport}) -> ProtoOptsKey = case Protocol of gun_http -> http_opts; gun_http2 -> http2_opts end, ProtoOpts = maps:get(ProtoOptsKey, Opts, #{}), - Keepalive = maps:get(keepalive, ProtoOpts, 5000), - KeepaliveRef = case Keepalive of - infinity -> undefined; - _ -> erlang:send_after(Keepalive, self(), keepalive) + ProtoState = Protocol:init(Owner, Socket, Transport, ProtoOpts), + Owner ! {gun_up, self(), Protocol:name()}, + {keep_state, keepalive_timeout(active(State#state{socket=Socket, + protocol=Protocol, protocol_state=ProtoState}))}; +%% Socket events. +connected(info, {OK, Socket, Data}, State=#state{socket=Socket, messages={OK, _, _}, + protocol=Protocol, protocol_state=ProtoState}) -> + commands(Protocol:handle(Data, ProtoState), active(State)); +connected(info, {Closed, Socket}, State=#state{socket=Socket, messages={_, Closed, _}}) -> + disconnect(State, closed); +connected(info, {Error, Socket, Reason}, State=#state{socket=Socket, messages={_, _, Error}}) -> + disconnect(State, {error, Reason}); +%% Timeouts. +%% @todo HTTP/2 requires more timeouts than just the keepalive timeout. +%% We should have a timeout function in protocols that deal with +%% received timeouts. Currently the timeout messages are ignored. +connected(info, keepalive, State=#state{protocol=Protocol, protocol_state=ProtoState}) -> + ProtoState2 = Protocol:keepalive(ProtoState), + {keep_state, keepalive_timeout(State#state{protocol_state=ProtoState2})}; +%% Public HTTP interface. +connected(cast, {request, ReplyTo, StreamRef, Method, Path, Headers, Body}, + State=#state{origin_host=Host, origin_port=Port, + protocol=Protocol, protocol_state=ProtoState}) -> + ProtoState2 = case Body of + <<>> -> Protocol:request(ProtoState, + StreamRef, ReplyTo, Method, Host, Port, Path, Headers); + _ -> Protocol:request(ProtoState, + StreamRef, ReplyTo, Method, Host, Port, Path, Headers, Body) end, - loop(State#state{keepalive_ref=KeepaliveRef}). - -loop(State=#state{parent=Parent, owner=Owner, owner_ref=OwnerRef, - origin_host=Host, origin_port=Port, opts=Opts, socket=Socket, - transport=Transport, protocol=Protocol, protocol_state=ProtoState}) -> - {OK, Closed, Error} = Transport:messages(), - Transport:setopts(Socket, [{active, once}]), - receive - {OK, Socket, Data} -> - case Protocol:handle(Data, ProtoState) of - Commands when is_list(Commands) -> - commands(Commands, State); - Command -> - commands([Command], State) - end; - {Closed, Socket} -> - Protocol:close(ProtoState), - Transport:close(Socket), - down(State, closed); - {Error, Socket, Reason} -> - Protocol:close(ProtoState), - Transport:close(Socket), - down(State, {error, Reason}); - {OK, _PreviousSocket, _Data} -> - loop(State); - {Closed, _PreviousSocket} -> - loop(State); - {Error, _PreviousSocket, _} -> - loop(State); - keepalive -> - ProtoState2 = Protocol:keepalive(ProtoState), - before_loop(State#state{protocol_state=ProtoState2}); - {request, ReplyTo, StreamRef, Method, Path, Headers, <<>>} -> - ProtoState2 = Protocol:request(ProtoState, - StreamRef, ReplyTo, Method, Host, Port, Path, Headers), - loop(State#state{protocol_state=ProtoState2}); - {request, ReplyTo, StreamRef, Method, Path, Headers, Body} -> - ProtoState2 = Protocol:request(ProtoState, - StreamRef, ReplyTo, Method, Host, Port, Path, Headers, Body), - loop(State#state{protocol_state=ProtoState2}); - %% @todo Do we want to reject ReplyTo if it's not the process - %% who initiated the connection? For both data and cancel. - {data, ReplyTo, StreamRef, IsFin, Data} -> - ProtoState2 = Protocol:data(ProtoState, - StreamRef, ReplyTo, IsFin, Data), - loop(State#state{protocol_state=ProtoState2}); - {connect, ReplyTo, StreamRef, Destination0, Headers} -> - %% The protocol option has been deprecated in favor of the protocols option. - %% Nobody probably ended up using it, but let's not break the interface. - Destination1 = case Destination0 of - #{protocols := _} -> - Destination0; - #{protocol := DestProto} -> - Destination0#{protocols => [DestProto]}; - _ -> - Destination0 - end, - Destination = case Destination1 of - #{transport := tls} -> - Destination1#{tls_opts => ensure_alpn( - maps:get(protocols, Destination1, [http]), - maps:get(tls_opts, Destination1, []))}; - _ -> - Destination1 - end, - ProtoState2 = Protocol:connect(ProtoState, StreamRef, ReplyTo, Destination, Headers), - loop(State#state{protocol_state=ProtoState2}); - {cancel, ReplyTo, StreamRef} -> - ProtoState2 = Protocol:cancel(ProtoState, StreamRef, ReplyTo), - loop(State#state{protocol_state=ProtoState2}); - %% @todo Maybe make an interface in the protocol module instead of checking on protocol name. - %% An interface would also make sure that HTTP/1.0 can't upgrade. - {ws_upgrade, Owner, StreamRef, Path, Headers} when Protocol =:= gun_http -> - WsOpts = maps:get(ws_opts, Opts, #{}), - ProtoState2 = Protocol:ws_upgrade(ProtoState, StreamRef, Host, Port, Path, Headers, WsOpts), - loop(State#state{protocol_state=ProtoState2}); - {ws_upgrade, Owner, StreamRef, Path, Headers, WsOpts} when Protocol =:= gun_http -> - ProtoState2 = Protocol:ws_upgrade(ProtoState, StreamRef, Host, Port, Path, Headers, WsOpts), - loop(State#state{protocol_state=ProtoState2}); - %% @todo can fail if http/1.0 - {shutdown, Owner} -> - %% @todo Protocol:shutdown? - ok; - {'DOWN', OwnerRef, process, Owner, Reason} -> - Protocol:close(ProtoState), - Transport:close(Socket), - owner_gone(Reason); - {system, From, Request} -> - sys:handle_system_msg(Request, From, Parent, ?MODULE, [], - {loop, State}); - %% @todo HTTP/2 requires more timeouts than just the keepalive timeout. - %% We should have a timeout function in protocols that deal with - %% received timeouts. Currently the timeout messages are ignored. - {ws_upgrade, _, StreamRef, _, _} -> - Owner ! {gun_error, self(), StreamRef, {badstate, - "Websocket is only supported over HTTP/1.1."}}, - loop(State); - {ws_upgrade, _, StreamRef, _, _, _} -> - Owner ! {gun_error, self(), StreamRef, {badstate, - "Websocket is only supported over HTTP/1.1."}}, - loop(State); - {ws_send, _, _} -> - Owner ! {gun_error, self(), {badstate, - "Connection needs to be upgraded to Websocket " - "before the gun:ws_send/1 function can be used."}}, - loop(State); - %% @todo The ReplyTo patch disabled the notowner behavior. - %% We need to add an option to enforce this behavior if needed. - Any when is_tuple(Any), is_pid(element(2, Any)) -> - element(2, Any) ! {gun_error, self(), {notowner, - "Operations are restricted to the owner of the connection."}}, - loop(State); - Any -> - error_logger:error_msg("Unexpected message: ~w~n", [Any]), - loop(State) - end. - + {keep_state, State#state{protocol_state=ProtoState2}}; +%% @todo Do we want to reject ReplyTo if it's not the process +%% who initiated the connection? For both data and cancel. +connected(cast, {data, ReplyTo, StreamRef, IsFin, Data}, + State=#state{protocol=Protocol, protocol_state=ProtoState}) -> + ProtoState2 = Protocol:data(ProtoState, StreamRef, ReplyTo, IsFin, Data), + {keep_state, State#state{protocol_state=ProtoState2}}; +connected(cast, {connect, ReplyTo, StreamRef, Destination0, Headers}, + State=#state{protocol=Protocol, protocol_state=ProtoState}) -> + %% The protocol option has been deprecated in favor of the protocols option. + %% Nobody probably ended up using it, but let's not break the interface. + Destination1 = case Destination0 of + #{protocols := _} -> + Destination0; + #{protocol := DestProto} -> + Destination0#{protocols => [DestProto]}; + _ -> + Destination0 + end, + Destination = case Destination1 of + #{transport := tls} -> + Destination1#{tls_opts => ensure_alpn( + maps:get(protocols, Destination1, [http]), + maps:get(tls_opts, Destination1, []))}; + _ -> + Destination1 + end, + ProtoState2 = Protocol:connect(ProtoState, StreamRef, ReplyTo, Destination, Headers), + {keep_state, State#state{protocol_state=ProtoState2}}; +connected(cast, {cancel, ReplyTo, StreamRef}, + State=#state{protocol=Protocol, protocol_state=ProtoState}) -> + ProtoState2 = Protocol:cancel(ProtoState, StreamRef, ReplyTo), + {keep_state, State#state{protocol_state=ProtoState2}}; +%% Public Websocket interface. +%% @todo Maybe make an interface in the protocol module instead of checking on protocol name. +%% An interface would also make sure that HTTP/1.0 can't upgrade. +connected(cast, {ws_upgrade, Owner, StreamRef, Path, Headers}, + State=#state{owner=Owner, origin_host=Host, origin_port=Port, opts=Opts, + protocol=Protocol, protocol_state=ProtoState}) + when Protocol =:= gun_http -> + WsOpts = maps:get(ws_opts, Opts, #{}), + ProtoState2 = Protocol:ws_upgrade(ProtoState, StreamRef, Host, Port, Path, Headers, WsOpts), + {keep_state, State#state{protocol_state=ProtoState2}}; +connected(cast, {ws_upgrade, Owner, StreamRef, Path, Headers, WsOpts}, + State=#state{owner=Owner, origin_host=Host, origin_port=Port, + protocol=Protocol, protocol_state=ProtoState}) + when Protocol =:= gun_http -> + ProtoState2 = Protocol:ws_upgrade(ProtoState, StreamRef, Host, Port, Path, Headers, WsOpts), + {keep_state, State#state{protocol_state=ProtoState2}}; + %% @todo can fail if http/1.0 +%% @todo Probably don't error out here, have a protocol function/command. +connected(cast, {ws_upgrade, ReplyTo, StreamRef, _, _}, _) -> + ReplyTo ! {gun_error, self(), StreamRef, {badstate, + "Websocket is only supported over HTTP/1.1."}}, + keep_state_and_data; +connected(cast, {ws_upgrade, ReplyTo, StreamRef, _, _, _}, _) -> + ReplyTo ! {gun_error, self(), StreamRef, {badstate, + "Websocket is only supported over HTTP/1.1."}}, + keep_state_and_data; +connected(cast, {ws_send, Owner, Frame}, + State=#state{owner=Owner, protocol=Protocol=gun_ws, protocol_state=ProtoState}) -> + commands(Protocol:send(Frame, ProtoState), State); +connected(cast, {ws_send, ReplyTo, _}, _) -> + ReplyTo ! {gun_error, self(), {badstate, + "Connection needs to be upgraded to Websocket " + "before the gun:ws_send/1 function can be used."}}, + keep_state_and_data; +connected(Type, Event, State) -> + handle_common(Type, Event, ?FUNCTION_NAME, State). + +%% Common events. +handle_common(cast, {shutdown, Owner}, _, #state{owner=Owner}) -> + %% @todo Graceful shutdown. + stop; +%% We stop when the owner is gone. +handle_common(info, {'DOWN', OwnerRef, process, Owner, Reason}, _, #state{ + owner=Owner, owner_ref=OwnerRef, socket=Socket, transport=Transport, + protocol=Protocol, protocol_state=ProtoState}) -> + _ = case Protocol of + undefined -> ok; + _ -> Protocol:close(owner_gone, ProtoState) + end, + _ = case Socket of + undefined -> ok; + _ -> Transport:close(Socket) + end, + owner_gone(Reason); +handle_common({call, From}, _, _, _) -> + {keep_state_and_data, {reply, From, {error, bad_call}}}; +%% @todo The ReplyTo patch disabled the notowner behavior. +%% We need to add an option to enforce this behavior if needed. +handle_common(cast, Any, _, #state{owner=Owner}) when element(2, Any) =/= Owner -> + element(2, Any) ! {gun_error, self(), {notowner, + "Operations are restricted to the owner of the connection."}}, + keep_state_and_data; +handle_common(Type, Event, StateName, StateData) -> + error_logger:error_msg("Unexpected event in state ~p of type ~p:~n~w~n~p~n", + [StateName, Type, Event, StateData]), + keep_state_and_data. + +commands(Command, State) when not is_list(Command) -> + commands([Command], State); commands([], State) -> - loop(State); -commands([close|_], State=#state{socket=Socket, transport=Transport}) -> - Transport:close(Socket), - down(State, normal); -commands([Error={error, _}|_], State=#state{socket=Socket, transport=Transport}) -> - Transport:close(Socket), - down(State, Error); + {keep_state, State}; +commands([close|_], State) -> + disconnect(State, normal); +commands([Error={error, _}|_], State) -> + disconnect(State, Error); commands([{state, ProtoState}|Tail], State) -> commands(Tail, State#state{protocol_state=ProtoState}); %% @todo The scheme should probably not be ignored. @@ -904,80 +870,73 @@ commands([{switch_transport, Transport, Socket}|Tail], State) -> commands(Tail, State#state{socket=Socket, transport=Transport}); %% @todo The two loops should be reunified and this clause generalized. commands([{switch_protocol, Protocol=gun_ws, ProtoState}], State) -> - ws_loop(State#state{protocol=Protocol, protocol_state=ProtoState}); + {keep_state, keepalive_cancel(State#state{protocol=Protocol, protocol_state=ProtoState})}; %% @todo And this state should probably not be ignored. commands([{switch_protocol, Protocol, _ProtoState0}|Tail], State=#state{owner=Owner, opts=Opts, socket=Socket, transport=Transport}) -> ProtoOpts = maps:get(http2_opts, Opts, #{}), ProtoState = Protocol:init(Owner, Socket, Transport, ProtoOpts), - commands(Tail, State#state{protocol=Protocol, protocol_state=ProtoState}). + commands(Tail, keepalive_timeout(State#state{protocol=Protocol, protocol_state=ProtoState})). -ws_loop(State=#state{parent=Parent, owner=Owner, owner_ref=OwnerRef, socket=Socket, - transport=Transport, protocol=Protocol, protocol_state=ProtoState}) -> - {OK, Closed, Error} = Transport:messages(), - Transport:setopts(Socket, [{active, once}]), +disconnect(State=#state{owner=Owner, opts=Opts, + socket=Socket, transport=Transport, + protocol=Protocol, protocol_state=ProtoState}, Reason) -> + Protocol:close(Reason, ProtoState), + %% @todo Need a special state for orderly shutdown of a connection. + Transport:close(Socket), + %% We closed the socket, discard any remaining socket events. + disconnect_flush(State), + %% @todo Stop keepalive timeout, flush message. + {KilledStreams, UnprocessedStreams} = Protocol:down(ProtoState), + Owner ! {gun_down, self(), Protocol:name(), Reason, KilledStreams, UnprocessedStreams}, + Retry = maps:get(retry, Opts, 5), + {next_state, not_connected, + keepalive_cancel(State#state{socket=undefined, + protocol=undefined, protocol_state=undefined, last_error=Reason}), + {next_event, internal, {retries, Retry}}}. + +disconnect_flush(State=#state{socket=Socket, messages={OK, Closed, Error}}) -> receive - {OK, Socket, Data} -> - case Protocol:handle(Data, ProtoState) of - close -> - Transport:close(Socket), - down(State, normal); - ProtoState2 -> - ws_loop(State#state{protocol_state=ProtoState2}) - end; - {Closed, Socket} -> - Transport:close(Socket), - down(State, closed); - {Error, Socket, Reason} -> - Transport:close(Socket), - down(State, {error, Reason}); - %% Ignore any previous HTTP keep-alive. - keepalive -> - ws_loop(State); -% {ws_send, Owner, Frames} when is_list(Frames) -> -% todo; %% @todo - {ws_send, Owner, Frame} -> - case Protocol:send(Frame, ProtoState) of - close -> - Transport:close(Socket), - down(State, normal); - ProtoState2 -> - ws_loop(State#state{protocol_state=ProtoState2}) - end; - {shutdown, Owner} -> - %% @todo Protocol:shutdown? %% @todo close frame - ok; - {'DOWN', OwnerRef, process, Owner, Reason} -> - Protocol:close(owner_gone, ProtoState), - Transport:close(Socket), - owner_gone(Reason); - {system, From, Request} -> - sys:handle_system_msg(Request, From, Parent, ?MODULE, [], - {ws_loop, State}); - Any when is_tuple(Any), is_pid(element(2, Any)) -> - element(2, Any) ! {gun_error, self(), {notowner, - "Operations are restricted to the owner of the connection."}}, - ws_loop(State); - Any -> - error_logger:error_msg("Unexpected message: ~w~n", [Any]) + {OK, Socket, _} -> disconnect_flush(State); + {Closed, Socket} -> disconnect_flush(State); + {Error, Socket, _} -> disconnect_flush(State) + after 0 -> + ok end. --spec owner_gone(_) -> no_return(). -owner_gone(normal) -> exit(normal); -owner_gone(shutdown) -> exit(shutdown); -owner_gone(Shutdown = {shutdown, _}) -> exit(Shutdown); -owner_gone(Reason) -> error({owner_gone, Reason}). - -system_continue(_, _, {retry_loop, State, Retry}) -> - retry_loop(State, Retry); -system_continue(_, _, {loop, State}) -> - loop(State); -system_continue(_, _, {ws_loop, State}) -> - ws_loop(State). - --spec system_terminate(any(), _, _, _) -> no_return(). -system_terminate(Reason, _, _, _) -> - exit(Reason). - -system_code_change(Misc, _, _, _) -> - {ok, Misc}. +active(State=#state{socket=Socket, transport=Transport}) -> + Transport:setopts(Socket, [{active, once}]), + State. + +keepalive_timeout(State=#state{opts=Opts, protocol=Protocol}) -> + %% @todo Might not be worth checking every time? + ProtoOptsKey = case Protocol of + gun_http -> http_opts; + gun_http2 -> http2_opts + end, + ProtoOpts = maps:get(ProtoOptsKey, Opts, #{}), + Keepalive = maps:get(keepalive, ProtoOpts, 5000), + KeepaliveRef = case Keepalive of + infinity -> undefined; + %% @todo Maybe change that to a start_timer. + _ -> erlang:send_after(Keepalive, self(), keepalive) + end, + State#state{keepalive_ref=KeepaliveRef}. + +keepalive_cancel(State=#state{keepalive_ref=undefined}) -> + State; +keepalive_cancel(State=#state{keepalive_ref=KeepaliveRef}) -> + _ = erlang:cancel_timer(KeepaliveRef), + %% Flush if we have a keepalive message + receive + keepalive -> ok + after 0 -> + ok + end, + State#state{keepalive_ref=undefined}. + +-spec owner_gone(_) -> stop | {stop, _}. +owner_gone(normal) -> stop; +owner_gone(shutdown) -> {stop, shutdown}; +owner_gone(Shutdown = {shutdown, _}) -> {stop, Shutdown}; +owner_gone(Reason) -> {stop, {owner_gone, Reason}}. diff --git a/src/gun_http.erl b/src/gun_http.erl index e2b37d1..81310bf 100644 --- a/src/gun_http.erl +++ b/src/gun_http.erl @@ -18,7 +18,7 @@ -export([name/0]). -export([init/4]). -export([handle/2]). --export([close/1]). +-export([close/2]). -export([keepalive/1]). -export([request/8]). -export([request/9]). @@ -304,10 +304,11 @@ send_data_if_alive(Data, State=#http_state{streams=[Stream=#stream{ send_data_if_alive(_, State, _) -> State. -close(State=#http_state{in=body_close, streams=[_|Tail]}) -> +%% @todo Use Reason. +close(_, State=#http_state{in=body_close, streams=[_|Tail]}) -> _ = send_data_if_alive(<<>>, State, fin), close_streams(Tail); -close(#http_state{streams=Streams}) -> +close(_, #http_state{streams=Streams}) -> close_streams(Streams). close_streams([]) -> diff --git a/src/gun_http2.erl b/src/gun_http2.erl index 3468448..6538083 100644 --- a/src/gun_http2.erl +++ b/src/gun_http2.erl @@ -18,7 +18,7 @@ -export([name/0]). -export([init/4]). -export([handle/2]). --export([close/1]). +-export([close/2]). -export([keepalive/1]). -export([request/8]). -export([request/9]). @@ -225,7 +225,8 @@ ignored_frame(State=#http2_state{http2_machine=HTTP2Machine0}) -> terminate(State#http2_state{http2_machine=HTTP2Machine}, Error) end. -close(#http2_state{streams=Streams}) -> +%% @todo Use Reason. +close(_, #http2_state{streams=Streams}) -> close_streams(Streams). close_streams([]) -> diff --git a/src/gun_ws.erl b/src/gun_ws.erl index b89840e..cccb4e4 100644 --- a/src/gun_ws.erl +++ b/src/gun_ws.erl @@ -74,10 +74,10 @@ init(Owner, Socket, Transport, StreamRef, Headers, Extensions, Handler, Opts) -> %% Do not handle anything if we received a close frame. handle(_, State=#ws_state{in=close}) -> - State; + {state, State}; %% Shortcut for common case when Data is empty after processing a frame. handle(<<>>, State=#ws_state{in=head}) -> - State; + {state, State}; handle(Data, State=#ws_state{buffer=Buffer, in=head, frag_state=FragState, extensions=Extensions}) -> Data2 = << Buffer/binary, Data/binary >>, case cow_ws:parse_header(Data2, Extensions, FragState) of @@ -85,7 +85,7 @@ handle(Data, State=#ws_state{buffer=Buffer, in=head, frag_state=FragState, exten handle(Rest, State#ws_state{buffer= <<>>, in=#payload{type=Type, rsv=Rsv, len=Len, mask_key=MaskKey}, frag_state=FragState2}); more -> - State#ws_state{buffer=Data2}; + {state, State#ws_state{buffer=Data2}}; error -> close({error, badframe}, State) end; @@ -97,11 +97,11 @@ handle(Data, State=#ws_state{in=In=#payload{type=Type, rsv=Rsv, len=Len, mask_ke {ok, Payload, Utf8State2, Rest} -> dispatch(Rest, State#ws_state{in=head, utf8_state=Utf8State2}, Type, << Unmasked/binary, Payload/binary >>, CloseCode); {more, CloseCode2, Payload, Utf8State2} -> - State#ws_state{in=In#payload{close_code=CloseCode2, unmasked= << Unmasked/binary, Payload/binary >>, - len=Len - byte_size(Data), unmasked_len=2 + byte_size(Data)}, utf8_state=Utf8State2}; + {state, State#ws_state{in=In#payload{close_code=CloseCode2, unmasked= << Unmasked/binary, Payload/binary >>, + len=Len - byte_size(Data), unmasked_len=2 + byte_size(Data)}, utf8_state=Utf8State2}}; {more, Payload, Utf8State2} -> - State#ws_state{in=In#payload{unmasked= << Unmasked/binary, Payload/binary >>, - len=Len - byte_size(Data), unmasked_len=UnmaskedLen + byte_size(Data)}, utf8_state=Utf8State2}; + {state, State#ws_state{in=In#payload{unmasked= << Unmasked/binary, Payload/binary >>, + len=Len - byte_size(Data), unmasked_len=UnmaskedLen + byte_size(Data)}, utf8_state=Utf8State2}}; Error = {error, _Reason} -> close(Error, State) end. @@ -111,10 +111,10 @@ dispatch(Rest, State0=#ws_state{frag_state=FragState, Type0, Payload0, CloseCode0) -> case cow_ws:make_frame(Type0, Payload0, CloseCode0, FragState) of ping -> - State = send(pong, State0), + {state, State} = send(pong, State0), handle(Rest, State); {ping, Payload} -> - State = send({pong, Payload}, State0), + {state, State} = send({pong, Payload}, State0), handle(Rest, State); pong -> handle(Rest, State0); @@ -133,9 +133,8 @@ dispatch(Rest, State0=#ws_state{frag_state=FragState, close(Reason, State) -> case Reason of -%% @todo We need to send a close frame from gun:ws_loop on close. -% Normal when Normal =:= stop; Normal =:= timeout -> -% send({close, 1000, <<>>}, State); + normal -> + send({close, 1000, <<>>}, State); owner_gone -> send({close, 1001, <<>>}, State); {error, badframe} -> @@ -149,7 +148,7 @@ send(Frame, State=#ws_state{socket=Socket, transport=Transport, extensions=Exten case Frame of close -> close; {close, _, _} -> close; - _ -> State + _ -> {state, State} end. %% Websocket has no concept of streams. |