@@ -36,11 +36,20 @@
-record(stream, {
id = undefined :: cowboy_stream:streamid(),
%% Stream handlers and their state.
- state = undefined :: {module(), any()},
+ state = undefined :: {module(), any()} | flush,
%% Whether we finished sending data.
- local = idle :: idle | upgrade | cowboy_stream:fin(),
+ local = idle :: idle | upgrade | cowboy_stream:fin() | flush,
+ %% Local flow control window (how much we can send).
+ local_window :: integer(),
+ %% Buffered data waiting for the flow control window to increase.
+ local_buffer = queue:new() :: queue:queue(
+ {cowboy_stream:fin(), non_neg_integer(), iolist()
+ | {sendfile, non_neg_integer(), pos_integer(), file:name_all()}}),
+ local_buffer_size = 0 :: non_neg_integer(),
%% Whether we finished receiving data.
remote = nofin :: cowboy_stream:fin(),
+ %% Remote flow control window (how much we accept to receive).
+ remote_window :: integer(),
%% Request body length.
body_length = 0 :: non_neg_integer()
@@ -67,7 +76,7 @@
% header_table_size => 4096,
% enable_push => false, %% We are the server. Push is never enabled.
% max_concurrent_streams => infinity,
-% initial_window_size => 65535,
+ initial_window_size => 65535,
max_frame_size => 16384
% max_header_list_size => infinity
} :: map(),
@@ -75,7 +84,13 @@
%% We need to be careful there. It's well possible that we send
%% two SETTINGS frames before we receive a SETTINGS ack.
next_settings = #{} :: undefined | map(), %% @todo perhaps set to undefined by default
- remote_settings = #{} :: map(),
+ remote_settings = #{
+ initial_window_size => 65535
+ } :: map(),
+ %% Connection-wide flow control window.
+ local_window = 65535 :: integer(), %% How much we can send.
+ remote_window = 65535 :: integer(), %% How much we accept to receive.
%% Stream identifiers.
client_streamid = 0 :: non_neg_integer(),
@@ -269,17 +284,22 @@ parse_settings_preface(State, _, _, _) ->
%% and terminate the stream if this is the end of it.
%% DATA frame.
-frame(State=#state{streams=Streams}, {data, StreamID, IsFin0, Data}) ->
+frame(State=#state{remote_window=ConnWindow, streams=Streams},
+ {data, StreamID, IsFin0, Data}) ->
case lists:keyfind(StreamID, #stream.id, Streams) of
- Stream = #stream{state=StreamState0, remote=nofin, body_length=Len0} ->
- Len = Len0 + byte_size(Data),
+ Stream = #stream{state=StreamState0, remote=nofin,
+ remote_window=StreamWindow, body_length=Len0} ->
+ DataLen = byte_size(Data),
+ Len = Len0 + DataLen,
IsFin = case IsFin0 of
fin -> {fin, Len};
nofin -> nofin
try cowboy_stream:data(StreamID, IsFin, Data, StreamState0) of
{Commands, StreamState} ->
- commands(State, Stream#stream{state=StreamState, body_length=Len}, Commands)
+ commands(State#state{remote_window=ConnWindow - DataLen},
+ Stream#stream{state=StreamState, remote_window=StreamWindow - DataLen,
+ body_length=Len}, Commands)
catch Class:Reason ->
error_logger:error_msg("Exception occurred in "
"cowboy_stream:data(~p, ~p, ~p, ~p) with reason ~p:~p.",
@@ -342,13 +362,22 @@ frame(State, {ping_ack, _Opaque}) ->
frame(State, Frame={goaway, _, _, _}) ->
terminate(State, {stop, Frame, 'Client is going away.'});
%% Connection-wide WINDOW_UPDATE frame.
-frame(State, {window_update, _Increment}) ->
- %% @todo control flow
- State;
+frame(State=#state{local_window=ConnWindow}, {window_update, Increment}) ->
+ send_data(State#state{local_window=ConnWindow + Increment});
%% Stream-specific WINDOW_UPDATE frame.
-frame(State, {window_update, _StreamID, _Increment}) ->
- %% @todo stream-specific control flow
- State;
+frame(State0=#state{streams=Streams0}, {window_update, StreamID, Increment}) ->
+ case lists:keyfind(StreamID, #stream.id, Streams0) of
+ Stream0 = #stream{local_window=StreamWindow} ->
+ {State, Stream} = send_data(State0,
+ Stream0#stream{local_window=StreamWindow + Increment}),
+ Streams = lists:keystore(StreamID, #stream.id, Streams0, Stream),
+ State#state{streams=Streams};
+ false ->
+ %% @todo Receiving this frame on a stream in the idle state is an error.
+ %% WINDOW_UPDATE frames may be received for a short period of time
+ %% after a stream is closed. They must be ignored.
+ State0
+ end;
%% Unexpected CONTINUATION frame.
frame(State, {continuation, _, _, _}) ->
terminate(State, {connection_error, protocol_error,
@@ -419,11 +448,8 @@ commands(State=#state{socket=Socket, transport=Transport, encode_state=EncodeSta
[{sendfile, fin, O, B, P}|Tail]);
_ ->
Transport:send(Socket, cow_http2:headers(StreamID, nofin, HeaderBlock)),
- %% @todo 16384 is the default SETTINGS_MAX_FRAME_SIZE.
- %% Use the length set by the server instead, if any.
- %% @todo Would be better if we didn't have to convert to binary.
- send_data(Socket, Transport, StreamID, fin, iolist_to_binary(Body), 16384),
- commands(State#state{encode_state=EncodeState}, Stream#stream{local=fin}, Tail)
+ {State1, Stream1} = send_data(State, Stream#stream{local=nofin}, fin, Body),
+ commands(State1#state{encode_state=EncodeState}, Stream1, Tail)
%% @todo response when local!=idle
%% Send response headers and initiate chunked encoding.
@@ -445,10 +471,9 @@ commands(State=#state{socket=Socket, transport=Transport, encode_state=EncodeSta
%% split into multiple calls and flow control should be used to make
%% sure we only send as fast as the client can receive and don't block
%% anything.
-commands(State=#state{socket=Socket, transport=Transport}, Stream=#stream{id=StreamID, local=nofin},
- [{data, IsFin, Data}|Tail]) ->
- Transport:send(Socket, cow_http2:data(StreamID, IsFin, Data)),
- commands(State, Stream#stream{local=IsFin}, Tail);
+commands(State0, Stream0=#stream{local=nofin}, [{data, IsFin, Data}|Tail]) ->
+ {State, Stream} = send_data(State0, Stream0, IsFin, Data),
+ commands(State, Stream, Tail);
%% @todo data when local!=nofin
@@ -463,16 +488,10 @@ commands(State=#state{socket=Socket, transport=Transport}, Stream=#stream{id=Str
%% to ensure the file is sent in chunks (which would require a better
%% flow control at the stream handler level). One thing for sure, the
%% implementation necessarily varies between HTTP/1.1 and HTTP/2.
-commands(State=#state{socket=Socket, transport=Transport}, Stream=#stream{id=StreamID, local=nofin},
+commands(State0, Stream0=#stream{local=nofin},
[{sendfile, IsFin, Offset, Bytes, Path}|Tail]) ->
- %% @todo We currently have a naive implementation without a
- %% scheduler to prioritize frames that need to be sent.
- %% A future update will need to queue such data frames
- %% and only send them when there is nothing currently
- %% being sent. We would probably also benefit from doing
- %% asynchronous sends.
- sendfile(Socket, Transport, StreamID, IsFin, Offset, Bytes, Path, 16384),
- commands(State, Stream#stream{local=IsFin}, Tail);
+ {State, Stream} = send_data(State0, Stream0, IsFin, {sendfile, Offset, Bytes, Path}),
+ commands(State, Stream, Tail);
%% @todo sendfile when local!=nofin
%% Send a push promise.
@@ -500,9 +519,15 @@ commands(State0=#state{socket=Socket, transport=Transport, server_streamid=Promi
State = stream_init(State0#state{server_streamid=PromisedStreamID + 2, encode_state=EncodeState},
PromisedStreamID, fin, iolist_to_binary(HeaderBlock)),
commands(State, Stream, Tail);
-%% @todo Update the flow control state.
-commands(State, Stream, [{flow, _Size}|Tail]) ->
- commands(State, Stream, Tail);
+commands(State=#state{socket=Socket, transport=Transport, remote_window=ConnWindow},
+ Stream=#stream{id=StreamID, remote_window=StreamWindow},
+ [{flow, Size}|Tail]) ->
+ Transport:send(Socket, [
+ cow_http2:window_update(Size),
+ cow_http2:window_update(StreamID, Size)
+ ]),
+ commands(State#state{remote_window=ConnWindow + Size},
+ Stream#stream{remote_window=StreamWindow + Size}, Tail);
%% Supervise a child process.
commands(State=#state{children=Children}, Stream=#stream{id=StreamID},
[{spawn, Pid, _Shutdown}|Tail]) -> %% @todo Shutdown
@@ -536,30 +561,89 @@ status(Status) when is_integer(Status) ->
status(<< H, T, U, _/bits >>) when H >= $1, H =< $9, T >= $0, T =< $9, U >= $0, U =< $9 ->
<< H, T, U >>.
-%% This same function is found in gun_http2.
-send_data(Socket, Transport, StreamID, IsFin, Data, Length) ->
- if
- Length < byte_size(Data) ->
- << Payload:Length/binary, Rest/bits >> = Data,
- Transport:send(Socket, cow_http2:data(StreamID, nofin, Payload)),
- send_data(Socket, Transport, StreamID, IsFin, Rest, Length);
- true ->
- Transport:send(Socket, cow_http2:data(StreamID, IsFin, Data))
- end.
-%% @todo This is currently awfully slow. But at least it's correct.
-sendfile(Socket, Transport, StreamID, IsFin, Offset, Bytes, Path, Length) ->
- if
- Length < Bytes ->
- Transport:send(Socket, cow_http2:data_header(StreamID, nofin, Length)),
- Transport:sendfile(Socket, Path, Offset, Length),
- sendfile(Socket, Transport, StreamID, IsFin,
- Offset + Length, Bytes - Length, Path, Length);
- true ->
+%% @todo Should we ever want to implement the PRIORITY mechanism,
+%% this would be the place to do it. Right now, we just go over
+%% all streams and send what we can until either everything is
+%% sent or we run out of space in the window.
+send_data(State=#state{streams=Streams}) ->
+ resume_streams(State, Streams, []).
+%% @todo When streams terminate we need to remove the stream.
+resume_streams(State, [], Acc) ->
+ State#state{streams=lists:reverse(Acc)};
+%% While technically we should never get < 0 here, let's be on the safe side.
+resume_streams(State=#state{local_window=ConnWindow}, Streams, Acc)
+ when ConnWindow =< 0 ->
+ State#state{streams=lists:reverse(Acc, Streams)};
+%% We rely on send_data/2 to do all the necessary checks about the stream.
+resume_streams(State0, [Stream0|Tail], Acc) ->
+ {State, Stream} = send_data(State0, Stream0),
+ resume_streams(State, Tail, [Stream|Acc]).
+%% @todo We might want to print an error if local=fin.
+%% @todo It's possible that the stream terminates. We must remove it.
+ Stream=#stream{local=IsFin, local_window=StreamWindow, local_buffer_size=BufferSize})
+ when ConnWindow =< 0; IsFin =:= fin; StreamWindow =< 0; BufferSize =:= 0 ->
+ {State, Stream};
+send_data(State0, Stream0=#stream{local_buffer=Q0, local_buffer_size=BufferSize}) ->
+ %% We know there is an item in the queue.
+ {{value, {IsFin, DataSize, Data}}, Q} = queue:out(Q0),
+ {State, Stream} = send_data(State0,
+ Stream0#stream{local_buffer=Q, local_buffer_size=BufferSize - DataSize},
+ IsFin, Data),
+ send_data(State, Stream).
+%% Send data immediately if we can, buffer otherwise.
+%% @todo We might want to print an error if local=fin.
+ Stream=#stream{local_window=StreamWindow}, IsFin, Data)
+ when ConnWindow =< 0; StreamWindow =< 0 ->
+ {State, queue_data(Stream, IsFin, Data)};
+send_data(State=#state{socket=Socket, transport=Transport, local_window=ConnWindow},
+ Stream=#stream{id=StreamID, local_window=StreamWindow}, IsFin, Data) ->
+ MaxFrameSize = 16384, %% @todo Use the real SETTINGS_MAX_FRAME_SIZE set by the client.
+ SendSize = min(min(ConnWindow, StreamWindow), MaxFrameSize),
+ case Data of
+ {sendfile, Offset, Bytes, Path} when Bytes =< SendSize ->
Transport:send(Socket, cow_http2:data_header(StreamID, IsFin, Bytes)),
- Transport:sendfile(Socket, Path, Offset, Bytes)
+ Transport:sendfile(Socket, Path, Offset, Bytes),
+ {State#state{local_window=ConnWindow - SendSize},
+ Stream#stream{local=IsFin, local_window=StreamWindow - SendSize}};
+ {sendfile, Offset, Bytes, Path} ->
+ Transport:send(Socket, cow_http2:data_header(StreamID, nofin, SendSize)),
+ Transport:sendfile(Socket, Path, Offset, SendSize),
+ send_data(State#state{local_window=ConnWindow - SendSize},
+ Stream#stream{local_window=StreamWindow - SendSize},
+ IsFin, {sendfile, Offset + SendSize, Bytes - SendSize, Path});
+ Iolist0 ->
+ IolistSize = iolist_size(Iolist0),
+ if
+ IolistSize =< SendSize ->
+ Transport:send(Socket, cow_http2:data(StreamID, IsFin, Iolist0)),
+ {State#state{local_window=ConnWindow - SendSize},
+ Stream#stream{local=IsFin, local_window=StreamWindow - SendSize}};
+ true ->
+ {Iolist, More} = cowboy_iolists:split(SendSize, Iolist0),
+ Transport:send(Socket, cow_http2:data(StreamID, nofin, Iolist)),
+ send_data(State#state{local_window=ConnWindow - SendSize},
+ Stream#stream{local_window=StreamWindow - SendSize},
+ IsFin, More)
+ end
+queue_data(Stream=#stream{local_buffer=Q0, local_buffer_size=Size0}, IsFin, Data) ->
+ DataSize = case Data of
+ {sendfile, _, Bytes, _} -> Bytes;
+ Iolist -> iolist_size(Iolist)
+ end,
+ Q = queue:in({IsFin, DataSize, Data}, Q0),
+ Stream#stream{local_buffer=Q, local_buffer_size=Size0 + DataSize}.
-spec terminate(#state{}, _) -> no_return().
terminate(undefined, Reason) ->
exit({shutdown, Reason});
@@ -641,12 +725,17 @@ stream_init(State0=#state{ref=Ref, socket=Socket, transport=Transport, peer=Peer
'Error while trying to decode HPACK-encoded header block. (RFC7540 4.3)'})
-stream_handler_init(State=#state{opts=Opts}, StreamID, RemoteIsFin, LocalIsFin, Req) ->
+ local_settings=#{initial_window_size := RemoteWindow},
+ remote_settings=#{initial_window_size := LocalWindow}},
+ StreamID, RemoteIsFin, LocalIsFin, Req) ->
try cowboy_stream:init(StreamID, Req, Opts) of
{Commands, StreamState} ->
#stream{id=StreamID, state=StreamState,
- remote=RemoteIsFin, local=LocalIsFin}, Commands)
+ remote=RemoteIsFin, local=LocalIsFin,
+ local_window=LocalWindow, remote_window=RemoteWindow},
+ Commands)
catch Class:Reason ->
error_logger:error_msg("Exception occurred in "
"cowboy_stream:init(~p, ~p, ~p) with reason ~p:~p.",
@@ -676,12 +765,20 @@ stream_terminate(State=#state{socket=Socket, transport=Transport,
Children = stream_terminate_children(Children0, StreamID, []),
State1#state{streams=Streams, children=Children};
%% When a response was sent but not terminated, we need to close the stream.
- {value, #stream{state=StreamState, local=nofin}, Streams} when Reason =:= normal ->
+ {value, #stream{state=StreamState, local=nofin, local_buffer_size=0}, Streams}
+ when Reason =:= normal ->
Transport:send(Socket, cow_http2:data(StreamID, fin, <<>>)),
stream_call_terminate(StreamID, Reason, StreamState),
Children = stream_terminate_children(Children0, StreamID, []),
State#state{streams=Streams, children=Children};
- %% Otherwise we sent an RST_STREAM and the stream is already closed.
+ %% Unless there is still data in the buffer. We can however reset
+ %% a few fields and set a special local state to avoid confusion.
+ {value, Stream=#stream{state=StreamState, local=nofin}, Streams} ->
+ stream_call_terminate(StreamID, Reason, StreamState),
+ Children = stream_terminate_children(Children0, StreamID, []),
+ State#state{streams=[Stream#stream{state=flush, local=flush}|Streams],
+ children=Children};
+ %% Otherwise we sent an RST_STREAM and/or the stream is already closed.
{value, #stream{state=StreamState}, Streams} ->
stream_call_terminate(StreamID, Reason, StreamState),
Children = stream_terminate_children(Children0, StreamID, []),