aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorLoïc Hoguin <[email protected]>2019-07-13 17:55:20 +0200
committerLoïc Hoguin <[email protected]>2019-07-13 17:55:20 +0200
commitc7138443995ebd56f061b85e5ee0aebb5c04a00e (patch)
tree823defd10c1e09f30af3f954296cf9b73dfe4b8d /src
parent071599cbcd25cd2669e26d23a6e202e0275f191a (diff)
downloadgun-c7138443995ebd56f061b85e5ee0aebb5c04a00e.tar.gz
gun-c7138443995ebd56f061b85e5ee0aebb5c04a00e.tar.bz2
gun-c7138443995ebd56f061b85e5ee0aebb5c04a00e.zip
Add ws_upgrade/protocol_changed events
And ensure that Websocket triggers all the request/response events.
Diffstat (limited to 'src')
-rw-r--r--src/gun.erl46
-rw-r--r--src/gun_default_event_h.erl8
-rw-r--r--src/gun_event.erl37
-rw-r--r--src/gun_http.erl145
4 files changed, 141 insertions, 95 deletions
diff --git a/src/gun.erl b/src/gun.erl
index a15ca5b..803a5eb 100644
--- a/src/gun.erl
+++ b/src/gun.erl
@@ -905,25 +905,25 @@ connected(cast, {cancel, ReplyTo, StreamRef},
%% Public Websocket interface.
%% @todo Maybe make an interface in the protocol module instead of checking on protocol name.
%% An interface would also make sure that HTTP/1.0 can't upgrade.
-connected(cast, {ws_upgrade, Owner, StreamRef, Path, Headers},
- State=#state{owner=Owner, origin_host=Host, origin_port=Port, opts=Opts,
- protocol=Protocol, protocol_state=ProtoState})
- when Protocol =:= gun_http ->
+connected(cast, {ws_upgrade, Owner, StreamRef, Path, Headers}, State=#state{opts=Opts}) ->
WsOpts = maps:get(ws_opts, Opts, #{}),
- ProtoState2 = Protocol:ws_upgrade(ProtoState, StreamRef, Host, Port, Path, Headers, WsOpts),
- {keep_state, State#state{protocol_state=ProtoState2}};
+ connected(cast, {ws_upgrade, Owner, StreamRef, Path, Headers, WsOpts}, State);
connected(cast, {ws_upgrade, Owner, StreamRef, Path, Headers, WsOpts},
State=#state{owner=Owner, origin_host=Host, origin_port=Port,
- protocol=Protocol, protocol_state=ProtoState})
+ protocol=Protocol, protocol_state=ProtoState,
+ event_handler=EvHandler, event_handler_state=EvHandlerState0})
when Protocol =:= gun_http ->
- ProtoState2 = Protocol:ws_upgrade(ProtoState, StreamRef, Host, Port, Path, Headers, WsOpts),
- {keep_state, State#state{protocol_state=ProtoState2}};
- %% @todo can fail if http/1.0
-%% @todo Probably don't error out here, have a protocol function/command.
-connected(cast, {ws_upgrade, ReplyTo, StreamRef, _, _}, _) ->
- ReplyTo ! {gun_error, self(), StreamRef, {badstate,
- "Websocket is only supported over HTTP/1.1."}},
- keep_state_and_data;
+ EvHandlerState1 = EvHandler:ws_upgrade(#{
+ stream_ref => StreamRef,
+ reply_to => Owner, %% Only the owner can upgrade the connection at this time.
+ opts => WsOpts
+ }, EvHandlerState0),
+ %% @todo Can fail if HTTP/1.0.
+ {ProtoState2, EvHandlerState} = Protocol:ws_upgrade(ProtoState,
+ StreamRef, Host, Port, Path, Headers, WsOpts,
+ EvHandler, EvHandlerState1),
+ {keep_state, State#state{protocol_state=ProtoState2,
+ event_handler_state=EvHandlerState}};
connected(cast, {ws_upgrade, ReplyTo, StreamRef, _, _, _}, _) ->
ReplyTo ! {gun_error, self(), StreamRef, {badstate,
"Websocket is only supported over HTTP/1.1."}},
@@ -1004,14 +1004,20 @@ commands([{switch_transport, Transport, Socket}|Tail], State) ->
commands(Tail, active(State#state{socket=Socket, transport=Transport,
messages=Transport:messages()}));
%% @todo The two loops should be reunified and this clause generalized.
-commands([{switch_protocol, Protocol=gun_ws, ProtoState}], State) ->
- {keep_state, keepalive_cancel(State#state{protocol=Protocol, protocol_state=ProtoState})};
+commands([{switch_protocol, Protocol=gun_ws, ProtoState}], State=#state{
+ event_handler=EvHandler, event_handler_state=EvHandlerState0}) ->
+ EvHandlerState = EvHandler:protocol_changed(#{protocol => Protocol:name()}, EvHandlerState0),
+ {keep_state, keepalive_cancel(State#state{protocol=Protocol, protocol_state=ProtoState,
+ event_handler_state=EvHandlerState})};
%% @todo And this state should probably not be ignored.
-commands([{switch_protocol, Protocol, _ProtoState0}|Tail],
- State=#state{owner=Owner, opts=Opts, socket=Socket, transport=Transport}) ->
+commands([{switch_protocol, Protocol, _ProtoState0}|Tail], State=#state{
+ owner=Owner, opts=Opts, socket=Socket, transport=Transport,
+ event_handler=EvHandler, event_handler_state=EvHandlerState0}) ->
ProtoOpts = maps:get(http2_opts, Opts, #{}),
ProtoState = Protocol:init(Owner, Socket, Transport, ProtoOpts),
- commands(Tail, keepalive_timeout(State#state{protocol=Protocol, protocol_state=ProtoState})).
+ EvHandlerState = EvHandler:protocol_changed(#{protocol => Protocol:name()}, EvHandlerState0),
+ commands(Tail, keepalive_timeout(State#state{protocol=Protocol, protocol_state=ProtoState,
+ event_handler_state=EvHandlerState})).
disconnect(State=#state{owner=Owner, opts=Opts,
socket=Socket, transport=Transport,
diff --git a/src/gun_default_event_h.erl b/src/gun_default_event_h.erl
index ff6f951..6ef2e11 100644
--- a/src/gun_default_event_h.erl
+++ b/src/gun_default_event_h.erl
@@ -26,6 +26,8 @@
-export([response_headers/2]).
-export([response_trailers/2]).
-export([response_end/2]).
+-export([ws_upgrade/2]).
+-export([protocol_changed/2]).
-export([disconnect/2]).
-export([terminate/2]).
@@ -62,6 +64,12 @@ response_trailers(_EventData, State) ->
response_end(_EventData, State) ->
State.
+ws_upgrade(_EventData, State) ->
+ State.
+
+protocol_changed(_EventData, State) ->
+ State.
+
disconnect(_EventData, State) ->
State.
diff --git a/src/gun_event.erl b/src/gun_event.erl
index 72abe24..9e83cf8 100644
--- a/src/gun_event.erl
+++ b/src/gun_event.erl
@@ -48,7 +48,7 @@
-type request_start_event() :: #{
stream_ref := reference(),
reply_to := pid(),
- function := headers | request,
+ function := headers | request | ws_upgrade,
method := iodata(),
scheme => binary(),
authority := iodata(),
@@ -108,6 +108,38 @@
-callback response_end(response_end_event(), State) -> State.
+%% ws_upgrade.
+%%
+%% This event is a signal that the following request and response
+%% result from a gun:ws_upgrade/2,3,4 call.
+%%
+%% There is no corresponding "end" event. Instead, the success is
+%% indicated by a protocol_changed event following the informational
+%% response.
+
+-type ws_upgrade_event() :: #{
+ stream_ref := reference(),
+ reply_to := pid(),
+ opts := gun:ws_opts()
+}.
+
+-callback ws_upgrade(ws_upgrade_event(), State) -> State.
+
+%% protocol_changed.
+%%
+%% This event can occur either following a successful ws_upgrade
+%% event or following a successful CONNECT request.
+%%
+%% @todo Currently there is only a connection-wide variant of this
+%% event. In the future there will be a stream-wide variant to
+%% support CONNECT and Websocket over HTTP/2.
+
+-type protocol_changed_event() :: #{
+ protocol := http2 | ws
+}.
+
+-callback protocol_changed(protocol_changed_event(), State) -> State.
+
%% disconnect.
-type disconnect_event() :: #{
@@ -131,13 +163,10 @@
%% @todo tls_handshake_end
%% @todo origin_changed
%% @todo transport_changed
-%% @todo protocol_changed
%% @todo push_promise_start
%% @todo push_promise_end
%% @todo cancel_start
%% @todo cancel_end
-%% @todo ws_upgrade_start
-%% @todo ws_upgrade_end
%% @todo ws_frame_read_start
%% @todo ws_frame_read_header
%% @todo ws_frame_read_end
diff --git a/src/gun_http.erl b/src/gun_http.erl
index 4acc3c4..7598eff 100644
--- a/src/gun_http.erl
+++ b/src/gun_http.erl
@@ -27,7 +27,7 @@
-export([cancel/3]).
-export([stream_info/2]).
-export([down/1]).
--export([ws_upgrade/7]).
+-export([ws_upgrade/9]).
%% Functions shared with gun_http2.
-export([host_header/3]).
@@ -107,7 +107,7 @@ handle(Data, State=#http_state{in=head, buffer=Buffer,
EvHandlerState = case Buffer of
<<>> ->
EvHandler:response_start(#{
- stream_ref => StreamRef,
+ stream_ref => stream_ref(StreamRef),
reply_to => ReplyTo
}, EvHandlerState0);
_ ->
@@ -255,9 +255,14 @@ handle_head(Data, State=#http_state{socket=Socket, transport=Transport,
{Headers, Rest2} = cow_http:parse_headers(Rest),
case {Status, StreamRef} of
{101, {websocket, RealStreamRef, WsKey, WsExtensions, WsOpts}} ->
- %% @todo Websocket's 101 response_inform.
+ EvHandlerState = EvHandler:response_inform(#{
+ stream_ref => RealStreamRef,
+ reply_to => ReplyTo,
+ status => 101,
+ headers => Headers
+ }, EvHandlerState0),
{ws_handshake(Rest2, State, RealStreamRef, Headers, WsKey, WsExtensions, WsOpts),
- EvHandlerState0};
+ EvHandlerState};
%% @todo If the stream is cancelled we probably shouldn't finish the CONNECT setup.
{_, {connect, RealStreamRef, Destination}} when Status >= 200, Status < 300 ->
case IsAlive of
@@ -424,76 +429,75 @@ keepalive(State=#http_state{socket=Socket, transport=Transport, out=head}) ->
keepalive(State) ->
State.
-headers(State=#http_state{socket=Socket, transport=Transport, version=Version, out=head},
+headers(State=#http_state{out=head},
StreamRef, ReplyTo, Method, Host, Port, Path, Headers,
EvHandler, EvHandlerState0) ->
- Authority0 = host_header(Transport, Host, Port),
- Headers2 = lists:keydelete(<<"transfer-encoding">>, 1, Headers),
- %% We use Headers2 because this is the smallest list.
- Conn = conn_from_headers(Version, Headers2),
- Out = request_io_from_headers(Headers2),
- {Authority, Headers3} = case lists:keyfind(<<"host">>, 1, Headers2) of
- false -> {Authority0, [{<<"host">>, Authority0}|Headers2]};
- {_, Authority1} -> {Authority1, Headers2}
- end,
- Headers4 = case Out of
- body_chunked when Version =:= 'HTTP/1.0' -> Headers3;
- body_chunked -> [{<<"transfer-encoding">>, <<"chunked">>}|Headers3];
- _ -> Headers3
- end,
- Headers5 = transform_header_names(State, Headers4),
- RequestEvent = #{
- stream_ref => StreamRef,
- reply_to => ReplyTo,
- function => ?FUNCTION_NAME,
- method => Method,
- authority => Authority,
- path => Path,
- headers => Headers5
- },
- EvHandlerState1 = EvHandler:request_start(RequestEvent, EvHandlerState0),
- Transport:send(Socket, cow_http:request(Method, Path, Version, Headers5)),
- EvHandlerState = EvHandler:request_headers(RequestEvent, EvHandlerState1),
+ {Conn, Out, EvHandlerState} = send_request(State, StreamRef, ReplyTo,
+ Method, Host, Port, Path, Headers, undefined,
+ EvHandler, EvHandlerState0, ?FUNCTION_NAME),
{new_stream(State#http_state{connection=Conn, out=Out}, StreamRef, ReplyTo, Method),
EvHandlerState}.
-request(State=#http_state{socket=Socket, transport=Transport, version=Version, out=head},
- StreamRef, ReplyTo, Method, Host, Port, Path, Headers, Body,
- EvHandler, EvHandlerState0) ->
- Authority0 = host_header(Transport, Host, Port),
- Headers2 = lists:keydelete(<<"content-length">>, 1,
- lists:keydelete(<<"transfer-encoding">>, 1, Headers)),
+request(State=#http_state{out=head}, StreamRef, ReplyTo,
+ Method, Host, Port, Path, Headers, Body, EvHandler, EvHandlerState0) ->
+ {Conn, Out, EvHandlerState} = send_request(State, StreamRef, ReplyTo,
+ Method, Host, Port, Path, Headers, Body,
+ EvHandler, EvHandlerState0, ?FUNCTION_NAME),
+ {new_stream(State#http_state{connection=Conn, out=Out}, StreamRef, ReplyTo, Method),
+ EvHandlerState}.
+
+send_request(State=#http_state{socket=Socket, transport=Transport, version=Version},
+ StreamRef, ReplyTo, Method, Host, Port, Path, Headers0, Body,
+ EvHandler, EvHandlerState0, Function) ->
+ Headers1 = lists:keydelete(<<"transfer-encoding">>, 1, Headers0),
+ Headers2 = case Body of
+ undefined -> Headers1;
+ _ -> lists:keydelete(<<"content-length">>, 1, Headers1)
+ end,
%% We use Headers2 because this is the smallest list.
Conn = conn_from_headers(Version, Headers2),
+ Out = case Body of
+ undefined when Function =:= ws_upgrade -> head;
+ undefined -> request_io_from_headers(Headers2);
+ _ -> head
+ end,
+ Authority0 = host_header(Transport, Host, Port),
{Authority, Headers3} = case lists:keyfind(<<"host">>, 1, Headers2) of
false -> {Authority0, [{<<"host">>, Authority0}|Headers2]};
{_, Authority1} -> {Authority1, Headers2}
end,
Headers4 = transform_header_names(State, Headers3),
- Headers5 = [
- {<<"content-length">>, integer_to_binary(iolist_size(Body))}
- |Headers4],
+ Headers = case {Body, Out} of
+ {undefined, body_chunked} when Version =:= 'HTTP/1.0' -> Headers4;
+ {undefined, body_chunked} -> [{<<"transfer-encoding">>, <<"chunked">>}|Headers4];
+ {undefined, _} -> Headers4;
+ _ -> [{<<"content-length">>, integer_to_binary(iolist_size(Body))}|Headers4]
+ end,
RequestEvent = #{
stream_ref => StreamRef,
reply_to => ReplyTo,
- function => ?FUNCTION_NAME,
+ function => Function,
method => Method,
authority => Authority,
path => Path,
- headers => Headers5
+ headers => Headers
},
EvHandlerState1 = EvHandler:request_start(RequestEvent, EvHandlerState0),
Transport:send(Socket, [
- cow_http:request(Method, Path, Version, Headers5),
- Body]),
+ cow_http:request(Method, Path, Version, Headers),
+ [Body || Body =/= undefined]]),
EvHandlerState2 = EvHandler:request_headers(RequestEvent, EvHandlerState1),
- RequestEndEvent = #{
- stream_ref => StreamRef,
- reply_to => ReplyTo
- },
- EvHandlerState = EvHandler:request_end(RequestEndEvent, EvHandlerState2),
- {new_stream(State#http_state{connection=Conn}, StreamRef, ReplyTo, Method),
- EvHandlerState}.
+ EvHandlerState = case Out of
+ head ->
+ RequestEndEvent = #{
+ stream_ref => StreamRef,
+ reply_to => ReplyTo
+ },
+ EvHandler:request_end(RequestEndEvent, EvHandlerState2);
+ _ ->
+ EvHandlerState2
+ end,
+ {Conn, Out, EvHandlerState}.
host_header(Transport, Host0, Port) ->
Host = case Host0 of
@@ -652,17 +656,16 @@ conn_from_headers(Version, Headers) ->
false ->
keepalive;
{_, ConnHd} ->
- ConnList = cow_http_hd:parse_connection(ConnHd),
- case lists:member(<<"keep-alive">>, ConnList) of
- true -> keepalive;
- false -> close
- end
+ conn_from_header(cow_http_hd:parse_connection(ConnHd))
end.
+conn_from_header([]) -> close;
+conn_from_header([<<"keep-alive">>|_]) -> keepalive;
+conn_from_header([<<"upgrade">>|_]) -> keepalive;
+conn_from_header([_|Tail]) -> conn_from_header(Tail).
+
request_io_from_headers(Headers) ->
case lists:keyfind(<<"content-length">>, 1, Headers) of
- {_, <<"0">>} ->
- head;
{_, Length} ->
{body, cow_http_hd:parse_content_length(Length)};
_ ->
@@ -716,10 +719,11 @@ end_stream(State=#http_state{streams=[_|Tail]}) ->
%% Websocket upgrade.
%% Ensure version is 1.1.
-ws_upgrade(#http_state{version='HTTP/1.0'}, _, _, _, _, _, _) ->
- error; %% @todo
-ws_upgrade(State=#http_state{socket=Socket, transport=Transport, owner=Owner, out=head},
- StreamRef, Host, Port, Path, Headers0, WsOpts) ->
+ws_upgrade(#http_state{version='HTTP/1.0'}, _, _, _, _, _, _, _, _) ->
+ error; %% @todo Probably don't error out here, have a protocol function/command.
+ws_upgrade(State=#http_state{owner=ReplyTo, out=head},
+ StreamRef, Host, Port, Path, Headers0, WsOpts,
+ EvHandler, EvHandlerState0) ->
{Headers1, GunExtensions} = case maps:get(compress, WsOpts, false) of
true -> {[{<<"sec-websocket-extensions">>,
<<"permessage-deflate; client_max_window_bits; server_max_window_bits=15">>}
@@ -734,20 +738,19 @@ ws_upgrade(State=#http_state{socket=Socket, transport=Transport, owner=Owner, ou
[{<<"sec-websocket-protocol">>, Proto}|Headers1]
end,
Key = cow_ws:key(),
- Headers3 = [
+ Headers = [
{<<"connection">>, <<"upgrade">>},
{<<"upgrade">>, <<"websocket">>},
{<<"sec-websocket-version">>, <<"13">>},
{<<"sec-websocket-key">>, Key}
|Headers2
],
- Headers = case lists:keymember(<<"host">>, 1, Headers0) of
- true -> Headers3;
- false -> [{<<"host">>, host_header(Transport, Host, Port)}|Headers3]
- end,
- Transport:send(Socket, cow_http:request(<<"GET">>, Path, 'HTTP/1.1', Headers)),
- new_stream(State#http_state{connection=keepalive, out=head},
- {websocket, StreamRef, Key, GunExtensions, WsOpts}, Owner, <<"GET">>).
+ {Conn, Out, EvHandlerState} = send_request(State, StreamRef, ReplyTo,
+ <<"GET">>, Host, Port, Path, Headers, undefined,
+ EvHandler, EvHandlerState0, ?FUNCTION_NAME),
+ {new_stream(State#http_state{connection=Conn, out=Out},
+ {websocket, StreamRef, Key, GunExtensions, WsOpts}, ReplyTo, <<"GET">>),
+ EvHandlerState}.
ws_handshake(Buffer, State, StreamRef, Headers, Key, GunExtensions, Opts) ->
%% @todo check upgrade, connection