%% Copyright (c) 2016, Loïc Hoguin <[email protected]>
%%
%% Permission to use, copy, modify, and/or distribute this software for any
%% purpose with or without fee is hereby granted, provided that the above
%% copyright notice and this permission notice appear in all copies.
%%
%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
-module(gun_http2).
-export([check_options/1]).
-export([name/0]).
-export([init/4]).
-export([handle/2]).
-export([close/1]).
-export([keepalive/1]).
-export([request/8]).
-export([request/9]).
-export([data/5]).
-export([cancel/3]).
-export([down/1]).
-record(stream, {
id :: non_neg_integer(),
ref :: reference(),
reply_to :: pid(),
%% Whether we finished sending data.
local = nofin :: cowboy_stream:fin(),
%% Whether we finished receiving data.
remote = nofin :: cowboy_stream:fin(),
%% Content handlers state.
handler_state :: undefined | gun_content_handler:state()
}).
-record(http2_state, {
owner :: pid(),
socket :: inet:socket() | ssl:sslsocket(),
transport :: module(),
content_handlers :: gun_content_handler:opt(),
buffer = <<>> :: binary(),
%% @todo local_settings, next_settings, remote_settings
streams = [] :: [#stream{}],
stream_id = 1 :: non_neg_integer(),
%% HPACK decoding and encoding state.
decode_state = cow_hpack:init() :: cow_hpack:state(),
encode_state = cow_hpack:init() :: cow_hpack:state()
}).
check_options(Opts) ->
do_check_options(maps:to_list(Opts)).
do_check_options([]) ->
ok;
do_check_options([{keepalive, K}|Opts]) when is_integer(K), K > 0 ->
do_check_options(Opts);
do_check_options([Opt={content_handlers, Handlers}|Opts]) ->
case gun_content_handler:check_option(Handlers) of
ok -> do_check_options(Opts);
error -> {error, {options, {http, Opt}}}
end;
do_check_options([Opt|_]) ->
{error, {options, {http2, Opt}}}.
name() -> http2.
init(Owner, Socket, Transport, Opts) ->
%% Send the HTTP/2 preface.
Transport:send(Socket, [
<< "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n">>,
cow_http2:settings(#{}) %% @todo Settings.
]),
Handlers = maps:get(content_handlers, Opts, [gun_data]),
#http2_state{owner=Owner, socket=Socket, transport=Transport,
content_handlers=Handlers}.
handle(Data, State=#http2_state{buffer=Buffer}) ->
parse(<< Buffer/binary, Data/binary >>, State#http2_state{buffer= <<>>}).
parse(Data0, State=#http2_state{buffer=Buffer}) ->
%% @todo Parse states: Preface. Continuation.
Data = << Buffer/binary, Data0/binary >>,
case cow_http2:parse(Data) of
{ok, Frame, Rest} ->
parse(Rest, frame(Frame, State));
{stream_error, StreamID, Reason, Human, Rest} ->
parse(Rest, stream_reset(State, StreamID, {stream_error, Reason, Human}));
Error = {connection_error, _, _} ->
terminate(State, Error);
more ->
State#http2_state{buffer=Data}
end.
%% DATA frame.
frame({data, StreamID, IsFin, Data}, State) ->
case get_stream_by_id(StreamID, State) of
Stream = #stream{remote=nofin, handler_state=Handlers0} ->
Handlers = gun_content_handler:handle(IsFin, Data, Handlers0),
remote_fin(Stream#stream{handler_state=Handlers}, State, IsFin);
_ ->
%% @todo protocol_error if not existing
stream_reset(State, StreamID, {stream_error, stream_closed,
'DATA frame received for a closed or non-existent stream. (RFC7540 6.1)'})
end;
%% Single HEADERS frame headers block.
frame({headers, StreamID, IsFin, head_fin, HeaderBlock},
State=#http2_state{decode_state=DecodeState0, content_handlers=Handlers0}) ->
case get_stream_by_id(StreamID, State) of
Stream = #stream{ref=StreamRef, reply_to=ReplyTo, remote=nofin} ->
try cow_hpack:decode(HeaderBlock, DecodeState0) of
{Headers0, DecodeState} ->
case lists:keytake(<<":status">>, 1, Headers0) of
{value, {_, Status}, Headers} ->
ReplyTo ! {gun_response, self(), StreamRef, IsFin, parse_status(Status), Headers},
Handlers = case IsFin of
fin -> undefined;
nofin ->
gun_content_handler:init(ReplyTo, StreamRef,
Status, Headers, Handlers0)
end,
remote_fin(Stream#stream{handler_state=Handlers},
State#http2_state{decode_state=DecodeState}, IsFin);
false ->
stream_reset(State, StreamID, {stream_error, protocol_error,
'Malformed response; missing :status in HEADERS frame. (RFC7540 8.1.2.4)'})
end
catch _:_ ->
terminate(State, StreamID, {connection_error, compression_error,
'Error while trying to decode HPACK-encoded header block. (RFC7540 4.3)'})
end;
_ ->
stream_reset(State, StreamID, {stream_error, stream_closed,
'DATA frame received for a closed or non-existent stream. (RFC7540 6.1)'})
end;
%% @todo HEADERS frame starting a headers block. Enter continuation mode.
%frame(State, {headers, StreamID, IsFin, head_nofin, HeaderBlockFragment}) ->
% State#http2_state{parse_state={continuation, StreamID, IsFin, HeaderBlockFragment}};
%% @todo Single HEADERS frame headers block with priority.
%frame(State, {headers, StreamID, IsFin, head_fin,
% _IsExclusive, _DepStreamID, _Weight, HeaderBlock}) ->
% %% @todo Handle priority.
% stream_init(State, StreamID, IsFin, HeaderBlock);
%% @todo HEADERS frame starting a headers block. Enter continuation mode.
%frame(State, {headers, StreamID, IsFin, head_nofin,
% _IsExclusive, _DepStreamID, _Weight, HeaderBlockFragment}) ->
% %% @todo Handle priority.
% State#http2_state{parse_state={continuation, StreamID, IsFin, HeaderBlockFragment}};
%% @todo PRIORITY frame.
%frame(State, {priority, _StreamID, _IsExclusive, _DepStreamID, _Weight}) ->
% %% @todo Validate StreamID?
% %% @todo Handle priority.
% State;
%% @todo RST_STREAM frame.
frame({rst_stream, StreamID, Reason}, State) ->
stream_reset(State, StreamID, {stream_error, Reason, 'Stream reset by server.'});
%% SETTINGS frame.
frame({settings, _Settings}, State=#http2_state{socket=Socket, transport=Transport}) ->
%% @todo Apply SETTINGS.
Transport:send(Socket, cow_http2:settings_ack()),
State;
%% Ack for a previously sent SETTINGS frame.
frame(settings_ack, State) -> %% @todo =#http2_state{next_settings=_NextSettings}) ->
%% @todo Apply SETTINGS that require synchronization.
State;
%% PUSH_PROMISE frame.
%% @todo Continuation.
frame({push_promise, StreamID, head_fin, PromisedStreamID, HeaderBlock},
State=#http2_state{decode_state=DecodeState0}) ->
case get_stream_by_id(PromisedStreamID, State) of
false ->
case get_stream_by_id(StreamID, State) of
#stream{ref=StreamRef, reply_to=ReplyTo} ->
try cow_hpack:decode(HeaderBlock, DecodeState0) of
{Headers0, DecodeState} ->
{Method, Scheme, Authority, Path, Headers} = try
{value, {_, Method0}, Headers1} = lists:keytake(<<":method">>, 1, Headers0),
{value, {_, Scheme0}, Headers2} = lists:keytake(<<":scheme">>, 1, Headers1),
{value, {_, Authority0}, Headers3} = lists:keytake(<<":authority">>, 1, Headers2),
{value, {_, Path0}, Headers4} = lists:keytake(<<":path">>, 1, Headers3),
{Method0, Scheme0, Authority0, Path0, Headers4}
catch error:badmatch ->
stream_reset(State, StreamID, {stream_error, protocol_error,
'Malformed push promise; missing pseudo-header field. (RFC7540 8.1.2.3)'})
end,
NewStreamRef = make_ref(),
ReplyTo ! {gun_push, self(), StreamRef, NewStreamRef, Method,
iolist_to_binary([Scheme, <<"://">>, Authority, Path]), Headers},
new_stream(PromisedStreamID, NewStreamRef, ReplyTo, nofin, fin,
State#http2_state{decode_state=DecodeState})
catch _:_ ->
terminate(State, {connection_error, compression_error,
'Error while trying to decode HPACK-encoded header block. (RFC7540 4.3)'})
end;
_ ->
stream_reset(State, StreamID, {stream_error, stream_closed,
'DATA frame received for a closed or non-existent stream. (RFC7540 6.1)'})
end;
_ ->
stream_reset(State, StreamID, {stream_error, todo, ''})
end;
%% PING frame.
frame({ping, Opaque}, State=#http2_state{socket=Socket, transport=Transport}) ->
Transport:send(Socket, cow_http2:ping_ack(Opaque)),
State;
%% Ack for a previously sent PING frame.
%%
%% @todo Might want to check contents but probably a waste of time.
frame({ping_ack, _Opaque}, State) ->
State;
%% GOAWAY frame.
frame(Frame={goaway, StreamID, _, _}, State) ->
terminate(State, StreamID, {stop, Frame, 'Client is going away.'});
%% Connection-wide WINDOW_UPDATE frame.
frame({window_update, _Increment}, State) ->
%% @todo control flow
State;
%% Stream-specific WINDOW_UPDATE frame.
frame({window_update, _StreamID, _Increment}, State) ->
%% @todo stream-specific control flow
State;
%% Unexpected CONTINUATION frame.
frame({continuation, StreamID, _, _}, State) ->
terminate(State, StreamID, {connection_error, protocol_error,
'CONTINUATION frames MUST be preceded by a HEADERS frame. (RFC7540 6.10)'}).
parse_status(Status) ->
<< Code:3/binary, _/bits >> = Status,
list_to_integer(binary_to_list(Code)).
close(#http2_state{streams=Streams}) ->
close_streams(Streams).
close_streams([]) ->
ok;
close_streams([#stream{ref=StreamRef, reply_to=ReplyTo}|Tail]) ->
ReplyTo ! {gun_error, self(), StreamRef, {closed,
"The connection was lost."}},
close_streams(Tail).
keepalive(State=#http2_state{socket=Socket, transport=Transport}) ->
Transport:send(Socket, cow_http2:ping(0)),
State.
request(State=#http2_state{socket=Socket, transport=Transport, encode_state=EncodeState0,
stream_id=StreamID}, StreamRef, ReplyTo, Method, Host, Port, Path, Headers) ->
{HeaderBlock, EncodeState} = prepare_headers(EncodeState0, Transport, Method, Host, Port, Path, Headers),
IsFin = case (false =/= lists:keyfind(<<"content-type">>, 1, Headers))
orelse (false =/= lists:keyfind(<<"content-length">>, 1, Headers)) of
true -> nofin;
false -> fin
end,
Transport:send(Socket, cow_http2:headers(StreamID, IsFin, HeaderBlock)),
new_stream(StreamID, StreamRef, ReplyTo, nofin, IsFin,
State#http2_state{stream_id=StreamID + 2, encode_state=EncodeState}).
%% @todo Handle Body > 16MB. (split it out into many frames)
request(State=#http2_state{socket=Socket, transport=Transport, encode_state=EncodeState0,
stream_id=StreamID}, StreamRef, ReplyTo, Method, Host, Port, Path, Headers0, Body) ->
Headers = lists:keystore(<<"content-length">>, 1, Headers0,
{<<"content-length">>, integer_to_binary(iolist_size(Body))}),
{HeaderBlock, EncodeState} = prepare_headers(EncodeState0, Transport, Method, Host, Port, Path, Headers),
Transport:send(Socket, cow_http2:headers(StreamID, nofin, HeaderBlock)),
%% @todo 16384 is the default SETTINGS_MAX_FRAME_SIZE.
%% Use the length set by the server instead, if any.
%% @todo Would be better if we didn't have to convert to binary.
send_data(Socket, Transport, StreamID, fin, iolist_to_binary(Body), 16384),
new_stream(StreamID, StreamRef, ReplyTo, nofin, fin,
State#http2_state{stream_id=StreamID + 2, encode_state=EncodeState}).
prepare_headers(EncodeState, Transport, Method, Host0, Port, Path, Headers0) ->
Authority = case lists:keyfind(<<"host">>, 1, Headers0) of
{_, Host} -> Host;
_ -> [Host0, $:, integer_to_binary(Port)]
end,
%% @todo We also must remove any header found in the connection header.
Headers1 =
lists:keydelete(<<"host">>, 1,
lists:keydelete(<<"connection">>, 1,
lists:keydelete(<<"keep-alive">>, 1,
lists:keydelete(<<"proxy-connection">>, 1,
lists:keydelete(<<"transfer-encoding">>, 1,
lists:keydelete(<<"upgrade">>, 1, Headers0)))))),
Headers = [
{<<":method">>, Method},
{<<":scheme">>, case Transport:secure() of
true -> <<"https">>;
false -> <<"http">>
end},
{<<":authority">>, Authority},
{<<":path">>, Path}
|Headers1],
cow_hpack:encode(Headers, EncodeState).
data(State=#http2_state{socket=Socket, transport=Transport},
StreamRef, ReplyTo, IsFin, Data) ->
case get_stream_by_ref(StreamRef, State) of
#stream{local=fin} ->
error_stream_closed(State, StreamRef, ReplyTo);
S = #stream{} ->
%% @todo 16384 is the default SETTINGS_MAX_FRAME_SIZE.
%% Use the length set by the server instead, if any.
%% @todo Would be better if we didn't have to convert to binary.
send_data(Socket, Transport, S#stream.id, IsFin, iolist_to_binary(Data), 16384),
local_fin(S, State, IsFin);
false ->
error_stream_not_found(State, StreamRef, ReplyTo)
end.
%% This same function is found in cowboy_http2.
send_data(Socket, Transport, StreamID, IsFin, Data, Length) ->
if
Length < byte_size(Data) ->
<< Payload:Length/binary, Rest/bits >> = Data,
Transport:send(Socket, cow_http2:data(StreamID, nofin, Payload)),
send_data(Socket, Transport, StreamID, IsFin, Rest, Length);
true ->
Transport:send(Socket, cow_http2:data(StreamID, IsFin, Data))
end.
cancel(State=#http2_state{socket=Socket, transport=Transport},
StreamRef, ReplyTo) ->
case get_stream_by_ref(StreamRef, State) of
#stream{id=StreamID} ->
Transport:send(Socket, cow_http2:rst_stream(StreamID, cancel)),
delete_stream(StreamID, State);
false ->
error_stream_not_found(State, StreamRef, ReplyTo)
end.
%% @todo Add unprocessed streams when GOAWAY handling is done.
down(#http2_state{streams=Streams}) ->
KilledStreams = [Ref || #stream{ref=Ref} <- Streams],
{KilledStreams, []}.
terminate(#http2_state{streams=Streams}, Reason) ->
%% Because a particular stream is unknown,
%% we're sending the error message to all streams.
_ = [ReplyTo ! {gun_error, self(), Reason} || #stream{reply_to=ReplyTo} <- Streams],
%% @todo Send GOAWAY frame.
%% @todo LastGoodStreamID
close.
terminate(State, StreamID, Reason) ->
case get_stream_by_id(StreamID, State) of
#stream{reply_to=ReplyTo} ->
ReplyTo ! {gun_error, self(), Reason},
%% @todo Send GOAWAY frame.
%% @todo LastGoodStreamID
close;
_ ->
terminate(State, Reason)
end.
stream_reset(State=#http2_state{socket=Socket, transport=Transport,
streams=Streams0}, StreamID, StreamError={stream_error, Reason, _}) ->
Transport:send(Socket, cow_http2:rst_stream(StreamID, Reason)),
case lists:keytake(StreamID, #stream.id, Streams0) of
{value, #stream{ref=StreamRef, reply_to=ReplyTo}, Streams} ->
ReplyTo ! {gun_error, self(), StreamRef, StreamError},
State#http2_state{streams=Streams};
false ->
%% @todo Unknown stream. Not sure what to do here. Check again once all
%% terminate calls have been written.
State
end.
error_stream_closed(State, StreamRef, ReplyTo) ->
ReplyTo ! {gun_error, self(), StreamRef, {badstate,
"The stream has already been closed."}},
State.
error_stream_not_found(State, StreamRef, ReplyTo) ->
ReplyTo ! {gun_error, self(), StreamRef, {badstate,
"The stream cannot be found."}},
State.
%% Streams.
%% @todo probably change order of args and have state first?
new_stream(StreamID, StreamRef, ReplyTo, Remote, Local,
State=#http2_state{streams=Streams}) ->
New = #stream{id=StreamID, ref=StreamRef, reply_to=ReplyTo, remote=Remote, local=Local},
State#http2_state{streams=[New|Streams]}.
get_stream_by_id(StreamID, #http2_state{streams=Streams}) ->
lists:keyfind(StreamID, #stream.id, Streams).
get_stream_by_ref(StreamRef, #http2_state{streams=Streams}) ->
lists:keyfind(StreamRef, #stream.ref, Streams).
delete_stream(StreamID, State=#http2_state{streams=Streams}) ->
Streams2 = lists:keydelete(StreamID, #stream.id, Streams),
State#http2_state{streams=Streams2}.
remote_fin(S=#stream{local=fin}, State, fin) ->
delete_stream(S#stream.id, State);
%% We always replace the stream in the state because
%% the content handler state has changed.
remote_fin(S, State=#http2_state{streams=Streams}, IsFin) ->
Streams2 = lists:keyreplace(S#stream.id, #stream.id, Streams,
S#stream{remote=IsFin}),
State#http2_state{streams=Streams2}.
local_fin(_, State, nofin) ->
State;
local_fin(S=#stream{remote=fin}, State, fin) ->
delete_stream(S#stream.id, State);
local_fin(S, State=#http2_state{streams=Streams}, IsFin) ->
Streams2 = lists:keyreplace(S#stream.id, #stream.id, Streams,
S#stream{local=IsFin}),
State#http2_state{streams=Streams2}.