aboutsummaryrefslogtreecommitdiffstats
path: root/src/cowboy_websocket.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/cowboy_websocket.erl')
-rw-r--r--src/cowboy_websocket.erl62
1 files changed, 49 insertions, 13 deletions
diff --git a/src/cowboy_websocket.erl b/src/cowboy_websocket.erl
index cb30c3f..65289cd 100644
--- a/src/cowboy_websocket.erl
+++ b/src/cowboy_websocket.erl
@@ -68,6 +68,8 @@
-type opts() :: #{
active_n => pos_integer(),
compress => boolean(),
+ data_delivery => stream_handlers | relay,
+ data_delivery_flow => pos_integer(),
deflate_opts => cow_ws:deflate_opts(),
dynamic_buffer => false | {pos_integer(), pos_integer()},
dynamic_buffer_initial_average => non_neg_integer(),
@@ -91,7 +93,7 @@
parent :: undefined | pid(),
ref :: ranch:ref(),
socket = undefined :: inet:socket() | {pid(), cowboy_stream:streamid()} | undefined,
- transport = undefined :: module() | undefined,
+ transport :: module() | {data_delivery, stream_handlers | relay},
opts = #{} :: opts(),
active = true :: boolean(),
handler :: module(),
@@ -149,7 +151,7 @@ upgrade(Req, Env, Handler, HandlerState) ->
%% @todo Immediately crash if a response has already been sent.
upgrade(Req0=#{version := Version}, Env, Handler, HandlerState, Opts) ->
FilteredReq = case maps:get(req_filter, Opts, undefined) of
- undefined -> maps:with([method, version, scheme, host, port, path, qs, peer], Req0);
+ undefined -> maps:with([method, version, scheme, host, port, path, qs, peer, streamid], Req0);
FilterFun -> FilterFun(Req0)
end,
Utf8State = case maps:get(validate_utf8, Opts, true) of
@@ -273,12 +275,27 @@ websocket_handshake(State=#state{key=Key},
%% For HTTP/2 we do not let the process die, we instead keep it
%% for the Websocket stream. This is because in HTTP/2 we only
%% have a stream, it doesn't take over the whole connection.
-websocket_handshake(State, Req=#{ref := Ref, pid := Pid, streamid := StreamID},
+%%
+%% There are two methods of delivering data to the Websocket session:
+%% - 'stream_handlers' is the default and makes the data go
+%% through stream handlers just like when reading a request body;
+%% - 'relay' is a new method where data is sent as a message as
+%% soon as it is received from the socket in a DATA frame.
+websocket_handshake(State=#state{opts=Opts},
+ Req=#{ref := Ref, pid := Pid, streamid := StreamID},
HandlerState, _Env) ->
%% @todo We don't want date and server headers.
Headers = cowboy_req:response_headers(#{}, Req),
- Pid ! {{Pid, StreamID}, {switch_protocol, Headers, ?MODULE, {State, HandlerState}}},
- takeover(Pid, Ref, {Pid, StreamID}, undefined, #{}, <<>>,
+ DataDelivery = maps:get(data_delivery, Opts, stream_handlers),
+ ModState = #{
+ data_delivery => DataDelivery,
+ %% For relay data_delivery. The flow is a hint and may
+ %% not be used by the underlying protocol.
+ data_delivery_pid => self(),
+ data_delivery_flow => maps:get(data_delivery_flow, Opts, 131072)
+ },
+ Pid ! {{Pid, StreamID}, {switch_protocol, Headers, ?MODULE, ModState}},
+ takeover(Pid, Ref, {Pid, StreamID}, {data_delivery, DataDelivery}, #{}, <<>>,
{State, HandlerState}).
%% Connection process.
@@ -301,7 +318,7 @@ websocket_handshake(State, Req=#{ref := Ref, pid := Pid, streamid := StreamID},
-type parse_state() :: #ps_header{} | #ps_payload{}.
-spec takeover(pid(), ranch:ref(), inet:socket() | {pid(), cowboy_stream:streamid()},
- module() | undefined, any(), binary(),
+ module() | {data_delivery, stream_handlers | relay}, any(), binary(),
{#state{}, any()}) -> no_return().
takeover(Parent, Ref, Socket, Transport, Opts, Buffer,
{State0=#state{opts=WsOpts, handler=Handler, req=Req}, HandlerState}) ->
@@ -311,7 +328,7 @@ takeover(Parent, Ref, Socket, Transport, Opts, Buffer,
_ -> ranch:remove_connection(Ref)
end,
Messages = case Transport of
- undefined -> undefined;
+ {data_delivery, _} -> undefined;
_ -> Transport:messages()
end,
State = set_idle_timeout(State0#state{parent=Parent,
@@ -355,13 +372,14 @@ after_init(State, HandlerState, ParseState) ->
%% immediately but there might still be data to be processed in
%% the message queue.
-setopts_active(#state{transport=undefined}) ->
+setopts_active(#state{transport={data_delivery, _}}) ->
ok;
setopts_active(#state{socket=Socket, transport=Transport, opts=Opts}) ->
N = maps:get(active_n, Opts, 1),
Transport:setopts(Socket, [{active, N}]).
-maybe_read_body(#state{socket=Stream={Pid, _}, transport=undefined, active=true}) ->
+maybe_read_body(#state{transport={data_delivery, stream_handlers},
+ socket=Stream={Pid, _}, active=true}) ->
%% @todo Keep Ref around.
ReadBodyRef = make_ref(),
Pid ! {Stream, {read_body, self(), ReadBodyRef, auto, infinity}},
@@ -369,16 +387,25 @@ maybe_read_body(#state{socket=Stream={Pid, _}, transport=undefined, active=true}
maybe_read_body(_) ->
ok.
-active(State) ->
+active(State=#state{transport={data_delivery, relay},
+ socket=Stream={Pid, _}}) ->
+ Pid ! {'$cowboy_relay_command', Stream, active},
+ State#state{active=true};
+active(State0) ->
+ State = State0#state{active=true},
setopts_active(State),
maybe_read_body(State),
- State#state{active=true}.
+ State.
-passive(State=#state{transport=undefined}) ->
+passive(State=#state{transport={data_delivery, stream_handlers}}) ->
%% Unfortunately we cannot currently cancel read_body.
%% But that's OK, we will just stop reading the body
%% after the next message.
State#state{active=false};
+passive(State=#state{transport={data_delivery, relay},
+ socket=Stream={Pid, _}}) ->
+ Pid ! {'$cowboy_relay_command', Stream, passive},
+ State#state{active=false};
passive(State=#state{socket=Socket, transport=Transport, messages=Messages}) ->
Transport:setopts(Socket, [{active, false}]),
flush_passive(Socket, Messages),
@@ -454,6 +481,10 @@ loop(State=#state{parent=Parent, socket=Socket, messages=Messages,
{request_body, _Ref, fin, _, Data} ->
maybe_read_body(State),
parse(?reset_idle_timeout(State), HandlerState, ParseState, Data);
+ %% @todo It would be better to check StreamID.
+ %% @todo We must ensure that IsFin=fin is handled like a socket close?
+ {'$cowboy_relay_data', {Pid, _StreamID}, _IsFin, Data} when Pid =:= Parent ->
+ parse(?reset_idle_timeout(State), HandlerState, ParseState, Data);
%% Timeouts.
{timeout, TRef, ?MODULE} ->
tick_idle_timeout(State, HandlerState, ParseState);
@@ -662,9 +693,14 @@ commands([Frame|Tail], State, Data0) ->
commands(Tail, State, Data)
end.
-transport_send(#state{socket=Stream={Pid, _}, transport=undefined}, IsFin, Data) ->
+transport_send(#state{transport={data_delivery, stream_handlers},
+ socket=Stream={Pid, _}}, IsFin, Data) ->
Pid ! {Stream, {data, IsFin, Data}},
ok;
+transport_send(#state{transport={data_delivery, relay},
+ socket=Stream={Pid, _}}, IsFin, Data) ->
+ Pid ! {'$cowboy_relay_command', Stream, {data, IsFin, Data}},
+ ok;
transport_send(#state{socket=Socket, transport=Transport}, _, Data) ->
Transport:send(Socket, Data).