aboutsummaryrefslogtreecommitdiffstats
path: root/src/cowboy_http2.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/cowboy_http2.erl')
-rw-r--r--src/cowboy_http2.erl59
1 files changed, 57 insertions, 2 deletions
diff --git a/src/cowboy_http2.erl b/src/cowboy_http2.erl
index 0d22fa1..5cb0585 100644
--- a/src/cowboy_http2.erl
+++ b/src/cowboy_http2.erl
@@ -82,8 +82,13 @@
-export_type([opts/0]).
-record(stream, {
- %% Whether the stream is currently stopping.
- status = running :: running | stopping,
+ %% Whether the stream is currently in a special state.
+ %%
+ %% - The running state is the normal state of a stream.
+ %% - The relaying state is used by extended CONNECT protocols to
+ %% use a 'relay' data_delivery method.
+ %% - The stopping state indicates the stream used the 'stop' command.
+ status = running :: running | {relaying, non_neg_integer(), pid()} | stopping,
%% Flow requested for this stream.
flow = 0 :: non_neg_integer(),
@@ -327,6 +332,8 @@ loop(State=#state{parent=Parent, socket=Socket, transport=Transport,
%% Messages pertaining to a stream.
{{Pid, StreamID}, Msg} when Pid =:= self() ->
before_loop(info(State, StreamID, Msg), Buffer);
+ {'$cowboy_relay_command', {Pid, StreamID}, RelayCommand} when Pid =:= self() ->
+ before_loop(relay_command(State, StreamID, RelayCommand), Buffer);
%% Exit signal from children.
Msg = {'EXIT', Pid, _} ->
before_loop(down(State, Pid, Msg), Buffer);
@@ -520,6 +527,14 @@ data_frame(State0=#state{opts=Opts, flow=Flow0, streams=Streams}, StreamID, IsFi
reset_stream(State0, StreamID, {internal_error, {Class, Exception},
'Unhandled exception in cowboy_stream:data/4.'})
end;
+ %% Stream handlers are not used for the data when relaying.
+ #{StreamID := #stream{status={relaying, _, RelayPid}}} ->
+ RelayPid ! {'$cowboy_relay_data', {self(), StreamID}, IsFin, Data},
+ %% We keep a steady flow using the configured flow value.
+ %% Because we do not change the 'flow' value the update_window/2
+ %% function will always maintain this value (of course with
+ %% thresholds applying).
+ update_window(State0, StreamID);
%% We ignore DATA frames for streams that are stopping.
#{} ->
State0
@@ -866,6 +881,26 @@ commands(State=#state{socket=Socket, transport=Transport, http2_status=upgrade},
commands(State, StreamID, Tail);
%% Use a different protocol within the stream (CONNECT :protocol).
%% @todo Make sure we error out when the feature is disabled.
+%% There are two data_delivery: stream_handlers and relay.
+%% The former just has the data go through stream handlers
+%% like normal requests. The latter relays data directly.
+%%
+%% @todo When relaying there might be some data that is
+%% in stream handlers and that need to be received,
+%% depending on whether the protocol sends data
+%% before processing the response.
+commands(State0=#state{flow=Flow, streams=Streams}, StreamID,
+ [{switch_protocol, Headers, _Mod, ModState=#{data_delivery := relay}}|Tail]) ->
+ State1 = info(State0, StreamID, {headers, 200, Headers}),
+ #{StreamID := Stream} = Streams,
+ #{data_delivery_pid := RelayPid} = ModState,
+ %% WINDOW_UPDATE frames updating the window will be sent after
+ %% the first DATA frame has been received.
+ RelayFlow = maps:get(data_delivery_flow, ModState, 131072),
+ State = State1#state{flow=Flow + RelayFlow, streams=Streams#{StreamID => Stream#stream{
+ status={relaying, RelayFlow, RelayPid},
+ flow=RelayFlow}}},
+ commands(State, StreamID, Tail);
commands(State0, StreamID, [{switch_protocol, Headers, _Mod, _ModState}|Tail]) ->
State = info(State0, StreamID, {headers, 200, Headers}),
commands(State, StreamID, Tail);
@@ -881,6 +916,26 @@ commands(State=#state{opts=Opts}, StreamID, [Log={log, _, _, _}|Tail]) ->
cowboy:log(Log, Opts),
commands(State, StreamID, Tail).
+%% Relay data delivery commands.
+
+relay_command(State, StreamID, DataCmd = {data, _, _}) ->
+ commands(State, StreamID, [DataCmd]);
+%% When going active mode again we set the RelayFlow again
+%% and update the window if necessary.
+relay_command(State=#state{flow=Flow, streams=Streams}, StreamID, active) ->
+ #{StreamID := Stream} = Streams,
+ #stream{status={relaying, RelayFlow, _}} = Stream,
+ update_window(State#state{flow=Flow + RelayFlow,
+ streams=Streams#{StreamID => Stream#stream{flow=RelayFlow}}},
+ StreamID);
+%% When going passive mode we don't update the window
+%% since we have not incremented it.
+relay_command(State=#state{flow=Flow, streams=Streams}, StreamID, passive) ->
+ #{StreamID := Stream} = Streams,
+ #stream{flow=StreamFlow} = Stream,
+ State#state{flow=Flow - StreamFlow,
+ streams=Streams#{StreamID => Stream#stream{flow=0}}}.
+
%% Tentatively update the window after the flow was updated.
update_window(State0=#state{socket=Socket, transport=Transport,