diff options
Diffstat (limited to 'src/cowboy_http2.erl')
-rw-r--r-- | src/cowboy_http2.erl | 59 |
1 files changed, 57 insertions, 2 deletions
diff --git a/src/cowboy_http2.erl b/src/cowboy_http2.erl index 0d22fa1..5cb0585 100644 --- a/src/cowboy_http2.erl +++ b/src/cowboy_http2.erl @@ -82,8 +82,13 @@ -export_type([opts/0]). -record(stream, { - %% Whether the stream is currently stopping. - status = running :: running | stopping, + %% Whether the stream is currently in a special state. + %% + %% - The running state is the normal state of a stream. + %% - The relaying state is used by extended CONNECT protocols to + %% use a 'relay' data_delivery method. + %% - The stopping state indicates the stream used the 'stop' command. + status = running :: running | {relaying, non_neg_integer(), pid()} | stopping, %% Flow requested for this stream. flow = 0 :: non_neg_integer(), @@ -327,6 +332,8 @@ loop(State=#state{parent=Parent, socket=Socket, transport=Transport, %% Messages pertaining to a stream. {{Pid, StreamID}, Msg} when Pid =:= self() -> before_loop(info(State, StreamID, Msg), Buffer); + {'$cowboy_relay_command', {Pid, StreamID}, RelayCommand} when Pid =:= self() -> + before_loop(relay_command(State, StreamID, RelayCommand), Buffer); %% Exit signal from children. Msg = {'EXIT', Pid, _} -> before_loop(down(State, Pid, Msg), Buffer); @@ -520,6 +527,14 @@ data_frame(State0=#state{opts=Opts, flow=Flow0, streams=Streams}, StreamID, IsFi reset_stream(State0, StreamID, {internal_error, {Class, Exception}, 'Unhandled exception in cowboy_stream:data/4.'}) end; + %% Stream handlers are not used for the data when relaying. + #{StreamID := #stream{status={relaying, _, RelayPid}}} -> + RelayPid ! {'$cowboy_relay_data', {self(), StreamID}, IsFin, Data}, + %% We keep a steady flow using the configured flow value. + %% Because we do not change the 'flow' value the update_window/2 + %% function will always maintain this value (of course with + %% thresholds applying). + update_window(State0, StreamID); %% We ignore DATA frames for streams that are stopping. #{} -> State0 @@ -866,6 +881,26 @@ commands(State=#state{socket=Socket, transport=Transport, http2_status=upgrade}, commands(State, StreamID, Tail); %% Use a different protocol within the stream (CONNECT :protocol). %% @todo Make sure we error out when the feature is disabled. +%% There are two data_delivery: stream_handlers and relay. +%% The former just has the data go through stream handlers +%% like normal requests. The latter relays data directly. +%% +%% @todo When relaying there might be some data that is +%% in stream handlers and that need to be received, +%% depending on whether the protocol sends data +%% before processing the response. +commands(State0=#state{flow=Flow, streams=Streams}, StreamID, + [{switch_protocol, Headers, _Mod, ModState=#{data_delivery := relay}}|Tail]) -> + State1 = info(State0, StreamID, {headers, 200, Headers}), + #{StreamID := Stream} = Streams, + #{data_delivery_pid := RelayPid} = ModState, + %% WINDOW_UPDATE frames updating the window will be sent after + %% the first DATA frame has been received. + RelayFlow = maps:get(data_delivery_flow, ModState, 131072), + State = State1#state{flow=Flow + RelayFlow, streams=Streams#{StreamID => Stream#stream{ + status={relaying, RelayFlow, RelayPid}, + flow=RelayFlow}}}, + commands(State, StreamID, Tail); commands(State0, StreamID, [{switch_protocol, Headers, _Mod, _ModState}|Tail]) -> State = info(State0, StreamID, {headers, 200, Headers}), commands(State, StreamID, Tail); @@ -881,6 +916,26 @@ commands(State=#state{opts=Opts}, StreamID, [Log={log, _, _, _}|Tail]) -> cowboy:log(Log, Opts), commands(State, StreamID, Tail). +%% Relay data delivery commands. + +relay_command(State, StreamID, DataCmd = {data, _, _}) -> + commands(State, StreamID, [DataCmd]); +%% When going active mode again we set the RelayFlow again +%% and update the window if necessary. +relay_command(State=#state{flow=Flow, streams=Streams}, StreamID, active) -> + #{StreamID := Stream} = Streams, + #stream{status={relaying, RelayFlow, _}} = Stream, + update_window(State#state{flow=Flow + RelayFlow, + streams=Streams#{StreamID => Stream#stream{flow=RelayFlow}}}, + StreamID); +%% When going passive mode we don't update the window +%% since we have not incremented it. +relay_command(State=#state{flow=Flow, streams=Streams}, StreamID, passive) -> + #{StreamID := Stream} = Streams, + #stream{flow=StreamFlow} = Stream, + State#state{flow=Flow - StreamFlow, + streams=Streams#{StreamID => Stream#stream{flow=0}}}. + %% Tentatively update the window after the flow was updated. update_window(State0=#state{socket=Socket, transport=Transport, |