From c7138443995ebd56f061b85e5ee0aebb5c04a00e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Hoguin?= Date: Sat, 13 Jul 2019 17:55:20 +0200 Subject: Add ws_upgrade/protocol_changed events And ensure that Websocket triggers all the request/response events. --- src/gun.erl | 46 ++++++++------ src/gun_default_event_h.erl | 8 +++ src/gun_event.erl | 37 +++++++++-- src/gun_http.erl | 145 ++++++++++++++++++++++---------------------- 4 files changed, 141 insertions(+), 95 deletions(-) (limited to 'src') diff --git a/src/gun.erl b/src/gun.erl index a15ca5b..803a5eb 100644 --- a/src/gun.erl +++ b/src/gun.erl @@ -905,25 +905,25 @@ connected(cast, {cancel, ReplyTo, StreamRef}, %% Public Websocket interface. %% @todo Maybe make an interface in the protocol module instead of checking on protocol name. %% An interface would also make sure that HTTP/1.0 can't upgrade. -connected(cast, {ws_upgrade, Owner, StreamRef, Path, Headers}, - State=#state{owner=Owner, origin_host=Host, origin_port=Port, opts=Opts, - protocol=Protocol, protocol_state=ProtoState}) - when Protocol =:= gun_http -> +connected(cast, {ws_upgrade, Owner, StreamRef, Path, Headers}, State=#state{opts=Opts}) -> WsOpts = maps:get(ws_opts, Opts, #{}), - ProtoState2 = Protocol:ws_upgrade(ProtoState, StreamRef, Host, Port, Path, Headers, WsOpts), - {keep_state, State#state{protocol_state=ProtoState2}}; + connected(cast, {ws_upgrade, Owner, StreamRef, Path, Headers, WsOpts}, State); connected(cast, {ws_upgrade, Owner, StreamRef, Path, Headers, WsOpts}, State=#state{owner=Owner, origin_host=Host, origin_port=Port, - protocol=Protocol, protocol_state=ProtoState}) + protocol=Protocol, protocol_state=ProtoState, + event_handler=EvHandler, event_handler_state=EvHandlerState0}) when Protocol =:= gun_http -> - ProtoState2 = Protocol:ws_upgrade(ProtoState, StreamRef, Host, Port, Path, Headers, WsOpts), - {keep_state, State#state{protocol_state=ProtoState2}}; - %% @todo can fail if http/1.0 -%% @todo Probably don't error out here, have a protocol function/command. -connected(cast, {ws_upgrade, ReplyTo, StreamRef, _, _}, _) -> - ReplyTo ! {gun_error, self(), StreamRef, {badstate, - "Websocket is only supported over HTTP/1.1."}}, - keep_state_and_data; + EvHandlerState1 = EvHandler:ws_upgrade(#{ + stream_ref => StreamRef, + reply_to => Owner, %% Only the owner can upgrade the connection at this time. + opts => WsOpts + }, EvHandlerState0), + %% @todo Can fail if HTTP/1.0. + {ProtoState2, EvHandlerState} = Protocol:ws_upgrade(ProtoState, + StreamRef, Host, Port, Path, Headers, WsOpts, + EvHandler, EvHandlerState1), + {keep_state, State#state{protocol_state=ProtoState2, + event_handler_state=EvHandlerState}}; connected(cast, {ws_upgrade, ReplyTo, StreamRef, _, _, _}, _) -> ReplyTo ! {gun_error, self(), StreamRef, {badstate, "Websocket is only supported over HTTP/1.1."}}, @@ -1004,14 +1004,20 @@ commands([{switch_transport, Transport, Socket}|Tail], State) -> commands(Tail, active(State#state{socket=Socket, transport=Transport, messages=Transport:messages()})); %% @todo The two loops should be reunified and this clause generalized. -commands([{switch_protocol, Protocol=gun_ws, ProtoState}], State) -> - {keep_state, keepalive_cancel(State#state{protocol=Protocol, protocol_state=ProtoState})}; +commands([{switch_protocol, Protocol=gun_ws, ProtoState}], State=#state{ + event_handler=EvHandler, event_handler_state=EvHandlerState0}) -> + EvHandlerState = EvHandler:protocol_changed(#{protocol => Protocol:name()}, EvHandlerState0), + {keep_state, keepalive_cancel(State#state{protocol=Protocol, protocol_state=ProtoState, + event_handler_state=EvHandlerState})}; %% @todo And this state should probably not be ignored. -commands([{switch_protocol, Protocol, _ProtoState0}|Tail], - State=#state{owner=Owner, opts=Opts, socket=Socket, transport=Transport}) -> +commands([{switch_protocol, Protocol, _ProtoState0}|Tail], State=#state{ + owner=Owner, opts=Opts, socket=Socket, transport=Transport, + event_handler=EvHandler, event_handler_state=EvHandlerState0}) -> ProtoOpts = maps:get(http2_opts, Opts, #{}), ProtoState = Protocol:init(Owner, Socket, Transport, ProtoOpts), - commands(Tail, keepalive_timeout(State#state{protocol=Protocol, protocol_state=ProtoState})). + EvHandlerState = EvHandler:protocol_changed(#{protocol => Protocol:name()}, EvHandlerState0), + commands(Tail, keepalive_timeout(State#state{protocol=Protocol, protocol_state=ProtoState, + event_handler_state=EvHandlerState})). disconnect(State=#state{owner=Owner, opts=Opts, socket=Socket, transport=Transport, diff --git a/src/gun_default_event_h.erl b/src/gun_default_event_h.erl index ff6f951..6ef2e11 100644 --- a/src/gun_default_event_h.erl +++ b/src/gun_default_event_h.erl @@ -26,6 +26,8 @@ -export([response_headers/2]). -export([response_trailers/2]). -export([response_end/2]). +-export([ws_upgrade/2]). +-export([protocol_changed/2]). -export([disconnect/2]). -export([terminate/2]). @@ -62,6 +64,12 @@ response_trailers(_EventData, State) -> response_end(_EventData, State) -> State. +ws_upgrade(_EventData, State) -> + State. + +protocol_changed(_EventData, State) -> + State. + disconnect(_EventData, State) -> State. diff --git a/src/gun_event.erl b/src/gun_event.erl index 72abe24..9e83cf8 100644 --- a/src/gun_event.erl +++ b/src/gun_event.erl @@ -48,7 +48,7 @@ -type request_start_event() :: #{ stream_ref := reference(), reply_to := pid(), - function := headers | request, + function := headers | request | ws_upgrade, method := iodata(), scheme => binary(), authority := iodata(), @@ -108,6 +108,38 @@ -callback response_end(response_end_event(), State) -> State. +%% ws_upgrade. +%% +%% This event is a signal that the following request and response +%% result from a gun:ws_upgrade/2,3,4 call. +%% +%% There is no corresponding "end" event. Instead, the success is +%% indicated by a protocol_changed event following the informational +%% response. + +-type ws_upgrade_event() :: #{ + stream_ref := reference(), + reply_to := pid(), + opts := gun:ws_opts() +}. + +-callback ws_upgrade(ws_upgrade_event(), State) -> State. + +%% protocol_changed. +%% +%% This event can occur either following a successful ws_upgrade +%% event or following a successful CONNECT request. +%% +%% @todo Currently there is only a connection-wide variant of this +%% event. In the future there will be a stream-wide variant to +%% support CONNECT and Websocket over HTTP/2. + +-type protocol_changed_event() :: #{ + protocol := http2 | ws +}. + +-callback protocol_changed(protocol_changed_event(), State) -> State. + %% disconnect. -type disconnect_event() :: #{ @@ -131,13 +163,10 @@ %% @todo tls_handshake_end %% @todo origin_changed %% @todo transport_changed -%% @todo protocol_changed %% @todo push_promise_start %% @todo push_promise_end %% @todo cancel_start %% @todo cancel_end -%% @todo ws_upgrade_start -%% @todo ws_upgrade_end %% @todo ws_frame_read_start %% @todo ws_frame_read_header %% @todo ws_frame_read_end diff --git a/src/gun_http.erl b/src/gun_http.erl index 4acc3c4..7598eff 100644 --- a/src/gun_http.erl +++ b/src/gun_http.erl @@ -27,7 +27,7 @@ -export([cancel/3]). -export([stream_info/2]). -export([down/1]). --export([ws_upgrade/7]). +-export([ws_upgrade/9]). %% Functions shared with gun_http2. -export([host_header/3]). @@ -107,7 +107,7 @@ handle(Data, State=#http_state{in=head, buffer=Buffer, EvHandlerState = case Buffer of <<>> -> EvHandler:response_start(#{ - stream_ref => StreamRef, + stream_ref => stream_ref(StreamRef), reply_to => ReplyTo }, EvHandlerState0); _ -> @@ -255,9 +255,14 @@ handle_head(Data, State=#http_state{socket=Socket, transport=Transport, {Headers, Rest2} = cow_http:parse_headers(Rest), case {Status, StreamRef} of {101, {websocket, RealStreamRef, WsKey, WsExtensions, WsOpts}} -> - %% @todo Websocket's 101 response_inform. + EvHandlerState = EvHandler:response_inform(#{ + stream_ref => RealStreamRef, + reply_to => ReplyTo, + status => 101, + headers => Headers + }, EvHandlerState0), {ws_handshake(Rest2, State, RealStreamRef, Headers, WsKey, WsExtensions, WsOpts), - EvHandlerState0}; + EvHandlerState}; %% @todo If the stream is cancelled we probably shouldn't finish the CONNECT setup. {_, {connect, RealStreamRef, Destination}} when Status >= 200, Status < 300 -> case IsAlive of @@ -424,76 +429,75 @@ keepalive(State=#http_state{socket=Socket, transport=Transport, out=head}) -> keepalive(State) -> State. -headers(State=#http_state{socket=Socket, transport=Transport, version=Version, out=head}, +headers(State=#http_state{out=head}, StreamRef, ReplyTo, Method, Host, Port, Path, Headers, EvHandler, EvHandlerState0) -> - Authority0 = host_header(Transport, Host, Port), - Headers2 = lists:keydelete(<<"transfer-encoding">>, 1, Headers), - %% We use Headers2 because this is the smallest list. - Conn = conn_from_headers(Version, Headers2), - Out = request_io_from_headers(Headers2), - {Authority, Headers3} = case lists:keyfind(<<"host">>, 1, Headers2) of - false -> {Authority0, [{<<"host">>, Authority0}|Headers2]}; - {_, Authority1} -> {Authority1, Headers2} - end, - Headers4 = case Out of - body_chunked when Version =:= 'HTTP/1.0' -> Headers3; - body_chunked -> [{<<"transfer-encoding">>, <<"chunked">>}|Headers3]; - _ -> Headers3 - end, - Headers5 = transform_header_names(State, Headers4), - RequestEvent = #{ - stream_ref => StreamRef, - reply_to => ReplyTo, - function => ?FUNCTION_NAME, - method => Method, - authority => Authority, - path => Path, - headers => Headers5 - }, - EvHandlerState1 = EvHandler:request_start(RequestEvent, EvHandlerState0), - Transport:send(Socket, cow_http:request(Method, Path, Version, Headers5)), - EvHandlerState = EvHandler:request_headers(RequestEvent, EvHandlerState1), + {Conn, Out, EvHandlerState} = send_request(State, StreamRef, ReplyTo, + Method, Host, Port, Path, Headers, undefined, + EvHandler, EvHandlerState0, ?FUNCTION_NAME), {new_stream(State#http_state{connection=Conn, out=Out}, StreamRef, ReplyTo, Method), EvHandlerState}. -request(State=#http_state{socket=Socket, transport=Transport, version=Version, out=head}, - StreamRef, ReplyTo, Method, Host, Port, Path, Headers, Body, - EvHandler, EvHandlerState0) -> - Authority0 = host_header(Transport, Host, Port), - Headers2 = lists:keydelete(<<"content-length">>, 1, - lists:keydelete(<<"transfer-encoding">>, 1, Headers)), +request(State=#http_state{out=head}, StreamRef, ReplyTo, + Method, Host, Port, Path, Headers, Body, EvHandler, EvHandlerState0) -> + {Conn, Out, EvHandlerState} = send_request(State, StreamRef, ReplyTo, + Method, Host, Port, Path, Headers, Body, + EvHandler, EvHandlerState0, ?FUNCTION_NAME), + {new_stream(State#http_state{connection=Conn, out=Out}, StreamRef, ReplyTo, Method), + EvHandlerState}. + +send_request(State=#http_state{socket=Socket, transport=Transport, version=Version}, + StreamRef, ReplyTo, Method, Host, Port, Path, Headers0, Body, + EvHandler, EvHandlerState0, Function) -> + Headers1 = lists:keydelete(<<"transfer-encoding">>, 1, Headers0), + Headers2 = case Body of + undefined -> Headers1; + _ -> lists:keydelete(<<"content-length">>, 1, Headers1) + end, %% We use Headers2 because this is the smallest list. Conn = conn_from_headers(Version, Headers2), + Out = case Body of + undefined when Function =:= ws_upgrade -> head; + undefined -> request_io_from_headers(Headers2); + _ -> head + end, + Authority0 = host_header(Transport, Host, Port), {Authority, Headers3} = case lists:keyfind(<<"host">>, 1, Headers2) of false -> {Authority0, [{<<"host">>, Authority0}|Headers2]}; {_, Authority1} -> {Authority1, Headers2} end, Headers4 = transform_header_names(State, Headers3), - Headers5 = [ - {<<"content-length">>, integer_to_binary(iolist_size(Body))} - |Headers4], + Headers = case {Body, Out} of + {undefined, body_chunked} when Version =:= 'HTTP/1.0' -> Headers4; + {undefined, body_chunked} -> [{<<"transfer-encoding">>, <<"chunked">>}|Headers4]; + {undefined, _} -> Headers4; + _ -> [{<<"content-length">>, integer_to_binary(iolist_size(Body))}|Headers4] + end, RequestEvent = #{ stream_ref => StreamRef, reply_to => ReplyTo, - function => ?FUNCTION_NAME, + function => Function, method => Method, authority => Authority, path => Path, - headers => Headers5 + headers => Headers }, EvHandlerState1 = EvHandler:request_start(RequestEvent, EvHandlerState0), Transport:send(Socket, [ - cow_http:request(Method, Path, Version, Headers5), - Body]), + cow_http:request(Method, Path, Version, Headers), + [Body || Body =/= undefined]]), EvHandlerState2 = EvHandler:request_headers(RequestEvent, EvHandlerState1), - RequestEndEvent = #{ - stream_ref => StreamRef, - reply_to => ReplyTo - }, - EvHandlerState = EvHandler:request_end(RequestEndEvent, EvHandlerState2), - {new_stream(State#http_state{connection=Conn}, StreamRef, ReplyTo, Method), - EvHandlerState}. + EvHandlerState = case Out of + head -> + RequestEndEvent = #{ + stream_ref => StreamRef, + reply_to => ReplyTo + }, + EvHandler:request_end(RequestEndEvent, EvHandlerState2); + _ -> + EvHandlerState2 + end, + {Conn, Out, EvHandlerState}. host_header(Transport, Host0, Port) -> Host = case Host0 of @@ -652,17 +656,16 @@ conn_from_headers(Version, Headers) -> false -> keepalive; {_, ConnHd} -> - ConnList = cow_http_hd:parse_connection(ConnHd), - case lists:member(<<"keep-alive">>, ConnList) of - true -> keepalive; - false -> close - end + conn_from_header(cow_http_hd:parse_connection(ConnHd)) end. +conn_from_header([]) -> close; +conn_from_header([<<"keep-alive">>|_]) -> keepalive; +conn_from_header([<<"upgrade">>|_]) -> keepalive; +conn_from_header([_|Tail]) -> conn_from_header(Tail). + request_io_from_headers(Headers) -> case lists:keyfind(<<"content-length">>, 1, Headers) of - {_, <<"0">>} -> - head; {_, Length} -> {body, cow_http_hd:parse_content_length(Length)}; _ -> @@ -716,10 +719,11 @@ end_stream(State=#http_state{streams=[_|Tail]}) -> %% Websocket upgrade. %% Ensure version is 1.1. -ws_upgrade(#http_state{version='HTTP/1.0'}, _, _, _, _, _, _) -> - error; %% @todo -ws_upgrade(State=#http_state{socket=Socket, transport=Transport, owner=Owner, out=head}, - StreamRef, Host, Port, Path, Headers0, WsOpts) -> +ws_upgrade(#http_state{version='HTTP/1.0'}, _, _, _, _, _, _, _, _) -> + error; %% @todo Probably don't error out here, have a protocol function/command. +ws_upgrade(State=#http_state{owner=ReplyTo, out=head}, + StreamRef, Host, Port, Path, Headers0, WsOpts, + EvHandler, EvHandlerState0) -> {Headers1, GunExtensions} = case maps:get(compress, WsOpts, false) of true -> {[{<<"sec-websocket-extensions">>, <<"permessage-deflate; client_max_window_bits; server_max_window_bits=15">>} @@ -734,20 +738,19 @@ ws_upgrade(State=#http_state{socket=Socket, transport=Transport, owner=Owner, ou [{<<"sec-websocket-protocol">>, Proto}|Headers1] end, Key = cow_ws:key(), - Headers3 = [ + Headers = [ {<<"connection">>, <<"upgrade">>}, {<<"upgrade">>, <<"websocket">>}, {<<"sec-websocket-version">>, <<"13">>}, {<<"sec-websocket-key">>, Key} |Headers2 ], - Headers = case lists:keymember(<<"host">>, 1, Headers0) of - true -> Headers3; - false -> [{<<"host">>, host_header(Transport, Host, Port)}|Headers3] - end, - Transport:send(Socket, cow_http:request(<<"GET">>, Path, 'HTTP/1.1', Headers)), - new_stream(State#http_state{connection=keepalive, out=head}, - {websocket, StreamRef, Key, GunExtensions, WsOpts}, Owner, <<"GET">>). + {Conn, Out, EvHandlerState} = send_request(State, StreamRef, ReplyTo, + <<"GET">>, Host, Port, Path, Headers, undefined, + EvHandler, EvHandlerState0, ?FUNCTION_NAME), + {new_stream(State#http_state{connection=Conn, out=Out}, + {websocket, StreamRef, Key, GunExtensions, WsOpts}, ReplyTo, <<"GET">>), + EvHandlerState}. ws_handshake(Buffer, State, StreamRef, Headers, Key, GunExtensions, Opts) -> %% @todo check upgrade, connection -- cgit v1.2.3