From 356bf47edeb5b78765200e78d9b7a48aa98b97f5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Hoguin?= Date: Fri, 16 Oct 2020 11:33:31 +0200 Subject: Add or fix events inside or related to CONNECT tunnels --- src/gun.erl | 64 +++++++++++++++++++++++++++++++++++++++++++------------------ 1 file changed, 45 insertions(+), 19 deletions(-) (limited to 'src/gun.erl') diff --git a/src/gun.erl b/src/gun.erl index 217133f..62abd6f 100644 --- a/src/gun.erl +++ b/src/gun.erl @@ -93,6 +93,7 @@ -export([ws_upgrade/3]). -export([ws_upgrade/4]). -export([ws_send/2]). +-export([ws_send/3]). %% Internals. -export([start_link/4]). @@ -274,7 +275,8 @@ keepalive => timeout(), protocols => [{binary(), module()}], reply_to => pid(), - silence_pings => boolean() + silence_pings => boolean(), + tunnel => stream_ref() }. -export_type([ws_opts/0]). @@ -926,20 +928,26 @@ ws_upgrade(ServerPid, Path, Headers) -> StreamRef. -spec ws_upgrade(pid(), iodata(), req_headers(), ws_opts()) -> stream_ref(). -ws_upgrade(ServerPid, Path, Headers, Opts) -> +ws_upgrade(ServerPid, Path, Headers, Opts0) -> + Tunnel = get_tunnel(Opts0), + Opts = maps:without([tunnel], Opts0), ok = gun_ws:check_options(Opts), - StreamRef = make_ref(), + StreamRef = make_stream_ref(Tunnel), ReplyTo = maps:get(reply_to, Opts, self()), %% @todo Also accept tunnel option. gen_statem:cast(ServerPid, {ws_upgrade, ReplyTo, StreamRef, Path, normalize_headers(Headers), Opts}), StreamRef. %% @todo ws_send/2 will need to be deprecated in favor of a variant with StreamRef. -%% But it can be kept for the time being since it can still work for HTTP/1.1. +%% But it can be kept for the time being since it can still work for HTTP/1.1 (connected_ws_only). -spec ws_send(pid(), ws_frame() | [ws_frame()]) -> ok. ws_send(ServerPid, Frames) -> gen_statem:cast(ServerPid, {ws_send, self(), Frames}). +-spec ws_send(pid(), stream_ref(), ws_frame() | [ws_frame()]) -> ok. +ws_send(ServerPid, StreamRef, Frames) -> + gen_statem:cast(ServerPid, {ws_send, self(), StreamRef, Frames}). + %% Internals. callback_mode() -> state_functions. @@ -1208,6 +1216,12 @@ connected_data_only(cast, Msg, _) connected_data_only(Type, Event, State) -> handle_common_connected(Type, Event, ?FUNCTION_NAME, State). +connected_ws_only(cast, {ws_send, ReplyTo, StreamRef, Frames}, State=#state{ + protocol=Protocol=gun_ws, protocol_state=ProtoState, + event_handler=EvHandler, event_handler_state=EvHandlerState0}) -> + {Commands, EvHandlerState} = Protocol:ws_send(Frames, + ProtoState, StreamRef, ReplyTo, EvHandler, EvHandlerState0), + commands(Commands, State#state{event_handler_state=EvHandlerState}); connected_ws_only(cast, {ws_send, ReplyTo, Frames}, State=#state{ protocol=Protocol=gun_ws, protocol_state=ProtoState, event_handler=EvHandler, event_handler_state=EvHandlerState0}) -> @@ -1259,13 +1273,13 @@ connected(cast, {request, ReplyTo, StreamRef, Method, Path, Headers0, Body, Init {keep_state, State#state{protocol_state=ProtoState2, event_handler_state=EvHandlerState}}; connected(cast, {connect, ReplyTo, StreamRef, Destination, Headers, InitialFlow}, State=#state{origin_host=Host, origin_port=Port, - protocol=Protocol, protocol_state=ProtoState}) -> - %% @todo No events are currently handled for the CONNECT request? - ProtoState2 = Protocol:connect(ProtoState, + protocol=Protocol, protocol_state=ProtoState, + event_handler=EvHandler, event_handler_state=EvHandlerState0}) -> + {ProtoState2, EvHandlerState} = Protocol:connect(ProtoState, dereference_stream_ref(StreamRef, State), ReplyTo, Destination, #{host => Host, port => Port}, - Headers, InitialFlow), - {keep_state, State#state{protocol_state=ProtoState2}}; + Headers, InitialFlow, EvHandler, EvHandlerState0), + {keep_state, State#state{protocol_state=ProtoState2, event_handler_state=EvHandlerState}}; %% Public Websocket interface. %% @todo Maybe make an interface in the protocol module instead of checking on protocol name. %% An interface would also make sure that HTTP/1.0 can't upgrade. @@ -1275,8 +1289,7 @@ connected(cast, {ws_upgrade, ReplyTo, StreamRef, Path, Headers}, State=#state{op connected(cast, {ws_upgrade, ReplyTo, StreamRef, Path, Headers0, WsOpts}, State0=#state{origin_host=Host, origin_port=Port, protocol=Protocol, protocol_state=ProtoState, - event_handler=EvHandler, event_handler_state=EvHandlerState0}) - when Protocol =:= gun_http -> + event_handler=EvHandler, event_handler_state=EvHandlerState0}) -> EvHandlerState1 = EvHandler:ws_upgrade(#{ stream_ref => StreamRef, reply_to => ReplyTo, @@ -1285,14 +1298,19 @@ connected(cast, {ws_upgrade, ReplyTo, StreamRef, Path, Headers0, WsOpts}, %% @todo Can fail if HTTP/1.0. {Headers, State} = add_cookie_header(Path, Headers0, State0), {ProtoState2, EvHandlerState} = Protocol:ws_upgrade(ProtoState, - StreamRef, ReplyTo, Host, Port, Path, Headers, WsOpts, - EvHandler, EvHandlerState1), + dereference_stream_ref(StreamRef, State), ReplyTo, + Host, Port, Path, Headers, WsOpts, EvHandler, EvHandlerState1), {keep_state, State#state{protocol_state=ProtoState2, event_handler_state=EvHandlerState}}; -connected(cast, {ws_upgrade, ReplyTo, StreamRef, _, _, _}, _) -> - ReplyTo ! {gun_error, self(), StreamRef, {badstate, - "Websocket is only supported over HTTP/1.1."}}, - keep_state_and_data; +%% @todo Maybe better standardize the protocol callbacks argument orders. +connected(cast, {ws_send, ReplyTo, StreamRef, Frames}, State=#state{ + protocol=Protocol, protocol_state=ProtoState, + event_handler=EvHandler, event_handler_state=EvHandlerState0}) + when is_list(StreamRef) -> + {Commands, EvHandlerState} = Protocol:ws_send(Frames, + ProtoState, StreamRef, ReplyTo, EvHandler, EvHandlerState0), + commands(Commands, State#state{event_handler_state=EvHandlerState}); +%% Catch-all for the StreamRef-free variant. connected(cast, {ws_send, ReplyTo, _}, _) -> ReplyTo ! {gun_error, self(), {badstate, "Connection needs to be upgraded to Websocket " @@ -1472,7 +1490,7 @@ handle_common_connected_no_input(cast, {cancel, ReplyTo, StreamRef}, _, State=#state{protocol=Protocol, protocol_state=ProtoState, event_handler=EvHandler, event_handler_state=EvHandlerState0}) -> {ProtoState2, EvHandlerState} = Protocol:cancel(ProtoState, - StreamRef, ReplyTo, EvHandler, EvHandlerState0), + dereference_stream_ref(StreamRef, State), ReplyTo, EvHandler, EvHandlerState0), {keep_state, State#state{protocol_state=ProtoState2, event_handler_state=EvHandlerState}}; handle_common_connected_no_input({call, From}, {stream_info, StreamRef}, _, State=#state{intermediaries=Intermediaries0, protocol=Protocol, protocol_state=ProtoState}) -> @@ -1721,7 +1739,13 @@ commands([{switch_protocol, NewProtocol, ReplyTo}], State0=#state{ _ -> ProtoOpts0#{tunnel_transport => tcp} end, {StateName, ProtoState} = Protocol:init(ReplyTo, Socket, Transport, ProtoOpts), - EvHandlerState = EvHandler:protocol_changed(#{protocol => Protocol:name()}, EvHandlerState0), + ProtocolChangedEvent = case ProtoOpts of + #{stream_ref := StreamRef} -> + #{stream_ref => StreamRef, protocol => Protocol:name()}; + _ -> + #{protocol => Protocol:name()} + end, + EvHandlerState = EvHandler:protocol_changed(ProtocolChangedEvent, EvHandlerState0), %% We cancel the existing keepalive and, depending on the protocol, %% we enable keepalive again, effectively resetting the timer. State = keepalive_cancel(active(State0#state{protocol=Protocol, protocol_state=ProtoState, @@ -1755,6 +1779,8 @@ disconnect(State0=#state{owner=Owner, status=Status, opts=Opts, KilledStreams = Protocol:down(ProtoState), Owner ! {gun_down, self(), Protocol:name(), Reason, KilledStreams}, Retry = maps:get(retry, Opts, 5), + %% @todo We need to reset the origin_scheme/host/port and the transport + %% as well as remove the intermediaries. {next_state, not_connected, keepalive_cancel(State#state{socket=undefined, protocol=undefined, protocol_state=undefined}), -- cgit v1.2.3