diff options
| author | Loïc Hoguin <[email protected]> | 2018-10-26 10:12:25 +0200 | 
|---|---|---|
| committer | Loïc Hoguin <[email protected]> | 2018-10-26 10:12:25 +0200 | 
| commit | b461b119e78e4e09bb28b186b09da7ed4a86a0dd (patch) | |
| tree | 7e3aff85603ea11450b9df2ba57df61927889ef0 /src | |
| parent | 078f855672fe8ad65d2b25b0a4843c0f5637f32c (diff) | |
| download | cowlib-b461b119e78e4e09bb28b186b09da7ed4a86a0dd.tar.gz cowlib-b461b119e78e4e09bb28b186b09da7ed4a86a0dd.tar.bz2 cowlib-b461b119e78e4e09bb28b186b09da7ed4a86a0dd.zip | |
Introduce cow_http2_machine, an HTTP/2 state machine
This is the result of a merge of the Cowboy and Gun HTTP/2 codes.
It can probably do a little more but it's at a point where Cowboy
works fine when using it so additional work will be done in other
commits.
The Gun code has not been switched to this module yet. I expect
for example the PUSH_PROMISE code to fail at this point. This will
be the next step.
Diffstat (limited to 'src')
| -rw-r--r-- | src/cow_http2.erl | 6 | ||||
| -rw-r--r-- | src/cow_http2_machine.erl | 1295 | 
2 files changed, 1301 insertions, 0 deletions
| diff --git a/src/cow_http2.erl b/src/cow_http2.erl index e6f7738..ec4aab9 100644 --- a/src/cow_http2.erl +++ b/src/cow_http2.erl @@ -36,8 +36,14 @@  -export([window_update/2]).  -type streamid() :: pos_integer(). +-export_type([streamid/0]). +  -type fin() :: fin | nofin. +-export_type([fin/0]). +  -type head_fin() :: head_fin | head_nofin. +-export_type([head_fin/0]). +  -type exclusive() :: exclusive | shared.  -type weight() :: 1..256.  -type settings() :: map(). diff --git a/src/cow_http2_machine.erl b/src/cow_http2_machine.erl new file mode 100644 index 0000000..b1cf1da --- /dev/null +++ b/src/cow_http2_machine.erl @@ -0,0 +1,1295 @@ +%% Copyright (c) 2018, Loïc Hoguin <[email protected]> +%% +%% Permission to use, copy, modify, and/or distribute this software for any +%% purpose with or without fee is hereby granted, provided that the above +%% copyright notice and this permission notice appear in all copies. +%% +%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES +%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF +%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR +%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES +%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN +%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF +%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + +-module(cow_http2_machine). + +-export([init/2]). +-export([init_upgrade_stream/2]). +-export([frame/2]). +-export([ignored_frame/1]). +-export([prepare_headers/5]). +-export([prepare_push_promise/4]). +-export([prepare_trailers/3]). +-export([send_or_queue_data/4]). +-export([update_window/2]). +-export([update_window/3]). +-export([reset_stream/2]). +-export([get_local_setting/2]). +-export([get_last_streamid/1]). +-export([get_stream_local_state/2]). + +-type opts() :: #{ +	enable_connect_protocol => boolean(), +	initial_connection_window_size => 65535..16#7fffffff, +	initial_stream_window_size => 0..16#7fffffff, +	max_concurrent_streams => non_neg_integer() | infinity, +	max_decode_table_size => non_neg_integer(), +	max_encode_table_size => non_neg_integer(), +	max_frame_size_received => 16384..16777215, +	max_frame_size_sent => 16384..16777215 | infinity +}. +-export_type([opts/0]). + +%% The order of the fields is significant. +-record(sendfile, { +	offset :: non_neg_integer(), +	bytes :: pos_integer(), +	path :: file:name_all() +}). + +-record(stream, { +	id = undefined :: cow_http2:streamid(), + +	%% Request method. +	method = undefined :: binary(), + +	%% Whether we finished sending data. +	local = idle :: idle | cow_http2:fin(), + +	%% Local flow control window (how much we can send). +	local_window :: integer(), + +	%% Buffered data waiting for the flow control window to increase. +	local_buffer = queue:new() :: +		queue:queue({cow_http2:fin(), non_neg_integer(), {data, iodata()} | #sendfile{}}), +	local_buffer_size = 0 :: non_neg_integer(), +	local_trailers = undefined :: undefined | cow_http:headers(), + +	%% Whether we finished receiving data. +	remote = idle :: idle | cow_http2:fin(), + +	%% Remote flow control window (how much we accept to receive). +	remote_window :: integer(), + +	%% Size expected and read from the request body. +	remote_expected_size = undefined :: undefined | non_neg_integer(), +	remote_read_size = 0 :: non_neg_integer(), + +	%% Unparsed te header. Used to know if we can send trailers. +	te :: undefined | binary() +}). + +-type stream() :: #stream{}. + +-type continued_frame() :: +	{headers, cow_http2:streamid(), cow_http2:fin(), cow_http2:head_fin(), binary()} | +	{push_promise, cow_http2:streamid(), cow_http2:head_fin(), cow_http2:streamid(), binary()}. + +-record(http2_machine, { +	%% Whether the HTTP/2 endpoint is a client or a server. +	mode :: client | server, + +	%% HTTP/2 SETTINGS customization. +	opts = #{} :: opts(), + +	%% Connection-wide frame processing state. +	state = settings :: settings | normal +		| {continuation, request | response | trailers | push_promise, continued_frame()}, + +	%% Settings are separate for each endpoint. In addition, settings +	%% must be acknowledged before they can be expected to be applied. +	local_settings = #{ +%		header_table_size => 4096, +%		enable_push => true, +%		max_concurrent_streams => infinity, +		initial_window_size => 65535 +%		max_frame_size => 16384 +%		max_header_list_size => infinity +	} :: map(), +	next_settings = undefined :: undefined | map(), +	remote_settings = #{ +		initial_window_size => 65535 +	} :: map(), + +	%% Connection-wide flow control window. +	local_window = 65535 :: integer(), %% How much we can send. +	remote_window = 65535 :: integer(), %% How much we accept to receive. + +	%% Stream identifiers. +	local_streamid :: pos_integer(), %% The next streamid to be used. +	remote_streamid = 0 :: non_neg_integer(), %% The last streamid received. + +	%% Currently active HTTP/2 streams. Streams may be initiated either +	%% by the client or by the server through PUSH_PROMISE frames. +	streams = [] :: [stream()], + +	%% HTTP/2 streams that have been reset recently by the server. +	%% We are expected to keep receiving additional frames after +	%% sending an RST_STREAM. +	lingering_streams = [] :: [cow_http2:streamid()], + +	%% HTTP/2 streams that have been reset recently by the client. +	%% We keep a few of these around in order to reject subsequent +	%% frames on these streams. +	rst_lingering_streams = [] :: [cow_http2:streamid()], + +	%% HPACK decoding and encoding state. +	decode_state = cow_hpack:init() :: cow_hpack:state(), +	encode_state = cow_hpack:init() :: cow_hpack:state() +}). + +-opaque http2_machine() :: #http2_machine{}. +-export_type([http2_machine/0]). + +-type pseudo_headers() :: #{} %% Trailers +	| #{ %% Responses. +		status := cow_http:status() +	} | #{ %% Normal CONNECT requests. +		method := binary(), +		authority := binary() +	} | #{ %% Other requests and extended CONNECT requests. +		method := binary(), +		scheme := binary(), +		authority := binary(), +		path := binary(), +		protocol => binary() +	}. + +%% Returns true when the given StreamID is for a local-initiated stream. +-define(IS_SERVER_LOCAL(StreamID), ((StreamID rem 2) =:= 0)). +-define(IS_CLIENT_LOCAL(StreamID), ((StreamID rem 2) =:= 1)). +-define(IS_LOCAL(Mode, StreamID), ( +	((Mode =:= server) andalso ?IS_SERVER_LOCAL(StreamID)) +	orelse +	((Mode =:= client) andalso ?IS_CLIENT_LOCAL(StreamID)) +)). + +-spec init(client | server, opts()) -> {ok, iodata(), http2_machine()}. +init(client, Opts) -> +	NextSettings = settings_init(Opts), +	client_preface(#http2_machine{ +		mode=client, +		opts=only_keep_relevant_opts(Opts), +		next_settings=NextSettings, +		local_streamid=1 +	}); +init(server, Opts) -> +	NextSettings = settings_init(Opts), +	common_preface(#http2_machine{ +		mode=server, +		opts=only_keep_relevant_opts(Opts), +		next_settings=NextSettings, +		local_streamid=2 +	}). + +%% We remove the options that are part of SETTINGS or that are not known. +only_keep_relevant_opts(Opts) -> +	maps:with([ +		initial_connection_window_size, +		max_encode_table_size, +		max_frame_size_sent +	], Opts). + +client_preface(State0) -> +	{ok, CommonPreface, State} = common_preface(State0), +	{ok, [ +		<<"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n">>, +		CommonPreface +	], State}. + +%% We send next_settings and use defaults until we get an ack. +%% +%% We also send a WINDOW_UPDATE frame for the connection when +%% the user specified an initial_connection_window_size. +common_preface(State=#http2_machine{opts=Opts, next_settings=NextSettings}) -> +	case maps:get(initial_connection_window_size, Opts, 65535) of +		65535 -> +			{ok, cow_http2:settings(NextSettings), State}; +		Size -> +			{ok, [ +				cow_http2:settings(NextSettings), +				cow_http2:window_update(Size - 65535) +			], update_window(Size - 65535, State)} +	end. + +settings_init(Opts) -> +	S0 = setting_from_opt(#{}, Opts, max_decode_table_size, +		header_table_size, 4096), +	S1 = setting_from_opt(S0, Opts, max_concurrent_streams, +		max_concurrent_streams, infinity), +	S2 = setting_from_opt(S1, Opts, initial_stream_window_size, +		initial_window_size, 65535), +	S3 = setting_from_opt(S2, Opts, max_frame_size_received, +		max_frame_size, 16384), +	%% @todo max_header_list_size +	setting_from_opt(S3, Opts, enable_connect_protocol, +		enable_connect_protocol, false). + +setting_from_opt(Settings, Opts, OptName, SettingName, Default) -> +	case maps:get(OptName, Opts, Default) of +		Default -> Settings; +		Value -> Settings#{SettingName => Value} +	end. + +-spec init_upgrade_stream(binary(), State) +	-> {ok, cow_http2:streamid(), State} when State::http2_machine(). +init_upgrade_stream(Method, State=#http2_machine{mode=server, remote_streamid=0, +		local_settings=#{initial_window_size := RemoteWindow}, +		remote_settings=#{initial_window_size := LocalWindow}}) -> +	Stream = #stream{id=1, method=Method, +		remote=fin, remote_expected_size=0, +		local_window=LocalWindow, remote_window=RemoteWindow, te=undefined}, +	{ok, 1, stream_store(Stream, State#http2_machine{remote_streamid=1})}. + +-spec frame(cow_http2:frame(), State) +	-> {ok, State} +	| {ok, {data, cow_http2:streamid(), cow_http2:fin(), binary()}, State} +	| {ok, {headers, cow_http2:streamid(), cow_http2:fin(), +		cow_http:headers(), pseudo_headers(), non_neg_integer() | undefined}, State} +	| {ok, {trailers, cow_http2:streamid(), cow_http:headers()}, State} +	| {ok, {rst_stream, cow_http2:streamid(), cow_http2:error()}, State} +	| {ok, {push_promise, cow_http2:streamid(), cow_http2:streamid(), +		cow_http:headers(), pseudo_headers()}, State} +	| {ok, {goaway, cow_http2:streamid(), cow_http2:error(), binary()}, State} +	| {send, [{cow_http2:streamid(), cow_http2:fin(), +		[{data, iodata()} | #sendfile{} | {trailers, cow_http:headers()}]}], State} +	| {error, {stream_error, cow_http2:streamid(), cow_http2:error(), atom()}, State} +	| {error, {connection_error, cow_http2:error(), atom()}, State} +	when State::http2_machine(). +frame(Frame, State=#http2_machine{state=settings}) -> +	settings_frame(Frame, State#http2_machine{state=normal}); +frame(Frame, State=#http2_machine{state={continuation, _, _}}) -> +	continuation_frame(Frame, State); +frame(settings_ack, State=#http2_machine{state=normal}) -> +	settings_ack_frame(State); +frame(Frame, State=#http2_machine{state=normal}) -> +	case element(1, Frame) of +		data -> data_frame(Frame, State); +		headers -> headers_frame(Frame, State); +		priority -> priority_frame(Frame, State); +		rst_stream -> rst_stream_frame(Frame, State); +		settings -> settings_frame(Frame, State); +		push_promise -> push_promise_frame(Frame, State); +		ping -> ping_frame(Frame, State); +		ping_ack -> ping_ack_frame(Frame, State); +		goaway -> goaway_frame(Frame, State); +		window_update -> window_update_frame(Frame, State); +		continuation -> unexpected_continuation_frame(Frame, State); +		_ -> ignored_frame(State) +	end. + +%% DATA frame. + +data_frame({data, StreamID, _, _}, State=#http2_machine{mode=Mode, +		local_streamid=LocalStreamID, remote_streamid=RemoteStreamID}) +		when (?IS_LOCAL(Mode, StreamID) andalso (StreamID >= LocalStreamID)) +		orelse ((not ?IS_LOCAL(Mode, StreamID)) andalso (StreamID > RemoteStreamID)) -> +	{error, {connection_error, protocol_error, +		'DATA frame received on a stream in idle state. (RFC7540 5.1)'}, +		State}; +data_frame({data, _, _, Data}, State=#http2_machine{remote_window=ConnWindow}) +		when byte_size(Data) > ConnWindow -> +	{error, {connection_error, flow_control_error, +		'DATA frame overflowed the connection flow control window. (RFC7540 6.9, RFC7540 6.9.1)'}, +		State}; +data_frame(Frame={data, StreamID, _, Data}, State0=#http2_machine{ +		remote_window=ConnWindow, lingering_streams=Lingering}) -> +	DataLen = byte_size(Data), +	State = State0#http2_machine{remote_window=ConnWindow - DataLen}, +	case stream_get(StreamID, State) of +		#stream{remote_window=StreamWindow} when StreamWindow < DataLen -> +			stream_reset(StreamID, State, flow_control_error, +				'DATA frame overflowed the stream flow control window. (RFC7540 6.9, RFC7540 6.9.1)'); +		Stream = #stream{remote=nofin} -> +			data_frame(Frame, State, Stream, DataLen); +		#stream{remote=idle} -> +			stream_reset(StreamID, State, protocol_error, +				'DATA frame received before a HEADERS frame. (RFC7540 8.1, RFC7540 8.1.2.6)'); +		#stream{remote=fin} -> +			stream_reset(StreamID, State, stream_closed, +				'DATA frame received for a half-closed (remote) stream. (RFC7540 5.1)'); +		undefined -> +			%% After we send an RST_STREAM frame and terminate a stream, +			%% the client still might be sending us some more frames +			%% until it can process this RST_STREAM. We therefore ignore +			%% DATA frames received for such lingering streams. +			case lists:member(StreamID, Lingering) of +				true -> +					{ok, State0}; +				false -> +					{error, {connection_error, stream_closed, +						'DATA frame received for a closed stream. (RFC7540 5.1)'}, +						State} +			end +	end. + +data_frame(Frame={data, _, IsFin, _}, State0, Stream0=#stream{id=StreamID, +		remote_window=StreamWindow, remote_read_size=StreamRead}, DataLen) -> +	Stream = Stream0#stream{remote=IsFin, +		remote_window=StreamWindow - DataLen, +		remote_read_size=StreamRead + DataLen}, +	State = stream_store(Stream, State0), +	case is_body_size_valid(Stream) of +		true -> +			{ok, Frame, State}; +		false -> +			stream_reset(StreamID, State, protocol_error, +				'The total size of DATA frames is different than the content-length. (RFC7540 8.1.2.6)') +	end. + +%% It's always valid when no content-length header was specified. +is_body_size_valid(#stream{remote_expected_size=undefined}) -> +	true; +%% We didn't finish reading the body but the size is already larger than expected. +is_body_size_valid(#stream{remote=nofin, remote_expected_size=Expected, +		remote_read_size=Read}) when Read > Expected -> +	false; +is_body_size_valid(#stream{remote=nofin}) -> +	true; +is_body_size_valid(#stream{remote=fin, remote_expected_size=Expected, +		remote_read_size=Expected}) -> +	true; +%% We finished reading the body and the size read is not the one expected. +is_body_size_valid(_) -> +	false. + +%% HEADERS frame. +%% +%% We always close the connection when we detect errors before +%% decoding the headers to not waste resources on non-compliant +%% endpoints, making us stricter than the RFC requires. + +%% Convenience record to manipulate the tuple. +%% The order of the fields matter. +-record(headers, { +	id :: cow_http2:streamid(), +	fin :: cow_http2:fin(), +	head :: cow_http2:head_fin(), +	data :: binary() +}). + +headers_frame(Frame=#headers{}, State=#http2_machine{mode=Mode}) -> +	case Mode of +		server -> server_headers_frame(Frame, State); +		client -> client_headers_frame(Frame, State) +	end; +%% @todo Handle the PRIORITY data, but only if this returns an ok tuple. +%% @todo Do not lose the PRIORITY information if CONTINUATION frames follow. +headers_frame({headers, StreamID, IsFin, IsHeadFin, +		_IsExclusive, _DepStreamID, _Weight, HeaderData}, +		State=#http2_machine{mode=Mode}) -> +	HeadersFrame = #headers{id=StreamID, fin=IsFin, head=IsHeadFin, data=HeaderData}, +	case Mode of +		server -> server_headers_frame(HeadersFrame, State); +		client -> client_headers_frame(HeadersFrame, State) +	end. + +%% Reject HEADERS frames with even-numbered streamid. +server_headers_frame(#headers{id=StreamID}, State) +		when ?IS_SERVER_LOCAL(StreamID) -> +	{error, {connection_error, protocol_error, +		'HEADERS frame received with even-numbered streamid. (RFC7540 5.1.1)'}, +		State}; +%% HEADERS frame on an idle stream: new request. +server_headers_frame(Frame=#headers{id=StreamID, head=IsHeadFin}, +		State=#http2_machine{mode=server, remote_streamid=RemoteStreamID}) +		when StreamID > RemoteStreamID -> +	case IsHeadFin of +		head_fin -> +			headers_decode(Frame, State, request, undefined); +		head_nofin -> +			{ok, State#http2_machine{state={continuation, request, Frame}}} +	end; +%% Either a HEADERS frame received on (half-)closed stream, +%% or a HEADERS frame containing the trailers. +server_headers_frame(Frame=#headers{id=StreamID, fin=IsFin, head=IsHeadFin}, State) -> +	case stream_get(StreamID, State) of +		%% Trailers. +		Stream = #stream{remote=nofin} when IsFin =:= fin -> +			case IsHeadFin of +				head_fin -> +					headers_decode(Frame, State, trailers, Stream); +				head_nofin -> +					{ok, State#http2_machine{state={continuation, trailers, Frame}}} +			end; +		#stream{remote=nofin} -> +			{error, {connection_error, protocol_error, +				'Trailing HEADERS frame received without the END_STREAM flag set. (RFC7540 8.1, RFC7540 8.1.2.6)'}, +				State}; +		_ -> +			{error, {connection_error, stream_closed, +				'HEADERS frame received on a stream in closed or half-closed state. (RFC7540 5.1)'}, +				State} +	end. + +%% Either a HEADERS frame received on an (half-)closed stream, +%% or a HEADERS frame containing the response or the trailers. +client_headers_frame(Frame=#headers{id=StreamID, fin=IsFin, head=IsHeadFin}, +		State=#http2_machine{local_streamid=LocalStreamID, remote_streamid=RemoteStreamID}) +		when (?IS_CLIENT_LOCAL(StreamID) andalso (StreamID < LocalStreamID)) +		orelse ((not ?IS_CLIENT_LOCAL(StreamID)) andalso (StreamID =< RemoteStreamID)) -> +	case stream_get(StreamID, State) of +		Stream = #stream{remote=idle} -> +			case IsHeadFin of +				head_fin -> +					headers_decode(Frame, State, response, Stream); +				head_nofin -> +					{ok, State#http2_machine{state={continuation, response, Frame}}} +			end; +		Stream = #stream{remote=nofin} when IsFin =:= fin -> +			case IsHeadFin of +				head_fin -> +					headers_decode(Frame, State, trailers, Stream); +				head_nofin -> +					{ok, State#http2_machine{state={continuation, trailers, Frame}}} +			end; +		#stream{remote=nofin} -> +			{error, {connection_error, protocol_error, +				'Trailing HEADERS frame received without the END_STREAM flag set. (RFC7540 8.1, RFC7540 8.1.2.6)'}, +				State}; +		_ -> +			{error, {connection_error, stream_closed, +				'HEADERS frame received on a stream in closed or half-closed state. (RFC7540 5.1)'}, +				State} +	end; +%% Reject HEADERS frames received on idle streams. +client_headers_frame(_, State) -> +	{error, {connection_error, protocol_error, +		'HEADERS frame received on an idle stream. (RFC7540 5.1.1)'}, +		State}. + +headers_decode(Frame=#headers{head=head_fin, data=HeaderData}, +		State=#http2_machine{decode_state=DecodeState0}, Type, Stream) -> +	try cow_hpack:decode(HeaderData, DecodeState0) of +		{Headers, DecodeState} when Type =:= request -> +			headers_enforce_concurrency_limit(Frame, +				State#http2_machine{decode_state=DecodeState}, Type, Stream, Headers); +		{Headers, DecodeState} -> +			headers_pseudo_headers(Frame, +				State#http2_machine{decode_state=DecodeState}, Type, Stream, Headers) +	catch _:_ -> +		{error, {connection_error, compression_error, +			'Error while trying to decode HPACK-encoded header block. (RFC7540 4.3)'}, +			State} +	end. + +headers_enforce_concurrency_limit(Frame=#headers{id=StreamID}, +		State=#http2_machine{local_settings=LocalSettings, streams=Streams}, +		Type, Stream, Headers) -> +	MaxConcurrentStreams = maps:get(max_concurrent_streams, LocalSettings, infinity), +	%% Using < is correct because this new stream is not included +	%% in the Streams variable yet and so we'll end up with +1 stream. +	case length(Streams) < MaxConcurrentStreams of +		true -> +			headers_pseudo_headers(Frame, State, Type, Stream, Headers); +		false -> +			{error, {stream_error, StreamID, refused_stream, +				'Maximum number of concurrent streams has been reached. (RFC7540 5.1.2)'}, +				State} +	end. + +headers_pseudo_headers(Frame, State=#http2_machine{local_settings=LocalSettings}, +		Type=request, Stream, Headers0) -> +	IsExtendedConnectEnabled = maps:get(enable_connect_protocol, LocalSettings, false), +	case request_pseudo_headers(Headers0, #{}) of +		%% Extended CONNECT method (RFC8441). +		{ok, PseudoHeaders=#{method := <<"CONNECT">>, scheme := _, +			authority := _, path := _, protocol := _}, Headers} +			when IsExtendedConnectEnabled -> +			headers_regular_headers(Frame, State, Type, Stream, PseudoHeaders, Headers); +		{ok, #{method := <<"CONNECT">>, scheme := _, +			authority := _, path := _}, _} +			when IsExtendedConnectEnabled -> +			headers_malformed(Frame, State, +				'The :protocol pseudo-header MUST be sent with an extended CONNECT. (RFC8441 4)'); +		{ok, #{protocol := _}, _} -> +			headers_malformed(Frame, State, +				'The :protocol pseudo-header is only defined for the extended CONNECT. (RFC8441 4)'); +		%% Normal CONNECT (no scheme/path). +		{ok, PseudoHeaders=#{method := <<"CONNECT">>, authority := _}, Headers} +				when map_size(PseudoHeaders) =:= 2 -> +			headers_regular_headers(Frame, State, Type, Stream, PseudoHeaders, Headers); +		{ok, #{method := <<"CONNECT">>}, _} -> +			headers_malformed(Frame, State, +				'CONNECT requests only use the :method and :authority pseudo-headers. (RFC7540 8.3)'); +		%% Other requests. +		{ok, PseudoHeaders=#{method := _, scheme := _, authority := _, path := _}, Headers} -> +			headers_regular_headers(Frame, State, Type, Stream, PseudoHeaders, Headers); +		{ok, _, _} -> +			headers_malformed(Frame, State, +				'A required pseudo-header was not found. (RFC7540 8.1.2.3)'); +		{error, HumanReadable} -> +			headers_malformed(Frame, State, HumanReadable) +	end; +headers_pseudo_headers(Frame=#headers{id=StreamID}, +		State, Type=response, Stream, Headers0) -> +	case response_pseudo_headers(Headers0, #{}) of +		{ok, PseudoHeaders=#{status := _}, Headers} -> +			headers_regular_headers(Frame, State, Type, Stream, PseudoHeaders, Headers); +		{ok, _, _} -> +			stream_reset(StreamID, State, protocol_error, +				'A required pseudo-header was not found. (RFC7540 8.1.2.4)'); +		{error, HumanReadable} -> +			stream_reset(StreamID, State, protocol_error, HumanReadable) +	end; +headers_pseudo_headers(Frame=#headers{id=StreamID}, +		State, Type=trailers, Stream, Headers) -> +	case trailers_contain_pseudo_headers(Headers) of +		false -> +			headers_regular_headers(Frame, State, Type, Stream, #{}, Headers); +		true -> +			stream_reset(StreamID, State, protocol_error, +				'Trailer header blocks must not contain pseudo-headers. (RFC7540 8.1.2.1)') +	end. + +headers_malformed(#headers{id=StreamID}, State, HumanReadable) -> +	{error, {stream_error, StreamID, protocol_error, HumanReadable}, State}. + +request_pseudo_headers([{<<":method">>, _}|_], #{method := _}) -> +	{error, 'Multiple :method pseudo-headers were found. (RFC7540 8.1.2.3)'}; +request_pseudo_headers([{<<":method">>, Method}|Tail], PseudoHeaders) -> +	request_pseudo_headers(Tail, PseudoHeaders#{method => Method}); +request_pseudo_headers([{<<":scheme">>, _}|_], #{scheme := _}) -> +	{error, 'Multiple :scheme pseudo-headers were found. (RFC7540 8.1.2.3)'}; +request_pseudo_headers([{<<":scheme">>, Scheme}|Tail], PseudoHeaders) -> +	request_pseudo_headers(Tail, PseudoHeaders#{scheme => Scheme}); +request_pseudo_headers([{<<":authority">>, _}|_], #{authority := _}) -> +	{error, 'Multiple :authority pseudo-headers were found. (RFC7540 8.1.2.3)'}; +request_pseudo_headers([{<<":authority">>, Authority}|Tail], PseudoHeaders) -> +	request_pseudo_headers(Tail, PseudoHeaders#{authority => Authority}); +request_pseudo_headers([{<<":path">>, _}|_], #{path := _}) -> +	{error, 'Multiple :path pseudo-headers were found. (RFC7540 8.1.2.3)'}; +request_pseudo_headers([{<<":path">>, Path}|Tail], PseudoHeaders) -> +	request_pseudo_headers(Tail, PseudoHeaders#{path => Path}); +request_pseudo_headers([{<<":protocol">>, _}|_], #{protocol := _}) -> +	{error, 'Multiple :protocol pseudo-headers were found. (RFC7540 8.1.2.3)'}; +request_pseudo_headers([{<<":protocol">>, Protocol}|Tail], PseudoHeaders) -> +	request_pseudo_headers(Tail, PseudoHeaders#{protocol => Protocol}); +request_pseudo_headers([{<<":", _/bits>>, _}|_], _) -> +	{error, 'An unknown or invalid pseudo-header was found. (RFC7540 8.1.2.1)'}; +request_pseudo_headers(Headers, PseudoHeaders) -> +	{ok, PseudoHeaders, Headers}. + +response_pseudo_headers([{<<":status">>, _}|_], #{status := _}) -> +	{error, 'Multiple :status pseudo-headers were found. (RFC7540 8.1.2.3)'}; +response_pseudo_headers([{<<":status">>, Status}|Tail], PseudoHeaders) -> +	try cow_http:status_to_integer(Status) of +		IntStatus -> +			response_pseudo_headers(Tail, PseudoHeaders#{status => IntStatus}) +	catch _:_ -> +		{error, 'The :status pseudo-header value is invalid. (RFC7540 8.1.2.4)'} +	end; +response_pseudo_headers([{<<":", _/bits>>, _}|_], _) -> +	{error, 'An unknown or invalid pseudo-header was found. (RFC7540 8.1.2.1)'}; +response_pseudo_headers(Headers, PseudoHeaders) -> +	{ok, PseudoHeaders, Headers}. + +trailers_contain_pseudo_headers([]) -> +	false; +trailers_contain_pseudo_headers([{<<":", _/bits>>, _}|_]) -> +	true; +trailers_contain_pseudo_headers([_|Tail]) -> +	trailers_contain_pseudo_headers(Tail). + +%% Rejecting invalid regular headers might be a bit too strong for clients. +headers_regular_headers(Frame=#headers{id=StreamID}, +		State, Type, Stream, PseudoHeaders, Headers) -> +	case regular_headers(Headers, Type) of +		ok when Type =:= request -> +			request_expected_size(Frame, State, Type, Stream, PseudoHeaders, Headers); +		ok when Type =:= response -> +			response_expected_size(Frame, State, Type, Stream, PseudoHeaders, Headers); +		ok when Type =:= trailers -> +			trailers_frame(Frame, State, Stream, Headers); +		{error, HumanReadable} when Type =:= request -> +			headers_malformed(Frame, State, HumanReadable); +		{error, HumanReadable} -> +			stream_reset(StreamID, State, protocol_error, HumanReadable) +	end. + +regular_headers([{<<":", _/bits>>, _}|_], _) -> +	{error, 'Pseudo-headers were found after regular headers. (RFC7540 8.1.2.1)'}; +regular_headers([{<<"connection">>, _}|_], _) -> +	{error, 'The connection header is not allowed. (RFC7540 8.1.2.2)'}; +regular_headers([{<<"keep-alive">>, _}|_], _) -> +	{error, 'The keep-alive header is not allowed. (RFC7540 8.1.2.2)'}; +regular_headers([{<<"proxy-authenticate">>, _}|_], _) -> +	{error, 'The proxy-authenticate header is not allowed. (RFC7540 8.1.2.2)'}; +regular_headers([{<<"proxy-authorization">>, _}|_], _) -> +	{error, 'The proxy-authorization header is not allowed. (RFC7540 8.1.2.2)'}; +regular_headers([{<<"transfer-encoding">>, _}|_], _) -> +	{error, 'The transfer-encoding header is not allowed. (RFC7540 8.1.2.2)'}; +regular_headers([{<<"upgrade">>, _}|_], _) -> +	{error, 'The upgrade header is not allowed. (RFC7540 8.1.2.2)'}; +regular_headers([{<<"te">>, Value}|_], request) when Value =/= <<"trailers">> -> +	{error, 'The te header with a value other than "trailers" is not allowed. (RFC7540 8.1.2.2)'}; +regular_headers([{<<"te">>, _}|_], Type) when Type =/= request -> +	{error, 'The te header is only allowed in request headers. (RFC7540 8.1.2.2)'}; +regular_headers([{Name, _}|Tail], Type) -> +	Pattern = [ +		<<$A>>, <<$B>>, <<$C>>, <<$D>>, <<$E>>, <<$F>>, <<$G>>, <<$H>>, <<$I>>, +		<<$J>>, <<$K>>, <<$L>>, <<$M>>, <<$N>>, <<$O>>, <<$P>>, <<$Q>>, <<$R>>, +		<<$S>>, <<$T>>, <<$U>>, <<$V>>, <<$W>>, <<$X>>, <<$Y>>, <<$Z>> +	], +	case binary:match(Name, Pattern) of +		nomatch -> regular_headers(Tail, Type); +		_ -> {error, 'Header names must be lowercase. (RFC7540 8.1.2)'} +	end; +regular_headers([], _) -> +	ok. + +request_expected_size(Frame=#headers{fin=IsFin}, State, Type, Stream, PseudoHeaders, Headers) -> +	case [CL || {<<"content-length">>, CL} <- Headers] of +		[] when IsFin =:= fin -> +			headers_frame(Frame, State, Type, Stream, PseudoHeaders, Headers, 0); +		[] -> +			headers_frame(Frame, State, Type, Stream, PseudoHeaders, Headers, undefined); +		[<<"0">>] when IsFin =:= fin -> +			headers_frame(Frame, State, Type, Stream, PseudoHeaders, Headers, 0); +		[_] when IsFin =:= fin -> +			headers_malformed(Frame, State, +				'HEADERS frame with the END_STREAM flag contains a non-zero content-length. (RFC7540 8.1.2.6)'); +		[BinLen] -> +			headers_parse_expected_size(Frame, State, Type, Stream, +				PseudoHeaders, Headers, BinLen); +		_ -> +			headers_malformed(Frame, State, +				'Multiple content-length headers were received. (RFC7230 3.3.2)') +	end. + +response_expected_size(Frame=#headers{id=StreamID, fin=IsFin}, State, Type, +		Stream=#stream{method=Method}, PseudoHeaders=#{status := Status}, Headers) -> +	case [CL || {<<"content-length">>, CL} <- Headers] of +		[] when IsFin =:= fin -> +			headers_frame(Frame, State, Type, Stream, PseudoHeaders, Headers, 0); +		[] -> +			headers_frame(Frame, State, Type, Stream, PseudoHeaders, Headers, undefined); +		[_] when Status >= 100, Status =< 199 -> +			stream_reset(StreamID, State, protocol_error, +				'Content-length header received in a 1xx response. (RFC7230 3.3.2)'); +		[_] when Status =:= 204 -> +			stream_reset(StreamID, State, protocol_error, +				'Content-length header received in a 204 response. (RFC7230 3.3.2)'); +		[_] when Status >= 200, Status =< 299, Method =:= <<"CONNECT">> -> +			stream_reset(StreamID, State, protocol_error, +				'Content-length header received in a 2xx response to a CONNECT request. (RFC7230 3.3.2).'); +		%% Responses to HEAD requests, and 304 responses may contain +		%% a content-length header that must be ignored. (RFC7230 3.3.2) +		[_] when Method =:= <<"HEAD">> -> +			headers_frame(Frame, State, Type, Stream, PseudoHeaders, Headers, 0); +		[_] when Status =:= 304 -> +			headers_frame(Frame, State, Type, Stream, PseudoHeaders, Headers, 0); +		[<<"0">>] when IsFin =:= fin -> +			headers_frame(Frame, State, Type, Stream, PseudoHeaders, Headers, 0); +		[_] when IsFin =:= fin -> +			stream_reset(StreamID, State, protocol_error, +				'HEADERS frame with the END_STREAM flag contains a non-zero content-length. (RFC7540 8.1.2.6)'); +		[BinLen] -> +			headers_parse_expected_size(Frame, State, Type, Stream, +				PseudoHeaders, Headers, BinLen); +		_ -> +			stream_reset(StreamID, State, protocol_error, +				'Multiple content-length headers were received. (RFC7230 3.3.2)') +	end. + +headers_parse_expected_size(Frame=#headers{id=StreamID}, +		State, Type, Stream, PseudoHeaders, Headers, BinLen) -> +	try cow_http_hd:parse_content_length(BinLen) of +		Len -> +			headers_frame(Frame, State, Type, Stream, PseudoHeaders, Headers, Len) +	catch +		_:_ -> +			HumanReadable = 'The content-length header is invalid. (RFC7230 3.3.2)', +			case Type of +				request -> headers_malformed(Frame, State, HumanReadable); +				response -> stream_reset(StreamID, State, protocol_error, HumanReadable) +			end +	end. + +headers_frame(#headers{id=StreamID, fin=IsFin}, State0=#http2_machine{ +		local_settings=#{initial_window_size := RemoteWindow}, +		remote_settings=#{initial_window_size := LocalWindow}}, +		Type, Stream0, PseudoHeaders, Headers, Len) -> +	Stream = case Stream0 of +		undefined -> +			TE = case lists:keyfind(<<"te">>, 1, Headers) of +				{_, TE0} -> TE0; +				false -> undefined +			end, +			#stream{id=StreamID, method=maps:get(method, PseudoHeaders), +				remote=IsFin, remote_expected_size=Len, +				local_window=LocalWindow, remote_window=RemoteWindow, te=TE}; +		_ -> +			case {Type, PseudoHeaders} of +				{response, #{status := Status}} when Status >= 100, Status =< 199 -> +					Stream0; +				_ -> +					Stream0#stream{remote=IsFin, remote_expected_size=Len} +			end +	end, +	State = stream_store(Stream, State0#http2_machine{remote_streamid=StreamID}), +	{ok, {headers, StreamID, IsFin, Headers, PseudoHeaders, Len}, State}. + +trailers_frame(#headers{id=StreamID}, State0, Stream0, Headers) -> +	Stream = Stream0#stream{remote=fin}, +	State = stream_store(Stream, State0), +	case is_body_size_valid(Stream) of +		true -> +			{ok, {trailers, StreamID, Headers}, State}; +		false -> +			stream_reset(StreamID, State, protocol_error, +				'The total size of DATA frames is different than the content-length. (RFC7540 8.1.2.6)') +	end. + +%% PRIORITY frame. +%% +%% @todo Handle PRIORITY frames. + +priority_frame(_Frame, State) -> +	{ok, State}. + +%% RST_STREAM frame. + +rst_stream_frame({rst_stream, StreamID, _}, State=#http2_machine{mode=Mode, +		local_streamid=LocalStreamID, remote_streamid=RemoteStreamID}) +		when (?IS_LOCAL(Mode, StreamID) andalso (StreamID >= LocalStreamID)) +		orelse ((not ?IS_LOCAL(Mode, StreamID)) andalso (StreamID > RemoteStreamID)) -> +	{error, {connection_error, protocol_error, +		'RST_STREAM frame received on a stream in idle state. (RFC7540 5.1)'}, +		State}; +rst_stream_frame({rst_stream, StreamID, Reason}, State=#http2_machine{ +		streams=Streams0, rst_lingering_streams=Lingering0}) -> +	Streams = lists:keydelete(StreamID, #stream.id, Streams0), +	%% We only keep up to 10 streams in this state. @todo Make it configurable? +	Lingering = [StreamID|lists:sublist(Lingering0, 10 - 1)], +	{ok, {rst_stream, StreamID, Reason}, +		State#http2_machine{streams=Streams, rst_lingering_streams=Lingering}}. + +%% SETTINGS frame. + +settings_frame({settings, Settings}, State0=#http2_machine{ +		opts=Opts, remote_settings=Settings0}) -> +	State1 = State0#http2_machine{remote_settings=maps:merge(Settings0, Settings)}, +	State2 = maps:fold(fun +		(header_table_size, NewSize, State=#http2_machine{encode_state=EncodeState0}) -> +			MaxSize = maps:get(max_encode_table_size, Opts, 4096), +			EncodeState = cow_hpack:set_max_size(min(NewSize, MaxSize), EncodeState0), +			State#http2_machine{encode_state=EncodeState}; +		(initial_window_size, NewWindowSize, State) -> +			OldWindowSize = maps:get(initial_window_size, Settings0, 65535), +			streams_update_local_window(State, NewWindowSize - OldWindowSize); +		(_, _, State) -> +			State +	end, State1, Settings), +	case Settings of +		#{initial_window_size := _} -> send_data(State2); +		_ -> {ok, State2} +	end; +%% We expect to receive a SETTINGS frame as part of the preface. +settings_frame(_F, State) -> +	{error, {connection_error, protocol_error, +		'The preface sequence must be followed by a SETTINGS frame. (RFC7540 3.5)'}, +		State}. + +%% When SETTINGS_INITIAL_WINDOW_SIZE changes we need to update +%% the local stream windows for all active streams and perhaps +%% resume sending data. +streams_update_local_window(State=#http2_machine{streams=Streams0}, Increment) -> +	Streams = [ +		S#stream{local_window=StreamWindow + Increment} +	|| S=#stream{local_window=StreamWindow} <- Streams0], +	State#http2_machine{streams=Streams}. + +%% Ack for a previously sent SETTINGS frame. + +settings_ack_frame(State0=#http2_machine{local_settings=Local0, next_settings=NextSettings}) -> +	Local = maps:merge(Local0, NextSettings), +	State1 = State0#http2_machine{local_settings=Local, next_settings=#{}}, +	{ok, maps:fold(fun +		(header_table_size, MaxSize, State=#http2_machine{decode_state=DecodeState0}) -> +			DecodeState = cow_hpack:set_max_size(MaxSize, DecodeState0), +			State#http2_machine{decode_state=DecodeState}; +		(initial_window_size, NewWindowSize, State) -> +			OldWindowSize = maps:get(initial_window_size, Local0, 65535), +			streams_update_remote_window(State, NewWindowSize - OldWindowSize); +		(_, _, State) -> +			State +	end, State1, NextSettings)}. + +%% When we receive an ack to a SETTINGS frame we sent we need to update +%% the remote stream windows for all active streams. +streams_update_remote_window(State=#http2_machine{streams=Streams0}, Increment) -> +	Streams = [ +		S#stream{remote_window=StreamWindow + Increment} +	|| S=#stream{remote_window=StreamWindow} <- Streams0], +	State#http2_machine{streams=Streams}. + +%% PUSH_PROMISE frame. + +%% Convenience record to manipulate the tuple. +%% The order of the fields matter. +-record(push_promise, { +	id :: cow_http2:streamid(), +	head :: cow_http2:head_fin(), +	promised_id :: cow_http2:streamid(), +	data :: binary() +}). + +push_promise_frame(_, State=#http2_machine{mode=server}) -> +	{error, {connection_error, protocol_error, +		'PUSH_PROMISE frames MUST only be sent on a peer-initiated stream. (RFC7540 6.6)'}, +		State}; +push_promise_frame(_, State=#http2_machine{local_settings=#{enable_push := false}}) -> +	{error, {connection_error, protocol_error, +		'PUSH_PROMISE frame received despite SETTINGS_ENABLE_PUSH set to 0. (RFC7540 6.6)'}, +		State}; +push_promise_frame(#push_promise{promised_id=PromisedStreamID}, +		State=#http2_machine{remote_streamid=RemoteStreamID}) +		when PromisedStreamID =< RemoteStreamID -> +	{error, {connection_error, protocol_error, +		'PUSH_PROMISE frame received for a promised stream in closed or half-closed state. (RFC7540 5.1, RFC7540 6.6)'}, +		State}; +push_promise_frame(#push_promise{id=StreamID}, State) +		when not ?IS_CLIENT_LOCAL(StreamID) -> +	{error, {connection_error, protocol_error, +		'PUSH_PROMISE frame received on a server-initiated stream. (RFC7540 6.6)'}, +		State}; +push_promise_frame(Frame=#push_promise{id=StreamID, head=IsHeadFin, +		promised_id=PromisedStreamID, data=HeaderData}, State) -> +	case stream_get(StreamID, State) of +		#stream{remote=idle} -> +			case IsHeadFin of +				head_fin -> +					%% @todo Gotta make sure the headers_* functions +					%% will work properly for PUSH_PROMISE requests. +					headers_decode(#headers{id=PromisedStreamID, +						fin=fin, head=IsHeadFin, data=HeaderData}, +						State, push_promise, undefined); +				head_nofin -> +					{ok, State#http2_machine{state={continuation, push_promise, Frame}}} +			end; +		_ -> +%% @todo Check if the stream is lingering. If it is, decode the frame +%% and do what? That's the big question and why it's not implemented yet. +%   However, an endpoint that +%   has sent RST_STREAM on the associated stream MUST handle PUSH_PROMISE +%   frames that might have been created before the RST_STREAM frame is +%   received and processed. (RFC7540 6.6) +			{error, {connection_error, stream_closed, +				'PUSH_PROMISE frame received on a stream in closed or half-closed state. (RFC7540 5.1, RFC7540 6.6)'}, +				State} +	end. + +%% PING frame. + +ping_frame({ping, _}, State) -> +	{ok, State}. + +%% Ack for a previously sent PING frame. +%% +%% @todo Might want to check contents but probably a waste of time. + +ping_ack_frame({ping_ack, _}, State) -> +	{ok, State}. + +%% GOAWAY frame. + +goaway_frame(Frame={goaway, _, _, _}, State) -> +	{ok, Frame, State}. + +%% WINDOW_UPDATE frame. + +%% Connection-wide WINDOW_UPDATE frame. +window_update_frame({window_update, Increment}, State=#http2_machine{local_window=ConnWindow}) +		when ConnWindow + Increment > 16#7fffffff -> +	{error, {connection_error, flow_control_error, +		'The flow control window must not be greater than 2^31-1. (RFC7540 6.9.1)'}, +		State}; +window_update_frame({window_update, Increment}, State=#http2_machine{local_window=ConnWindow}) -> +	send_data(State#http2_machine{local_window=ConnWindow + Increment}); +%% Stream-specific WINDOW_UPDATE frame. +window_update_frame({window_update, StreamID, _}, +		State=#http2_machine{remote_streamid=RemoteStreamID}) +		when StreamID > RemoteStreamID -> +	{error, {connection_error, protocol_error, +		'WINDOW_UPDATE frame received on a stream in idle state. (RFC7540 5.1)'}, +		State}; +window_update_frame({window_update, StreamID, Increment}, +		State0=#http2_machine{rst_lingering_streams=RstLingering}) -> +	case stream_get(StreamID, State0) of +		#stream{local_window=StreamWindow} when StreamWindow + Increment > 16#7fffffff -> +			stream_reset(StreamID, State0, flow_control_error, +				'The flow control window must not be greater than 2^31-1. (RFC7540 6.9.1)'); +		Stream0 = #stream{local_window=StreamWindow} -> +			send_data(Stream0#stream{local_window=StreamWindow + Increment}, State0); +		undefined -> +			%% WINDOW_UPDATE frames may be received for a short period of time +			%% after a stream is closed. They must be ignored. +			case lists:member(StreamID, RstLingering) of +				false -> {ok, State0}; +				true -> stream_reset(StreamID, State0, stream_closed, +					'WINDOW_UPDATE frame received after the stream was reset. (RFC7540 5.1)') +			end +	end. + +%% CONTINUATION frame. + +%% Convenience record to manipulate the tuple. +%% The order of the fields matter. +-record(continuation, { +	id :: cow_http2:streamid(), +	head :: cow_http2:head_fin(), +	data :: binary() +}). + +unexpected_continuation_frame(#continuation{}, State) -> +	{error, {connection_error, protocol_error, +		'CONTINUATION frames MUST be preceded by a HEADERS or PUSH_PROMISE frame. (RFC7540 6.10)'}, +		State}. + +continuation_frame(#continuation{id=StreamID, head=head_fin, data=HeaderFragment1}, +		State=#http2_machine{state={continuation, Type, +			Frame=#headers{id=StreamID, data=HeaderFragment0}}}) -> +	HeaderData = <<HeaderFragment0/binary, HeaderFragment1/binary>>, +	headers_decode(Frame#headers{head=head_fin, data=HeaderData}, +		State#http2_machine{state=normal}, Type, stream_get(StreamID, State)); +continuation_frame(#continuation{id=StreamID, head=head_fin, data=HeaderFragment1}, +		State=#http2_machine{state={continuation, Type, #push_promise{ +			id=StreamID, promised_id=PromisedStreamID, data=HeaderFragment0}}}) -> +	HeaderData = <<HeaderFragment0/binary, HeaderFragment1/binary>>, +	headers_decode(#headers{id=PromisedStreamID, fin=fin, head=head_fin, data=HeaderData}, +		State#http2_machine{state=normal}, Type, undefined); +continuation_frame(#continuation{id=StreamID, data=HeaderFragment1}, +		State=#http2_machine{state={continuation, Type, ContinuedFrame0}}) +		when element(2, ContinuedFrame0) =:= StreamID -> +	ContinuedFrame = case ContinuedFrame0 of +		#headers{data=HeaderFragment0} -> +			HeaderData = <<HeaderFragment0/binary, HeaderFragment1/binary>>, +			ContinuedFrame0#headers{data=HeaderData}; +		#push_promise{data=HeaderFragment0} -> +			HeaderData = <<HeaderFragment0/binary, HeaderFragment1/binary>>, +			ContinuedFrame0#push_promise{data=HeaderData} +	end, +	{ok, State#http2_machine{state={continuation, Type, ContinuedFrame}}}; +continuation_frame(_F, State) -> +	{error, {connection_error, protocol_error, +		'An invalid frame was received in the middle of a header block. (RFC7540 6.2)'}, +		State}. + +%% Ignored frames. + +-spec ignored_frame(State) +	-> {ok, State} +	| {error, {connection_error, protocol_error, atom()}, State} +	when State::http2_machine(). +ignored_frame(State=#http2_machine{state={continuation, _, _}}) -> +	{error, {connection_error, protocol_error, +		'An invalid frame was received in the middle of a header block. (RFC7540 6.2)'}, +		State}; +%% @todo It might be useful to error out when we receive +%% too many unknown frames. (RFC7540 10.5) +ignored_frame(State) -> +	{ok, State}. + +%% Functions for sending a message header or body. Note that +%% this module does not send data directly, instead it returns +%% a value that can then be used to send the frames. + +-spec prepare_headers(cow_http2:streamid(), State, idle | cow_http2:fin(), +	pseudo_headers(), cow_http:headers()) +	-> {ok, cow_http2:fin(), iodata(), State} when State::http2_machine(). +%% @todo Should handle the request case too. +prepare_headers(StreamID, State=#http2_machine{encode_state=EncodeState0}, +		IsFin0, PseudoHeaders, Headers0) -> +	Stream = #stream{method=Method, local=idle} = stream_get(StreamID, State), +	IsFin = case {IsFin0, Method} of +		{idle, _} -> nofin; +		{_, <<"HEAD">>} -> fin; +		_ -> IsFin0 +	end, +	Headers = merge_pseudo_headers(PseudoHeaders, Headers0), +	{HeaderBlock, EncodeState} = cow_hpack:encode(Headers, EncodeState0), +	{ok, IsFin, HeaderBlock, stream_store(Stream#stream{local=IsFin0}, +		State#http2_machine{encode_state=EncodeState})}. + +-spec prepare_push_promise(cow_http2:streamid(), State, pseudo_headers(), cow_http:headers()) +	-> {ok, cow_http2:streamid(), iodata(), State} +	| {error, no_push} when State::http2_machine(). +prepare_push_promise(_, #http2_machine{remote_settings=#{enable_push := false}}, _, _) -> +	{error, no_push}; +prepare_push_promise(StreamID, State=#http2_machine{encode_state=EncodeState0, +		local_settings=#{initial_window_size := RemoteWindow}, +		remote_settings=#{initial_window_size := LocalWindow}, +		local_streamid=LocalStreamID}, PseudoHeaders, Headers0) -> +	#stream{local=idle} = stream_get(StreamID, State), +	TE = case lists:keyfind(<<"te">>, 1, Headers0) of +		{_, TE0} -> TE0; +		false -> undefined +	end, +	Headers = merge_pseudo_headers(PseudoHeaders, Headers0), +	{HeaderBlock, EncodeState} = cow_hpack:encode(Headers, EncodeState0), +	{ok, LocalStreamID, HeaderBlock, stream_store( +		#stream{id=LocalStreamID, method=maps:get(method, PseudoHeaders), +			remote=fin, remote_expected_size=0, +			local_window=LocalWindow, remote_window=RemoteWindow, te=TE}, +		State#http2_machine{encode_state=EncodeState, local_streamid=LocalStreamID + 2})}. + +merge_pseudo_headers(PseudoHeaders, Headers0) -> +	lists:foldl(fun +		({status, Status}, Acc) when is_integer(Status) -> +			[{<<":status">>, integer_to_binary(Status)}|Acc]; +		({Name, Value}, Acc) -> +			[{iolist_to_binary([$:, atom_to_binary(Name, latin1)]), Value}|Acc] +		end, Headers0, maps:to_list(PseudoHeaders)). + +-spec prepare_trailers(cow_http2:streamid(), State, cow_http:headers()) +	-> {ok, iodata(), State} when State::http2_machine(). +prepare_trailers(StreamID, State=#http2_machine{encode_state=EncodeState0}, Trailers) -> +	Stream = #stream{local=nofin} = stream_get(StreamID, State), +	{HeaderBlock, EncodeState} = cow_hpack:encode(Trailers, EncodeState0), +	{ok, HeaderBlock, stream_store(Stream#stream{local=fin}, +		State#http2_machine{encode_state=EncodeState})}. + +-spec send_or_queue_data(cow_http2:streamid(), State, cow_http2:fin(), DataOrFileOrTrailers) +	-> {ok, State} +	| {send, [{cow_http2:streamid(), cow_http2:fin(), [DataOrFileOrTrailers]}], State} +	when State::http2_machine(), DataOrFileOrTrailers:: +		{data, iodata()} | #sendfile{} | {trailers, cow_http:headers()}. +send_or_queue_data(StreamID, State0, IsFin0, DataOrFileOrTrailers0) -> +	%% @todo Probably just ignore if the method was HEAD. +	Stream0 = #stream{local=nofin, te=TE0} = stream_get(StreamID, State0), +	DataOrFileOrTrailers = case DataOrFileOrTrailers0 of +		{trailers, _} -> +			%% We only accept TE headers containing exactly "trailers" (RFC7540 8.1.2.1). +			TE = try cow_http_hd:parse_te(TE0) of +				{trailers, []} -> trailers; +				_ -> no_trailers +			catch _:_ -> +				%% If we can't parse the TE header, assume we can't send trailers. +				no_trailers +			end, +			case TE of +				trailers -> +					DataOrFileOrTrailers0; +				no_trailers -> +					{data, <<>>} +			end; +		_ -> +			DataOrFileOrTrailers0 +	end, +	case send_or_queue_data(Stream0, State0, [], IsFin0, DataOrFileOrTrailers, in) of +		{ok, Stream, State, []} -> +			{ok, stream_store(Stream, State)}; +		{ok, Stream=#stream{local=IsFin}, State, SendData} -> +			{send, [{StreamID, IsFin, lists:reverse(SendData)}], stream_store(Stream, State)} +	end. + +%% Internal data sending/queuing functions. + +%% @todo Should we ever want to implement the PRIORITY mechanism, +%% this would be the place to do it. Right now, we just go over +%% all streams and send what we can until either everything is +%% sent or we run out of space in the window. +send_data(State0=#http2_machine{streams=Streams0}) -> +	case send_data_for_all_streams(Streams0, State0, [], []) of +		{ok, Streams, State, []} -> +			{ok, State#http2_machine{streams=Streams}}; +		{ok, Streams, State, Send} -> +			{send, Send, State#http2_machine{streams=Streams}} +	end. + +send_data_for_all_streams([], State, Acc, Send) -> +	{ok, lists:reverse(Acc), State, Send}; +%% While technically we should never get < 0 here, let's be on the safe side. +send_data_for_all_streams(Tail, State=#http2_machine{local_window=ConnWindow}, Acc, Send) +		when ConnWindow =< 0 -> +	{ok, lists:reverse(Acc, Tail), State, Send}; +%% We rely on send_data_for_one_stream/3 to do all the necessary checks about the stream. +send_data_for_all_streams([Stream0|Tail], State0, Acc, Send) -> +	case send_data_for_one_stream(Stream0, State0, []) of +		{ok, Stream, State, []} -> +			send_data_for_all_streams(Tail, State, [Stream|Acc], Send); +		%% We need to remove the stream here because we do not use stream_store/2. +		{ok, #stream{id=StreamID, local=fin, remote=fin}, State, SendData} -> +			send_data_for_all_streams(Tail, State, Acc, +				[{StreamID, fin, SendData}|Send]); +		{ok, Stream=#stream{id=StreamID, local=IsFin}, State, SendData} -> +			send_data_for_all_streams(Tail, State, [Stream|Acc], +				[{StreamID, IsFin, SendData}|Send]) +	end. + +send_data(Stream0, State0) -> +	case send_data_for_one_stream(Stream0, State0, []) of +		{ok, Stream, State, []} -> +			{ok, stream_store(Stream, State)}; +		{ok, Stream=#stream{id=StreamID, local=IsFin}, State, SendData} -> +			{send, [{StreamID, IsFin, SendData}], stream_store(Stream, State)} +	end. + +send_data_for_one_stream(Stream=#stream{local=nofin, local_buffer_size=0, +		local_trailers=Trailers}, State, SendAcc) when Trailers =/= undefined -> +	{ok, Stream, State, lists:reverse([{trailers, Trailers}|SendAcc])}; +send_data_for_one_stream(Stream=#stream{local=IsFin, local_window=StreamWindow, +		local_buffer_size=BufferSize}, State=#http2_machine{local_window=ConnWindow}, SendAcc) +		when ConnWindow =< 0; IsFin =:= fin; StreamWindow =< 0; BufferSize =:= 0 -> +	{ok, Stream, State, lists:reverse(SendAcc)}; +send_data_for_one_stream(Stream0=#stream{local_buffer=Q0, local_buffer_size=BufferSize}, +		State0, SendAcc0) -> +	%% We know there is an item in the queue. +	{{value, {IsFin, DataSize, Data}}, Q} = queue:out(Q0), +	Stream1 = Stream0#stream{local_buffer=Q, local_buffer_size=BufferSize - DataSize}, +	{ok, Stream, State, SendAcc} +		= send_or_queue_data(Stream1, State0, SendAcc0, IsFin, Data, in_r), +	send_data_for_one_stream(Stream, State, SendAcc). + +%% We can send trailers immediately if the queue is empty, otherwise we queue. +%% We always send trailer frames even if the window is empty. +send_or_queue_data(Stream=#stream{local_buffer_size=0}, +		State, SendAcc, fin, {trailers, Trailers}, _) -> +	{ok, Stream, State, [{trailers, Trailers}|SendAcc]}; +send_or_queue_data(Stream, State, SendAcc, fin, {trailers, Trailers}, _) -> +	{ok, Stream#stream{local_trailers=Trailers}, State, SendAcc}; +%% Send data immediately if we can, buffer otherwise. +send_or_queue_data(Stream=#stream{local_window=StreamWindow}, +		State=#http2_machine{local_window=ConnWindow}, +		SendAcc, IsFin, Data, In) +		when ConnWindow =< 0; StreamWindow =< 0 -> +	{ok, queue_data(Stream, IsFin, Data, In), State, SendAcc}; +send_or_queue_data(Stream=#stream{local_window=StreamWindow}, +		State=#http2_machine{opts=Opts, remote_settings=RemoteSettings, +		local_window=ConnWindow}, SendAcc, IsFin, Data, In) -> +	RemoteMaxFrameSize = maps:get(max_frame_size, RemoteSettings, 16384), +	ConfiguredMaxFrameSize = maps:get(max_frame_size_sent, Opts, infinity), +	MaxSendSize = min( +		min(ConnWindow, StreamWindow), +		min(RemoteMaxFrameSize, ConfiguredMaxFrameSize) +	), +	case Data of +		File = #sendfile{bytes=Bytes} when Bytes =< MaxSendSize -> +			{ok, Stream#stream{local=IsFin, local_window=StreamWindow - Bytes}, +				State#http2_machine{local_window=ConnWindow - Bytes}, +				[File|SendAcc]}; +		File = #sendfile{offset=Offset, bytes=Bytes} -> +			send_or_queue_data(Stream#stream{local_window=StreamWindow - MaxSendSize}, +				State#http2_machine{local_window=ConnWindow - MaxSendSize}, +				[File#sendfile{bytes=MaxSendSize}|SendAcc], IsFin, +				File#sendfile{offset=Offset + MaxSendSize, bytes=Bytes - MaxSendSize}, In); +		{data, Iolist0} -> +			IolistSize = iolist_size(Iolist0), +			if +				IolistSize =< MaxSendSize -> +					{ok, Stream#stream{local=IsFin, local_window=StreamWindow - IolistSize}, +						State#http2_machine{local_window=ConnWindow - IolistSize}, +						[{data, Iolist0}|SendAcc]}; +				true -> +					{Iolist, More} = cow_iolists:split(MaxSendSize, Iolist0), +					send_or_queue_data(Stream#stream{local_window=StreamWindow - MaxSendSize}, +						State#http2_machine{local_window=ConnWindow - MaxSendSize}, +						[{data, Iolist}|SendAcc], IsFin, {data, More}, In) +			end +	end. + +queue_data(Stream=#stream{local_buffer=Q0, local_buffer_size=Size0}, IsFin, Data, In) -> +	DataSize = case Data of +		{sendfile, _, Bytes, _} -> Bytes; +		{data, Iolist} -> iolist_size(Iolist) +	end, +	Q = queue:In({IsFin, DataSize, Data}, Q0), +	Stream#stream{local_buffer=Q, local_buffer_size=Size0 + DataSize}. + +%% Public interface to update the flow control window. + +-spec update_window(0..16#7fffffff, State) +	-> State when State::http2_machine(). +update_window(Size, State=#http2_machine{remote_window=RemoteWindow}) -> +	State#http2_machine{remote_window=RemoteWindow + Size}. + +-spec update_window(cow_http2:streamid(), 0..16#7fffffff, State) +	-> State when State::http2_machine(). +update_window(StreamID, Size, State) -> +	Stream = #stream{remote_window=RemoteWindow} = stream_get(StreamID, State), +	stream_store(Stream#stream{remote_window=RemoteWindow + Size}, State). + +%% Public interface to reset streams. + +-spec reset_stream(cow_http2:streamid(), State) +	-> {ok, State} | {error, not_found} when State::http2_machine(). +reset_stream(StreamID, State=#http2_machine{streams=Streams0}) -> +	case lists:keytake(StreamID, #stream.id, Streams0) of +		{value, _, Streams} -> +			{ok, stream_linger(StreamID, State#http2_machine{streams=Streams})}; +		false -> +			{error, not_found} +	end. + +%% Retrieve a setting value, or its default value if not set. + +-spec get_local_setting(atom(), http2_machine()) -> atom() | integer(). +get_local_setting(Key, #http2_machine{local_settings=Settings}) -> +	maps:get(Key, Settings, default_setting_value(Key)). + +default_setting_value(header_table_size) -> 4096; +default_setting_value(enable_push) -> true; +default_setting_value(max_concurrent_streams) -> infinity; +default_setting_value(initial_window_size) -> 65535; +default_setting_value(max_frame_size) -> 16384; +default_setting_value(max_header_list_size) -> infinity; +default_setting_value(enable_connect_protocol) -> false. + +%% Function to obtain the last known streamid received +%% for the purposes of sending a GOAWAY frame and closing the connection. + +-spec get_last_streamid(http2_machine()) -> cow_http2:streamid(). +get_last_streamid(#http2_machine{remote_streamid=RemoteStreamID}) -> +	RemoteStreamID. + +%% Retrieve the local state for a stream, including the state in the queue. + +-spec get_stream_local_state(cow_http2:streamid(), http2_machine()) +	-> {ok, idle | cow_http2:fin(), empty | nofin | fin} | {error, not_found | closed}. +get_stream_local_state(StreamID, State=#http2_machine{mode=Mode, +		local_streamid=LocalStreamID, remote_streamid=RemoteStreamID}) -> +	case stream_get(StreamID, State) of +		#stream{local=IsFin, local_buffer=Q, local_trailers=undefined} -> +			IsQueueFin = case queue:peek_r(Q) of +				empty -> empty; +				{value, {IsQueueFin0, _, _}} -> IsQueueFin0 +			end, +			{ok, IsFin, IsQueueFin}; +		%% Trailers are queued so the local state is fin after the queue is drained. +		#stream{local=IsFin} -> +			{ok, IsFin, fin}; +		undefined when (?IS_LOCAL(Mode, StreamID) andalso (StreamID < LocalStreamID)) +				orelse ((not ?IS_LOCAL(Mode, StreamID)) andalso (StreamID =< RemoteStreamID)) -> +			{error, closed}; +		undefined -> +			{error, not_found} +	end. + +%% Stream-related functions. + +stream_get(StreamID, #http2_machine{streams=Streams}) -> +	case lists:keyfind(StreamID, #stream.id, Streams) of +		false -> undefined; +		Stream -> Stream +	end. + +stream_store(#stream{id=StreamID, local=fin, remote=fin}, +		State=#http2_machine{streams=Streams0}) -> +	Streams = lists:keydelete(StreamID, #stream.id, Streams0), +	State#http2_machine{streams=Streams}; +stream_store(Stream=#stream{id=StreamID}, +		State=#http2_machine{streams=Streams0}) -> +	Streams = lists:keystore(StreamID, #stream.id, Streams0, Stream), +	State#http2_machine{streams=Streams}. + +%% @todo Don't send an RST_STREAM if one was already sent. +stream_reset(StreamID, State, Reason, HumanReadable) -> +	{error, {stream_error, StreamID, Reason, HumanReadable}, +		stream_linger(StreamID, State)}. + +stream_linger(StreamID, State=#http2_machine{lingering_streams=Lingering0}) -> +	%% We only keep up to 100 streams in this state. @todo Make it configurable? +	Lingering = [StreamID|lists:sublist(Lingering0, 100 - 1)], +	State#http2_machine{lingering_streams=Lingering}. | 
