aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorLoïc Hoguin <[email protected]>2018-10-26 10:12:25 +0200
committerLoïc Hoguin <[email protected]>2018-10-26 10:12:25 +0200
commitb461b119e78e4e09bb28b186b09da7ed4a86a0dd (patch)
tree7e3aff85603ea11450b9df2ba57df61927889ef0
parent078f855672fe8ad65d2b25b0a4843c0f5637f32c (diff)
downloadcowlib-b461b119e78e4e09bb28b186b09da7ed4a86a0dd.tar.gz
cowlib-b461b119e78e4e09bb28b186b09da7ed4a86a0dd.tar.bz2
cowlib-b461b119e78e4e09bb28b186b09da7ed4a86a0dd.zip
Introduce cow_http2_machine, an HTTP/2 state machine
This is the result of a merge of the Cowboy and Gun HTTP/2 codes. It can probably do a little more but it's at a point where Cowboy works fine when using it so additional work will be done in other commits. The Gun code has not been switched to this module yet. I expect for example the PUSH_PROMISE code to fail at this point. This will be the next step.
-rw-r--r--ebin/cowlib.app2
-rw-r--r--src/cow_http2.erl6
-rw-r--r--src/cow_http2_machine.erl1295
3 files changed, 1302 insertions, 1 deletions
diff --git a/ebin/cowlib.app b/ebin/cowlib.app
index c41b719..3826365 100644
--- a/ebin/cowlib.app
+++ b/ebin/cowlib.app
@@ -1,7 +1,7 @@
{application, 'cowlib', [
{description, "Support library for manipulating Web protocols."},
{vsn, "2.6.0"},
- {modules, ['cow_base64url','cow_cookie','cow_date','cow_hpack','cow_http','cow_http2','cow_http_hd','cow_http_te','cow_iolists','cow_mimetypes','cow_multipart','cow_qs','cow_spdy','cow_sse','cow_uri','cow_ws']},
+ {modules, ['cow_base64url','cow_cookie','cow_date','cow_hpack','cow_http','cow_http2','cow_http2_machine','cow_http_hd','cow_http_te','cow_iolists','cow_mimetypes','cow_multipart','cow_qs','cow_spdy','cow_sse','cow_uri','cow_ws']},
{registered, []},
{applications, [kernel,stdlib,crypto]},
{env, []}
diff --git a/src/cow_http2.erl b/src/cow_http2.erl
index e6f7738..ec4aab9 100644
--- a/src/cow_http2.erl
+++ b/src/cow_http2.erl
@@ -36,8 +36,14 @@
-export([window_update/2]).
-type streamid() :: pos_integer().
+-export_type([streamid/0]).
+
-type fin() :: fin | nofin.
+-export_type([fin/0]).
+
-type head_fin() :: head_fin | head_nofin.
+-export_type([head_fin/0]).
+
-type exclusive() :: exclusive | shared.
-type weight() :: 1..256.
-type settings() :: map().
diff --git a/src/cow_http2_machine.erl b/src/cow_http2_machine.erl
new file mode 100644
index 0000000..b1cf1da
--- /dev/null
+++ b/src/cow_http2_machine.erl
@@ -0,0 +1,1295 @@
+%% Copyright (c) 2018, Loïc Hoguin <[email protected]>
+%%
+%% Permission to use, copy, modify, and/or distribute this software for any
+%% purpose with or without fee is hereby granted, provided that the above
+%% copyright notice and this permission notice appear in all copies.
+%%
+%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+
+-module(cow_http2_machine).
+
+-export([init/2]).
+-export([init_upgrade_stream/2]).
+-export([frame/2]).
+-export([ignored_frame/1]).
+-export([prepare_headers/5]).
+-export([prepare_push_promise/4]).
+-export([prepare_trailers/3]).
+-export([send_or_queue_data/4]).
+-export([update_window/2]).
+-export([update_window/3]).
+-export([reset_stream/2]).
+-export([get_local_setting/2]).
+-export([get_last_streamid/1]).
+-export([get_stream_local_state/2]).
+
+-type opts() :: #{
+ enable_connect_protocol => boolean(),
+ initial_connection_window_size => 65535..16#7fffffff,
+ initial_stream_window_size => 0..16#7fffffff,
+ max_concurrent_streams => non_neg_integer() | infinity,
+ max_decode_table_size => non_neg_integer(),
+ max_encode_table_size => non_neg_integer(),
+ max_frame_size_received => 16384..16777215,
+ max_frame_size_sent => 16384..16777215 | infinity
+}.
+-export_type([opts/0]).
+
+%% The order of the fields is significant.
+-record(sendfile, {
+ offset :: non_neg_integer(),
+ bytes :: pos_integer(),
+ path :: file:name_all()
+}).
+
+-record(stream, {
+ id = undefined :: cow_http2:streamid(),
+
+ %% Request method.
+ method = undefined :: binary(),
+
+ %% Whether we finished sending data.
+ local = idle :: idle | cow_http2:fin(),
+
+ %% Local flow control window (how much we can send).
+ local_window :: integer(),
+
+ %% Buffered data waiting for the flow control window to increase.
+ local_buffer = queue:new() ::
+ queue:queue({cow_http2:fin(), non_neg_integer(), {data, iodata()} | #sendfile{}}),
+ local_buffer_size = 0 :: non_neg_integer(),
+ local_trailers = undefined :: undefined | cow_http:headers(),
+
+ %% Whether we finished receiving data.
+ remote = idle :: idle | cow_http2:fin(),
+
+ %% Remote flow control window (how much we accept to receive).
+ remote_window :: integer(),
+
+ %% Size expected and read from the request body.
+ remote_expected_size = undefined :: undefined | non_neg_integer(),
+ remote_read_size = 0 :: non_neg_integer(),
+
+ %% Unparsed te header. Used to know if we can send trailers.
+ te :: undefined | binary()
+}).
+
+-type stream() :: #stream{}.
+
+-type continued_frame() ::
+ {headers, cow_http2:streamid(), cow_http2:fin(), cow_http2:head_fin(), binary()} |
+ {push_promise, cow_http2:streamid(), cow_http2:head_fin(), cow_http2:streamid(), binary()}.
+
+-record(http2_machine, {
+ %% Whether the HTTP/2 endpoint is a client or a server.
+ mode :: client | server,
+
+ %% HTTP/2 SETTINGS customization.
+ opts = #{} :: opts(),
+
+ %% Connection-wide frame processing state.
+ state = settings :: settings | normal
+ | {continuation, request | response | trailers | push_promise, continued_frame()},
+
+ %% Settings are separate for each endpoint. In addition, settings
+ %% must be acknowledged before they can be expected to be applied.
+ local_settings = #{
+% header_table_size => 4096,
+% enable_push => true,
+% max_concurrent_streams => infinity,
+ initial_window_size => 65535
+% max_frame_size => 16384
+% max_header_list_size => infinity
+ } :: map(),
+ next_settings = undefined :: undefined | map(),
+ remote_settings = #{
+ initial_window_size => 65535
+ } :: map(),
+
+ %% Connection-wide flow control window.
+ local_window = 65535 :: integer(), %% How much we can send.
+ remote_window = 65535 :: integer(), %% How much we accept to receive.
+
+ %% Stream identifiers.
+ local_streamid :: pos_integer(), %% The next streamid to be used.
+ remote_streamid = 0 :: non_neg_integer(), %% The last streamid received.
+
+ %% Currently active HTTP/2 streams. Streams may be initiated either
+ %% by the client or by the server through PUSH_PROMISE frames.
+ streams = [] :: [stream()],
+
+ %% HTTP/2 streams that have been reset recently by the server.
+ %% We are expected to keep receiving additional frames after
+ %% sending an RST_STREAM.
+ lingering_streams = [] :: [cow_http2:streamid()],
+
+ %% HTTP/2 streams that have been reset recently by the client.
+ %% We keep a few of these around in order to reject subsequent
+ %% frames on these streams.
+ rst_lingering_streams = [] :: [cow_http2:streamid()],
+
+ %% HPACK decoding and encoding state.
+ decode_state = cow_hpack:init() :: cow_hpack:state(),
+ encode_state = cow_hpack:init() :: cow_hpack:state()
+}).
+
+-opaque http2_machine() :: #http2_machine{}.
+-export_type([http2_machine/0]).
+
+-type pseudo_headers() :: #{} %% Trailers
+ | #{ %% Responses.
+ status := cow_http:status()
+ } | #{ %% Normal CONNECT requests.
+ method := binary(),
+ authority := binary()
+ } | #{ %% Other requests and extended CONNECT requests.
+ method := binary(),
+ scheme := binary(),
+ authority := binary(),
+ path := binary(),
+ protocol => binary()
+ }.
+
+%% Returns true when the given StreamID is for a local-initiated stream.
+-define(IS_SERVER_LOCAL(StreamID), ((StreamID rem 2) =:= 0)).
+-define(IS_CLIENT_LOCAL(StreamID), ((StreamID rem 2) =:= 1)).
+-define(IS_LOCAL(Mode, StreamID), (
+ ((Mode =:= server) andalso ?IS_SERVER_LOCAL(StreamID))
+ orelse
+ ((Mode =:= client) andalso ?IS_CLIENT_LOCAL(StreamID))
+)).
+
+-spec init(client | server, opts()) -> {ok, iodata(), http2_machine()}.
+init(client, Opts) ->
+ NextSettings = settings_init(Opts),
+ client_preface(#http2_machine{
+ mode=client,
+ opts=only_keep_relevant_opts(Opts),
+ next_settings=NextSettings,
+ local_streamid=1
+ });
+init(server, Opts) ->
+ NextSettings = settings_init(Opts),
+ common_preface(#http2_machine{
+ mode=server,
+ opts=only_keep_relevant_opts(Opts),
+ next_settings=NextSettings,
+ local_streamid=2
+ }).
+
+%% We remove the options that are part of SETTINGS or that are not known.
+only_keep_relevant_opts(Opts) ->
+ maps:with([
+ initial_connection_window_size,
+ max_encode_table_size,
+ max_frame_size_sent
+ ], Opts).
+
+client_preface(State0) ->
+ {ok, CommonPreface, State} = common_preface(State0),
+ {ok, [
+ <<"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n">>,
+ CommonPreface
+ ], State}.
+
+%% We send next_settings and use defaults until we get an ack.
+%%
+%% We also send a WINDOW_UPDATE frame for the connection when
+%% the user specified an initial_connection_window_size.
+common_preface(State=#http2_machine{opts=Opts, next_settings=NextSettings}) ->
+ case maps:get(initial_connection_window_size, Opts, 65535) of
+ 65535 ->
+ {ok, cow_http2:settings(NextSettings), State};
+ Size ->
+ {ok, [
+ cow_http2:settings(NextSettings),
+ cow_http2:window_update(Size - 65535)
+ ], update_window(Size - 65535, State)}
+ end.
+
+settings_init(Opts) ->
+ S0 = setting_from_opt(#{}, Opts, max_decode_table_size,
+ header_table_size, 4096),
+ S1 = setting_from_opt(S0, Opts, max_concurrent_streams,
+ max_concurrent_streams, infinity),
+ S2 = setting_from_opt(S1, Opts, initial_stream_window_size,
+ initial_window_size, 65535),
+ S3 = setting_from_opt(S2, Opts, max_frame_size_received,
+ max_frame_size, 16384),
+ %% @todo max_header_list_size
+ setting_from_opt(S3, Opts, enable_connect_protocol,
+ enable_connect_protocol, false).
+
+setting_from_opt(Settings, Opts, OptName, SettingName, Default) ->
+ case maps:get(OptName, Opts, Default) of
+ Default -> Settings;
+ Value -> Settings#{SettingName => Value}
+ end.
+
+-spec init_upgrade_stream(binary(), State)
+ -> {ok, cow_http2:streamid(), State} when State::http2_machine().
+init_upgrade_stream(Method, State=#http2_machine{mode=server, remote_streamid=0,
+ local_settings=#{initial_window_size := RemoteWindow},
+ remote_settings=#{initial_window_size := LocalWindow}}) ->
+ Stream = #stream{id=1, method=Method,
+ remote=fin, remote_expected_size=0,
+ local_window=LocalWindow, remote_window=RemoteWindow, te=undefined},
+ {ok, 1, stream_store(Stream, State#http2_machine{remote_streamid=1})}.
+
+-spec frame(cow_http2:frame(), State)
+ -> {ok, State}
+ | {ok, {data, cow_http2:streamid(), cow_http2:fin(), binary()}, State}
+ | {ok, {headers, cow_http2:streamid(), cow_http2:fin(),
+ cow_http:headers(), pseudo_headers(), non_neg_integer() | undefined}, State}
+ | {ok, {trailers, cow_http2:streamid(), cow_http:headers()}, State}
+ | {ok, {rst_stream, cow_http2:streamid(), cow_http2:error()}, State}
+ | {ok, {push_promise, cow_http2:streamid(), cow_http2:streamid(),
+ cow_http:headers(), pseudo_headers()}, State}
+ | {ok, {goaway, cow_http2:streamid(), cow_http2:error(), binary()}, State}
+ | {send, [{cow_http2:streamid(), cow_http2:fin(),
+ [{data, iodata()} | #sendfile{} | {trailers, cow_http:headers()}]}], State}
+ | {error, {stream_error, cow_http2:streamid(), cow_http2:error(), atom()}, State}
+ | {error, {connection_error, cow_http2:error(), atom()}, State}
+ when State::http2_machine().
+frame(Frame, State=#http2_machine{state=settings}) ->
+ settings_frame(Frame, State#http2_machine{state=normal});
+frame(Frame, State=#http2_machine{state={continuation, _, _}}) ->
+ continuation_frame(Frame, State);
+frame(settings_ack, State=#http2_machine{state=normal}) ->
+ settings_ack_frame(State);
+frame(Frame, State=#http2_machine{state=normal}) ->
+ case element(1, Frame) of
+ data -> data_frame(Frame, State);
+ headers -> headers_frame(Frame, State);
+ priority -> priority_frame(Frame, State);
+ rst_stream -> rst_stream_frame(Frame, State);
+ settings -> settings_frame(Frame, State);
+ push_promise -> push_promise_frame(Frame, State);
+ ping -> ping_frame(Frame, State);
+ ping_ack -> ping_ack_frame(Frame, State);
+ goaway -> goaway_frame(Frame, State);
+ window_update -> window_update_frame(Frame, State);
+ continuation -> unexpected_continuation_frame(Frame, State);
+ _ -> ignored_frame(State)
+ end.
+
+%% DATA frame.
+
+data_frame({data, StreamID, _, _}, State=#http2_machine{mode=Mode,
+ local_streamid=LocalStreamID, remote_streamid=RemoteStreamID})
+ when (?IS_LOCAL(Mode, StreamID) andalso (StreamID >= LocalStreamID))
+ orelse ((not ?IS_LOCAL(Mode, StreamID)) andalso (StreamID > RemoteStreamID)) ->
+ {error, {connection_error, protocol_error,
+ 'DATA frame received on a stream in idle state. (RFC7540 5.1)'},
+ State};
+data_frame({data, _, _, Data}, State=#http2_machine{remote_window=ConnWindow})
+ when byte_size(Data) > ConnWindow ->
+ {error, {connection_error, flow_control_error,
+ 'DATA frame overflowed the connection flow control window. (RFC7540 6.9, RFC7540 6.9.1)'},
+ State};
+data_frame(Frame={data, StreamID, _, Data}, State0=#http2_machine{
+ remote_window=ConnWindow, lingering_streams=Lingering}) ->
+ DataLen = byte_size(Data),
+ State = State0#http2_machine{remote_window=ConnWindow - DataLen},
+ case stream_get(StreamID, State) of
+ #stream{remote_window=StreamWindow} when StreamWindow < DataLen ->
+ stream_reset(StreamID, State, flow_control_error,
+ 'DATA frame overflowed the stream flow control window. (RFC7540 6.9, RFC7540 6.9.1)');
+ Stream = #stream{remote=nofin} ->
+ data_frame(Frame, State, Stream, DataLen);
+ #stream{remote=idle} ->
+ stream_reset(StreamID, State, protocol_error,
+ 'DATA frame received before a HEADERS frame. (RFC7540 8.1, RFC7540 8.1.2.6)');
+ #stream{remote=fin} ->
+ stream_reset(StreamID, State, stream_closed,
+ 'DATA frame received for a half-closed (remote) stream. (RFC7540 5.1)');
+ undefined ->
+ %% After we send an RST_STREAM frame and terminate a stream,
+ %% the client still might be sending us some more frames
+ %% until it can process this RST_STREAM. We therefore ignore
+ %% DATA frames received for such lingering streams.
+ case lists:member(StreamID, Lingering) of
+ true ->
+ {ok, State0};
+ false ->
+ {error, {connection_error, stream_closed,
+ 'DATA frame received for a closed stream. (RFC7540 5.1)'},
+ State}
+ end
+ end.
+
+data_frame(Frame={data, _, IsFin, _}, State0, Stream0=#stream{id=StreamID,
+ remote_window=StreamWindow, remote_read_size=StreamRead}, DataLen) ->
+ Stream = Stream0#stream{remote=IsFin,
+ remote_window=StreamWindow - DataLen,
+ remote_read_size=StreamRead + DataLen},
+ State = stream_store(Stream, State0),
+ case is_body_size_valid(Stream) of
+ true ->
+ {ok, Frame, State};
+ false ->
+ stream_reset(StreamID, State, protocol_error,
+ 'The total size of DATA frames is different than the content-length. (RFC7540 8.1.2.6)')
+ end.
+
+%% It's always valid when no content-length header was specified.
+is_body_size_valid(#stream{remote_expected_size=undefined}) ->
+ true;
+%% We didn't finish reading the body but the size is already larger than expected.
+is_body_size_valid(#stream{remote=nofin, remote_expected_size=Expected,
+ remote_read_size=Read}) when Read > Expected ->
+ false;
+is_body_size_valid(#stream{remote=nofin}) ->
+ true;
+is_body_size_valid(#stream{remote=fin, remote_expected_size=Expected,
+ remote_read_size=Expected}) ->
+ true;
+%% We finished reading the body and the size read is not the one expected.
+is_body_size_valid(_) ->
+ false.
+
+%% HEADERS frame.
+%%
+%% We always close the connection when we detect errors before
+%% decoding the headers to not waste resources on non-compliant
+%% endpoints, making us stricter than the RFC requires.
+
+%% Convenience record to manipulate the tuple.
+%% The order of the fields matter.
+-record(headers, {
+ id :: cow_http2:streamid(),
+ fin :: cow_http2:fin(),
+ head :: cow_http2:head_fin(),
+ data :: binary()
+}).
+
+headers_frame(Frame=#headers{}, State=#http2_machine{mode=Mode}) ->
+ case Mode of
+ server -> server_headers_frame(Frame, State);
+ client -> client_headers_frame(Frame, State)
+ end;
+%% @todo Handle the PRIORITY data, but only if this returns an ok tuple.
+%% @todo Do not lose the PRIORITY information if CONTINUATION frames follow.
+headers_frame({headers, StreamID, IsFin, IsHeadFin,
+ _IsExclusive, _DepStreamID, _Weight, HeaderData},
+ State=#http2_machine{mode=Mode}) ->
+ HeadersFrame = #headers{id=StreamID, fin=IsFin, head=IsHeadFin, data=HeaderData},
+ case Mode of
+ server -> server_headers_frame(HeadersFrame, State);
+ client -> client_headers_frame(HeadersFrame, State)
+ end.
+
+%% Reject HEADERS frames with even-numbered streamid.
+server_headers_frame(#headers{id=StreamID}, State)
+ when ?IS_SERVER_LOCAL(StreamID) ->
+ {error, {connection_error, protocol_error,
+ 'HEADERS frame received with even-numbered streamid. (RFC7540 5.1.1)'},
+ State};
+%% HEADERS frame on an idle stream: new request.
+server_headers_frame(Frame=#headers{id=StreamID, head=IsHeadFin},
+ State=#http2_machine{mode=server, remote_streamid=RemoteStreamID})
+ when StreamID > RemoteStreamID ->
+ case IsHeadFin of
+ head_fin ->
+ headers_decode(Frame, State, request, undefined);
+ head_nofin ->
+ {ok, State#http2_machine{state={continuation, request, Frame}}}
+ end;
+%% Either a HEADERS frame received on (half-)closed stream,
+%% or a HEADERS frame containing the trailers.
+server_headers_frame(Frame=#headers{id=StreamID, fin=IsFin, head=IsHeadFin}, State) ->
+ case stream_get(StreamID, State) of
+ %% Trailers.
+ Stream = #stream{remote=nofin} when IsFin =:= fin ->
+ case IsHeadFin of
+ head_fin ->
+ headers_decode(Frame, State, trailers, Stream);
+ head_nofin ->
+ {ok, State#http2_machine{state={continuation, trailers, Frame}}}
+ end;
+ #stream{remote=nofin} ->
+ {error, {connection_error, protocol_error,
+ 'Trailing HEADERS frame received without the END_STREAM flag set. (RFC7540 8.1, RFC7540 8.1.2.6)'},
+ State};
+ _ ->
+ {error, {connection_error, stream_closed,
+ 'HEADERS frame received on a stream in closed or half-closed state. (RFC7540 5.1)'},
+ State}
+ end.
+
+%% Either a HEADERS frame received on an (half-)closed stream,
+%% or a HEADERS frame containing the response or the trailers.
+client_headers_frame(Frame=#headers{id=StreamID, fin=IsFin, head=IsHeadFin},
+ State=#http2_machine{local_streamid=LocalStreamID, remote_streamid=RemoteStreamID})
+ when (?IS_CLIENT_LOCAL(StreamID) andalso (StreamID < LocalStreamID))
+ orelse ((not ?IS_CLIENT_LOCAL(StreamID)) andalso (StreamID =< RemoteStreamID)) ->
+ case stream_get(StreamID, State) of
+ Stream = #stream{remote=idle} ->
+ case IsHeadFin of
+ head_fin ->
+ headers_decode(Frame, State, response, Stream);
+ head_nofin ->
+ {ok, State#http2_machine{state={continuation, response, Frame}}}
+ end;
+ Stream = #stream{remote=nofin} when IsFin =:= fin ->
+ case IsHeadFin of
+ head_fin ->
+ headers_decode(Frame, State, trailers, Stream);
+ head_nofin ->
+ {ok, State#http2_machine{state={continuation, trailers, Frame}}}
+ end;
+ #stream{remote=nofin} ->
+ {error, {connection_error, protocol_error,
+ 'Trailing HEADERS frame received without the END_STREAM flag set. (RFC7540 8.1, RFC7540 8.1.2.6)'},
+ State};
+ _ ->
+ {error, {connection_error, stream_closed,
+ 'HEADERS frame received on a stream in closed or half-closed state. (RFC7540 5.1)'},
+ State}
+ end;
+%% Reject HEADERS frames received on idle streams.
+client_headers_frame(_, State) ->
+ {error, {connection_error, protocol_error,
+ 'HEADERS frame received on an idle stream. (RFC7540 5.1.1)'},
+ State}.
+
+headers_decode(Frame=#headers{head=head_fin, data=HeaderData},
+ State=#http2_machine{decode_state=DecodeState0}, Type, Stream) ->
+ try cow_hpack:decode(HeaderData, DecodeState0) of
+ {Headers, DecodeState} when Type =:= request ->
+ headers_enforce_concurrency_limit(Frame,
+ State#http2_machine{decode_state=DecodeState}, Type, Stream, Headers);
+ {Headers, DecodeState} ->
+ headers_pseudo_headers(Frame,
+ State#http2_machine{decode_state=DecodeState}, Type, Stream, Headers)
+ catch _:_ ->
+ {error, {connection_error, compression_error,
+ 'Error while trying to decode HPACK-encoded header block. (RFC7540 4.3)'},
+ State}
+ end.
+
+headers_enforce_concurrency_limit(Frame=#headers{id=StreamID},
+ State=#http2_machine{local_settings=LocalSettings, streams=Streams},
+ Type, Stream, Headers) ->
+ MaxConcurrentStreams = maps:get(max_concurrent_streams, LocalSettings, infinity),
+ %% Using < is correct because this new stream is not included
+ %% in the Streams variable yet and so we'll end up with +1 stream.
+ case length(Streams) < MaxConcurrentStreams of
+ true ->
+ headers_pseudo_headers(Frame, State, Type, Stream, Headers);
+ false ->
+ {error, {stream_error, StreamID, refused_stream,
+ 'Maximum number of concurrent streams has been reached. (RFC7540 5.1.2)'},
+ State}
+ end.
+
+headers_pseudo_headers(Frame, State=#http2_machine{local_settings=LocalSettings},
+ Type=request, Stream, Headers0) ->
+ IsExtendedConnectEnabled = maps:get(enable_connect_protocol, LocalSettings, false),
+ case request_pseudo_headers(Headers0, #{}) of
+ %% Extended CONNECT method (RFC8441).
+ {ok, PseudoHeaders=#{method := <<"CONNECT">>, scheme := _,
+ authority := _, path := _, protocol := _}, Headers}
+ when IsExtendedConnectEnabled ->
+ headers_regular_headers(Frame, State, Type, Stream, PseudoHeaders, Headers);
+ {ok, #{method := <<"CONNECT">>, scheme := _,
+ authority := _, path := _}, _}
+ when IsExtendedConnectEnabled ->
+ headers_malformed(Frame, State,
+ 'The :protocol pseudo-header MUST be sent with an extended CONNECT. (RFC8441 4)');
+ {ok, #{protocol := _}, _} ->
+ headers_malformed(Frame, State,
+ 'The :protocol pseudo-header is only defined for the extended CONNECT. (RFC8441 4)');
+ %% Normal CONNECT (no scheme/path).
+ {ok, PseudoHeaders=#{method := <<"CONNECT">>, authority := _}, Headers}
+ when map_size(PseudoHeaders) =:= 2 ->
+ headers_regular_headers(Frame, State, Type, Stream, PseudoHeaders, Headers);
+ {ok, #{method := <<"CONNECT">>}, _} ->
+ headers_malformed(Frame, State,
+ 'CONNECT requests only use the :method and :authority pseudo-headers. (RFC7540 8.3)');
+ %% Other requests.
+ {ok, PseudoHeaders=#{method := _, scheme := _, authority := _, path := _}, Headers} ->
+ headers_regular_headers(Frame, State, Type, Stream, PseudoHeaders, Headers);
+ {ok, _, _} ->
+ headers_malformed(Frame, State,
+ 'A required pseudo-header was not found. (RFC7540 8.1.2.3)');
+ {error, HumanReadable} ->
+ headers_malformed(Frame, State, HumanReadable)
+ end;
+headers_pseudo_headers(Frame=#headers{id=StreamID},
+ State, Type=response, Stream, Headers0) ->
+ case response_pseudo_headers(Headers0, #{}) of
+ {ok, PseudoHeaders=#{status := _}, Headers} ->
+ headers_regular_headers(Frame, State, Type, Stream, PseudoHeaders, Headers);
+ {ok, _, _} ->
+ stream_reset(StreamID, State, protocol_error,
+ 'A required pseudo-header was not found. (RFC7540 8.1.2.4)');
+ {error, HumanReadable} ->
+ stream_reset(StreamID, State, protocol_error, HumanReadable)
+ end;
+headers_pseudo_headers(Frame=#headers{id=StreamID},
+ State, Type=trailers, Stream, Headers) ->
+ case trailers_contain_pseudo_headers(Headers) of
+ false ->
+ headers_regular_headers(Frame, State, Type, Stream, #{}, Headers);
+ true ->
+ stream_reset(StreamID, State, protocol_error,
+ 'Trailer header blocks must not contain pseudo-headers. (RFC7540 8.1.2.1)')
+ end.
+
+headers_malformed(#headers{id=StreamID}, State, HumanReadable) ->
+ {error, {stream_error, StreamID, protocol_error, HumanReadable}, State}.
+
+request_pseudo_headers([{<<":method">>, _}|_], #{method := _}) ->
+ {error, 'Multiple :method pseudo-headers were found. (RFC7540 8.1.2.3)'};
+request_pseudo_headers([{<<":method">>, Method}|Tail], PseudoHeaders) ->
+ request_pseudo_headers(Tail, PseudoHeaders#{method => Method});
+request_pseudo_headers([{<<":scheme">>, _}|_], #{scheme := _}) ->
+ {error, 'Multiple :scheme pseudo-headers were found. (RFC7540 8.1.2.3)'};
+request_pseudo_headers([{<<":scheme">>, Scheme}|Tail], PseudoHeaders) ->
+ request_pseudo_headers(Tail, PseudoHeaders#{scheme => Scheme});
+request_pseudo_headers([{<<":authority">>, _}|_], #{authority := _}) ->
+ {error, 'Multiple :authority pseudo-headers were found. (RFC7540 8.1.2.3)'};
+request_pseudo_headers([{<<":authority">>, Authority}|Tail], PseudoHeaders) ->
+ request_pseudo_headers(Tail, PseudoHeaders#{authority => Authority});
+request_pseudo_headers([{<<":path">>, _}|_], #{path := _}) ->
+ {error, 'Multiple :path pseudo-headers were found. (RFC7540 8.1.2.3)'};
+request_pseudo_headers([{<<":path">>, Path}|Tail], PseudoHeaders) ->
+ request_pseudo_headers(Tail, PseudoHeaders#{path => Path});
+request_pseudo_headers([{<<":protocol">>, _}|_], #{protocol := _}) ->
+ {error, 'Multiple :protocol pseudo-headers were found. (RFC7540 8.1.2.3)'};
+request_pseudo_headers([{<<":protocol">>, Protocol}|Tail], PseudoHeaders) ->
+ request_pseudo_headers(Tail, PseudoHeaders#{protocol => Protocol});
+request_pseudo_headers([{<<":", _/bits>>, _}|_], _) ->
+ {error, 'An unknown or invalid pseudo-header was found. (RFC7540 8.1.2.1)'};
+request_pseudo_headers(Headers, PseudoHeaders) ->
+ {ok, PseudoHeaders, Headers}.
+
+response_pseudo_headers([{<<":status">>, _}|_], #{status := _}) ->
+ {error, 'Multiple :status pseudo-headers were found. (RFC7540 8.1.2.3)'};
+response_pseudo_headers([{<<":status">>, Status}|Tail], PseudoHeaders) ->
+ try cow_http:status_to_integer(Status) of
+ IntStatus ->
+ response_pseudo_headers(Tail, PseudoHeaders#{status => IntStatus})
+ catch _:_ ->
+ {error, 'The :status pseudo-header value is invalid. (RFC7540 8.1.2.4)'}
+ end;
+response_pseudo_headers([{<<":", _/bits>>, _}|_], _) ->
+ {error, 'An unknown or invalid pseudo-header was found. (RFC7540 8.1.2.1)'};
+response_pseudo_headers(Headers, PseudoHeaders) ->
+ {ok, PseudoHeaders, Headers}.
+
+trailers_contain_pseudo_headers([]) ->
+ false;
+trailers_contain_pseudo_headers([{<<":", _/bits>>, _}|_]) ->
+ true;
+trailers_contain_pseudo_headers([_|Tail]) ->
+ trailers_contain_pseudo_headers(Tail).
+
+%% Rejecting invalid regular headers might be a bit too strong for clients.
+headers_regular_headers(Frame=#headers{id=StreamID},
+ State, Type, Stream, PseudoHeaders, Headers) ->
+ case regular_headers(Headers, Type) of
+ ok when Type =:= request ->
+ request_expected_size(Frame, State, Type, Stream, PseudoHeaders, Headers);
+ ok when Type =:= response ->
+ response_expected_size(Frame, State, Type, Stream, PseudoHeaders, Headers);
+ ok when Type =:= trailers ->
+ trailers_frame(Frame, State, Stream, Headers);
+ {error, HumanReadable} when Type =:= request ->
+ headers_malformed(Frame, State, HumanReadable);
+ {error, HumanReadable} ->
+ stream_reset(StreamID, State, protocol_error, HumanReadable)
+ end.
+
+regular_headers([{<<":", _/bits>>, _}|_], _) ->
+ {error, 'Pseudo-headers were found after regular headers. (RFC7540 8.1.2.1)'};
+regular_headers([{<<"connection">>, _}|_], _) ->
+ {error, 'The connection header is not allowed. (RFC7540 8.1.2.2)'};
+regular_headers([{<<"keep-alive">>, _}|_], _) ->
+ {error, 'The keep-alive header is not allowed. (RFC7540 8.1.2.2)'};
+regular_headers([{<<"proxy-authenticate">>, _}|_], _) ->
+ {error, 'The proxy-authenticate header is not allowed. (RFC7540 8.1.2.2)'};
+regular_headers([{<<"proxy-authorization">>, _}|_], _) ->
+ {error, 'The proxy-authorization header is not allowed. (RFC7540 8.1.2.2)'};
+regular_headers([{<<"transfer-encoding">>, _}|_], _) ->
+ {error, 'The transfer-encoding header is not allowed. (RFC7540 8.1.2.2)'};
+regular_headers([{<<"upgrade">>, _}|_], _) ->
+ {error, 'The upgrade header is not allowed. (RFC7540 8.1.2.2)'};
+regular_headers([{<<"te">>, Value}|_], request) when Value =/= <<"trailers">> ->
+ {error, 'The te header with a value other than "trailers" is not allowed. (RFC7540 8.1.2.2)'};
+regular_headers([{<<"te">>, _}|_], Type) when Type =/= request ->
+ {error, 'The te header is only allowed in request headers. (RFC7540 8.1.2.2)'};
+regular_headers([{Name, _}|Tail], Type) ->
+ Pattern = [
+ <<$A>>, <<$B>>, <<$C>>, <<$D>>, <<$E>>, <<$F>>, <<$G>>, <<$H>>, <<$I>>,
+ <<$J>>, <<$K>>, <<$L>>, <<$M>>, <<$N>>, <<$O>>, <<$P>>, <<$Q>>, <<$R>>,
+ <<$S>>, <<$T>>, <<$U>>, <<$V>>, <<$W>>, <<$X>>, <<$Y>>, <<$Z>>
+ ],
+ case binary:match(Name, Pattern) of
+ nomatch -> regular_headers(Tail, Type);
+ _ -> {error, 'Header names must be lowercase. (RFC7540 8.1.2)'}
+ end;
+regular_headers([], _) ->
+ ok.
+
+request_expected_size(Frame=#headers{fin=IsFin}, State, Type, Stream, PseudoHeaders, Headers) ->
+ case [CL || {<<"content-length">>, CL} <- Headers] of
+ [] when IsFin =:= fin ->
+ headers_frame(Frame, State, Type, Stream, PseudoHeaders, Headers, 0);
+ [] ->
+ headers_frame(Frame, State, Type, Stream, PseudoHeaders, Headers, undefined);
+ [<<"0">>] when IsFin =:= fin ->
+ headers_frame(Frame, State, Type, Stream, PseudoHeaders, Headers, 0);
+ [_] when IsFin =:= fin ->
+ headers_malformed(Frame, State,
+ 'HEADERS frame with the END_STREAM flag contains a non-zero content-length. (RFC7540 8.1.2.6)');
+ [BinLen] ->
+ headers_parse_expected_size(Frame, State, Type, Stream,
+ PseudoHeaders, Headers, BinLen);
+ _ ->
+ headers_malformed(Frame, State,
+ 'Multiple content-length headers were received. (RFC7230 3.3.2)')
+ end.
+
+response_expected_size(Frame=#headers{id=StreamID, fin=IsFin}, State, Type,
+ Stream=#stream{method=Method}, PseudoHeaders=#{status := Status}, Headers) ->
+ case [CL || {<<"content-length">>, CL} <- Headers] of
+ [] when IsFin =:= fin ->
+ headers_frame(Frame, State, Type, Stream, PseudoHeaders, Headers, 0);
+ [] ->
+ headers_frame(Frame, State, Type, Stream, PseudoHeaders, Headers, undefined);
+ [_] when Status >= 100, Status =< 199 ->
+ stream_reset(StreamID, State, protocol_error,
+ 'Content-length header received in a 1xx response. (RFC7230 3.3.2)');
+ [_] when Status =:= 204 ->
+ stream_reset(StreamID, State, protocol_error,
+ 'Content-length header received in a 204 response. (RFC7230 3.3.2)');
+ [_] when Status >= 200, Status =< 299, Method =:= <<"CONNECT">> ->
+ stream_reset(StreamID, State, protocol_error,
+ 'Content-length header received in a 2xx response to a CONNECT request. (RFC7230 3.3.2).');
+ %% Responses to HEAD requests, and 304 responses may contain
+ %% a content-length header that must be ignored. (RFC7230 3.3.2)
+ [_] when Method =:= <<"HEAD">> ->
+ headers_frame(Frame, State, Type, Stream, PseudoHeaders, Headers, 0);
+ [_] when Status =:= 304 ->
+ headers_frame(Frame, State, Type, Stream, PseudoHeaders, Headers, 0);
+ [<<"0">>] when IsFin =:= fin ->
+ headers_frame(Frame, State, Type, Stream, PseudoHeaders, Headers, 0);
+ [_] when IsFin =:= fin ->
+ stream_reset(StreamID, State, protocol_error,
+ 'HEADERS frame with the END_STREAM flag contains a non-zero content-length. (RFC7540 8.1.2.6)');
+ [BinLen] ->
+ headers_parse_expected_size(Frame, State, Type, Stream,
+ PseudoHeaders, Headers, BinLen);
+ _ ->
+ stream_reset(StreamID, State, protocol_error,
+ 'Multiple content-length headers were received. (RFC7230 3.3.2)')
+ end.
+
+headers_parse_expected_size(Frame=#headers{id=StreamID},
+ State, Type, Stream, PseudoHeaders, Headers, BinLen) ->
+ try cow_http_hd:parse_content_length(BinLen) of
+ Len ->
+ headers_frame(Frame, State, Type, Stream, PseudoHeaders, Headers, Len)
+ catch
+ _:_ ->
+ HumanReadable = 'The content-length header is invalid. (RFC7230 3.3.2)',
+ case Type of
+ request -> headers_malformed(Frame, State, HumanReadable);
+ response -> stream_reset(StreamID, State, protocol_error, HumanReadable)
+ end
+ end.
+
+headers_frame(#headers{id=StreamID, fin=IsFin}, State0=#http2_machine{
+ local_settings=#{initial_window_size := RemoteWindow},
+ remote_settings=#{initial_window_size := LocalWindow}},
+ Type, Stream0, PseudoHeaders, Headers, Len) ->
+ Stream = case Stream0 of
+ undefined ->
+ TE = case lists:keyfind(<<"te">>, 1, Headers) of
+ {_, TE0} -> TE0;
+ false -> undefined
+ end,
+ #stream{id=StreamID, method=maps:get(method, PseudoHeaders),
+ remote=IsFin, remote_expected_size=Len,
+ local_window=LocalWindow, remote_window=RemoteWindow, te=TE};
+ _ ->
+ case {Type, PseudoHeaders} of
+ {response, #{status := Status}} when Status >= 100, Status =< 199 ->
+ Stream0;
+ _ ->
+ Stream0#stream{remote=IsFin, remote_expected_size=Len}
+ end
+ end,
+ State = stream_store(Stream, State0#http2_machine{remote_streamid=StreamID}),
+ {ok, {headers, StreamID, IsFin, Headers, PseudoHeaders, Len}, State}.
+
+trailers_frame(#headers{id=StreamID}, State0, Stream0, Headers) ->
+ Stream = Stream0#stream{remote=fin},
+ State = stream_store(Stream, State0),
+ case is_body_size_valid(Stream) of
+ true ->
+ {ok, {trailers, StreamID, Headers}, State};
+ false ->
+ stream_reset(StreamID, State, protocol_error,
+ 'The total size of DATA frames is different than the content-length. (RFC7540 8.1.2.6)')
+ end.
+
+%% PRIORITY frame.
+%%
+%% @todo Handle PRIORITY frames.
+
+priority_frame(_Frame, State) ->
+ {ok, State}.
+
+%% RST_STREAM frame.
+
+rst_stream_frame({rst_stream, StreamID, _}, State=#http2_machine{mode=Mode,
+ local_streamid=LocalStreamID, remote_streamid=RemoteStreamID})
+ when (?IS_LOCAL(Mode, StreamID) andalso (StreamID >= LocalStreamID))
+ orelse ((not ?IS_LOCAL(Mode, StreamID)) andalso (StreamID > RemoteStreamID)) ->
+ {error, {connection_error, protocol_error,
+ 'RST_STREAM frame received on a stream in idle state. (RFC7540 5.1)'},
+ State};
+rst_stream_frame({rst_stream, StreamID, Reason}, State=#http2_machine{
+ streams=Streams0, rst_lingering_streams=Lingering0}) ->
+ Streams = lists:keydelete(StreamID, #stream.id, Streams0),
+ %% We only keep up to 10 streams in this state. @todo Make it configurable?
+ Lingering = [StreamID|lists:sublist(Lingering0, 10 - 1)],
+ {ok, {rst_stream, StreamID, Reason},
+ State#http2_machine{streams=Streams, rst_lingering_streams=Lingering}}.
+
+%% SETTINGS frame.
+
+settings_frame({settings, Settings}, State0=#http2_machine{
+ opts=Opts, remote_settings=Settings0}) ->
+ State1 = State0#http2_machine{remote_settings=maps:merge(Settings0, Settings)},
+ State2 = maps:fold(fun
+ (header_table_size, NewSize, State=#http2_machine{encode_state=EncodeState0}) ->
+ MaxSize = maps:get(max_encode_table_size, Opts, 4096),
+ EncodeState = cow_hpack:set_max_size(min(NewSize, MaxSize), EncodeState0),
+ State#http2_machine{encode_state=EncodeState};
+ (initial_window_size, NewWindowSize, State) ->
+ OldWindowSize = maps:get(initial_window_size, Settings0, 65535),
+ streams_update_local_window(State, NewWindowSize - OldWindowSize);
+ (_, _, State) ->
+ State
+ end, State1, Settings),
+ case Settings of
+ #{initial_window_size := _} -> send_data(State2);
+ _ -> {ok, State2}
+ end;
+%% We expect to receive a SETTINGS frame as part of the preface.
+settings_frame(_F, State) ->
+ {error, {connection_error, protocol_error,
+ 'The preface sequence must be followed by a SETTINGS frame. (RFC7540 3.5)'},
+ State}.
+
+%% When SETTINGS_INITIAL_WINDOW_SIZE changes we need to update
+%% the local stream windows for all active streams and perhaps
+%% resume sending data.
+streams_update_local_window(State=#http2_machine{streams=Streams0}, Increment) ->
+ Streams = [
+ S#stream{local_window=StreamWindow + Increment}
+ || S=#stream{local_window=StreamWindow} <- Streams0],
+ State#http2_machine{streams=Streams}.
+
+%% Ack for a previously sent SETTINGS frame.
+
+settings_ack_frame(State0=#http2_machine{local_settings=Local0, next_settings=NextSettings}) ->
+ Local = maps:merge(Local0, NextSettings),
+ State1 = State0#http2_machine{local_settings=Local, next_settings=#{}},
+ {ok, maps:fold(fun
+ (header_table_size, MaxSize, State=#http2_machine{decode_state=DecodeState0}) ->
+ DecodeState = cow_hpack:set_max_size(MaxSize, DecodeState0),
+ State#http2_machine{decode_state=DecodeState};
+ (initial_window_size, NewWindowSize, State) ->
+ OldWindowSize = maps:get(initial_window_size, Local0, 65535),
+ streams_update_remote_window(State, NewWindowSize - OldWindowSize);
+ (_, _, State) ->
+ State
+ end, State1, NextSettings)}.
+
+%% When we receive an ack to a SETTINGS frame we sent we need to update
+%% the remote stream windows for all active streams.
+streams_update_remote_window(State=#http2_machine{streams=Streams0}, Increment) ->
+ Streams = [
+ S#stream{remote_window=StreamWindow + Increment}
+ || S=#stream{remote_window=StreamWindow} <- Streams0],
+ State#http2_machine{streams=Streams}.
+
+%% PUSH_PROMISE frame.
+
+%% Convenience record to manipulate the tuple.
+%% The order of the fields matter.
+-record(push_promise, {
+ id :: cow_http2:streamid(),
+ head :: cow_http2:head_fin(),
+ promised_id :: cow_http2:streamid(),
+ data :: binary()
+}).
+
+push_promise_frame(_, State=#http2_machine{mode=server}) ->
+ {error, {connection_error, protocol_error,
+ 'PUSH_PROMISE frames MUST only be sent on a peer-initiated stream. (RFC7540 6.6)'},
+ State};
+push_promise_frame(_, State=#http2_machine{local_settings=#{enable_push := false}}) ->
+ {error, {connection_error, protocol_error,
+ 'PUSH_PROMISE frame received despite SETTINGS_ENABLE_PUSH set to 0. (RFC7540 6.6)'},
+ State};
+push_promise_frame(#push_promise{promised_id=PromisedStreamID},
+ State=#http2_machine{remote_streamid=RemoteStreamID})
+ when PromisedStreamID =< RemoteStreamID ->
+ {error, {connection_error, protocol_error,
+ 'PUSH_PROMISE frame received for a promised stream in closed or half-closed state. (RFC7540 5.1, RFC7540 6.6)'},
+ State};
+push_promise_frame(#push_promise{id=StreamID}, State)
+ when not ?IS_CLIENT_LOCAL(StreamID) ->
+ {error, {connection_error, protocol_error,
+ 'PUSH_PROMISE frame received on a server-initiated stream. (RFC7540 6.6)'},
+ State};
+push_promise_frame(Frame=#push_promise{id=StreamID, head=IsHeadFin,
+ promised_id=PromisedStreamID, data=HeaderData}, State) ->
+ case stream_get(StreamID, State) of
+ #stream{remote=idle} ->
+ case IsHeadFin of
+ head_fin ->
+ %% @todo Gotta make sure the headers_* functions
+ %% will work properly for PUSH_PROMISE requests.
+ headers_decode(#headers{id=PromisedStreamID,
+ fin=fin, head=IsHeadFin, data=HeaderData},
+ State, push_promise, undefined);
+ head_nofin ->
+ {ok, State#http2_machine{state={continuation, push_promise, Frame}}}
+ end;
+ _ ->
+%% @todo Check if the stream is lingering. If it is, decode the frame
+%% and do what? That's the big question and why it's not implemented yet.
+% However, an endpoint that
+% has sent RST_STREAM on the associated stream MUST handle PUSH_PROMISE
+% frames that might have been created before the RST_STREAM frame is
+% received and processed. (RFC7540 6.6)
+ {error, {connection_error, stream_closed,
+ 'PUSH_PROMISE frame received on a stream in closed or half-closed state. (RFC7540 5.1, RFC7540 6.6)'},
+ State}
+ end.
+
+%% PING frame.
+
+ping_frame({ping, _}, State) ->
+ {ok, State}.
+
+%% Ack for a previously sent PING frame.
+%%
+%% @todo Might want to check contents but probably a waste of time.
+
+ping_ack_frame({ping_ack, _}, State) ->
+ {ok, State}.
+
+%% GOAWAY frame.
+
+goaway_frame(Frame={goaway, _, _, _}, State) ->
+ {ok, Frame, State}.
+
+%% WINDOW_UPDATE frame.
+
+%% Connection-wide WINDOW_UPDATE frame.
+window_update_frame({window_update, Increment}, State=#http2_machine{local_window=ConnWindow})
+ when ConnWindow + Increment > 16#7fffffff ->
+ {error, {connection_error, flow_control_error,
+ 'The flow control window must not be greater than 2^31-1. (RFC7540 6.9.1)'},
+ State};
+window_update_frame({window_update, Increment}, State=#http2_machine{local_window=ConnWindow}) ->
+ send_data(State#http2_machine{local_window=ConnWindow + Increment});
+%% Stream-specific WINDOW_UPDATE frame.
+window_update_frame({window_update, StreamID, _},
+ State=#http2_machine{remote_streamid=RemoteStreamID})
+ when StreamID > RemoteStreamID ->
+ {error, {connection_error, protocol_error,
+ 'WINDOW_UPDATE frame received on a stream in idle state. (RFC7540 5.1)'},
+ State};
+window_update_frame({window_update, StreamID, Increment},
+ State0=#http2_machine{rst_lingering_streams=RstLingering}) ->
+ case stream_get(StreamID, State0) of
+ #stream{local_window=StreamWindow} when StreamWindow + Increment > 16#7fffffff ->
+ stream_reset(StreamID, State0, flow_control_error,
+ 'The flow control window must not be greater than 2^31-1. (RFC7540 6.9.1)');
+ Stream0 = #stream{local_window=StreamWindow} ->
+ send_data(Stream0#stream{local_window=StreamWindow + Increment}, State0);
+ undefined ->
+ %% WINDOW_UPDATE frames may be received for a short period of time
+ %% after a stream is closed. They must be ignored.
+ case lists:member(StreamID, RstLingering) of
+ false -> {ok, State0};
+ true -> stream_reset(StreamID, State0, stream_closed,
+ 'WINDOW_UPDATE frame received after the stream was reset. (RFC7540 5.1)')
+ end
+ end.
+
+%% CONTINUATION frame.
+
+%% Convenience record to manipulate the tuple.
+%% The order of the fields matter.
+-record(continuation, {
+ id :: cow_http2:streamid(),
+ head :: cow_http2:head_fin(),
+ data :: binary()
+}).
+
+unexpected_continuation_frame(#continuation{}, State) ->
+ {error, {connection_error, protocol_error,
+ 'CONTINUATION frames MUST be preceded by a HEADERS or PUSH_PROMISE frame. (RFC7540 6.10)'},
+ State}.
+
+continuation_frame(#continuation{id=StreamID, head=head_fin, data=HeaderFragment1},
+ State=#http2_machine{state={continuation, Type,
+ Frame=#headers{id=StreamID, data=HeaderFragment0}}}) ->
+ HeaderData = <<HeaderFragment0/binary, HeaderFragment1/binary>>,
+ headers_decode(Frame#headers{head=head_fin, data=HeaderData},
+ State#http2_machine{state=normal}, Type, stream_get(StreamID, State));
+continuation_frame(#continuation{id=StreamID, head=head_fin, data=HeaderFragment1},
+ State=#http2_machine{state={continuation, Type, #push_promise{
+ id=StreamID, promised_id=PromisedStreamID, data=HeaderFragment0}}}) ->
+ HeaderData = <<HeaderFragment0/binary, HeaderFragment1/binary>>,
+ headers_decode(#headers{id=PromisedStreamID, fin=fin, head=head_fin, data=HeaderData},
+ State#http2_machine{state=normal}, Type, undefined);
+continuation_frame(#continuation{id=StreamID, data=HeaderFragment1},
+ State=#http2_machine{state={continuation, Type, ContinuedFrame0}})
+ when element(2, ContinuedFrame0) =:= StreamID ->
+ ContinuedFrame = case ContinuedFrame0 of
+ #headers{data=HeaderFragment0} ->
+ HeaderData = <<HeaderFragment0/binary, HeaderFragment1/binary>>,
+ ContinuedFrame0#headers{data=HeaderData};
+ #push_promise{data=HeaderFragment0} ->
+ HeaderData = <<HeaderFragment0/binary, HeaderFragment1/binary>>,
+ ContinuedFrame0#push_promise{data=HeaderData}
+ end,
+ {ok, State#http2_machine{state={continuation, Type, ContinuedFrame}}};
+continuation_frame(_F, State) ->
+ {error, {connection_error, protocol_error,
+ 'An invalid frame was received in the middle of a header block. (RFC7540 6.2)'},
+ State}.
+
+%% Ignored frames.
+
+-spec ignored_frame(State)
+ -> {ok, State}
+ | {error, {connection_error, protocol_error, atom()}, State}
+ when State::http2_machine().
+ignored_frame(State=#http2_machine{state={continuation, _, _}}) ->
+ {error, {connection_error, protocol_error,
+ 'An invalid frame was received in the middle of a header block. (RFC7540 6.2)'},
+ State};
+%% @todo It might be useful to error out when we receive
+%% too many unknown frames. (RFC7540 10.5)
+ignored_frame(State) ->
+ {ok, State}.
+
+%% Functions for sending a message header or body. Note that
+%% this module does not send data directly, instead it returns
+%% a value that can then be used to send the frames.
+
+-spec prepare_headers(cow_http2:streamid(), State, idle | cow_http2:fin(),
+ pseudo_headers(), cow_http:headers())
+ -> {ok, cow_http2:fin(), iodata(), State} when State::http2_machine().
+%% @todo Should handle the request case too.
+prepare_headers(StreamID, State=#http2_machine{encode_state=EncodeState0},
+ IsFin0, PseudoHeaders, Headers0) ->
+ Stream = #stream{method=Method, local=idle} = stream_get(StreamID, State),
+ IsFin = case {IsFin0, Method} of
+ {idle, _} -> nofin;
+ {_, <<"HEAD">>} -> fin;
+ _ -> IsFin0
+ end,
+ Headers = merge_pseudo_headers(PseudoHeaders, Headers0),
+ {HeaderBlock, EncodeState} = cow_hpack:encode(Headers, EncodeState0),
+ {ok, IsFin, HeaderBlock, stream_store(Stream#stream{local=IsFin0},
+ State#http2_machine{encode_state=EncodeState})}.
+
+-spec prepare_push_promise(cow_http2:streamid(), State, pseudo_headers(), cow_http:headers())
+ -> {ok, cow_http2:streamid(), iodata(), State}
+ | {error, no_push} when State::http2_machine().
+prepare_push_promise(_, #http2_machine{remote_settings=#{enable_push := false}}, _, _) ->
+ {error, no_push};
+prepare_push_promise(StreamID, State=#http2_machine{encode_state=EncodeState0,
+ local_settings=#{initial_window_size := RemoteWindow},
+ remote_settings=#{initial_window_size := LocalWindow},
+ local_streamid=LocalStreamID}, PseudoHeaders, Headers0) ->
+ #stream{local=idle} = stream_get(StreamID, State),
+ TE = case lists:keyfind(<<"te">>, 1, Headers0) of
+ {_, TE0} -> TE0;
+ false -> undefined
+ end,
+ Headers = merge_pseudo_headers(PseudoHeaders, Headers0),
+ {HeaderBlock, EncodeState} = cow_hpack:encode(Headers, EncodeState0),
+ {ok, LocalStreamID, HeaderBlock, stream_store(
+ #stream{id=LocalStreamID, method=maps:get(method, PseudoHeaders),
+ remote=fin, remote_expected_size=0,
+ local_window=LocalWindow, remote_window=RemoteWindow, te=TE},
+ State#http2_machine{encode_state=EncodeState, local_streamid=LocalStreamID + 2})}.
+
+merge_pseudo_headers(PseudoHeaders, Headers0) ->
+ lists:foldl(fun
+ ({status, Status}, Acc) when is_integer(Status) ->
+ [{<<":status">>, integer_to_binary(Status)}|Acc];
+ ({Name, Value}, Acc) ->
+ [{iolist_to_binary([$:, atom_to_binary(Name, latin1)]), Value}|Acc]
+ end, Headers0, maps:to_list(PseudoHeaders)).
+
+-spec prepare_trailers(cow_http2:streamid(), State, cow_http:headers())
+ -> {ok, iodata(), State} when State::http2_machine().
+prepare_trailers(StreamID, State=#http2_machine{encode_state=EncodeState0}, Trailers) ->
+ Stream = #stream{local=nofin} = stream_get(StreamID, State),
+ {HeaderBlock, EncodeState} = cow_hpack:encode(Trailers, EncodeState0),
+ {ok, HeaderBlock, stream_store(Stream#stream{local=fin},
+ State#http2_machine{encode_state=EncodeState})}.
+
+-spec send_or_queue_data(cow_http2:streamid(), State, cow_http2:fin(), DataOrFileOrTrailers)
+ -> {ok, State}
+ | {send, [{cow_http2:streamid(), cow_http2:fin(), [DataOrFileOrTrailers]}], State}
+ when State::http2_machine(), DataOrFileOrTrailers::
+ {data, iodata()} | #sendfile{} | {trailers, cow_http:headers()}.
+send_or_queue_data(StreamID, State0, IsFin0, DataOrFileOrTrailers0) ->
+ %% @todo Probably just ignore if the method was HEAD.
+ Stream0 = #stream{local=nofin, te=TE0} = stream_get(StreamID, State0),
+ DataOrFileOrTrailers = case DataOrFileOrTrailers0 of
+ {trailers, _} ->
+ %% We only accept TE headers containing exactly "trailers" (RFC7540 8.1.2.1).
+ TE = try cow_http_hd:parse_te(TE0) of
+ {trailers, []} -> trailers;
+ _ -> no_trailers
+ catch _:_ ->
+ %% If we can't parse the TE header, assume we can't send trailers.
+ no_trailers
+ end,
+ case TE of
+ trailers ->
+ DataOrFileOrTrailers0;
+ no_trailers ->
+ {data, <<>>}
+ end;
+ _ ->
+ DataOrFileOrTrailers0
+ end,
+ case send_or_queue_data(Stream0, State0, [], IsFin0, DataOrFileOrTrailers, in) of
+ {ok, Stream, State, []} ->
+ {ok, stream_store(Stream, State)};
+ {ok, Stream=#stream{local=IsFin}, State, SendData} ->
+ {send, [{StreamID, IsFin, lists:reverse(SendData)}], stream_store(Stream, State)}
+ end.
+
+%% Internal data sending/queuing functions.
+
+%% @todo Should we ever want to implement the PRIORITY mechanism,
+%% this would be the place to do it. Right now, we just go over
+%% all streams and send what we can until either everything is
+%% sent or we run out of space in the window.
+send_data(State0=#http2_machine{streams=Streams0}) ->
+ case send_data_for_all_streams(Streams0, State0, [], []) of
+ {ok, Streams, State, []} ->
+ {ok, State#http2_machine{streams=Streams}};
+ {ok, Streams, State, Send} ->
+ {send, Send, State#http2_machine{streams=Streams}}
+ end.
+
+send_data_for_all_streams([], State, Acc, Send) ->
+ {ok, lists:reverse(Acc), State, Send};
+%% While technically we should never get < 0 here, let's be on the safe side.
+send_data_for_all_streams(Tail, State=#http2_machine{local_window=ConnWindow}, Acc, Send)
+ when ConnWindow =< 0 ->
+ {ok, lists:reverse(Acc, Tail), State, Send};
+%% We rely on send_data_for_one_stream/3 to do all the necessary checks about the stream.
+send_data_for_all_streams([Stream0|Tail], State0, Acc, Send) ->
+ case send_data_for_one_stream(Stream0, State0, []) of
+ {ok, Stream, State, []} ->
+ send_data_for_all_streams(Tail, State, [Stream|Acc], Send);
+ %% We need to remove the stream here because we do not use stream_store/2.
+ {ok, #stream{id=StreamID, local=fin, remote=fin}, State, SendData} ->
+ send_data_for_all_streams(Tail, State, Acc,
+ [{StreamID, fin, SendData}|Send]);
+ {ok, Stream=#stream{id=StreamID, local=IsFin}, State, SendData} ->
+ send_data_for_all_streams(Tail, State, [Stream|Acc],
+ [{StreamID, IsFin, SendData}|Send])
+ end.
+
+send_data(Stream0, State0) ->
+ case send_data_for_one_stream(Stream0, State0, []) of
+ {ok, Stream, State, []} ->
+ {ok, stream_store(Stream, State)};
+ {ok, Stream=#stream{id=StreamID, local=IsFin}, State, SendData} ->
+ {send, [{StreamID, IsFin, SendData}], stream_store(Stream, State)}
+ end.
+
+send_data_for_one_stream(Stream=#stream{local=nofin, local_buffer_size=0,
+ local_trailers=Trailers}, State, SendAcc) when Trailers =/= undefined ->
+ {ok, Stream, State, lists:reverse([{trailers, Trailers}|SendAcc])};
+send_data_for_one_stream(Stream=#stream{local=IsFin, local_window=StreamWindow,
+ local_buffer_size=BufferSize}, State=#http2_machine{local_window=ConnWindow}, SendAcc)
+ when ConnWindow =< 0; IsFin =:= fin; StreamWindow =< 0; BufferSize =:= 0 ->
+ {ok, Stream, State, lists:reverse(SendAcc)};
+send_data_for_one_stream(Stream0=#stream{local_buffer=Q0, local_buffer_size=BufferSize},
+ State0, SendAcc0) ->
+ %% We know there is an item in the queue.
+ {{value, {IsFin, DataSize, Data}}, Q} = queue:out(Q0),
+ Stream1 = Stream0#stream{local_buffer=Q, local_buffer_size=BufferSize - DataSize},
+ {ok, Stream, State, SendAcc}
+ = send_or_queue_data(Stream1, State0, SendAcc0, IsFin, Data, in_r),
+ send_data_for_one_stream(Stream, State, SendAcc).
+
+%% We can send trailers immediately if the queue is empty, otherwise we queue.
+%% We always send trailer frames even if the window is empty.
+send_or_queue_data(Stream=#stream{local_buffer_size=0},
+ State, SendAcc, fin, {trailers, Trailers}, _) ->
+ {ok, Stream, State, [{trailers, Trailers}|SendAcc]};
+send_or_queue_data(Stream, State, SendAcc, fin, {trailers, Trailers}, _) ->
+ {ok, Stream#stream{local_trailers=Trailers}, State, SendAcc};
+%% Send data immediately if we can, buffer otherwise.
+send_or_queue_data(Stream=#stream{local_window=StreamWindow},
+ State=#http2_machine{local_window=ConnWindow},
+ SendAcc, IsFin, Data, In)
+ when ConnWindow =< 0; StreamWindow =< 0 ->
+ {ok, queue_data(Stream, IsFin, Data, In), State, SendAcc};
+send_or_queue_data(Stream=#stream{local_window=StreamWindow},
+ State=#http2_machine{opts=Opts, remote_settings=RemoteSettings,
+ local_window=ConnWindow}, SendAcc, IsFin, Data, In) ->
+ RemoteMaxFrameSize = maps:get(max_frame_size, RemoteSettings, 16384),
+ ConfiguredMaxFrameSize = maps:get(max_frame_size_sent, Opts, infinity),
+ MaxSendSize = min(
+ min(ConnWindow, StreamWindow),
+ min(RemoteMaxFrameSize, ConfiguredMaxFrameSize)
+ ),
+ case Data of
+ File = #sendfile{bytes=Bytes} when Bytes =< MaxSendSize ->
+ {ok, Stream#stream{local=IsFin, local_window=StreamWindow - Bytes},
+ State#http2_machine{local_window=ConnWindow - Bytes},
+ [File|SendAcc]};
+ File = #sendfile{offset=Offset, bytes=Bytes} ->
+ send_or_queue_data(Stream#stream{local_window=StreamWindow - MaxSendSize},
+ State#http2_machine{local_window=ConnWindow - MaxSendSize},
+ [File#sendfile{bytes=MaxSendSize}|SendAcc], IsFin,
+ File#sendfile{offset=Offset + MaxSendSize, bytes=Bytes - MaxSendSize}, In);
+ {data, Iolist0} ->
+ IolistSize = iolist_size(Iolist0),
+ if
+ IolistSize =< MaxSendSize ->
+ {ok, Stream#stream{local=IsFin, local_window=StreamWindow - IolistSize},
+ State#http2_machine{local_window=ConnWindow - IolistSize},
+ [{data, Iolist0}|SendAcc]};
+ true ->
+ {Iolist, More} = cow_iolists:split(MaxSendSize, Iolist0),
+ send_or_queue_data(Stream#stream{local_window=StreamWindow - MaxSendSize},
+ State#http2_machine{local_window=ConnWindow - MaxSendSize},
+ [{data, Iolist}|SendAcc], IsFin, {data, More}, In)
+ end
+ end.
+
+queue_data(Stream=#stream{local_buffer=Q0, local_buffer_size=Size0}, IsFin, Data, In) ->
+ DataSize = case Data of
+ {sendfile, _, Bytes, _} -> Bytes;
+ {data, Iolist} -> iolist_size(Iolist)
+ end,
+ Q = queue:In({IsFin, DataSize, Data}, Q0),
+ Stream#stream{local_buffer=Q, local_buffer_size=Size0 + DataSize}.
+
+%% Public interface to update the flow control window.
+
+-spec update_window(0..16#7fffffff, State)
+ -> State when State::http2_machine().
+update_window(Size, State=#http2_machine{remote_window=RemoteWindow}) ->
+ State#http2_machine{remote_window=RemoteWindow + Size}.
+
+-spec update_window(cow_http2:streamid(), 0..16#7fffffff, State)
+ -> State when State::http2_machine().
+update_window(StreamID, Size, State) ->
+ Stream = #stream{remote_window=RemoteWindow} = stream_get(StreamID, State),
+ stream_store(Stream#stream{remote_window=RemoteWindow + Size}, State).
+
+%% Public interface to reset streams.
+
+-spec reset_stream(cow_http2:streamid(), State)
+ -> {ok, State} | {error, not_found} when State::http2_machine().
+reset_stream(StreamID, State=#http2_machine{streams=Streams0}) ->
+ case lists:keytake(StreamID, #stream.id, Streams0) of
+ {value, _, Streams} ->
+ {ok, stream_linger(StreamID, State#http2_machine{streams=Streams})};
+ false ->
+ {error, not_found}
+ end.
+
+%% Retrieve a setting value, or its default value if not set.
+
+-spec get_local_setting(atom(), http2_machine()) -> atom() | integer().
+get_local_setting(Key, #http2_machine{local_settings=Settings}) ->
+ maps:get(Key, Settings, default_setting_value(Key)).
+
+default_setting_value(header_table_size) -> 4096;
+default_setting_value(enable_push) -> true;
+default_setting_value(max_concurrent_streams) -> infinity;
+default_setting_value(initial_window_size) -> 65535;
+default_setting_value(max_frame_size) -> 16384;
+default_setting_value(max_header_list_size) -> infinity;
+default_setting_value(enable_connect_protocol) -> false.
+
+%% Function to obtain the last known streamid received
+%% for the purposes of sending a GOAWAY frame and closing the connection.
+
+-spec get_last_streamid(http2_machine()) -> cow_http2:streamid().
+get_last_streamid(#http2_machine{remote_streamid=RemoteStreamID}) ->
+ RemoteStreamID.
+
+%% Retrieve the local state for a stream, including the state in the queue.
+
+-spec get_stream_local_state(cow_http2:streamid(), http2_machine())
+ -> {ok, idle | cow_http2:fin(), empty | nofin | fin} | {error, not_found | closed}.
+get_stream_local_state(StreamID, State=#http2_machine{mode=Mode,
+ local_streamid=LocalStreamID, remote_streamid=RemoteStreamID}) ->
+ case stream_get(StreamID, State) of
+ #stream{local=IsFin, local_buffer=Q, local_trailers=undefined} ->
+ IsQueueFin = case queue:peek_r(Q) of
+ empty -> empty;
+ {value, {IsQueueFin0, _, _}} -> IsQueueFin0
+ end,
+ {ok, IsFin, IsQueueFin};
+ %% Trailers are queued so the local state is fin after the queue is drained.
+ #stream{local=IsFin} ->
+ {ok, IsFin, fin};
+ undefined when (?IS_LOCAL(Mode, StreamID) andalso (StreamID < LocalStreamID))
+ orelse ((not ?IS_LOCAL(Mode, StreamID)) andalso (StreamID =< RemoteStreamID)) ->
+ {error, closed};
+ undefined ->
+ {error, not_found}
+ end.
+
+%% Stream-related functions.
+
+stream_get(StreamID, #http2_machine{streams=Streams}) ->
+ case lists:keyfind(StreamID, #stream.id, Streams) of
+ false -> undefined;
+ Stream -> Stream
+ end.
+
+stream_store(#stream{id=StreamID, local=fin, remote=fin},
+ State=#http2_machine{streams=Streams0}) ->
+ Streams = lists:keydelete(StreamID, #stream.id, Streams0),
+ State#http2_machine{streams=Streams};
+stream_store(Stream=#stream{id=StreamID},
+ State=#http2_machine{streams=Streams0}) ->
+ Streams = lists:keystore(StreamID, #stream.id, Streams0, Stream),
+ State#http2_machine{streams=Streams}.
+
+%% @todo Don't send an RST_STREAM if one was already sent.
+stream_reset(StreamID, State, Reason, HumanReadable) ->
+ {error, {stream_error, StreamID, Reason, HumanReadable},
+ stream_linger(StreamID, State)}.
+
+stream_linger(StreamID, State=#http2_machine{lingering_streams=Lingering0}) ->
+ %% We only keep up to 100 streams in this state. @todo Make it configurable?
+ Lingering = [StreamID|lists:sublist(Lingering0, 100 - 1)],
+ State#http2_machine{lingering_streams=Lingering}.