aboutsummaryrefslogtreecommitdiffstats
path: root/src/cowboy_webtransport.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/cowboy_webtransport.erl')
-rw-r--r--src/cowboy_webtransport.erl292
1 files changed, 292 insertions, 0 deletions
diff --git a/src/cowboy_webtransport.erl b/src/cowboy_webtransport.erl
new file mode 100644
index 0000000..8c8ca39
--- /dev/null
+++ b/src/cowboy_webtransport.erl
@@ -0,0 +1,292 @@
+%% Copyright (c) Loïc Hoguin <[email protected]>
+%%
+%% Permission to use, copy, modify, and/or distribute this software for any
+%% purpose with or without fee is hereby granted, provided that the above
+%% copyright notice and this permission notice appear in all copies.
+%%
+%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+
+%% @todo To enable WebTransport the following options need to be set:
+%%
+%% QUIC:
+%% - max_datagram_frame_size > 0
+%%
+%% HTTP/3:
+%% - SETTINGS_H3_DATAGRAM = 1
+%% - SETTINGS_ENABLE_CONNECT_PROTOCOL = 1
+%% - SETTINGS_WT_MAX_SESSIONS >= 1
+
+%% Cowboy supports versions 07 through 13 of the WebTransport drafts.
+%% Cowboy also has some compatibility with version 02.
+%%
+%% WebTransport CONNECT requests go through cowboy_stream as normal
+%% and then an upgrade/switch_protocol is issued (just like Websocket).
+%% After that point none of the events go through cowboy_stream except
+%% the final terminate event. The request process becomes the process
+%% handling all events in the WebTransport session.
+%%
+%% WebTransport sessions can be ended via a command, via a crash or
+%% exit, via the closing of the connection (client or server inititated),
+%% via the client ending the session (mirroring the command) or via
+%% the client terminating the CONNECT stream.
+-module(cowboy_webtransport).
+
+-export([upgrade/4]).
+-export([upgrade/5]).
+
+%% cowboy_stream.
+-export([info/3]).
+-export([terminate/3]).
+
+-type stream_type() :: unidi | bidi.
+-type open_stream_ref() :: any().
+
+-type event() ::
+ {stream_open, cow_http3:stream_id(), stream_type()} |
+ {opened_stream_id, open_stream_ref(), cow_http3:stream_id()} |
+ {stream_data, cow_http3:stream_id(), cow_http:fin(), binary()} |
+ {datagram, binary()} |
+ close_initiated.
+
+-type commands() :: [
+ {open_stream, open_stream_ref(), stream_type(), iodata()} |
+ {close_stream, cow_http3:stream_id(), cow_http3:wt_app_error_code()} |
+ {send, cow_http3:stream_id() | datagram, iodata()} |
+ initiate_close |
+ close |
+ {close, cow_http3:wt_app_error_code()} |
+ {close, cow_http3:wt_app_error_code(), iodata()}
+].
+-export_type([commands/0]).
+
+-type call_result(State) :: {commands(), State} | {commands(), State, hibernate}.
+
+-callback init(Req, any())
+ -> {ok | module(), Req, any()}
+ | {module(), Req, any(), any()}
+ when Req::cowboy_req:req().
+
+-callback webtransport_init(State)
+ -> call_result(State) when State::any().
+-optional_callbacks([webtransport_init/1]).
+
+-callback webtransport_handle(event(), State)
+ -> call_result(State) when State::any().
+-optional_callbacks([webtransport_handle/2]).
+
+-callback webtransport_info(any(), State)
+ -> call_result(State) when State::any().
+-optional_callbacks([webtransport_info/2]).
+
+-callback terminate(any(), cowboy_req:req(), any()) -> ok.
+-optional_callbacks([terminate/3]).
+
+-type opts() :: #{
+ req_filter => fun((cowboy_req:req()) -> map())
+}.
+-export_type([opts/0]).
+
+-record(state, {
+ id :: cow_http3:stream_id(),
+ parent :: pid(),
+ opts = #{} :: opts(),
+ handler :: module(),
+ hibernate = false :: boolean(),
+ req = #{} :: map()
+}).
+
+%% This function mirrors a similar function for Websocket.
+
+-spec is_upgrade_request(cowboy_req:req()) -> boolean().
+
+is_upgrade_request(#{version := Version, method := <<"CONNECT">>, protocol := Protocol})
+ when Version =:= 'HTTP/3' ->
+ %% @todo scheme MUST BE "https"
+ <<"webtransport">> =:= cowboy_bstr:to_lower(Protocol);
+
+is_upgrade_request(_) ->
+ false.
+
+%% Stream process.
+
+-spec upgrade(Req, Env, module(), any())
+ -> {ok, Req, Env}
+ when Req::cowboy_req:req(), Env::cowboy_middleware:env().
+
+upgrade(Req, Env, Handler, HandlerState) ->
+ upgrade(Req, Env, Handler, HandlerState, #{}).
+
+-spec upgrade(Req, Env, module(), any(), opts())
+ -> {ok, Req, Env}
+ when Req::cowboy_req:req(), Env::cowboy_middleware:env().
+
+%% @todo Immediately crash if a response has already been sent.
+upgrade(Req=#{version := 'HTTP/3', pid := Pid, streamid := StreamID}, Env, Handler, HandlerState, Opts) ->
+ FilteredReq = case maps:get(req_filter, Opts, undefined) of
+ undefined -> maps:with([method, version, scheme, host, port, path, qs, peer], Req);
+ FilterFun -> FilterFun(Req)
+ end,
+ State = #state{id=StreamID, parent=Pid, opts=Opts, handler=Handler, req=FilteredReq},
+ %% @todo Must ensure the relevant settings are enabled (QUIC and H3).
+ %% Either we check them BEFORE, or we check them when the handler
+ %% is OK to initiate a webtransport session. Probably need to
+ %% check them BEFORE as we need to become (takeover) the webtransport process
+ %% after we are done with the upgrade. Maybe in cow_http3_machine but
+ %% it doesn't have QUIC settings currently (max_datagram_size).
+ case is_upgrade_request(Req) of
+ true ->
+ Headers = cowboy_req:response_headers(#{}, Req),
+ Pid ! {{Pid, StreamID}, {switch_protocol, Headers, ?MODULE,
+ #{session_pid => self()}}},
+ webtransport_init(State, HandlerState);
+ %% Use 501 Not Implemented to mirror the recommendation in
+ %% by RFC9220 3 (WebSockets Upgrade over HTTP/3).
+ false ->
+ %% @todo I don't think terminate will be called.
+ {ok, cowboy_req:reply(501, Req), Env}
+ end.
+
+webtransport_init(State=#state{handler=Handler}, HandlerState) ->
+ case erlang:function_exported(Handler, webtransport_init, 1) of
+ true -> handler_call(State, HandlerState, webtransport_init, undefined);
+ false -> before_loop(State, HandlerState)
+ end.
+
+before_loop(State=#state{hibernate=true}, HandlerState) ->
+ proc_lib:hibernate(?MODULE, loop, [State#state{hibernate=false}, HandlerState]);
+before_loop(State, HandlerState) ->
+ loop(State, HandlerState).
+
+-spec loop(#state{}, any()) -> no_return().
+
+loop(State=#state{id=SessionID, parent=Parent}, HandlerState) ->
+ receive
+ {'$webtransport_event', SessionID, Event={closed, _, _}} ->
+ terminate_proc(State, HandlerState, Event);
+ {'$webtransport_event', SessionID, Event=closed_abruptly} ->
+ terminate_proc(State, HandlerState, Event);
+ {'$webtransport_event', SessionID, Event} ->
+ handler_call(State, HandlerState, webtransport_handle, Event);
+ %% Timeouts.
+%% @todo idle_timeout
+% {timeout, TRef, ?MODULE} ->
+% tick_idle_timeout(State, HandlerState, ParseState);
+% {timeout, OlderTRef, ?MODULE} when is_reference(OlderTRef) ->
+% before_loop(State, HandlerState, ParseState);
+ %% System messages.
+ {'EXIT', Parent, Reason} ->
+ %% @todo We should exit gracefully.
+ exit(Reason);
+ {system, From, Request} ->
+ sys:handle_system_msg(Request, From, Parent, ?MODULE, [],
+ {State, HandlerState});
+ %% Calls from supervisor module.
+ {'$gen_call', From, Call} ->
+ cowboy_children:handle_supervisor_call(Call, From, [], ?MODULE),
+ before_loop(State, HandlerState);
+ Message ->
+ handler_call(State, HandlerState, webtransport_info, Message)
+ end.
+
+handler_call(State=#state{handler=Handler}, HandlerState, Callback, Message) ->
+ try case Callback of
+ webtransport_init -> Handler:webtransport_init(HandlerState);
+ _ -> Handler:Callback(Message, HandlerState)
+ end of
+ {Commands, HandlerState2} when is_list(Commands) ->
+ handler_call_result(State, HandlerState2, Commands);
+ {Commands, HandlerState2, hibernate} when is_list(Commands) ->
+ handler_call_result(State#state{hibernate=true}, HandlerState2, Commands)
+ catch Class:Reason:Stacktrace ->
+ %% @todo Do we need to send a close? Let cowboy_http3 detect and handle it?
+ handler_terminate(State, HandlerState, {crash, Class, Reason}),
+ erlang:raise(Class, Reason, Stacktrace)
+ end.
+
+handler_call_result(State0, HandlerState, Commands) ->
+ case commands(Commands, State0, ok, []) of
+ {ok, State} ->
+ before_loop(State, HandlerState);
+ {stop, State} ->
+ terminate_proc(State, HandlerState, stop)
+ end.
+
+%% We accumulate the commands that must be sent to the connection process
+%% because we want to send everything into one message. Other commands are
+%% processed immediately.
+
+commands([], State, Res, []) ->
+ {Res, State};
+commands([], State=#state{id=SessionID, parent=Pid}, Res, Commands) ->
+ Pid ! {'$webtransport_commands', SessionID, lists:reverse(Commands)},
+ {Res, State};
+%% {open_stream, OpenStreamRef, StreamType, InitialData}.
+commands([Command={open_stream, _, _, _}|Tail], State, Res, Acc) ->
+ commands(Tail, State, Res, [Command|Acc]);
+%% {close_stream, StreamID, Code}.
+commands([Command={close_stream, _, _}|Tail], State, Res, Acc) ->
+ commands(Tail, State, Res, [Command|Acc]);
+%% @todo We must reject send to a remote unidi stream.
+%% {send, StreamID | datagram, Data}.
+commands([Command={send, _, _}|Tail], State, Res, Acc) ->
+ commands(Tail, State, Res, [Command|Acc]);
+%% {send, StreamID, IsFin, Data}.
+commands([Command={send, _, _, _}|Tail], State, Res, Acc) ->
+ commands(Tail, State, Res, [Command|Acc]);
+%% initiate_close - DRAIN_WT_SESSION
+commands([Command=initiate_close|Tail], State, Res, Acc) ->
+ commands(Tail, State, Res, [Command|Acc]);
+%% close | {close, Code} | {close, Code, Msg} - CLOSE_WT_SESSION
+%% @todo At this point the handler must not issue stream or send commands.
+commands([Command=close|Tail], State, _, Acc) ->
+ commands(Tail, State, stop, [Command|Acc]);
+commands([Command={close, _}|Tail], State, _, Acc) ->
+ commands(Tail, State, stop, [Command|Acc]);
+commands([Command={close, _, _}|Tail], State, _, Acc) ->
+ commands(Tail, State, stop, [Command|Acc]).
+%% @todo A set_options command could be useful to increase the number of allowed streams
+%% or other forms of flow control. Alternatively a flow command. Or both.
+%% @todo A shutdown_reason command could be useful for the same reasons as Websocekt.
+
+-spec terminate_proc(_, _, _) -> no_return().
+
+terminate_proc(State, HandlerState, Reason) ->
+ handler_terminate(State, HandlerState, Reason),
+ %% @todo This is what should be done if shutdown_reason gets implemented.
+% case Shutdown of
+% normal -> exit(normal);
+% _ -> exit({shutdown, Shutdown})
+% end.
+ exit(normal).
+
+handler_terminate(#state{handler=Handler, req=Req}, HandlerState, Reason) ->
+ cowboy_handler:terminate(Reason, Req, HandlerState, Handler).
+
+%% cowboy_stream callbacks.
+%%
+%% We shortcut stream handlers but still need to process some events
+%% such as process exiting or termination. We implement the relevant
+%% callbacks here. Note that as far as WebTransport is concerned,
+%% receiving stream data here would be an error therefore the data
+%% callback is not implemented.
+%%
+%% @todo Better type than map() for the cowboy_stream state.
+
+-spec info(cowboy_stream:streamid(), any(), State)
+ -> {cowboy_stream:commands(), State} when State::map().
+
+info(StreamID, Msg, WTState=#{stream_state := StreamState0}) ->
+ {Commands, StreamState} = cowboy_stream:info(StreamID, Msg, StreamState0),
+ {Commands, WTState#{stream_state => StreamState}}.
+
+-spec terminate(cowboy_stream:streamid(), cowboy_stream:reason(), map())
+ -> any().
+
+terminate(StreamID, Reason, #{stream_state := StreamState}) ->
+ cowboy_stream:terminate(StreamID, Reason, StreamState).