From 23fc8e2a89058d2f96ff64c0bc0dff637662995c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Hoguin?= Date: Fri, 27 May 2016 12:45:55 +0200 Subject: Initial HTTP/2 support A number of things are not implemented, but this is enough to connect to Twitter and get a response sent back. It also currently doesn't have tests. --- Makefile | 2 +- src/gun.erl | 38 ++++-- src/gun_http2.erl | 337 +++++++++++++++++++++++++++++++++++++++++++++++++ test/twitter_SUITE.erl | 17 ++- 4 files changed, 383 insertions(+), 11 deletions(-) create mode 100644 src/gun_http2.erl diff --git a/Makefile b/Makefile index 280a8b0..208e298 100644 --- a/Makefile +++ b/Makefile @@ -11,7 +11,7 @@ CI_OTP = OTP-18.0.2 # Dependencies. DEPS = cowlib ranch -dep_cowlib = git https://github.com/ninenines/cowlib 1.3.0 +dep_cowlib = git https://github.com/ninenines/cowlib master dep_ranch = git https://github.com/ninenines/ranch 1.1.0 TEST_DEPS = ct_helper diff --git a/src/gun.erl b/src/gun.erl index 2c9b586..82f658d 100644 --- a/src/gun.erl +++ b/src/gun.erl @@ -87,6 +87,9 @@ -type http_opts() :: map(). -export_type([http_opts/0]). +-type http2_opts() :: map(). +-export_type([http2_opts/0]). + -type spdy_opts() :: map(). -export_type([spdy_opts/0]). @@ -138,11 +141,18 @@ check_options([{http_opts, ProtoOpts}|Opts]) when is_map(ProtoOpts) -> Error -> Error end; +check_options([{http2_opts, ProtoOpts}|Opts]) when is_map(ProtoOpts) -> + case gun_http2:check_options(ProtoOpts) of + ok -> + check_options(Opts); + Error -> + Error + end; check_options([Opt = {protocols, L}|Opts]) when is_list(L) -> Len = length(L), case length(lists:usort(L)) of Len when Len > 0 -> - Check = lists:usort([(P =:= http) orelse (P =:= spdy) || P <- L]), + Check = lists:usort([(P =:= http) orelse (P =:= http2) orelse (P =:= spdy) || P <- L]), case Check of [true] -> check_options(Opts); @@ -184,6 +194,7 @@ consider_tracing(ServerPid, #{trace := true}) -> dbg:tracer(), dbg:tpl(gun, [{'_', [], [{return_trace}]}]), dbg:tpl(gun_http, [{'_', [], [{return_trace}]}]), + dbg:tpl(gun_http2, [{'_', [], [{return_trace}]}]), dbg:tpl(gun_spdy, [{'_', [], [{return_trace}]}]), dbg:tpl(gun_ws, [{'_', [], [{return_trace}]}]), dbg:p(ServerPid, all); @@ -344,14 +355,14 @@ await_body(ServerPid, StreamRef, Timeout, MRef, Acc) -> {error, timeout} end. --spec await_up(pid()) -> {ok, http | spdy} | {error, atom()}. +-spec await_up(pid()) -> {ok, http | http2 | spdy} | {error, atom()}. await_up(ServerPid) -> MRef = monitor(process, ServerPid), Res = await_up(ServerPid, 5000, MRef), demonitor(MRef, [flush]), Res. --spec await_up(pid(), reference() | timeout()) -> {ok, http | spdy} | {error, atom()}. +-spec await_up(pid(), reference() | timeout()) -> {ok, http | http2 | spdy} | {error, atom()}. await_up(ServerPid, MRef) when is_reference(MRef) -> await_up(ServerPid, 5000, MRef); await_up(ServerPid, Timeout) -> @@ -360,7 +371,7 @@ await_up(ServerPid, Timeout) -> demonitor(MRef, [flush]), Res. --spec await_up(pid(), timeout(), reference()) -> {ok, http | spdy} | {error, atom()}. +-spec await_up(pid(), timeout(), reference()) -> {ok, http | http2 | spdy} | {error, atom()}. await_up(ServerPid, Timeout, MRef) -> receive {gun_up, ServerPid, Protocol} -> @@ -424,6 +435,9 @@ cancel(ServerPid, StreamRef) -> _ = ServerPid ! {cancel, self(), StreamRef}, ok. +%% @todo Allow upgrading an HTTP/1.1 connection to HTTP/2. +%% http2_upgrade + %% Websocket. -spec ws_upgrade(pid(), iodata()) -> reference(). @@ -476,14 +490,17 @@ default_transport(_) -> tcp. connect(State=#state{host=Host, port=Port, opts=Opts, transport=Transport=ranch_ssl}, Retries) -> Protocols = lists:flatten([case P of http -> <<"http/1.1">>; + http2 -> <<"h2">>; spdy -> [<<"spdy/3.1">>, <<"spdy/3">>] - end || P <- maps:get(protocols, Opts, [spdy, http])]), + end || P <- maps:get(protocols, Opts, [http2, spdy, http])]), TransportOpts = [binary, {active, false}, + {alpn_advertised_protocols, Protocols}, {client_preferred_next_protocols, {client, Protocols, <<"http/1.1">>}} |maps:get(transport_opts, Opts, [])], case Transport:connect(Host, Port, TransportOpts) of {ok, Socket} -> {Protocol, ProtoOptsKey} = case ssl:negotiated_protocol(Socket) of + {ok, <<"h2">>} -> {gun_http2, http2_opts}; {ok, <<"spdy/3", _/bits>>} -> {gun_spdy, spdy_opts}; _ -> {gun_http, http_opts} end, @@ -498,6 +515,7 @@ connect(State=#state{host=Host, port=Port, opts=Opts, transport=Transport}, Retr {ok, Socket} -> {Protocol, ProtoOptsKey} = case maps:get(protocols, Opts, [http]) of [http] -> {gun_http, http_opts}; + [http2] -> {gun_http2, http2_opts}; [spdy] -> {gun_spdy, spdy_opts} end, up(State, Socket, Protocol, ProtoOptsKey); @@ -546,8 +564,10 @@ retry_loop(State=#state{parent=Parent, opts=Opts}, Retries) -> end. before_loop(State=#state{opts=Opts, protocol=Protocol}) -> + %% @todo Might not be worth checking every time? ProtoOptsKey = case Protocol of gun_http -> http_opts; + gun_http2 -> http2_opts; gun_spdy -> spdy_opts end, ProtoOpts = maps:get(ProtoOptsKey, Opts, #{}), @@ -567,7 +587,7 @@ loop(State=#state{parent=Parent, owner=Owner, host=Host, port=Port, opts=Opts, down(State, normal); {upgrade, Protocol2, ProtoState2} -> ws_loop(State#state{protocol=Protocol2, protocol_state=ProtoState2}); - ProtoState2 -> + ProtoState2 -> loop(State#state{protocol_state=ProtoState2}) end; {Closed, Socket} -> @@ -602,11 +622,13 @@ loop(State=#state{parent=Parent, owner=Owner, host=Host, port=Port, opts=Opts, {cancel, Owner, StreamRef} -> ProtoState2 = Protocol:cancel(ProtoState, StreamRef), loop(State#state{protocol_state=ProtoState2}); - {ws_upgrade, Owner, StreamRef, Path, Headers} when Protocol =/= gun_spdy -> + %% @todo Maybe make an interface in the protocol module instead of checking on protocol name. + %% An interface would also make sure that HTTP/1.0 can't upgrade. + {ws_upgrade, Owner, StreamRef, Path, Headers} when Protocol =:= gun_http -> WsOpts = maps:get(ws_opts, Opts, #{}), ProtoState2 = Protocol:ws_upgrade(ProtoState, StreamRef, Host, Port, Path, Headers, WsOpts), loop(State#state{protocol_state=ProtoState2}); - {ws_upgrade, Owner, StreamRef, Path, Headers, WsOpts} when Protocol =/= gun_spdy -> + {ws_upgrade, Owner, StreamRef, Path, Headers, WsOpts} when Protocol =:= gun_http -> ProtoState2 = Protocol:ws_upgrade(ProtoState, StreamRef, Host, Port, Path, Headers, WsOpts), loop(State#state{protocol_state=ProtoState2}); %% @todo can fail if http/1.0 diff --git a/src/gun_http2.erl b/src/gun_http2.erl new file mode 100644 index 0000000..5c82756 --- /dev/null +++ b/src/gun_http2.erl @@ -0,0 +1,337 @@ +%% Copyright (c) 2016, Loïc Hoguin +%% +%% Permission to use, copy, modify, and/or distribute this software for any +%% purpose with or without fee is hereby granted, provided that the above +%% copyright notice and this permission notice appear in all copies. +%% +%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES +%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF +%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR +%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES +%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN +%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF +%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + +-module(gun_http2). + +-export([check_options/1]). +-export([name/0]). +-export([init/4]). +-export([handle/2]). +-export([close/1]). +-export([keepalive/1]). +-export([request/7]). +-export([request/8]). +-export([data/4]). +-export([cancel/2]). +-export([down/1]). + +-record(stream, { + id :: non_neg_integer(), + ref :: reference(), + %% Whether we finished sending data. + local = nofin :: cowboy_stream:fin(), + %% Whether we finished receiving data. + remote = nofin :: cowboy_stream:fin() +}). + +-record(http2_state, { + owner :: pid(), + socket :: inet:socket() | ssl:sslsocket(), + transport :: module(), + buffer = <<>> :: binary(), + + %% @todo local_settings, next_settings, remote_settings + + streams = [] :: [#stream{}], + stream_id = 1 :: non_neg_integer(), + + %% HPACK decoding and encoding state. + decode_state = cow_hpack:init() :: cow_hpack:state(), + encode_state = cow_hpack:init() :: cow_hpack:state() +}). + +check_options(Opts) -> + do_check_options(maps:to_list(Opts)). + +do_check_options([]) -> + ok; +do_check_options([{keepalive, K}|Opts]) when is_integer(K), K > 0 -> + do_check_options(Opts); +do_check_options([Opt|_]) -> + {error, {options, {http2, Opt}}}. + +name() -> http2. + +init(Owner, Socket, Transport, _Opts) -> + %% Send the HTTP/2 preface. + Transport:send(Socket, [ + << "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n">>, + cow_http2:settings(#{}) %% @todo Settings. + ]), + #http2_state{owner=Owner, socket=Socket, transport=Transport}. + +handle(Data, State=#http2_state{buffer=Buffer}) -> + parse(<< Buffer/binary, Data/binary >>, State#http2_state{buffer= <<>>}). + +parse(Data0, State=#http2_state{buffer=Buffer}) -> + %% @todo Parse states: Preface. Continuation. + Data = << Buffer/binary, Data0/binary >>, + case cow_http2:parse(Data) of + {ok, Frame, Rest} -> + parse(Rest, frame(Frame, State)); + {stream_error, StreamID, Reason, Human, Rest} -> + parse(Rest, stream_reset(State, StreamID, {stream_error, Reason, Human})); + Error = {connection_error, _, _} -> + terminate(State, Error); + more -> + State#http2_state{buffer=Data} + end. + +%% DATA frame. +frame({data, StreamID, IsFin, Data}, State=#http2_state{owner=Owner}) -> + case get_stream_by_id(StreamID, State) of + Stream = #stream{ref=StreamRef, remote=nofin} -> + Owner ! {gun_data, self(), StreamRef, IsFin, Data}, + remote_fin(Stream, State, IsFin); + _ -> + %% @todo protocol_error if not existing + stream_reset(State, StreamID, {stream_error, stream_closed, + 'DATA frame received for a closed or non-existent stream. (RFC7540 6.1)'}) + end; +%% Single HEADERS frame headers block. +frame({headers, StreamID, IsFin, head_fin, HeaderBlock}, + State=#http2_state{owner=Owner, decode_state=DecodeState0}) -> + case get_stream_by_id(StreamID, State) of + Stream = #stream{ref=StreamRef, remote=nofin} -> + try cow_hpack:decode(HeaderBlock, DecodeState0) of + {Headers0, DecodeState} -> + case lists:keytake(<<":status">>, 1, Headers0) of + {value, {_, Status}, Headers} -> + Owner ! {gun_response, self(), StreamRef, IsFin, parse_status(Status), Headers}, + remote_fin(Stream, State#http2_state{decode_state=DecodeState}, IsFin); + false -> + stream_reset(State, StreamID, {stream_error, protocol_error, + 'Malformed response; missing :status in HEADERS frame. (RFC7540 8.1.2.4)'}) + end + catch _:_ -> + terminate(State, {connection_error, compression_error, + 'Error while trying to decode HPACK-encoded header block. (RFC7540 4.3)'}) + end; + _ -> + stream_reset(State, StreamID, {stream_error, stream_closed, + 'DATA frame received for a closed or non-existent stream. (RFC7540 6.1)'}) + end; +%% @todo HEADERS frame starting a headers block. Enter continuation mode. +%frame(State, {headers, StreamID, IsFin, head_nofin, HeaderBlockFragment}) -> +% State#http2_state{parse_state={continuation, StreamID, IsFin, HeaderBlockFragment}}; +%% @todo Single HEADERS frame headers block with priority. +%frame(State, {headers, StreamID, IsFin, head_fin, +% _IsExclusive, _DepStreamID, _Weight, HeaderBlock}) -> +% %% @todo Handle priority. +% stream_init(State, StreamID, IsFin, HeaderBlock); +%% @todo HEADERS frame starting a headers block. Enter continuation mode. +%frame(State, {headers, StreamID, IsFin, head_nofin, +% _IsExclusive, _DepStreamID, _Weight, HeaderBlockFragment}) -> +% %% @todo Handle priority. +% State#http2_state{parse_state={continuation, StreamID, IsFin, HeaderBlockFragment}}; +%% @todo PRIORITY frame. +%frame(State, {priority, _StreamID, _IsExclusive, _DepStreamID, _Weight}) -> +% %% @todo Validate StreamID? +% %% @todo Handle priority. +% State; +%% @todo RST_STREAM frame. +%frame(State, {rst_stream, StreamID, Reason}) -> +% stream_reset(State, StreamID, {stream_error, Reason, 'Stream reset requested by client.'}); +%% SETTINGS frame. +frame({settings, _Settings}, State=#http2_state{socket=Socket, transport=Transport}) -> + %% @todo Apply SETTINGS. + Transport:send(Socket, cow_http2:settings_ack()), + State; +%% Ack for a previously sent SETTINGS frame. +frame(settings_ack, State) -> %% @todo =#http2_state{next_settings=_NextSettings}) -> + %% @todo Apply SETTINGS that require synchronization. + State; +%% PUSH_PROMISE frame. +%% @todo Continuation. +%frame({push_promise, StreamID, head_fin, PromisedStreamID, HeaderBlock}, State) -> +% %% @todo +% State; +%% PING frame. +frame({ping, Opaque}, State=#http2_state{socket=Socket, transport=Transport}) -> + Transport:send(Socket, cow_http2:ping_ack(Opaque)), + State; +%% Ack for a previously sent PING frame. +%% +%% @todo Might want to check contents but probably a waste of time. +frame({ping_ack, _Opaque}, State) -> + State; +%% GOAWAY frame. +frame(Frame={goaway, _, _, _}, State) -> + terminate(State, {stop, Frame, 'Client is going away.'}); +%% Connection-wide WINDOW_UPDATE frame. +frame({window_update, _Increment}, State) -> + %% @todo control flow + State; +%% Stream-specific WINDOW_UPDATE frame. +frame({window_update, _StreamID, _Increment}, State) -> + %% @todo stream-specific control flow + State; +%% Unexpected CONTINUATION frame. +frame({continuation, _, _, _}, State) -> + terminate(State, {connection_error, protocol_error, + 'CONTINUATION frames MUST be preceded by a HEADERS frame. (RFC7540 6.10)'}). + +parse_status(Status) -> + << Code:3/binary, _/bits >> = Status, + list_to_integer(binary_to_list(Code)). + +close(#http2_state{owner=Owner, streams=Streams}) -> + close_streams(Owner, Streams). + +close_streams(_, []) -> + ok; +close_streams(Owner, [#stream{ref=StreamRef}|Tail]) -> + Owner ! {gun_error, self(), StreamRef, {closed, + "The connection was lost."}}, + close_streams(Owner, Tail). + +keepalive(State=#http2_state{socket=Socket, transport=Transport}) -> + Transport:send(Socket, cow_http2:ping(<< 0:64 >>)), + State. + +%% @todo Shouldn't always be HTTPS scheme. We need to properly keep track of it. +request(State=#http2_state{socket=Socket, transport=Transport, encode_state=EncodeState0, + stream_id=StreamID}, StreamRef, Method, Host, Port, Path, Headers) -> + {HeaderBlock, EncodeState} = prepare_headers(EncodeState0, Method, Host, Port, Path, Headers), + IsFin = case (false =/= lists:keyfind(<<"content-type">>, 1, Headers)) + orelse (false =/= lists:keyfind(<<"content-length">>, 1, Headers)) of + true -> nofin; + false -> fin + end, + Transport:send(Socket, cow_http2:headers(StreamID, IsFin, HeaderBlock)), + new_stream(StreamID, StreamRef, nofin, IsFin, + State#http2_state{stream_id=StreamID + 2, encode_state=EncodeState}). + +%% @todo Handle Body > 16MB. (split it out into many frames) +%% @todo Shouldn't always be HTTPS scheme. We need to properly keep track of it. +request(State=#http2_state{socket=Socket, transport=Transport, encode_state=EncodeState0, + stream_id=StreamID}, StreamRef, Method, Host, Port, Path, Headers0, Body) -> + Headers = lists:keystore(<<"content-length">>, 1, Headers0, + {<<"content-length">>, integer_to_binary(iolist_size(Body))}), + {HeaderBlock, EncodeState} = prepare_headers(EncodeState0, Method, Host, Port, Path, Headers), + Transport:send(Socket, [ + cow_http2:headers(StreamID, nofin, HeaderBlock), + cow_http2:data(StreamID, fin, Body) + ]), + new_stream(StreamID, StreamRef, nofin, fin, + State#http2_state{stream_id=StreamID + 2, encode_state=EncodeState}). + +prepare_headers(EncodeState, Method, Host, Port, Path, Headers0) -> + %% @todo We also must remove any header found in the connection header. + Headers1 = + lists:keydelete(<<"host">>, 1, + lists:keydelete(<<"connection">>, 1, + lists:keydelete(<<"keep-alive">>, 1, + lists:keydelete(<<"proxy-connection">>, 1, + lists:keydelete(<<"transfer-encoding">>, 1, + lists:keydelete(<<"upgrade">>, 1, Headers0)))))), + Headers = [ + {<<":method">>, Method}, + {<<":scheme">>, <<"https">>}, + {<<":authority">>, [Host, $:, integer_to_binary(Port)]}, + {<<":path">>, Path} + |Headers1], + cow_hpack:encode(Headers, EncodeState). + +data(State=#http2_state{socket=Socket, transport=Transport}, + StreamRef, IsFin, Data) -> + case get_stream_by_ref(StreamRef, State) of + #stream{local=fin} -> + error_stream_closed(State, StreamRef); + S = #stream{} -> + Transport:send(Socket, cow_spdy:data(S#stream.id, IsFin, Data)), + local_fin(S, State, IsFin); + false -> + error_stream_not_found(State, StreamRef) + end. + +cancel(State=#http2_state{socket=Socket, transport=Transport}, + StreamRef) -> + case get_stream_by_ref(StreamRef, State) of + #stream{id=StreamID} -> + Transport:send(Socket, cow_http2:rst_stream(StreamID, cancel)), + delete_stream(StreamID, State); + false -> + error_stream_not_found(State, StreamRef) + end. + +%% @todo Add unprocessed streams when GOAWAY handling is done. +down(#http2_state{streams=Streams}) -> + KilledStreams = [Ref || #stream{ref=Ref} <- Streams], + {KilledStreams, []}. + +terminate(#http2_state{owner=Owner}, Reason) -> + Owner ! {gun_error, self(), Reason}, + %% @todo Send GOAWAY frame. + %% @todo LastGoodStreamID + close. + +stream_reset(State=#http2_state{owner=Owner, socket=Socket, transport=Transport, + streams=Streams0}, StreamID, StreamError={stream_error, Reason, _}) -> + Transport:send(Socket, cow_http2:rst_stream(StreamID, Reason)), + case lists:keytake(StreamID, #stream.id, Streams0) of + {value, #stream{ref=StreamRef}, Streams} -> + Owner ! {gun_error, self(), StreamRef, StreamError}, + State#http2_state{streams=Streams}; + false -> + %% @todo Unknown stream. Not sure what to do here. Check again once all + %% terminate calls have been written. + State + end. + +error_stream_closed(State=#http2_state{owner=Owner}, StreamRef) -> + Owner ! {gun_error, self(), StreamRef, {badstate, + "The stream has already been closed."}}, + State. + +error_stream_not_found(State=#http2_state{owner=Owner}, StreamRef) -> + Owner ! {gun_error, self(), StreamRef, {badstate, + "The stream cannot be found."}}, + State. + +%% Streams. +%% @todo probably change order of args and have state first? + +new_stream(StreamID, StreamRef, Remote, Local, + State=#http2_state{streams=Streams}) -> + New = #stream{id=StreamID, ref=StreamRef, remote=Remote, local=Local}, + State#http2_state{streams=[New|Streams]}. + +get_stream_by_id(StreamID, #http2_state{streams=Streams}) -> + lists:keyfind(StreamID, #stream.id, Streams). + +get_stream_by_ref(StreamRef, #http2_state{streams=Streams}) -> + lists:keyfind(StreamRef, #stream.ref, Streams). + +delete_stream(StreamID, State=#http2_state{streams=Streams}) -> + Streams2 = lists:keydelete(StreamID, #stream.id, Streams), + State#http2_state{streams=Streams2}. + +remote_fin(_, State, nofin) -> + State; +remote_fin(S=#stream{local=fin}, State, fin) -> + delete_stream(S#stream.id, State); +remote_fin(S, State=#http2_state{streams=Streams}, IsFin) -> + Streams2 = lists:keyreplace(S#stream.id, #stream.id, Streams, + S#stream{remote=IsFin}), + State#http2_state{streams=Streams2}. + +local_fin(_, State, nofin) -> + State; +local_fin(S=#stream{remote=fin}, State, fin) -> + delete_stream(S#stream.id, State); +local_fin(S, State=#http2_state{streams=Streams}, IsFin) -> + Streams2 = lists:keyreplace(S#stream.id, #stream.id, Streams, + S#stream{local=IsFin}), + State#http2_state{streams=Streams2}. diff --git a/test/twitter_SUITE.erl b/test/twitter_SUITE.erl index 86f9a85..6cd53c4 100644 --- a/test/twitter_SUITE.erl +++ b/test/twitter_SUITE.erl @@ -16,11 +16,24 @@ -compile(export_all). all() -> - [spdy]. + [http, http2, spdy]. + +http(_) -> + {ok, Pid} = gun:open("twitter.com", 443, #{protocols => [http]}), + {ok, http} = gun:await_up(Pid), + common(Pid). + +http2(_) -> + {ok, Pid} = gun:open("twitter.com", 443, #{protocols => [http2]}), + {ok, http2} = gun:await_up(Pid), + common(Pid). spdy(_) -> - {ok, Pid} = gun:open("twitter.com", 443), + {ok, Pid} = gun:open("twitter.com", 443, #{protocols => [spdy]}), {ok, spdy} = gun:await_up(Pid), + common(Pid). + +common(Pid) -> Ref = gun:get(Pid, "/"), receive {gun_response, Pid, Ref, nofin, Status, Headers} -> -- cgit v1.2.3