From b461b119e78e4e09bb28b186b09da7ed4a86a0dd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Hoguin?= Date: Fri, 26 Oct 2018 10:12:25 +0200 Subject: Introduce cow_http2_machine, an HTTP/2 state machine This is the result of a merge of the Cowboy and Gun HTTP/2 codes. It can probably do a little more but it's at a point where Cowboy works fine when using it so additional work will be done in other commits. The Gun code has not been switched to this module yet. I expect for example the PUSH_PROMISE code to fail at this point. This will be the next step. --- src/cow_http2.erl | 6 + src/cow_http2_machine.erl | 1295 +++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 1301 insertions(+) create mode 100644 src/cow_http2_machine.erl (limited to 'src') diff --git a/src/cow_http2.erl b/src/cow_http2.erl index e6f7738..ec4aab9 100644 --- a/src/cow_http2.erl +++ b/src/cow_http2.erl @@ -36,8 +36,14 @@ -export([window_update/2]). -type streamid() :: pos_integer(). +-export_type([streamid/0]). + -type fin() :: fin | nofin. +-export_type([fin/0]). + -type head_fin() :: head_fin | head_nofin. +-export_type([head_fin/0]). + -type exclusive() :: exclusive | shared. -type weight() :: 1..256. -type settings() :: map(). diff --git a/src/cow_http2_machine.erl b/src/cow_http2_machine.erl new file mode 100644 index 0000000..b1cf1da --- /dev/null +++ b/src/cow_http2_machine.erl @@ -0,0 +1,1295 @@ +%% Copyright (c) 2018, Loïc Hoguin +%% +%% Permission to use, copy, modify, and/or distribute this software for any +%% purpose with or without fee is hereby granted, provided that the above +%% copyright notice and this permission notice appear in all copies. +%% +%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES +%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF +%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR +%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES +%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN +%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF +%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + +-module(cow_http2_machine). + +-export([init/2]). +-export([init_upgrade_stream/2]). +-export([frame/2]). +-export([ignored_frame/1]). +-export([prepare_headers/5]). +-export([prepare_push_promise/4]). +-export([prepare_trailers/3]). +-export([send_or_queue_data/4]). +-export([update_window/2]). +-export([update_window/3]). +-export([reset_stream/2]). +-export([get_local_setting/2]). +-export([get_last_streamid/1]). +-export([get_stream_local_state/2]). + +-type opts() :: #{ + enable_connect_protocol => boolean(), + initial_connection_window_size => 65535..16#7fffffff, + initial_stream_window_size => 0..16#7fffffff, + max_concurrent_streams => non_neg_integer() | infinity, + max_decode_table_size => non_neg_integer(), + max_encode_table_size => non_neg_integer(), + max_frame_size_received => 16384..16777215, + max_frame_size_sent => 16384..16777215 | infinity +}. +-export_type([opts/0]). + +%% The order of the fields is significant. +-record(sendfile, { + offset :: non_neg_integer(), + bytes :: pos_integer(), + path :: file:name_all() +}). + +-record(stream, { + id = undefined :: cow_http2:streamid(), + + %% Request method. + method = undefined :: binary(), + + %% Whether we finished sending data. + local = idle :: idle | cow_http2:fin(), + + %% Local flow control window (how much we can send). + local_window :: integer(), + + %% Buffered data waiting for the flow control window to increase. + local_buffer = queue:new() :: + queue:queue({cow_http2:fin(), non_neg_integer(), {data, iodata()} | #sendfile{}}), + local_buffer_size = 0 :: non_neg_integer(), + local_trailers = undefined :: undefined | cow_http:headers(), + + %% Whether we finished receiving data. + remote = idle :: idle | cow_http2:fin(), + + %% Remote flow control window (how much we accept to receive). + remote_window :: integer(), + + %% Size expected and read from the request body. + remote_expected_size = undefined :: undefined | non_neg_integer(), + remote_read_size = 0 :: non_neg_integer(), + + %% Unparsed te header. Used to know if we can send trailers. + te :: undefined | binary() +}). + +-type stream() :: #stream{}. + +-type continued_frame() :: + {headers, cow_http2:streamid(), cow_http2:fin(), cow_http2:head_fin(), binary()} | + {push_promise, cow_http2:streamid(), cow_http2:head_fin(), cow_http2:streamid(), binary()}. + +-record(http2_machine, { + %% Whether the HTTP/2 endpoint is a client or a server. + mode :: client | server, + + %% HTTP/2 SETTINGS customization. + opts = #{} :: opts(), + + %% Connection-wide frame processing state. + state = settings :: settings | normal + | {continuation, request | response | trailers | push_promise, continued_frame()}, + + %% Settings are separate for each endpoint. In addition, settings + %% must be acknowledged before they can be expected to be applied. + local_settings = #{ +% header_table_size => 4096, +% enable_push => true, +% max_concurrent_streams => infinity, + initial_window_size => 65535 +% max_frame_size => 16384 +% max_header_list_size => infinity + } :: map(), + next_settings = undefined :: undefined | map(), + remote_settings = #{ + initial_window_size => 65535 + } :: map(), + + %% Connection-wide flow control window. + local_window = 65535 :: integer(), %% How much we can send. + remote_window = 65535 :: integer(), %% How much we accept to receive. + + %% Stream identifiers. + local_streamid :: pos_integer(), %% The next streamid to be used. + remote_streamid = 0 :: non_neg_integer(), %% The last streamid received. + + %% Currently active HTTP/2 streams. Streams may be initiated either + %% by the client or by the server through PUSH_PROMISE frames. + streams = [] :: [stream()], + + %% HTTP/2 streams that have been reset recently by the server. + %% We are expected to keep receiving additional frames after + %% sending an RST_STREAM. + lingering_streams = [] :: [cow_http2:streamid()], + + %% HTTP/2 streams that have been reset recently by the client. + %% We keep a few of these around in order to reject subsequent + %% frames on these streams. + rst_lingering_streams = [] :: [cow_http2:streamid()], + + %% HPACK decoding and encoding state. + decode_state = cow_hpack:init() :: cow_hpack:state(), + encode_state = cow_hpack:init() :: cow_hpack:state() +}). + +-opaque http2_machine() :: #http2_machine{}. +-export_type([http2_machine/0]). + +-type pseudo_headers() :: #{} %% Trailers + | #{ %% Responses. + status := cow_http:status() + } | #{ %% Normal CONNECT requests. + method := binary(), + authority := binary() + } | #{ %% Other requests and extended CONNECT requests. + method := binary(), + scheme := binary(), + authority := binary(), + path := binary(), + protocol => binary() + }. + +%% Returns true when the given StreamID is for a local-initiated stream. +-define(IS_SERVER_LOCAL(StreamID), ((StreamID rem 2) =:= 0)). +-define(IS_CLIENT_LOCAL(StreamID), ((StreamID rem 2) =:= 1)). +-define(IS_LOCAL(Mode, StreamID), ( + ((Mode =:= server) andalso ?IS_SERVER_LOCAL(StreamID)) + orelse + ((Mode =:= client) andalso ?IS_CLIENT_LOCAL(StreamID)) +)). + +-spec init(client | server, opts()) -> {ok, iodata(), http2_machine()}. +init(client, Opts) -> + NextSettings = settings_init(Opts), + client_preface(#http2_machine{ + mode=client, + opts=only_keep_relevant_opts(Opts), + next_settings=NextSettings, + local_streamid=1 + }); +init(server, Opts) -> + NextSettings = settings_init(Opts), + common_preface(#http2_machine{ + mode=server, + opts=only_keep_relevant_opts(Opts), + next_settings=NextSettings, + local_streamid=2 + }). + +%% We remove the options that are part of SETTINGS or that are not known. +only_keep_relevant_opts(Opts) -> + maps:with([ + initial_connection_window_size, + max_encode_table_size, + max_frame_size_sent + ], Opts). + +client_preface(State0) -> + {ok, CommonPreface, State} = common_preface(State0), + {ok, [ + <<"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n">>, + CommonPreface + ], State}. + +%% We send next_settings and use defaults until we get an ack. +%% +%% We also send a WINDOW_UPDATE frame for the connection when +%% the user specified an initial_connection_window_size. +common_preface(State=#http2_machine{opts=Opts, next_settings=NextSettings}) -> + case maps:get(initial_connection_window_size, Opts, 65535) of + 65535 -> + {ok, cow_http2:settings(NextSettings), State}; + Size -> + {ok, [ + cow_http2:settings(NextSettings), + cow_http2:window_update(Size - 65535) + ], update_window(Size - 65535, State)} + end. + +settings_init(Opts) -> + S0 = setting_from_opt(#{}, Opts, max_decode_table_size, + header_table_size, 4096), + S1 = setting_from_opt(S0, Opts, max_concurrent_streams, + max_concurrent_streams, infinity), + S2 = setting_from_opt(S1, Opts, initial_stream_window_size, + initial_window_size, 65535), + S3 = setting_from_opt(S2, Opts, max_frame_size_received, + max_frame_size, 16384), + %% @todo max_header_list_size + setting_from_opt(S3, Opts, enable_connect_protocol, + enable_connect_protocol, false). + +setting_from_opt(Settings, Opts, OptName, SettingName, Default) -> + case maps:get(OptName, Opts, Default) of + Default -> Settings; + Value -> Settings#{SettingName => Value} + end. + +-spec init_upgrade_stream(binary(), State) + -> {ok, cow_http2:streamid(), State} when State::http2_machine(). +init_upgrade_stream(Method, State=#http2_machine{mode=server, remote_streamid=0, + local_settings=#{initial_window_size := RemoteWindow}, + remote_settings=#{initial_window_size := LocalWindow}}) -> + Stream = #stream{id=1, method=Method, + remote=fin, remote_expected_size=0, + local_window=LocalWindow, remote_window=RemoteWindow, te=undefined}, + {ok, 1, stream_store(Stream, State#http2_machine{remote_streamid=1})}. + +-spec frame(cow_http2:frame(), State) + -> {ok, State} + | {ok, {data, cow_http2:streamid(), cow_http2:fin(), binary()}, State} + | {ok, {headers, cow_http2:streamid(), cow_http2:fin(), + cow_http:headers(), pseudo_headers(), non_neg_integer() | undefined}, State} + | {ok, {trailers, cow_http2:streamid(), cow_http:headers()}, State} + | {ok, {rst_stream, cow_http2:streamid(), cow_http2:error()}, State} + | {ok, {push_promise, cow_http2:streamid(), cow_http2:streamid(), + cow_http:headers(), pseudo_headers()}, State} + | {ok, {goaway, cow_http2:streamid(), cow_http2:error(), binary()}, State} + | {send, [{cow_http2:streamid(), cow_http2:fin(), + [{data, iodata()} | #sendfile{} | {trailers, cow_http:headers()}]}], State} + | {error, {stream_error, cow_http2:streamid(), cow_http2:error(), atom()}, State} + | {error, {connection_error, cow_http2:error(), atom()}, State} + when State::http2_machine(). +frame(Frame, State=#http2_machine{state=settings}) -> + settings_frame(Frame, State#http2_machine{state=normal}); +frame(Frame, State=#http2_machine{state={continuation, _, _}}) -> + continuation_frame(Frame, State); +frame(settings_ack, State=#http2_machine{state=normal}) -> + settings_ack_frame(State); +frame(Frame, State=#http2_machine{state=normal}) -> + case element(1, Frame) of + data -> data_frame(Frame, State); + headers -> headers_frame(Frame, State); + priority -> priority_frame(Frame, State); + rst_stream -> rst_stream_frame(Frame, State); + settings -> settings_frame(Frame, State); + push_promise -> push_promise_frame(Frame, State); + ping -> ping_frame(Frame, State); + ping_ack -> ping_ack_frame(Frame, State); + goaway -> goaway_frame(Frame, State); + window_update -> window_update_frame(Frame, State); + continuation -> unexpected_continuation_frame(Frame, State); + _ -> ignored_frame(State) + end. + +%% DATA frame. + +data_frame({data, StreamID, _, _}, State=#http2_machine{mode=Mode, + local_streamid=LocalStreamID, remote_streamid=RemoteStreamID}) + when (?IS_LOCAL(Mode, StreamID) andalso (StreamID >= LocalStreamID)) + orelse ((not ?IS_LOCAL(Mode, StreamID)) andalso (StreamID > RemoteStreamID)) -> + {error, {connection_error, protocol_error, + 'DATA frame received on a stream in idle state. (RFC7540 5.1)'}, + State}; +data_frame({data, _, _, Data}, State=#http2_machine{remote_window=ConnWindow}) + when byte_size(Data) > ConnWindow -> + {error, {connection_error, flow_control_error, + 'DATA frame overflowed the connection flow control window. (RFC7540 6.9, RFC7540 6.9.1)'}, + State}; +data_frame(Frame={data, StreamID, _, Data}, State0=#http2_machine{ + remote_window=ConnWindow, lingering_streams=Lingering}) -> + DataLen = byte_size(Data), + State = State0#http2_machine{remote_window=ConnWindow - DataLen}, + case stream_get(StreamID, State) of + #stream{remote_window=StreamWindow} when StreamWindow < DataLen -> + stream_reset(StreamID, State, flow_control_error, + 'DATA frame overflowed the stream flow control window. (RFC7540 6.9, RFC7540 6.9.1)'); + Stream = #stream{remote=nofin} -> + data_frame(Frame, State, Stream, DataLen); + #stream{remote=idle} -> + stream_reset(StreamID, State, protocol_error, + 'DATA frame received before a HEADERS frame. (RFC7540 8.1, RFC7540 8.1.2.6)'); + #stream{remote=fin} -> + stream_reset(StreamID, State, stream_closed, + 'DATA frame received for a half-closed (remote) stream. (RFC7540 5.1)'); + undefined -> + %% After we send an RST_STREAM frame and terminate a stream, + %% the client still might be sending us some more frames + %% until it can process this RST_STREAM. We therefore ignore + %% DATA frames received for such lingering streams. + case lists:member(StreamID, Lingering) of + true -> + {ok, State0}; + false -> + {error, {connection_error, stream_closed, + 'DATA frame received for a closed stream. (RFC7540 5.1)'}, + State} + end + end. + +data_frame(Frame={data, _, IsFin, _}, State0, Stream0=#stream{id=StreamID, + remote_window=StreamWindow, remote_read_size=StreamRead}, DataLen) -> + Stream = Stream0#stream{remote=IsFin, + remote_window=StreamWindow - DataLen, + remote_read_size=StreamRead + DataLen}, + State = stream_store(Stream, State0), + case is_body_size_valid(Stream) of + true -> + {ok, Frame, State}; + false -> + stream_reset(StreamID, State, protocol_error, + 'The total size of DATA frames is different than the content-length. (RFC7540 8.1.2.6)') + end. + +%% It's always valid when no content-length header was specified. +is_body_size_valid(#stream{remote_expected_size=undefined}) -> + true; +%% We didn't finish reading the body but the size is already larger than expected. +is_body_size_valid(#stream{remote=nofin, remote_expected_size=Expected, + remote_read_size=Read}) when Read > Expected -> + false; +is_body_size_valid(#stream{remote=nofin}) -> + true; +is_body_size_valid(#stream{remote=fin, remote_expected_size=Expected, + remote_read_size=Expected}) -> + true; +%% We finished reading the body and the size read is not the one expected. +is_body_size_valid(_) -> + false. + +%% HEADERS frame. +%% +%% We always close the connection when we detect errors before +%% decoding the headers to not waste resources on non-compliant +%% endpoints, making us stricter than the RFC requires. + +%% Convenience record to manipulate the tuple. +%% The order of the fields matter. +-record(headers, { + id :: cow_http2:streamid(), + fin :: cow_http2:fin(), + head :: cow_http2:head_fin(), + data :: binary() +}). + +headers_frame(Frame=#headers{}, State=#http2_machine{mode=Mode}) -> + case Mode of + server -> server_headers_frame(Frame, State); + client -> client_headers_frame(Frame, State) + end; +%% @todo Handle the PRIORITY data, but only if this returns an ok tuple. +%% @todo Do not lose the PRIORITY information if CONTINUATION frames follow. +headers_frame({headers, StreamID, IsFin, IsHeadFin, + _IsExclusive, _DepStreamID, _Weight, HeaderData}, + State=#http2_machine{mode=Mode}) -> + HeadersFrame = #headers{id=StreamID, fin=IsFin, head=IsHeadFin, data=HeaderData}, + case Mode of + server -> server_headers_frame(HeadersFrame, State); + client -> client_headers_frame(HeadersFrame, State) + end. + +%% Reject HEADERS frames with even-numbered streamid. +server_headers_frame(#headers{id=StreamID}, State) + when ?IS_SERVER_LOCAL(StreamID) -> + {error, {connection_error, protocol_error, + 'HEADERS frame received with even-numbered streamid. (RFC7540 5.1.1)'}, + State}; +%% HEADERS frame on an idle stream: new request. +server_headers_frame(Frame=#headers{id=StreamID, head=IsHeadFin}, + State=#http2_machine{mode=server, remote_streamid=RemoteStreamID}) + when StreamID > RemoteStreamID -> + case IsHeadFin of + head_fin -> + headers_decode(Frame, State, request, undefined); + head_nofin -> + {ok, State#http2_machine{state={continuation, request, Frame}}} + end; +%% Either a HEADERS frame received on (half-)closed stream, +%% or a HEADERS frame containing the trailers. +server_headers_frame(Frame=#headers{id=StreamID, fin=IsFin, head=IsHeadFin}, State) -> + case stream_get(StreamID, State) of + %% Trailers. + Stream = #stream{remote=nofin} when IsFin =:= fin -> + case IsHeadFin of + head_fin -> + headers_decode(Frame, State, trailers, Stream); + head_nofin -> + {ok, State#http2_machine{state={continuation, trailers, Frame}}} + end; + #stream{remote=nofin} -> + {error, {connection_error, protocol_error, + 'Trailing HEADERS frame received without the END_STREAM flag set. (RFC7540 8.1, RFC7540 8.1.2.6)'}, + State}; + _ -> + {error, {connection_error, stream_closed, + 'HEADERS frame received on a stream in closed or half-closed state. (RFC7540 5.1)'}, + State} + end. + +%% Either a HEADERS frame received on an (half-)closed stream, +%% or a HEADERS frame containing the response or the trailers. +client_headers_frame(Frame=#headers{id=StreamID, fin=IsFin, head=IsHeadFin}, + State=#http2_machine{local_streamid=LocalStreamID, remote_streamid=RemoteStreamID}) + when (?IS_CLIENT_LOCAL(StreamID) andalso (StreamID < LocalStreamID)) + orelse ((not ?IS_CLIENT_LOCAL(StreamID)) andalso (StreamID =< RemoteStreamID)) -> + case stream_get(StreamID, State) of + Stream = #stream{remote=idle} -> + case IsHeadFin of + head_fin -> + headers_decode(Frame, State, response, Stream); + head_nofin -> + {ok, State#http2_machine{state={continuation, response, Frame}}} + end; + Stream = #stream{remote=nofin} when IsFin =:= fin -> + case IsHeadFin of + head_fin -> + headers_decode(Frame, State, trailers, Stream); + head_nofin -> + {ok, State#http2_machine{state={continuation, trailers, Frame}}} + end; + #stream{remote=nofin} -> + {error, {connection_error, protocol_error, + 'Trailing HEADERS frame received without the END_STREAM flag set. (RFC7540 8.1, RFC7540 8.1.2.6)'}, + State}; + _ -> + {error, {connection_error, stream_closed, + 'HEADERS frame received on a stream in closed or half-closed state. (RFC7540 5.1)'}, + State} + end; +%% Reject HEADERS frames received on idle streams. +client_headers_frame(_, State) -> + {error, {connection_error, protocol_error, + 'HEADERS frame received on an idle stream. (RFC7540 5.1.1)'}, + State}. + +headers_decode(Frame=#headers{head=head_fin, data=HeaderData}, + State=#http2_machine{decode_state=DecodeState0}, Type, Stream) -> + try cow_hpack:decode(HeaderData, DecodeState0) of + {Headers, DecodeState} when Type =:= request -> + headers_enforce_concurrency_limit(Frame, + State#http2_machine{decode_state=DecodeState}, Type, Stream, Headers); + {Headers, DecodeState} -> + headers_pseudo_headers(Frame, + State#http2_machine{decode_state=DecodeState}, Type, Stream, Headers) + catch _:_ -> + {error, {connection_error, compression_error, + 'Error while trying to decode HPACK-encoded header block. (RFC7540 4.3)'}, + State} + end. + +headers_enforce_concurrency_limit(Frame=#headers{id=StreamID}, + State=#http2_machine{local_settings=LocalSettings, streams=Streams}, + Type, Stream, Headers) -> + MaxConcurrentStreams = maps:get(max_concurrent_streams, LocalSettings, infinity), + %% Using < is correct because this new stream is not included + %% in the Streams variable yet and so we'll end up with +1 stream. + case length(Streams) < MaxConcurrentStreams of + true -> + headers_pseudo_headers(Frame, State, Type, Stream, Headers); + false -> + {error, {stream_error, StreamID, refused_stream, + 'Maximum number of concurrent streams has been reached. (RFC7540 5.1.2)'}, + State} + end. + +headers_pseudo_headers(Frame, State=#http2_machine{local_settings=LocalSettings}, + Type=request, Stream, Headers0) -> + IsExtendedConnectEnabled = maps:get(enable_connect_protocol, LocalSettings, false), + case request_pseudo_headers(Headers0, #{}) of + %% Extended CONNECT method (RFC8441). + {ok, PseudoHeaders=#{method := <<"CONNECT">>, scheme := _, + authority := _, path := _, protocol := _}, Headers} + when IsExtendedConnectEnabled -> + headers_regular_headers(Frame, State, Type, Stream, PseudoHeaders, Headers); + {ok, #{method := <<"CONNECT">>, scheme := _, + authority := _, path := _}, _} + when IsExtendedConnectEnabled -> + headers_malformed(Frame, State, + 'The :protocol pseudo-header MUST be sent with an extended CONNECT. (RFC8441 4)'); + {ok, #{protocol := _}, _} -> + headers_malformed(Frame, State, + 'The :protocol pseudo-header is only defined for the extended CONNECT. (RFC8441 4)'); + %% Normal CONNECT (no scheme/path). + {ok, PseudoHeaders=#{method := <<"CONNECT">>, authority := _}, Headers} + when map_size(PseudoHeaders) =:= 2 -> + headers_regular_headers(Frame, State, Type, Stream, PseudoHeaders, Headers); + {ok, #{method := <<"CONNECT">>}, _} -> + headers_malformed(Frame, State, + 'CONNECT requests only use the :method and :authority pseudo-headers. (RFC7540 8.3)'); + %% Other requests. + {ok, PseudoHeaders=#{method := _, scheme := _, authority := _, path := _}, Headers} -> + headers_regular_headers(Frame, State, Type, Stream, PseudoHeaders, Headers); + {ok, _, _} -> + headers_malformed(Frame, State, + 'A required pseudo-header was not found. (RFC7540 8.1.2.3)'); + {error, HumanReadable} -> + headers_malformed(Frame, State, HumanReadable) + end; +headers_pseudo_headers(Frame=#headers{id=StreamID}, + State, Type=response, Stream, Headers0) -> + case response_pseudo_headers(Headers0, #{}) of + {ok, PseudoHeaders=#{status := _}, Headers} -> + headers_regular_headers(Frame, State, Type, Stream, PseudoHeaders, Headers); + {ok, _, _} -> + stream_reset(StreamID, State, protocol_error, + 'A required pseudo-header was not found. (RFC7540 8.1.2.4)'); + {error, HumanReadable} -> + stream_reset(StreamID, State, protocol_error, HumanReadable) + end; +headers_pseudo_headers(Frame=#headers{id=StreamID}, + State, Type=trailers, Stream, Headers) -> + case trailers_contain_pseudo_headers(Headers) of + false -> + headers_regular_headers(Frame, State, Type, Stream, #{}, Headers); + true -> + stream_reset(StreamID, State, protocol_error, + 'Trailer header blocks must not contain pseudo-headers. (RFC7540 8.1.2.1)') + end. + +headers_malformed(#headers{id=StreamID}, State, HumanReadable) -> + {error, {stream_error, StreamID, protocol_error, HumanReadable}, State}. + +request_pseudo_headers([{<<":method">>, _}|_], #{method := _}) -> + {error, 'Multiple :method pseudo-headers were found. (RFC7540 8.1.2.3)'}; +request_pseudo_headers([{<<":method">>, Method}|Tail], PseudoHeaders) -> + request_pseudo_headers(Tail, PseudoHeaders#{method => Method}); +request_pseudo_headers([{<<":scheme">>, _}|_], #{scheme := _}) -> + {error, 'Multiple :scheme pseudo-headers were found. (RFC7540 8.1.2.3)'}; +request_pseudo_headers([{<<":scheme">>, Scheme}|Tail], PseudoHeaders) -> + request_pseudo_headers(Tail, PseudoHeaders#{scheme => Scheme}); +request_pseudo_headers([{<<":authority">>, _}|_], #{authority := _}) -> + {error, 'Multiple :authority pseudo-headers were found. (RFC7540 8.1.2.3)'}; +request_pseudo_headers([{<<":authority">>, Authority}|Tail], PseudoHeaders) -> + request_pseudo_headers(Tail, PseudoHeaders#{authority => Authority}); +request_pseudo_headers([{<<":path">>, _}|_], #{path := _}) -> + {error, 'Multiple :path pseudo-headers were found. (RFC7540 8.1.2.3)'}; +request_pseudo_headers([{<<":path">>, Path}|Tail], PseudoHeaders) -> + request_pseudo_headers(Tail, PseudoHeaders#{path => Path}); +request_pseudo_headers([{<<":protocol">>, _}|_], #{protocol := _}) -> + {error, 'Multiple :protocol pseudo-headers were found. (RFC7540 8.1.2.3)'}; +request_pseudo_headers([{<<":protocol">>, Protocol}|Tail], PseudoHeaders) -> + request_pseudo_headers(Tail, PseudoHeaders#{protocol => Protocol}); +request_pseudo_headers([{<<":", _/bits>>, _}|_], _) -> + {error, 'An unknown or invalid pseudo-header was found. (RFC7540 8.1.2.1)'}; +request_pseudo_headers(Headers, PseudoHeaders) -> + {ok, PseudoHeaders, Headers}. + +response_pseudo_headers([{<<":status">>, _}|_], #{status := _}) -> + {error, 'Multiple :status pseudo-headers were found. (RFC7540 8.1.2.3)'}; +response_pseudo_headers([{<<":status">>, Status}|Tail], PseudoHeaders) -> + try cow_http:status_to_integer(Status) of + IntStatus -> + response_pseudo_headers(Tail, PseudoHeaders#{status => IntStatus}) + catch _:_ -> + {error, 'The :status pseudo-header value is invalid. (RFC7540 8.1.2.4)'} + end; +response_pseudo_headers([{<<":", _/bits>>, _}|_], _) -> + {error, 'An unknown or invalid pseudo-header was found. (RFC7540 8.1.2.1)'}; +response_pseudo_headers(Headers, PseudoHeaders) -> + {ok, PseudoHeaders, Headers}. + +trailers_contain_pseudo_headers([]) -> + false; +trailers_contain_pseudo_headers([{<<":", _/bits>>, _}|_]) -> + true; +trailers_contain_pseudo_headers([_|Tail]) -> + trailers_contain_pseudo_headers(Tail). + +%% Rejecting invalid regular headers might be a bit too strong for clients. +headers_regular_headers(Frame=#headers{id=StreamID}, + State, Type, Stream, PseudoHeaders, Headers) -> + case regular_headers(Headers, Type) of + ok when Type =:= request -> + request_expected_size(Frame, State, Type, Stream, PseudoHeaders, Headers); + ok when Type =:= response -> + response_expected_size(Frame, State, Type, Stream, PseudoHeaders, Headers); + ok when Type =:= trailers -> + trailers_frame(Frame, State, Stream, Headers); + {error, HumanReadable} when Type =:= request -> + headers_malformed(Frame, State, HumanReadable); + {error, HumanReadable} -> + stream_reset(StreamID, State, protocol_error, HumanReadable) + end. + +regular_headers([{<<":", _/bits>>, _}|_], _) -> + {error, 'Pseudo-headers were found after regular headers. (RFC7540 8.1.2.1)'}; +regular_headers([{<<"connection">>, _}|_], _) -> + {error, 'The connection header is not allowed. (RFC7540 8.1.2.2)'}; +regular_headers([{<<"keep-alive">>, _}|_], _) -> + {error, 'The keep-alive header is not allowed. (RFC7540 8.1.2.2)'}; +regular_headers([{<<"proxy-authenticate">>, _}|_], _) -> + {error, 'The proxy-authenticate header is not allowed. (RFC7540 8.1.2.2)'}; +regular_headers([{<<"proxy-authorization">>, _}|_], _) -> + {error, 'The proxy-authorization header is not allowed. (RFC7540 8.1.2.2)'}; +regular_headers([{<<"transfer-encoding">>, _}|_], _) -> + {error, 'The transfer-encoding header is not allowed. (RFC7540 8.1.2.2)'}; +regular_headers([{<<"upgrade">>, _}|_], _) -> + {error, 'The upgrade header is not allowed. (RFC7540 8.1.2.2)'}; +regular_headers([{<<"te">>, Value}|_], request) when Value =/= <<"trailers">> -> + {error, 'The te header with a value other than "trailers" is not allowed. (RFC7540 8.1.2.2)'}; +regular_headers([{<<"te">>, _}|_], Type) when Type =/= request -> + {error, 'The te header is only allowed in request headers. (RFC7540 8.1.2.2)'}; +regular_headers([{Name, _}|Tail], Type) -> + Pattern = [ + <<$A>>, <<$B>>, <<$C>>, <<$D>>, <<$E>>, <<$F>>, <<$G>>, <<$H>>, <<$I>>, + <<$J>>, <<$K>>, <<$L>>, <<$M>>, <<$N>>, <<$O>>, <<$P>>, <<$Q>>, <<$R>>, + <<$S>>, <<$T>>, <<$U>>, <<$V>>, <<$W>>, <<$X>>, <<$Y>>, <<$Z>> + ], + case binary:match(Name, Pattern) of + nomatch -> regular_headers(Tail, Type); + _ -> {error, 'Header names must be lowercase. (RFC7540 8.1.2)'} + end; +regular_headers([], _) -> + ok. + +request_expected_size(Frame=#headers{fin=IsFin}, State, Type, Stream, PseudoHeaders, Headers) -> + case [CL || {<<"content-length">>, CL} <- Headers] of + [] when IsFin =:= fin -> + headers_frame(Frame, State, Type, Stream, PseudoHeaders, Headers, 0); + [] -> + headers_frame(Frame, State, Type, Stream, PseudoHeaders, Headers, undefined); + [<<"0">>] when IsFin =:= fin -> + headers_frame(Frame, State, Type, Stream, PseudoHeaders, Headers, 0); + [_] when IsFin =:= fin -> + headers_malformed(Frame, State, + 'HEADERS frame with the END_STREAM flag contains a non-zero content-length. (RFC7540 8.1.2.6)'); + [BinLen] -> + headers_parse_expected_size(Frame, State, Type, Stream, + PseudoHeaders, Headers, BinLen); + _ -> + headers_malformed(Frame, State, + 'Multiple content-length headers were received. (RFC7230 3.3.2)') + end. + +response_expected_size(Frame=#headers{id=StreamID, fin=IsFin}, State, Type, + Stream=#stream{method=Method}, PseudoHeaders=#{status := Status}, Headers) -> + case [CL || {<<"content-length">>, CL} <- Headers] of + [] when IsFin =:= fin -> + headers_frame(Frame, State, Type, Stream, PseudoHeaders, Headers, 0); + [] -> + headers_frame(Frame, State, Type, Stream, PseudoHeaders, Headers, undefined); + [_] when Status >= 100, Status =< 199 -> + stream_reset(StreamID, State, protocol_error, + 'Content-length header received in a 1xx response. (RFC7230 3.3.2)'); + [_] when Status =:= 204 -> + stream_reset(StreamID, State, protocol_error, + 'Content-length header received in a 204 response. (RFC7230 3.3.2)'); + [_] when Status >= 200, Status =< 299, Method =:= <<"CONNECT">> -> + stream_reset(StreamID, State, protocol_error, + 'Content-length header received in a 2xx response to a CONNECT request. (RFC7230 3.3.2).'); + %% Responses to HEAD requests, and 304 responses may contain + %% a content-length header that must be ignored. (RFC7230 3.3.2) + [_] when Method =:= <<"HEAD">> -> + headers_frame(Frame, State, Type, Stream, PseudoHeaders, Headers, 0); + [_] when Status =:= 304 -> + headers_frame(Frame, State, Type, Stream, PseudoHeaders, Headers, 0); + [<<"0">>] when IsFin =:= fin -> + headers_frame(Frame, State, Type, Stream, PseudoHeaders, Headers, 0); + [_] when IsFin =:= fin -> + stream_reset(StreamID, State, protocol_error, + 'HEADERS frame with the END_STREAM flag contains a non-zero content-length. (RFC7540 8.1.2.6)'); + [BinLen] -> + headers_parse_expected_size(Frame, State, Type, Stream, + PseudoHeaders, Headers, BinLen); + _ -> + stream_reset(StreamID, State, protocol_error, + 'Multiple content-length headers were received. (RFC7230 3.3.2)') + end. + +headers_parse_expected_size(Frame=#headers{id=StreamID}, + State, Type, Stream, PseudoHeaders, Headers, BinLen) -> + try cow_http_hd:parse_content_length(BinLen) of + Len -> + headers_frame(Frame, State, Type, Stream, PseudoHeaders, Headers, Len) + catch + _:_ -> + HumanReadable = 'The content-length header is invalid. (RFC7230 3.3.2)', + case Type of + request -> headers_malformed(Frame, State, HumanReadable); + response -> stream_reset(StreamID, State, protocol_error, HumanReadable) + end + end. + +headers_frame(#headers{id=StreamID, fin=IsFin}, State0=#http2_machine{ + local_settings=#{initial_window_size := RemoteWindow}, + remote_settings=#{initial_window_size := LocalWindow}}, + Type, Stream0, PseudoHeaders, Headers, Len) -> + Stream = case Stream0 of + undefined -> + TE = case lists:keyfind(<<"te">>, 1, Headers) of + {_, TE0} -> TE0; + false -> undefined + end, + #stream{id=StreamID, method=maps:get(method, PseudoHeaders), + remote=IsFin, remote_expected_size=Len, + local_window=LocalWindow, remote_window=RemoteWindow, te=TE}; + _ -> + case {Type, PseudoHeaders} of + {response, #{status := Status}} when Status >= 100, Status =< 199 -> + Stream0; + _ -> + Stream0#stream{remote=IsFin, remote_expected_size=Len} + end + end, + State = stream_store(Stream, State0#http2_machine{remote_streamid=StreamID}), + {ok, {headers, StreamID, IsFin, Headers, PseudoHeaders, Len}, State}. + +trailers_frame(#headers{id=StreamID}, State0, Stream0, Headers) -> + Stream = Stream0#stream{remote=fin}, + State = stream_store(Stream, State0), + case is_body_size_valid(Stream) of + true -> + {ok, {trailers, StreamID, Headers}, State}; + false -> + stream_reset(StreamID, State, protocol_error, + 'The total size of DATA frames is different than the content-length. (RFC7540 8.1.2.6)') + end. + +%% PRIORITY frame. +%% +%% @todo Handle PRIORITY frames. + +priority_frame(_Frame, State) -> + {ok, State}. + +%% RST_STREAM frame. + +rst_stream_frame({rst_stream, StreamID, _}, State=#http2_machine{mode=Mode, + local_streamid=LocalStreamID, remote_streamid=RemoteStreamID}) + when (?IS_LOCAL(Mode, StreamID) andalso (StreamID >= LocalStreamID)) + orelse ((not ?IS_LOCAL(Mode, StreamID)) andalso (StreamID > RemoteStreamID)) -> + {error, {connection_error, protocol_error, + 'RST_STREAM frame received on a stream in idle state. (RFC7540 5.1)'}, + State}; +rst_stream_frame({rst_stream, StreamID, Reason}, State=#http2_machine{ + streams=Streams0, rst_lingering_streams=Lingering0}) -> + Streams = lists:keydelete(StreamID, #stream.id, Streams0), + %% We only keep up to 10 streams in this state. @todo Make it configurable? + Lingering = [StreamID|lists:sublist(Lingering0, 10 - 1)], + {ok, {rst_stream, StreamID, Reason}, + State#http2_machine{streams=Streams, rst_lingering_streams=Lingering}}. + +%% SETTINGS frame. + +settings_frame({settings, Settings}, State0=#http2_machine{ + opts=Opts, remote_settings=Settings0}) -> + State1 = State0#http2_machine{remote_settings=maps:merge(Settings0, Settings)}, + State2 = maps:fold(fun + (header_table_size, NewSize, State=#http2_machine{encode_state=EncodeState0}) -> + MaxSize = maps:get(max_encode_table_size, Opts, 4096), + EncodeState = cow_hpack:set_max_size(min(NewSize, MaxSize), EncodeState0), + State#http2_machine{encode_state=EncodeState}; + (initial_window_size, NewWindowSize, State) -> + OldWindowSize = maps:get(initial_window_size, Settings0, 65535), + streams_update_local_window(State, NewWindowSize - OldWindowSize); + (_, _, State) -> + State + end, State1, Settings), + case Settings of + #{initial_window_size := _} -> send_data(State2); + _ -> {ok, State2} + end; +%% We expect to receive a SETTINGS frame as part of the preface. +settings_frame(_F, State) -> + {error, {connection_error, protocol_error, + 'The preface sequence must be followed by a SETTINGS frame. (RFC7540 3.5)'}, + State}. + +%% When SETTINGS_INITIAL_WINDOW_SIZE changes we need to update +%% the local stream windows for all active streams and perhaps +%% resume sending data. +streams_update_local_window(State=#http2_machine{streams=Streams0}, Increment) -> + Streams = [ + S#stream{local_window=StreamWindow + Increment} + || S=#stream{local_window=StreamWindow} <- Streams0], + State#http2_machine{streams=Streams}. + +%% Ack for a previously sent SETTINGS frame. + +settings_ack_frame(State0=#http2_machine{local_settings=Local0, next_settings=NextSettings}) -> + Local = maps:merge(Local0, NextSettings), + State1 = State0#http2_machine{local_settings=Local, next_settings=#{}}, + {ok, maps:fold(fun + (header_table_size, MaxSize, State=#http2_machine{decode_state=DecodeState0}) -> + DecodeState = cow_hpack:set_max_size(MaxSize, DecodeState0), + State#http2_machine{decode_state=DecodeState}; + (initial_window_size, NewWindowSize, State) -> + OldWindowSize = maps:get(initial_window_size, Local0, 65535), + streams_update_remote_window(State, NewWindowSize - OldWindowSize); + (_, _, State) -> + State + end, State1, NextSettings)}. + +%% When we receive an ack to a SETTINGS frame we sent we need to update +%% the remote stream windows for all active streams. +streams_update_remote_window(State=#http2_machine{streams=Streams0}, Increment) -> + Streams = [ + S#stream{remote_window=StreamWindow + Increment} + || S=#stream{remote_window=StreamWindow} <- Streams0], + State#http2_machine{streams=Streams}. + +%% PUSH_PROMISE frame. + +%% Convenience record to manipulate the tuple. +%% The order of the fields matter. +-record(push_promise, { + id :: cow_http2:streamid(), + head :: cow_http2:head_fin(), + promised_id :: cow_http2:streamid(), + data :: binary() +}). + +push_promise_frame(_, State=#http2_machine{mode=server}) -> + {error, {connection_error, protocol_error, + 'PUSH_PROMISE frames MUST only be sent on a peer-initiated stream. (RFC7540 6.6)'}, + State}; +push_promise_frame(_, State=#http2_machine{local_settings=#{enable_push := false}}) -> + {error, {connection_error, protocol_error, + 'PUSH_PROMISE frame received despite SETTINGS_ENABLE_PUSH set to 0. (RFC7540 6.6)'}, + State}; +push_promise_frame(#push_promise{promised_id=PromisedStreamID}, + State=#http2_machine{remote_streamid=RemoteStreamID}) + when PromisedStreamID =< RemoteStreamID -> + {error, {connection_error, protocol_error, + 'PUSH_PROMISE frame received for a promised stream in closed or half-closed state. (RFC7540 5.1, RFC7540 6.6)'}, + State}; +push_promise_frame(#push_promise{id=StreamID}, State) + when not ?IS_CLIENT_LOCAL(StreamID) -> + {error, {connection_error, protocol_error, + 'PUSH_PROMISE frame received on a server-initiated stream. (RFC7540 6.6)'}, + State}; +push_promise_frame(Frame=#push_promise{id=StreamID, head=IsHeadFin, + promised_id=PromisedStreamID, data=HeaderData}, State) -> + case stream_get(StreamID, State) of + #stream{remote=idle} -> + case IsHeadFin of + head_fin -> + %% @todo Gotta make sure the headers_* functions + %% will work properly for PUSH_PROMISE requests. + headers_decode(#headers{id=PromisedStreamID, + fin=fin, head=IsHeadFin, data=HeaderData}, + State, push_promise, undefined); + head_nofin -> + {ok, State#http2_machine{state={continuation, push_promise, Frame}}} + end; + _ -> +%% @todo Check if the stream is lingering. If it is, decode the frame +%% and do what? That's the big question and why it's not implemented yet. +% However, an endpoint that +% has sent RST_STREAM on the associated stream MUST handle PUSH_PROMISE +% frames that might have been created before the RST_STREAM frame is +% received and processed. (RFC7540 6.6) + {error, {connection_error, stream_closed, + 'PUSH_PROMISE frame received on a stream in closed or half-closed state. (RFC7540 5.1, RFC7540 6.6)'}, + State} + end. + +%% PING frame. + +ping_frame({ping, _}, State) -> + {ok, State}. + +%% Ack for a previously sent PING frame. +%% +%% @todo Might want to check contents but probably a waste of time. + +ping_ack_frame({ping_ack, _}, State) -> + {ok, State}. + +%% GOAWAY frame. + +goaway_frame(Frame={goaway, _, _, _}, State) -> + {ok, Frame, State}. + +%% WINDOW_UPDATE frame. + +%% Connection-wide WINDOW_UPDATE frame. +window_update_frame({window_update, Increment}, State=#http2_machine{local_window=ConnWindow}) + when ConnWindow + Increment > 16#7fffffff -> + {error, {connection_error, flow_control_error, + 'The flow control window must not be greater than 2^31-1. (RFC7540 6.9.1)'}, + State}; +window_update_frame({window_update, Increment}, State=#http2_machine{local_window=ConnWindow}) -> + send_data(State#http2_machine{local_window=ConnWindow + Increment}); +%% Stream-specific WINDOW_UPDATE frame. +window_update_frame({window_update, StreamID, _}, + State=#http2_machine{remote_streamid=RemoteStreamID}) + when StreamID > RemoteStreamID -> + {error, {connection_error, protocol_error, + 'WINDOW_UPDATE frame received on a stream in idle state. (RFC7540 5.1)'}, + State}; +window_update_frame({window_update, StreamID, Increment}, + State0=#http2_machine{rst_lingering_streams=RstLingering}) -> + case stream_get(StreamID, State0) of + #stream{local_window=StreamWindow} when StreamWindow + Increment > 16#7fffffff -> + stream_reset(StreamID, State0, flow_control_error, + 'The flow control window must not be greater than 2^31-1. (RFC7540 6.9.1)'); + Stream0 = #stream{local_window=StreamWindow} -> + send_data(Stream0#stream{local_window=StreamWindow + Increment}, State0); + undefined -> + %% WINDOW_UPDATE frames may be received for a short period of time + %% after a stream is closed. They must be ignored. + case lists:member(StreamID, RstLingering) of + false -> {ok, State0}; + true -> stream_reset(StreamID, State0, stream_closed, + 'WINDOW_UPDATE frame received after the stream was reset. (RFC7540 5.1)') + end + end. + +%% CONTINUATION frame. + +%% Convenience record to manipulate the tuple. +%% The order of the fields matter. +-record(continuation, { + id :: cow_http2:streamid(), + head :: cow_http2:head_fin(), + data :: binary() +}). + +unexpected_continuation_frame(#continuation{}, State) -> + {error, {connection_error, protocol_error, + 'CONTINUATION frames MUST be preceded by a HEADERS or PUSH_PROMISE frame. (RFC7540 6.10)'}, + State}. + +continuation_frame(#continuation{id=StreamID, head=head_fin, data=HeaderFragment1}, + State=#http2_machine{state={continuation, Type, + Frame=#headers{id=StreamID, data=HeaderFragment0}}}) -> + HeaderData = <>, + headers_decode(Frame#headers{head=head_fin, data=HeaderData}, + State#http2_machine{state=normal}, Type, stream_get(StreamID, State)); +continuation_frame(#continuation{id=StreamID, head=head_fin, data=HeaderFragment1}, + State=#http2_machine{state={continuation, Type, #push_promise{ + id=StreamID, promised_id=PromisedStreamID, data=HeaderFragment0}}}) -> + HeaderData = <>, + headers_decode(#headers{id=PromisedStreamID, fin=fin, head=head_fin, data=HeaderData}, + State#http2_machine{state=normal}, Type, undefined); +continuation_frame(#continuation{id=StreamID, data=HeaderFragment1}, + State=#http2_machine{state={continuation, Type, ContinuedFrame0}}) + when element(2, ContinuedFrame0) =:= StreamID -> + ContinuedFrame = case ContinuedFrame0 of + #headers{data=HeaderFragment0} -> + HeaderData = <>, + ContinuedFrame0#headers{data=HeaderData}; + #push_promise{data=HeaderFragment0} -> + HeaderData = <>, + ContinuedFrame0#push_promise{data=HeaderData} + end, + {ok, State#http2_machine{state={continuation, Type, ContinuedFrame}}}; +continuation_frame(_F, State) -> + {error, {connection_error, protocol_error, + 'An invalid frame was received in the middle of a header block. (RFC7540 6.2)'}, + State}. + +%% Ignored frames. + +-spec ignored_frame(State) + -> {ok, State} + | {error, {connection_error, protocol_error, atom()}, State} + when State::http2_machine(). +ignored_frame(State=#http2_machine{state={continuation, _, _}}) -> + {error, {connection_error, protocol_error, + 'An invalid frame was received in the middle of a header block. (RFC7540 6.2)'}, + State}; +%% @todo It might be useful to error out when we receive +%% too many unknown frames. (RFC7540 10.5) +ignored_frame(State) -> + {ok, State}. + +%% Functions for sending a message header or body. Note that +%% this module does not send data directly, instead it returns +%% a value that can then be used to send the frames. + +-spec prepare_headers(cow_http2:streamid(), State, idle | cow_http2:fin(), + pseudo_headers(), cow_http:headers()) + -> {ok, cow_http2:fin(), iodata(), State} when State::http2_machine(). +%% @todo Should handle the request case too. +prepare_headers(StreamID, State=#http2_machine{encode_state=EncodeState0}, + IsFin0, PseudoHeaders, Headers0) -> + Stream = #stream{method=Method, local=idle} = stream_get(StreamID, State), + IsFin = case {IsFin0, Method} of + {idle, _} -> nofin; + {_, <<"HEAD">>} -> fin; + _ -> IsFin0 + end, + Headers = merge_pseudo_headers(PseudoHeaders, Headers0), + {HeaderBlock, EncodeState} = cow_hpack:encode(Headers, EncodeState0), + {ok, IsFin, HeaderBlock, stream_store(Stream#stream{local=IsFin0}, + State#http2_machine{encode_state=EncodeState})}. + +-spec prepare_push_promise(cow_http2:streamid(), State, pseudo_headers(), cow_http:headers()) + -> {ok, cow_http2:streamid(), iodata(), State} + | {error, no_push} when State::http2_machine(). +prepare_push_promise(_, #http2_machine{remote_settings=#{enable_push := false}}, _, _) -> + {error, no_push}; +prepare_push_promise(StreamID, State=#http2_machine{encode_state=EncodeState0, + local_settings=#{initial_window_size := RemoteWindow}, + remote_settings=#{initial_window_size := LocalWindow}, + local_streamid=LocalStreamID}, PseudoHeaders, Headers0) -> + #stream{local=idle} = stream_get(StreamID, State), + TE = case lists:keyfind(<<"te">>, 1, Headers0) of + {_, TE0} -> TE0; + false -> undefined + end, + Headers = merge_pseudo_headers(PseudoHeaders, Headers0), + {HeaderBlock, EncodeState} = cow_hpack:encode(Headers, EncodeState0), + {ok, LocalStreamID, HeaderBlock, stream_store( + #stream{id=LocalStreamID, method=maps:get(method, PseudoHeaders), + remote=fin, remote_expected_size=0, + local_window=LocalWindow, remote_window=RemoteWindow, te=TE}, + State#http2_machine{encode_state=EncodeState, local_streamid=LocalStreamID + 2})}. + +merge_pseudo_headers(PseudoHeaders, Headers0) -> + lists:foldl(fun + ({status, Status}, Acc) when is_integer(Status) -> + [{<<":status">>, integer_to_binary(Status)}|Acc]; + ({Name, Value}, Acc) -> + [{iolist_to_binary([$:, atom_to_binary(Name, latin1)]), Value}|Acc] + end, Headers0, maps:to_list(PseudoHeaders)). + +-spec prepare_trailers(cow_http2:streamid(), State, cow_http:headers()) + -> {ok, iodata(), State} when State::http2_machine(). +prepare_trailers(StreamID, State=#http2_machine{encode_state=EncodeState0}, Trailers) -> + Stream = #stream{local=nofin} = stream_get(StreamID, State), + {HeaderBlock, EncodeState} = cow_hpack:encode(Trailers, EncodeState0), + {ok, HeaderBlock, stream_store(Stream#stream{local=fin}, + State#http2_machine{encode_state=EncodeState})}. + +-spec send_or_queue_data(cow_http2:streamid(), State, cow_http2:fin(), DataOrFileOrTrailers) + -> {ok, State} + | {send, [{cow_http2:streamid(), cow_http2:fin(), [DataOrFileOrTrailers]}], State} + when State::http2_machine(), DataOrFileOrTrailers:: + {data, iodata()} | #sendfile{} | {trailers, cow_http:headers()}. +send_or_queue_data(StreamID, State0, IsFin0, DataOrFileOrTrailers0) -> + %% @todo Probably just ignore if the method was HEAD. + Stream0 = #stream{local=nofin, te=TE0} = stream_get(StreamID, State0), + DataOrFileOrTrailers = case DataOrFileOrTrailers0 of + {trailers, _} -> + %% We only accept TE headers containing exactly "trailers" (RFC7540 8.1.2.1). + TE = try cow_http_hd:parse_te(TE0) of + {trailers, []} -> trailers; + _ -> no_trailers + catch _:_ -> + %% If we can't parse the TE header, assume we can't send trailers. + no_trailers + end, + case TE of + trailers -> + DataOrFileOrTrailers0; + no_trailers -> + {data, <<>>} + end; + _ -> + DataOrFileOrTrailers0 + end, + case send_or_queue_data(Stream0, State0, [], IsFin0, DataOrFileOrTrailers, in) of + {ok, Stream, State, []} -> + {ok, stream_store(Stream, State)}; + {ok, Stream=#stream{local=IsFin}, State, SendData} -> + {send, [{StreamID, IsFin, lists:reverse(SendData)}], stream_store(Stream, State)} + end. + +%% Internal data sending/queuing functions. + +%% @todo Should we ever want to implement the PRIORITY mechanism, +%% this would be the place to do it. Right now, we just go over +%% all streams and send what we can until either everything is +%% sent or we run out of space in the window. +send_data(State0=#http2_machine{streams=Streams0}) -> + case send_data_for_all_streams(Streams0, State0, [], []) of + {ok, Streams, State, []} -> + {ok, State#http2_machine{streams=Streams}}; + {ok, Streams, State, Send} -> + {send, Send, State#http2_machine{streams=Streams}} + end. + +send_data_for_all_streams([], State, Acc, Send) -> + {ok, lists:reverse(Acc), State, Send}; +%% While technically we should never get < 0 here, let's be on the safe side. +send_data_for_all_streams(Tail, State=#http2_machine{local_window=ConnWindow}, Acc, Send) + when ConnWindow =< 0 -> + {ok, lists:reverse(Acc, Tail), State, Send}; +%% We rely on send_data_for_one_stream/3 to do all the necessary checks about the stream. +send_data_for_all_streams([Stream0|Tail], State0, Acc, Send) -> + case send_data_for_one_stream(Stream0, State0, []) of + {ok, Stream, State, []} -> + send_data_for_all_streams(Tail, State, [Stream|Acc], Send); + %% We need to remove the stream here because we do not use stream_store/2. + {ok, #stream{id=StreamID, local=fin, remote=fin}, State, SendData} -> + send_data_for_all_streams(Tail, State, Acc, + [{StreamID, fin, SendData}|Send]); + {ok, Stream=#stream{id=StreamID, local=IsFin}, State, SendData} -> + send_data_for_all_streams(Tail, State, [Stream|Acc], + [{StreamID, IsFin, SendData}|Send]) + end. + +send_data(Stream0, State0) -> + case send_data_for_one_stream(Stream0, State0, []) of + {ok, Stream, State, []} -> + {ok, stream_store(Stream, State)}; + {ok, Stream=#stream{id=StreamID, local=IsFin}, State, SendData} -> + {send, [{StreamID, IsFin, SendData}], stream_store(Stream, State)} + end. + +send_data_for_one_stream(Stream=#stream{local=nofin, local_buffer_size=0, + local_trailers=Trailers}, State, SendAcc) when Trailers =/= undefined -> + {ok, Stream, State, lists:reverse([{trailers, Trailers}|SendAcc])}; +send_data_for_one_stream(Stream=#stream{local=IsFin, local_window=StreamWindow, + local_buffer_size=BufferSize}, State=#http2_machine{local_window=ConnWindow}, SendAcc) + when ConnWindow =< 0; IsFin =:= fin; StreamWindow =< 0; BufferSize =:= 0 -> + {ok, Stream, State, lists:reverse(SendAcc)}; +send_data_for_one_stream(Stream0=#stream{local_buffer=Q0, local_buffer_size=BufferSize}, + State0, SendAcc0) -> + %% We know there is an item in the queue. + {{value, {IsFin, DataSize, Data}}, Q} = queue:out(Q0), + Stream1 = Stream0#stream{local_buffer=Q, local_buffer_size=BufferSize - DataSize}, + {ok, Stream, State, SendAcc} + = send_or_queue_data(Stream1, State0, SendAcc0, IsFin, Data, in_r), + send_data_for_one_stream(Stream, State, SendAcc). + +%% We can send trailers immediately if the queue is empty, otherwise we queue. +%% We always send trailer frames even if the window is empty. +send_or_queue_data(Stream=#stream{local_buffer_size=0}, + State, SendAcc, fin, {trailers, Trailers}, _) -> + {ok, Stream, State, [{trailers, Trailers}|SendAcc]}; +send_or_queue_data(Stream, State, SendAcc, fin, {trailers, Trailers}, _) -> + {ok, Stream#stream{local_trailers=Trailers}, State, SendAcc}; +%% Send data immediately if we can, buffer otherwise. +send_or_queue_data(Stream=#stream{local_window=StreamWindow}, + State=#http2_machine{local_window=ConnWindow}, + SendAcc, IsFin, Data, In) + when ConnWindow =< 0; StreamWindow =< 0 -> + {ok, queue_data(Stream, IsFin, Data, In), State, SendAcc}; +send_or_queue_data(Stream=#stream{local_window=StreamWindow}, + State=#http2_machine{opts=Opts, remote_settings=RemoteSettings, + local_window=ConnWindow}, SendAcc, IsFin, Data, In) -> + RemoteMaxFrameSize = maps:get(max_frame_size, RemoteSettings, 16384), + ConfiguredMaxFrameSize = maps:get(max_frame_size_sent, Opts, infinity), + MaxSendSize = min( + min(ConnWindow, StreamWindow), + min(RemoteMaxFrameSize, ConfiguredMaxFrameSize) + ), + case Data of + File = #sendfile{bytes=Bytes} when Bytes =< MaxSendSize -> + {ok, Stream#stream{local=IsFin, local_window=StreamWindow - Bytes}, + State#http2_machine{local_window=ConnWindow - Bytes}, + [File|SendAcc]}; + File = #sendfile{offset=Offset, bytes=Bytes} -> + send_or_queue_data(Stream#stream{local_window=StreamWindow - MaxSendSize}, + State#http2_machine{local_window=ConnWindow - MaxSendSize}, + [File#sendfile{bytes=MaxSendSize}|SendAcc], IsFin, + File#sendfile{offset=Offset + MaxSendSize, bytes=Bytes - MaxSendSize}, In); + {data, Iolist0} -> + IolistSize = iolist_size(Iolist0), + if + IolistSize =< MaxSendSize -> + {ok, Stream#stream{local=IsFin, local_window=StreamWindow - IolistSize}, + State#http2_machine{local_window=ConnWindow - IolistSize}, + [{data, Iolist0}|SendAcc]}; + true -> + {Iolist, More} = cow_iolists:split(MaxSendSize, Iolist0), + send_or_queue_data(Stream#stream{local_window=StreamWindow - MaxSendSize}, + State#http2_machine{local_window=ConnWindow - MaxSendSize}, + [{data, Iolist}|SendAcc], IsFin, {data, More}, In) + end + end. + +queue_data(Stream=#stream{local_buffer=Q0, local_buffer_size=Size0}, IsFin, Data, In) -> + DataSize = case Data of + {sendfile, _, Bytes, _} -> Bytes; + {data, Iolist} -> iolist_size(Iolist) + end, + Q = queue:In({IsFin, DataSize, Data}, Q0), + Stream#stream{local_buffer=Q, local_buffer_size=Size0 + DataSize}. + +%% Public interface to update the flow control window. + +-spec update_window(0..16#7fffffff, State) + -> State when State::http2_machine(). +update_window(Size, State=#http2_machine{remote_window=RemoteWindow}) -> + State#http2_machine{remote_window=RemoteWindow + Size}. + +-spec update_window(cow_http2:streamid(), 0..16#7fffffff, State) + -> State when State::http2_machine(). +update_window(StreamID, Size, State) -> + Stream = #stream{remote_window=RemoteWindow} = stream_get(StreamID, State), + stream_store(Stream#stream{remote_window=RemoteWindow + Size}, State). + +%% Public interface to reset streams. + +-spec reset_stream(cow_http2:streamid(), State) + -> {ok, State} | {error, not_found} when State::http2_machine(). +reset_stream(StreamID, State=#http2_machine{streams=Streams0}) -> + case lists:keytake(StreamID, #stream.id, Streams0) of + {value, _, Streams} -> + {ok, stream_linger(StreamID, State#http2_machine{streams=Streams})}; + false -> + {error, not_found} + end. + +%% Retrieve a setting value, or its default value if not set. + +-spec get_local_setting(atom(), http2_machine()) -> atom() | integer(). +get_local_setting(Key, #http2_machine{local_settings=Settings}) -> + maps:get(Key, Settings, default_setting_value(Key)). + +default_setting_value(header_table_size) -> 4096; +default_setting_value(enable_push) -> true; +default_setting_value(max_concurrent_streams) -> infinity; +default_setting_value(initial_window_size) -> 65535; +default_setting_value(max_frame_size) -> 16384; +default_setting_value(max_header_list_size) -> infinity; +default_setting_value(enable_connect_protocol) -> false. + +%% Function to obtain the last known streamid received +%% for the purposes of sending a GOAWAY frame and closing the connection. + +-spec get_last_streamid(http2_machine()) -> cow_http2:streamid(). +get_last_streamid(#http2_machine{remote_streamid=RemoteStreamID}) -> + RemoteStreamID. + +%% Retrieve the local state for a stream, including the state in the queue. + +-spec get_stream_local_state(cow_http2:streamid(), http2_machine()) + -> {ok, idle | cow_http2:fin(), empty | nofin | fin} | {error, not_found | closed}. +get_stream_local_state(StreamID, State=#http2_machine{mode=Mode, + local_streamid=LocalStreamID, remote_streamid=RemoteStreamID}) -> + case stream_get(StreamID, State) of + #stream{local=IsFin, local_buffer=Q, local_trailers=undefined} -> + IsQueueFin = case queue:peek_r(Q) of + empty -> empty; + {value, {IsQueueFin0, _, _}} -> IsQueueFin0 + end, + {ok, IsFin, IsQueueFin}; + %% Trailers are queued so the local state is fin after the queue is drained. + #stream{local=IsFin} -> + {ok, IsFin, fin}; + undefined when (?IS_LOCAL(Mode, StreamID) andalso (StreamID < LocalStreamID)) + orelse ((not ?IS_LOCAL(Mode, StreamID)) andalso (StreamID =< RemoteStreamID)) -> + {error, closed}; + undefined -> + {error, not_found} + end. + +%% Stream-related functions. + +stream_get(StreamID, #http2_machine{streams=Streams}) -> + case lists:keyfind(StreamID, #stream.id, Streams) of + false -> undefined; + Stream -> Stream + end. + +stream_store(#stream{id=StreamID, local=fin, remote=fin}, + State=#http2_machine{streams=Streams0}) -> + Streams = lists:keydelete(StreamID, #stream.id, Streams0), + State#http2_machine{streams=Streams}; +stream_store(Stream=#stream{id=StreamID}, + State=#http2_machine{streams=Streams0}) -> + Streams = lists:keystore(StreamID, #stream.id, Streams0, Stream), + State#http2_machine{streams=Streams}. + +%% @todo Don't send an RST_STREAM if one was already sent. +stream_reset(StreamID, State, Reason, HumanReadable) -> + {error, {stream_error, StreamID, Reason, HumanReadable}, + stream_linger(StreamID, State)}. + +stream_linger(StreamID, State=#http2_machine{lingering_streams=Lingering0}) -> + %% We only keep up to 100 streams in this state. @todo Make it configurable? + Lingering = [StreamID|lists:sublist(Lingering0, 100 - 1)], + State#http2_machine{lingering_streams=Lingering}. -- cgit v1.2.3