aboutsummaryrefslogtreecommitdiffstats
path: root/src/gun.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/gun.erl')
-rw-r--r--src/gun.erl64
1 files changed, 45 insertions, 19 deletions
diff --git a/src/gun.erl b/src/gun.erl
index 217133f..62abd6f 100644
--- a/src/gun.erl
+++ b/src/gun.erl
@@ -93,6 +93,7 @@
-export([ws_upgrade/3]).
-export([ws_upgrade/4]).
-export([ws_send/2]).
+-export([ws_send/3]).
%% Internals.
-export([start_link/4]).
@@ -274,7 +275,8 @@
keepalive => timeout(),
protocols => [{binary(), module()}],
reply_to => pid(),
- silence_pings => boolean()
+ silence_pings => boolean(),
+ tunnel => stream_ref()
}.
-export_type([ws_opts/0]).
@@ -926,20 +928,26 @@ ws_upgrade(ServerPid, Path, Headers) ->
StreamRef.
-spec ws_upgrade(pid(), iodata(), req_headers(), ws_opts()) -> stream_ref().
-ws_upgrade(ServerPid, Path, Headers, Opts) ->
+ws_upgrade(ServerPid, Path, Headers, Opts0) ->
+ Tunnel = get_tunnel(Opts0),
+ Opts = maps:without([tunnel], Opts0),
ok = gun_ws:check_options(Opts),
- StreamRef = make_ref(),
+ StreamRef = make_stream_ref(Tunnel),
ReplyTo = maps:get(reply_to, Opts, self()),
%% @todo Also accept tunnel option.
gen_statem:cast(ServerPid, {ws_upgrade, ReplyTo, StreamRef, Path, normalize_headers(Headers), Opts}),
StreamRef.
%% @todo ws_send/2 will need to be deprecated in favor of a variant with StreamRef.
-%% But it can be kept for the time being since it can still work for HTTP/1.1.
+%% But it can be kept for the time being since it can still work for HTTP/1.1 (connected_ws_only).
-spec ws_send(pid(), ws_frame() | [ws_frame()]) -> ok.
ws_send(ServerPid, Frames) ->
gen_statem:cast(ServerPid, {ws_send, self(), Frames}).
+-spec ws_send(pid(), stream_ref(), ws_frame() | [ws_frame()]) -> ok.
+ws_send(ServerPid, StreamRef, Frames) ->
+ gen_statem:cast(ServerPid, {ws_send, self(), StreamRef, Frames}).
+
%% Internals.
callback_mode() -> state_functions.
@@ -1208,6 +1216,12 @@ connected_data_only(cast, Msg, _)
connected_data_only(Type, Event, State) ->
handle_common_connected(Type, Event, ?FUNCTION_NAME, State).
+connected_ws_only(cast, {ws_send, ReplyTo, StreamRef, Frames}, State=#state{
+ protocol=Protocol=gun_ws, protocol_state=ProtoState,
+ event_handler=EvHandler, event_handler_state=EvHandlerState0}) ->
+ {Commands, EvHandlerState} = Protocol:ws_send(Frames,
+ ProtoState, StreamRef, ReplyTo, EvHandler, EvHandlerState0),
+ commands(Commands, State#state{event_handler_state=EvHandlerState});
connected_ws_only(cast, {ws_send, ReplyTo, Frames}, State=#state{
protocol=Protocol=gun_ws, protocol_state=ProtoState,
event_handler=EvHandler, event_handler_state=EvHandlerState0}) ->
@@ -1259,13 +1273,13 @@ connected(cast, {request, ReplyTo, StreamRef, Method, Path, Headers0, Body, Init
{keep_state, State#state{protocol_state=ProtoState2, event_handler_state=EvHandlerState}};
connected(cast, {connect, ReplyTo, StreamRef, Destination, Headers, InitialFlow},
State=#state{origin_host=Host, origin_port=Port,
- protocol=Protocol, protocol_state=ProtoState}) ->
- %% @todo No events are currently handled for the CONNECT request?
- ProtoState2 = Protocol:connect(ProtoState,
+ protocol=Protocol, protocol_state=ProtoState,
+ event_handler=EvHandler, event_handler_state=EvHandlerState0}) ->
+ {ProtoState2, EvHandlerState} = Protocol:connect(ProtoState,
dereference_stream_ref(StreamRef, State), ReplyTo,
Destination, #{host => Host, port => Port},
- Headers, InitialFlow),
- {keep_state, State#state{protocol_state=ProtoState2}};
+ Headers, InitialFlow, EvHandler, EvHandlerState0),
+ {keep_state, State#state{protocol_state=ProtoState2, event_handler_state=EvHandlerState}};
%% Public Websocket interface.
%% @todo Maybe make an interface in the protocol module instead of checking on protocol name.
%% An interface would also make sure that HTTP/1.0 can't upgrade.
@@ -1275,8 +1289,7 @@ connected(cast, {ws_upgrade, ReplyTo, StreamRef, Path, Headers}, State=#state{op
connected(cast, {ws_upgrade, ReplyTo, StreamRef, Path, Headers0, WsOpts},
State0=#state{origin_host=Host, origin_port=Port,
protocol=Protocol, protocol_state=ProtoState,
- event_handler=EvHandler, event_handler_state=EvHandlerState0})
- when Protocol =:= gun_http ->
+ event_handler=EvHandler, event_handler_state=EvHandlerState0}) ->
EvHandlerState1 = EvHandler:ws_upgrade(#{
stream_ref => StreamRef,
reply_to => ReplyTo,
@@ -1285,14 +1298,19 @@ connected(cast, {ws_upgrade, ReplyTo, StreamRef, Path, Headers0, WsOpts},
%% @todo Can fail if HTTP/1.0.
{Headers, State} = add_cookie_header(Path, Headers0, State0),
{ProtoState2, EvHandlerState} = Protocol:ws_upgrade(ProtoState,
- StreamRef, ReplyTo, Host, Port, Path, Headers, WsOpts,
- EvHandler, EvHandlerState1),
+ dereference_stream_ref(StreamRef, State), ReplyTo,
+ Host, Port, Path, Headers, WsOpts, EvHandler, EvHandlerState1),
{keep_state, State#state{protocol_state=ProtoState2,
event_handler_state=EvHandlerState}};
-connected(cast, {ws_upgrade, ReplyTo, StreamRef, _, _, _}, _) ->
- ReplyTo ! {gun_error, self(), StreamRef, {badstate,
- "Websocket is only supported over HTTP/1.1."}},
- keep_state_and_data;
+%% @todo Maybe better standardize the protocol callbacks argument orders.
+connected(cast, {ws_send, ReplyTo, StreamRef, Frames}, State=#state{
+ protocol=Protocol, protocol_state=ProtoState,
+ event_handler=EvHandler, event_handler_state=EvHandlerState0})
+ when is_list(StreamRef) ->
+ {Commands, EvHandlerState} = Protocol:ws_send(Frames,
+ ProtoState, StreamRef, ReplyTo, EvHandler, EvHandlerState0),
+ commands(Commands, State#state{event_handler_state=EvHandlerState});
+%% Catch-all for the StreamRef-free variant.
connected(cast, {ws_send, ReplyTo, _}, _) ->
ReplyTo ! {gun_error, self(), {badstate,
"Connection needs to be upgraded to Websocket "
@@ -1472,7 +1490,7 @@ handle_common_connected_no_input(cast, {cancel, ReplyTo, StreamRef}, _,
State=#state{protocol=Protocol, protocol_state=ProtoState,
event_handler=EvHandler, event_handler_state=EvHandlerState0}) ->
{ProtoState2, EvHandlerState} = Protocol:cancel(ProtoState,
- StreamRef, ReplyTo, EvHandler, EvHandlerState0),
+ dereference_stream_ref(StreamRef, State), ReplyTo, EvHandler, EvHandlerState0),
{keep_state, State#state{protocol_state=ProtoState2, event_handler_state=EvHandlerState}};
handle_common_connected_no_input({call, From}, {stream_info, StreamRef}, _,
State=#state{intermediaries=Intermediaries0, protocol=Protocol, protocol_state=ProtoState}) ->
@@ -1721,7 +1739,13 @@ commands([{switch_protocol, NewProtocol, ReplyTo}], State0=#state{
_ -> ProtoOpts0#{tunnel_transport => tcp}
end,
{StateName, ProtoState} = Protocol:init(ReplyTo, Socket, Transport, ProtoOpts),
- EvHandlerState = EvHandler:protocol_changed(#{protocol => Protocol:name()}, EvHandlerState0),
+ ProtocolChangedEvent = case ProtoOpts of
+ #{stream_ref := StreamRef} ->
+ #{stream_ref => StreamRef, protocol => Protocol:name()};
+ _ ->
+ #{protocol => Protocol:name()}
+ end,
+ EvHandlerState = EvHandler:protocol_changed(ProtocolChangedEvent, EvHandlerState0),
%% We cancel the existing keepalive and, depending on the protocol,
%% we enable keepalive again, effectively resetting the timer.
State = keepalive_cancel(active(State0#state{protocol=Protocol, protocol_state=ProtoState,
@@ -1755,6 +1779,8 @@ disconnect(State0=#state{owner=Owner, status=Status, opts=Opts,
KilledStreams = Protocol:down(ProtoState),
Owner ! {gun_down, self(), Protocol:name(), Reason, KilledStreams},
Retry = maps:get(retry, Opts, 5),
+ %% @todo We need to reset the origin_scheme/host/port and the transport
+ %% as well as remove the intermediaries.
{next_state, not_connected,
keepalive_cancel(State#state{socket=undefined,
protocol=undefined, protocol_state=undefined}),