aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorLoïc Hoguin <[email protected]>2019-06-02 17:26:37 +0200
committerLoïc Hoguin <[email protected]>2019-06-02 17:28:34 +0200
commitb4c7749176e0a55b5763f3e04bf9312adff7ea82 (patch)
treecbb923a692aa3c578501a0a27e550ec9a4062a71
parenta309f196d15d3045d2e70b2d7e23858f47adb7df (diff)
downloadgun-b4c7749176e0a55b5763f3e04bf9312adff7ea82.tar.gz
gun-b4c7749176e0a55b5763f3e04bf9312adff7ea82.tar.bz2
gun-b4c7749176e0a55b5763f3e04bf9312adff7ea82.zip
Add request_start, request_headers and request_end events
-rw-r--r--src/gun.erl72
-rw-r--r--src/gun_default_event_h.erl12
-rw-r--r--src/gun_event.erl32
-rw-r--r--src/gun_http.erl116
-rw-r--r--src/gun_http2.erl173
-rw-r--r--src/gun_ws.erl5
-rw-r--r--test/event_SUITE.erl173
7 files changed, 427 insertions, 156 deletions
diff --git a/src/gun.erl b/src/gun.erl
index 0ad12cf..a15ca5b 100644
--- a/src/gun.erl
+++ b/src/gun.erl
@@ -730,21 +730,21 @@ init({Owner, Host, Port, Opts}) ->
tls -> {<<"https">>, gun_tls}
end,
OwnerRef = monitor(process, Owner),
- {EventHandler, EventHandlerState0} = maps:get(event_handler, Opts,
+ {EvHandler, EvHandlerState0} = maps:get(event_handler, Opts,
{gun_default_event_h, undefined}),
- EventHandlerState = EventHandler:init(#{
+ EvHandlerState = EvHandler:init(#{
owner => Owner,
transport => OriginTransport,
origin_scheme => OriginScheme,
origin_host => Host,
origin_port => Port,
opts => Opts
- }, EventHandlerState0),
+ }, EvHandlerState0),
State = #state{owner=Owner, owner_ref=OwnerRef,
host=Host, port=Port, origin_scheme=OriginScheme,
origin_host=Host, origin_port=Port, opts=Opts,
transport=Transport, messages=Transport:messages(),
- event_handler=EventHandler, event_handler_state=EventHandlerState},
+ event_handler=EvHandler, event_handler_state=EvHandlerState},
{ok, not_connected, State,
{next_event, internal, {retries, Retry}}}.
@@ -752,7 +752,7 @@ default_transport(443) -> tls;
default_transport(_) -> tcp.
not_connected(_, {retries, Retries}, State0=#state{host=Host, port=Port, opts=Opts,
- transport=Transport, event_handler=EventHandler, event_handler_state=EventHandlerState0}) ->
+ transport=Transport, event_handler=EvHandler, event_handler_state=EvHandlerState0}) ->
TransOpts0 = maps:get(transport_opts, Opts, []),
TransOpts1 = case Transport of
gun_tcp -> TransOpts0;
@@ -767,7 +767,7 @@ not_connected(_, {retries, Retries}, State0=#state{host=Host, port=Port, opts=Op
transport_opts => TransOpts,
timeout => ConnectTimeout
},
- EventHandlerState1 = EventHandler:connect_start(ConnectEvent, EventHandlerState0),
+ EvHandlerState1 = EvHandler:connect_start(ConnectEvent, EvHandlerState0),
case Transport:connect(Host, Port, TransOpts, ConnectTimeout) of
{ok, Socket} ->
Protocol = case Transport of
@@ -782,17 +782,17 @@ not_connected(_, {retries, Retries}, State0=#state{host=Host, port=Port, opts=Op
_ -> gun_http
end
end,
- EventHandlerState = EventHandler:connect_end(ConnectEvent#{
+ EvHandlerState = EvHandler:connect_end(ConnectEvent#{
socket => Socket,
protocol => Protocol:name()
- }, EventHandlerState1),
- {next_state, connected, State0#state{event_handler_state=EventHandlerState},
+ }, EvHandlerState1),
+ {next_state, connected, State0#state{event_handler_state=EvHandlerState},
{next_event, internal, {connected, Socket, Protocol}}};
{error, Reason} ->
- EventHandlerState = EventHandler:connect_end(ConnectEvent#{
+ EvHandlerState = EvHandler:connect_end(ConnectEvent#{
error => Reason
- }, EventHandlerState1),
- State = State0#state{event_handler_state=EventHandlerState},
+ }, EvHandlerState1),
+ State = State0#state{event_handler_state=EvHandlerState},
case Retries of
0 ->
{stop, {shutdown, Reason}, State};
@@ -830,8 +830,10 @@ connected(internal, {connected, Socket, Protocol},
protocol=Protocol, protocol_state=ProtoState}))};
%% Socket events.
connected(info, {OK, Socket, Data}, State=#state{socket=Socket, messages={OK, _, _},
- protocol=Protocol, protocol_state=ProtoState}) ->
- commands(Protocol:handle(Data, ProtoState), active(State));
+ protocol=Protocol, protocol_state=ProtoState,
+ event_handler=EvHandler, event_handler_state=EvHandlerState0}) ->
+ {Commands, EvHandlerState} = Protocol:handle(Data, ProtoState, EvHandler, EvHandlerState0),
+ commands(Commands, active(State#state{event_handler_state=EvHandlerState}));
connected(info, {Closed, Socket}, State=#state{socket=Socket, messages={_, Closed, _}}) ->
disconnect(State, closed);
connected(info, {Error, Socket, Reason}, State=#state{socket=Socket, messages={_, _, Error}}) ->
@@ -846,22 +848,28 @@ connected(info, keepalive, State=#state{protocol=Protocol, protocol_state=ProtoS
%% Public HTTP interface.
connected(cast, {headers, ReplyTo, StreamRef, Method, Path, Headers},
State=#state{origin_host=Host, origin_port=Port,
- protocol=Protocol, protocol_state=ProtoState}) ->
- ProtoState2 = Protocol:headers(ProtoState,
- StreamRef, ReplyTo, Method, Host, Port, Path, Headers),
- {keep_state, State#state{protocol_state=ProtoState2}};
+ protocol=Protocol, protocol_state=ProtoState,
+ event_handler=EvHandler, event_handler_state=EvHandlerState0}) ->
+ {ProtoState2, EvHandlerState} = Protocol:headers(ProtoState,
+ StreamRef, ReplyTo, Method, Host, Port, Path, Headers,
+ EvHandler, EvHandlerState0),
+ {keep_state, State#state{protocol_state=ProtoState2, event_handler_state=EvHandlerState}};
connected(cast, {request, ReplyTo, StreamRef, Method, Path, Headers, Body},
State=#state{origin_host=Host, origin_port=Port,
- protocol=Protocol, protocol_state=ProtoState}) ->
- ProtoState2 = Protocol:request(ProtoState,
- StreamRef, ReplyTo, Method, Host, Port, Path, Headers, Body),
- {keep_state, State#state{protocol_state=ProtoState2}};
+ protocol=Protocol, protocol_state=ProtoState,
+ event_handler=EvHandler, event_handler_state=EvHandlerState0}) ->
+ {ProtoState2, EvHandlerState} = Protocol:request(ProtoState,
+ StreamRef, ReplyTo, Method, Host, Port, Path, Headers, Body,
+ EvHandler, EvHandlerState0),
+ {keep_state, State#state{protocol_state=ProtoState2, event_handler_state=EvHandlerState}};
%% @todo Do we want to reject ReplyTo if it's not the process
%% who initiated the connection? For both data and cancel.
connected(cast, {data, ReplyTo, StreamRef, IsFin, Data},
- State=#state{protocol=Protocol, protocol_state=ProtoState}) ->
- ProtoState2 = Protocol:data(ProtoState, StreamRef, ReplyTo, IsFin, Data),
- {keep_state, State#state{protocol_state=ProtoState2}};
+ State=#state{protocol=Protocol, protocol_state=ProtoState,
+ event_handler=EvHandler, event_handler_state=EvHandlerState0}) ->
+ {ProtoState2, EvHandlerState} = Protocol:data(ProtoState,
+ StreamRef, ReplyTo, IsFin, Data, EvHandler, EvHandlerState0),
+ {keep_state, State#state{protocol_state=ProtoState2, event_handler_state=EvHandlerState}};
connected(cast, {connect, ReplyTo, StreamRef, Destination0, Headers},
State=#state{protocol=Protocol, protocol_state=ProtoState}) ->
%% The protocol option has been deprecated in favor of the protocols option.
@@ -1008,7 +1016,7 @@ commands([{switch_protocol, Protocol, _ProtoState0}|Tail],
disconnect(State=#state{owner=Owner, opts=Opts,
socket=Socket, transport=Transport,
protocol=Protocol, protocol_state=ProtoState,
- event_handler=EventHandler, event_handler_state=EventHandlerState0}, Reason) ->
+ event_handler=EvHandler, event_handler_state=EvHandlerState0}, Reason) ->
Protocol:close(Reason, ProtoState),
%% @todo Need a special state for orderly shutdown of a connection.
Transport:close(Socket),
@@ -1021,16 +1029,16 @@ disconnect(State=#state{owner=Owner, opts=Opts,
DisconnectEvent = #{
reason => Reason
},
- EventHandlerState = EventHandler:disconnect(DisconnectEvent, EventHandlerState0),
+ EvHandlerState = EvHandler:disconnect(DisconnectEvent, EvHandlerState0),
Retry = maps:get(retry, Opts, 5),
case Retry of
0 ->
- {stop, {shutdown, Reason}, State#state{event_handler_state=EventHandlerState}};
+ {stop, {shutdown, Reason}, State#state{event_handler_state=EvHandlerState}};
_ ->
{next_state, not_connected,
keepalive_cancel(State#state{socket=undefined,
protocol=undefined, protocol_state=undefined,
- event_handler_state=EventHandlerState}),
+ event_handler_state=EvHandlerState}),
{next_event, internal, {retries, Retry - 1}}}
end.
@@ -1079,10 +1087,10 @@ owner_down(shutdown) -> {stop, shutdown};
owner_down(Shutdown = {shutdown, _}) -> {stop, Shutdown};
owner_down(Reason) -> {stop, {shutdown, {owner_down, Reason}}}.
-terminate(Reason, StateName, #state{event_handler=EventHandler,
- event_handler_state=EventHandlerState}) ->
+terminate(Reason, StateName, #state{event_handler=EvHandler,
+ event_handler_state=EvHandlerState}) ->
TerminateEvent = #{
state => StateName,
reason => Reason
},
- EventHandler:terminate(TerminateEvent, EventHandlerState).
+ EvHandler:terminate(TerminateEvent, EvHandlerState).
diff --git a/src/gun_default_event_h.erl b/src/gun_default_event_h.erl
index 6d64ff7..a29183d 100644
--- a/src/gun_default_event_h.erl
+++ b/src/gun_default_event_h.erl
@@ -18,6 +18,9 @@
-export([init/2]).
-export([connect_start/2]).
-export([connect_end/2]).
+-export([request_start/2]).
+-export([request_headers/2]).
+-export([request_end/2]).
-export([disconnect/2]).
-export([terminate/2]).
@@ -30,6 +33,15 @@ connect_start(_EventData, State) ->
connect_end(_EventData, State) ->
State.
+request_start(_EventData, State) ->
+ State.
+
+request_headers(_EventData, State) ->
+ State.
+
+request_end(_EventData, State) ->
+ State.
+
disconnect(_EventData, State) ->
State.
diff --git a/src/gun_event.erl b/src/gun_event.erl
index 3d83bea..56f1a36 100644
--- a/src/gun_event.erl
+++ b/src/gun_event.erl
@@ -43,6 +43,31 @@
-callback connect_start(connect_event(), State) -> State.
-callback connect_end(connect_event(), State) -> State.
+%% request_start/request_headers.
+
+-type request_start_event() :: #{
+ stream_ref := reference(),
+ reply_to := pid(),
+ function := headers | request,
+ method := iodata(),
+ scheme => binary(),
+ authority := iodata(),
+ path := iodata(),
+ headers := [{binary(), iodata()}]
+}.
+
+-callback request_start(request_start_event(), State) -> State.
+-callback request_headers(request_start_event(), State) -> State.
+
+%% request_end.
+
+-type request_end_event() :: #{
+ stream_ref := reference(),
+ reply_to := pid()
+}.
+
+-callback request_end(request_end_event(), State) -> State.
+
%% disconnect.
-type disconnect_event() :: #{
@@ -67,17 +92,14 @@
%% @todo origin_changed
%% @todo transport_changed
%% @todo protocol_changed
-%% @todo stream_start
-%% @todo stream_end
-%% @todo request_start
-%% @todo request_headers
-%% @todo request_end
%% @todo response_start (call it once per inform + one for the response)
%% @todo response_inform
%% @todo response_headers
%% @todo response_end
%% @todo push_promise_start
%% @todo push_promise_end
+%% @todo cancel_start
+%% @todo cancel_end
%% @todo ws_upgrade_start
%% @todo ws_upgrade_end
%% @todo ws_frame_read_start
diff --git a/src/gun_http.erl b/src/gun_http.erl
index ca04b10..582106b 100644
--- a/src/gun_http.erl
+++ b/src/gun_http.erl
@@ -17,12 +17,12 @@
-export([check_options/1]).
-export([name/0]).
-export([init/4]).
--export([handle/2]).
+-export([handle/4]).
-export([close/2]).
-export([keepalive/1]).
--export([headers/8]).
--export([request/9]).
--export([data/5]).
+-export([headers/10]).
+-export([request/11]).
+-export([data/7]).
-export([connect/5]).
-export([cancel/3]).
-export([stream_info/2]).
@@ -93,6 +93,9 @@ init(Owner, Socket, Transport, Opts) ->
#http_state{owner=Owner, socket=Socket, transport=Transport, version=Version,
content_handlers=Handlers, transform_header_name=TransformHeaderName}.
+handle(Data, State, _EvHandler, EvHandlerState) ->
+ {handle(Data, State), EvHandlerState}.
+
%% Stop looping when we got no more data.
handle(<<>>, State) ->
{state, State};
@@ -345,42 +348,76 @@ keepalive(State=#http_state{socket=Socket, transport=Transport, out=head}) ->
keepalive(State) ->
State.
-headers(State=#http_state{socket=Socket, transport=Transport, version=Version,
- out=head}, StreamRef, ReplyTo, Method, Host, Port, Path, Headers) ->
+headers(State=#http_state{socket=Socket, transport=Transport, version=Version, out=head},
+ StreamRef, ReplyTo, Method, Host, Port, Path, Headers,
+ EvHandler, EvHandlerState0) ->
+ Authority0 = host_header(Transport, Host, Port),
Headers2 = lists:keydelete(<<"transfer-encoding">>, 1, Headers),
- Headers3 = case lists:keymember(<<"host">>, 1, Headers) of
- false -> [{<<"host">>, host_header(Transport, Host, Port)}|Headers2];
- true -> Headers2
- end,
%% We use Headers2 because this is the smallest list.
Conn = conn_from_headers(Version, Headers2),
Out = request_io_from_headers(Headers2),
+ {Authority, Headers3} = case lists:keyfind(<<"host">>, 1, Headers2) of
+ false -> {Authority0, [{<<"host">>, Authority0}|Headers2]};
+ {_, Authority1} -> {Authority1, Headers2}
+ end,
Headers4 = case Out of
body_chunked when Version =:= 'HTTP/1.0' -> Headers3;
body_chunked -> [{<<"transfer-encoding">>, <<"chunked">>}|Headers3];
_ -> Headers3
end,
Headers5 = transform_header_names(State, Headers4),
+ RequestEvent = #{
+ stream_ref => StreamRef,
+ reply_to => ReplyTo,
+ function => ?FUNCTION_NAME,
+ method => Method,
+ authority => Authority,
+ path => Path,
+ headers => Headers5
+ },
+ EvHandlerState1 = EvHandler:request_start(RequestEvent, EvHandlerState0),
Transport:send(Socket, cow_http:request(Method, Path, Version, Headers5)),
- new_stream(State#http_state{connection=Conn, out=Out}, StreamRef, ReplyTo, Method).
-
-request(State=#http_state{socket=Socket, transport=Transport, version=Version,
- out=head}, StreamRef, ReplyTo, Method, Host, Port, Path, Headers, Body) ->
+ EvHandlerState = EvHandler:request_headers(RequestEvent, EvHandlerState1),
+ {new_stream(State#http_state{connection=Conn, out=Out}, StreamRef, ReplyTo, Method),
+ EvHandlerState}.
+
+request(State=#http_state{socket=Socket, transport=Transport, version=Version, out=head},
+ StreamRef, ReplyTo, Method, Host, Port, Path, Headers, Body,
+ EvHandler, EvHandlerState0) ->
+ Authority0 = host_header(Transport, Host, Port),
Headers2 = lists:keydelete(<<"content-length">>, 1,
lists:keydelete(<<"transfer-encoding">>, 1, Headers)),
- Headers3 = case lists:keymember(<<"host">>, 1, Headers) of
- false -> [{<<"host">>, host_header(Transport, Host, Port)}|Headers2];
- true -> Headers2
- end,
- Headers4 = transform_header_names(State, Headers3),
%% We use Headers2 because this is the smallest list.
Conn = conn_from_headers(Version, Headers2),
+ {Authority, Headers3} = case lists:keyfind(<<"host">>, 1, Headers2) of
+ false -> {Authority0, [{<<"host">>, Authority0}|Headers2]};
+ {_, Authority1} -> {Authority1, Headers2}
+ end,
+ Headers4 = transform_header_names(State, Headers3),
+ Headers5 = [
+ {<<"content-length">>, integer_to_binary(iolist_size(Body))}
+ |Headers4],
+ RequestEvent = #{
+ stream_ref => StreamRef,
+ reply_to => ReplyTo,
+ function => ?FUNCTION_NAME,
+ method => Method,
+ authority => Authority,
+ path => Path,
+ headers => Headers5
+ },
+ EvHandlerState1 = EvHandler:request_start(RequestEvent, EvHandlerState0),
Transport:send(Socket, [
- cow_http:request(Method, Path, Version, [
- {<<"content-length">>, integer_to_binary(iolist_size(Body))}
- |Headers4]),
+ cow_http:request(Method, Path, Version, Headers5),
Body]),
- new_stream(State#http_state{connection=Conn}, StreamRef, ReplyTo, Method).
+ EvHandlerState2 = EvHandler:request_headers(RequestEvent, EvHandlerState1),
+ RequestEndEvent = #{
+ stream_ref => StreamRef,
+ reply_to => ReplyTo
+ },
+ EvHandlerState = EvHandler:request_end(RequestEndEvent, EvHandlerState2),
+ {new_stream(State#http_state{connection=Conn}, StreamRef, ReplyTo, Method),
+ EvHandlerState}.
host_header(Transport, Host0, Port) ->
Host = case Host0 of
@@ -399,14 +436,15 @@ transform_header_names(#http_state{transform_header_name = Fun}, Headers) ->
lists:keymap(Fun, 1, Headers).
%% We are expecting a new stream.
-data(State=#http_state{out=head}, StreamRef, ReplyTo, _, _) ->
- error_stream_closed(State, StreamRef, ReplyTo);
+data(State=#http_state{out=head}, StreamRef, ReplyTo, _, _, _, EvHandlerState) ->
+ {error_stream_closed(State, StreamRef, ReplyTo), EvHandlerState};
%% There are no active streams.
-data(State=#http_state{streams=[]}, StreamRef, ReplyTo, _, _) ->
- error_stream_not_found(State, StreamRef, ReplyTo);
+data(State=#http_state{streams=[]}, StreamRef, ReplyTo, _, _, _, EvHandlerState) ->
+ {error_stream_not_found(State, StreamRef, ReplyTo), EvHandlerState};
%% We can only send data on the last created stream.
data(State=#http_state{socket=Socket, transport=Transport, version=Version,
- out=Out, streams=Streams}, StreamRef, ReplyTo, IsFin, Data) ->
+ out=Out, streams=Streams}, StreamRef, ReplyTo, IsFin, Data,
+ EvHandler, EvHandlerState0) ->
case lists:last(Streams) of
#stream{ref=StreamRef, is_alive=true} ->
DataLength = iolist_size(Data),
@@ -421,25 +459,35 @@ data(State=#http_state{socket=Socket, transport=Transport, version=Version,
cow_http_te:last_chunk()
])
end,
- State#http_state{out=head};
+ RequestEndEvent = #{
+ stream_ref => StreamRef,
+ reply_to => ReplyTo
+ },
+ EvHandlerState = EvHandler:request_end(RequestEndEvent, EvHandlerState0),
+ {State#http_state{out=head}, EvHandlerState};
body_chunked when Version =:= 'HTTP/1.1' ->
Transport:send(Socket, cow_http_te:chunk(Data)),
- State;
+ {State, EvHandlerState0};
{body, Length} when DataLength =< Length ->
Transport:send(Socket, Data),
Length2 = Length - DataLength,
if
Length2 =:= 0, IsFin =:= fin ->
- State#http_state{out=head};
+ RequestEndEvent = #{
+ stream_ref => StreamRef,
+ reply_to => ReplyTo
+ },
+ EvHandlerState = EvHandler:request_end(RequestEndEvent, EvHandlerState0),
+ {State#http_state{out=head}, EvHandlerState};
Length2 > 0, IsFin =:= nofin ->
- State#http_state{out={body, Length2}}
+ {State#http_state{out={body, Length2}}, EvHandlerState0}
end;
body_chunked -> %% HTTP/1.0
Transport:send(Socket, Data),
- State
+ {State, EvHandlerState0}
end;
_ ->
- error_stream_not_found(State, StreamRef, ReplyTo)
+ {error_stream_not_found(State, StreamRef, ReplyTo), EvHandlerState0}
end.
connect(State=#http_state{streams=Streams}, StreamRef, ReplyTo, _, _) when Streams =/= [] ->
diff --git a/src/gun_http2.erl b/src/gun_http2.erl
index 1dd2a75..40afb16 100644
--- a/src/gun_http2.erl
+++ b/src/gun_http2.erl
@@ -17,12 +17,12 @@
-export([check_options/1]).
-export([name/0]).
-export([init/4]).
--export([handle/2]).
+-export([handle/4]).
-export([close/2]).
-export([keepalive/1]).
--export([headers/8]).
--export([request/9]).
--export([data/5]).
+-export([headers/10]).
+-export([request/11]).
+-export([data/7]).
-export([cancel/3]).
-export([stream_info/2]).
-export([down/1]).
@@ -85,59 +85,70 @@ init(Owner, Socket, Transport, Opts) ->
Transport:send(Socket, Preface),
State.
-handle(Data, State=#http2_state{buffer=Buffer}) ->
- parse(<< Buffer/binary, Data/binary >>, State#http2_state{buffer= <<>>}).
+handle(Data, State=#http2_state{buffer=Buffer}, EvHandler, EvHandlerState) ->
+ parse(<< Buffer/binary, Data/binary >>, State#http2_state{buffer= <<>>},
+ EvHandler, EvHandlerState).
-parse(Data, State0=#http2_state{http2_machine=HTTP2Machine}) ->
+parse(Data, State0=#http2_state{http2_machine=HTTP2Machine}, EvHandler, EvHandlerState0) ->
MaxFrameSize = cow_http2_machine:get_local_setting(max_frame_size, HTTP2Machine),
case cow_http2:parse(Data, MaxFrameSize) of
{ok, Frame, Rest} ->
- case frame(State0, Frame) of
- close -> close;
- State -> parse(Rest, State)
+ case frame(State0, Frame, EvHandler, EvHandlerState0) of
+ Close = {close, _} -> Close;
+ {State, EvHandlerState} -> parse(Rest, State, EvHandler, EvHandlerState)
end;
{ignore, Rest} ->
case ignored_frame(State0) of
- close -> close;
- State -> parse(Rest, State)
+ close -> {close, EvHandlerState0};
+ State -> parse(Rest, State, EvHandler, EvHandlerState0)
end;
{stream_error, StreamID, Reason, Human, Rest} ->
- parse(Rest, reset_stream(State0, StreamID, {stream_error, Reason, Human}));
+ parse(Rest, reset_stream(State0, StreamID, {stream_error, Reason, Human}),
+ EvHandler, EvHandlerState0);
Error = {connection_error, _, _} ->
- terminate(State0, Error);
+ {terminate(State0, Error), EvHandlerState0};
more ->
- {state, State0#http2_state{buffer=Data}}
+ {{state, State0#http2_state{buffer=Data}}, EvHandlerState0}
end.
%% Frames received.
-frame(State=#http2_state{http2_machine=HTTP2Machine0}, Frame) ->
+frame(State=#http2_state{http2_machine=HTTP2Machine0}, Frame, EvHandler, EvHandlerState) ->
case cow_http2_machine:frame(Frame, HTTP2Machine0) of
{ok, HTTP2Machine} ->
- maybe_ack(State#http2_state{http2_machine=HTTP2Machine}, Frame);
+ {maybe_ack(State#http2_state{http2_machine=HTTP2Machine}, Frame),
+ EvHandlerState};
{ok, {data, StreamID, IsFin, Data}, HTTP2Machine} ->
- data_frame(State#http2_state{http2_machine=HTTP2Machine}, StreamID, IsFin, Data);
+ data_frame(State#http2_state{http2_machine=HTTP2Machine}, StreamID, IsFin, Data,
+ EvHandler, EvHandlerState);
{ok, {headers, StreamID, IsFin, Headers, PseudoHeaders, BodyLen}, HTTP2Machine} ->
headers_frame(State#http2_state{http2_machine=HTTP2Machine},
- StreamID, IsFin, Headers, PseudoHeaders, BodyLen);
+ StreamID, IsFin, Headers, PseudoHeaders, BodyLen,
+ EvHandler, EvHandlerState);
{ok, {trailers, StreamID, Trailers}, HTTP2Machine} ->
trailers_frame(State#http2_state{http2_machine=HTTP2Machine},
- StreamID, Trailers);
+ StreamID, Trailers, EvHandler, EvHandlerState);
{ok, {rst_stream, StreamID, Reason}, HTTP2Machine} ->
- rst_stream_frame(State#http2_state{http2_machine=HTTP2Machine}, StreamID, Reason);
+ {rst_stream_frame(State#http2_state{http2_machine=HTTP2Machine}, StreamID, Reason),
+ EvHandlerState};
{ok, {push_promise, StreamID, PromisedStreamID, Headers, PseudoHeaders}, HTTP2Machine} ->
- push_promise_frame(State#http2_state{http2_machine=HTTP2Machine},
- StreamID, PromisedStreamID, Headers, PseudoHeaders);
+ {push_promise_frame(State#http2_state{http2_machine=HTTP2Machine},
+ StreamID, PromisedStreamID, Headers, PseudoHeaders),
+ EvHandlerState};
{ok, Frame={goaway, _StreamID, _Reason, _Data}, HTTP2Machine} ->
- terminate(State#http2_state{http2_machine=HTTP2Machine},
- {stop, Frame, 'Server is going away.'});
+ {terminate(State#http2_state{http2_machine=HTTP2Machine},
+ {stop, Frame, 'Server is going away.'}),
+ EvHandlerState};
{send, SendData, HTTP2Machine} ->
- send_data(maybe_ack(State#http2_state{http2_machine=HTTP2Machine}, Frame), SendData);
+ send_data(maybe_ack(State#http2_state{http2_machine=HTTP2Machine}, Frame), SendData,
+ EvHandler, EvHandlerState);
{error, {stream_error, StreamID, Reason, Human}, HTTP2Machine} ->
- reset_stream(State#http2_state{http2_machine=HTTP2Machine},
- StreamID, {stream_error, Reason, Human});
+ {reset_stream(State#http2_state{http2_machine=HTTP2Machine},
+ StreamID, {stream_error, Reason, Human}),
+ EvHandlerState};
{error, Error={connection_error, _, _}, HTTP2Machine} ->
- terminate(State#http2_state{http2_machine=HTTP2Machine}, Error)
+ {terminate(State#http2_state{http2_machine=HTTP2Machine}, Error),
+ EvHandlerState}
end.
maybe_ack(State=#http2_state{socket=Socket, transport=Transport}, Frame) ->
@@ -149,7 +160,8 @@ maybe_ack(State=#http2_state{socket=Socket, transport=Transport}, Frame) ->
State.
data_frame(State=#http2_state{socket=Socket, transport=Transport,
- http2_machine=HTTP2Machine0}, StreamID, IsFin, Data) ->
+ http2_machine=HTTP2Machine0}, StreamID, IsFin, Data,
+ _EvHandler, EvHandlerState) ->
Stream = #stream{handler_state=Handlers0} = get_stream_by_id(State, StreamID),
Handlers = gun_content_handler:handle(IsFin, Data, Handlers0),
Size = byte_size(Data),
@@ -169,16 +181,18 @@ data_frame(State=#http2_state{socket=Socket, transport=Transport,
HTTP2Machine1
end
end,
- maybe_delete_stream(store_stream(State#http2_state{http2_machine=HTTP2Machine},
- Stream#stream{handler_state=Handlers}), StreamID, remote, IsFin).
+ {maybe_delete_stream(store_stream(State#http2_state{http2_machine=HTTP2Machine},
+ Stream#stream{handler_state=Handlers}), StreamID, remote, IsFin),
+ EvHandlerState}.
headers_frame(State=#http2_state{content_handlers=Handlers0},
- StreamID, IsFin, Headers, PseudoHeaders, _BodyLen) ->
+ StreamID, IsFin, Headers, PseudoHeaders, _BodyLen,
+ _EvHandler, EvHandlerState) ->
Stream = #stream{ref=StreamRef, reply_to=ReplyTo} = get_stream_by_id(State, StreamID),
case PseudoHeaders of
#{status := Status} when Status >= 100, Status =< 199 ->
ReplyTo ! {gun_inform, self(), StreamRef, Status, Headers},
- State;
+ {State, EvHandlerState};
#{status := Status} ->
ReplyTo ! {gun_response, self(), StreamRef, IsFin, Status, Headers},
Handlers = case IsFin of
@@ -187,15 +201,16 @@ headers_frame(State=#http2_state{content_handlers=Handlers0},
gun_content_handler:init(ReplyTo, StreamRef,
Status, Headers, Handlers0)
end,
- maybe_delete_stream(store_stream(State, Stream#stream{handler_state=Handlers}),
- StreamID, remote, IsFin)
+ {maybe_delete_stream(store_stream(State, Stream#stream{handler_state=Handlers}),
+ StreamID, remote, IsFin),
+ EvHandlerState}
end.
-trailers_frame(State, StreamID, Trailers) ->
+trailers_frame(State, StreamID, Trailers, _EvHandler, EvHandlerState) ->
#stream{ref=StreamRef, reply_to=ReplyTo} = get_stream_by_id(State, StreamID),
%% @todo We probably want to pass this to gun_content_handler?
ReplyTo ! {gun_trailers, self(), StreamRef, Trailers},
- maybe_delete_stream(State, StreamID, remote, fin).
+ {maybe_delete_stream(State, StreamID, remote, fin), EvHandlerState}.
rst_stream_frame(State=#http2_state{streams=Streams0}, StreamID, Reason) ->
case lists:keytake(StreamID, #stream.id, Streams0) of
@@ -243,30 +258,56 @@ keepalive(State=#http2_state{socket=Socket, transport=Transport}) ->
headers(State=#http2_state{socket=Socket, transport=Transport,
http2_machine=HTTP2Machine0, streams=Streams},
- StreamRef, ReplyTo, Method, Host, Port, Path, Headers0) ->
+ StreamRef, ReplyTo, Method, Host, Port, Path, Headers0,
+ EvHandler, EvHandlerState0) ->
{ok, StreamID, HTTP2Machine1} = cow_http2_machine:init_stream(
iolist_to_binary(Method), HTTP2Machine0),
{ok, PseudoHeaders, Headers} = prepare_headers(State, Method, Host, Port, Path, Headers0),
+ RequestEvent = #{
+ stream_ref => StreamRef,
+ reply_to => ReplyTo,
+ function => ?FUNCTION_NAME,
+ method => Method,
+ authority => maps:get(authority, PseudoHeaders),
+ path => Path,
+ headers => Headers
+ },
+ EvHandlerState1 = EvHandler:request_start(RequestEvent, EvHandlerState0),
{ok, IsFin, HeaderBlock, HTTP2Machine} = cow_http2_machine:prepare_headers(
StreamID, HTTP2Machine1, nofin, PseudoHeaders, Headers),
Transport:send(Socket, cow_http2:headers(StreamID, IsFin, HeaderBlock)),
+ EvHandlerState = EvHandler:request_headers(RequestEvent, EvHandlerState1),
Stream = #stream{id=StreamID, ref=StreamRef, reply_to=ReplyTo},
- State#http2_state{http2_machine=HTTP2Machine, streams=[Stream|Streams]}.
+ {State#http2_state{http2_machine=HTTP2Machine, streams=[Stream|Streams]},
+ EvHandlerState}.
request(State=#http2_state{socket=Socket, transport=Transport,
http2_machine=HTTP2Machine0, streams=Streams},
- StreamRef, ReplyTo, Method, Host, Port, Path, Headers0, Body) ->
+ StreamRef, ReplyTo, Method, Host, Port, Path, Headers0, Body,
+ EvHandler, EvHandlerState0) ->
Headers1 = lists:keystore(<<"content-length">>, 1, Headers0,
{<<"content-length">>, integer_to_binary(iolist_size(Body))}),
{ok, StreamID, HTTP2Machine1} = cow_http2_machine:init_stream(
iolist_to_binary(Method), HTTP2Machine0),
{ok, PseudoHeaders, Headers} = prepare_headers(State, Method, Host, Port, Path, Headers1),
+ RequestEvent = #{
+ stream_ref => StreamRef,
+ reply_to => ReplyTo,
+ function => ?FUNCTION_NAME,
+ method => Method,
+ authority => maps:get(authority, PseudoHeaders),
+ path => Path,
+ headers => Headers
+ },
+ EvHandlerState1 = EvHandler:request_start(RequestEvent, EvHandlerState0),
{ok, IsFin, HeaderBlock, HTTP2Machine} = cow_http2_machine:prepare_headers(
StreamID, HTTP2Machine1, nofin, PseudoHeaders, Headers),
Transport:send(Socket, cow_http2:headers(StreamID, IsFin, HeaderBlock)),
+ EvHandlerState = EvHandler:request_headers(RequestEvent, EvHandlerState1),
Stream = #stream{id=StreamID, ref=StreamRef, reply_to=ReplyTo},
maybe_send_data(State#http2_state{http2_machine=HTTP2Machine,
- streams=[Stream|Streams]}, StreamID, fin, Body).
+ streams=[Stream|Streams]}, StreamID, fin, Body,
+ EvHandler, EvHandlerState).
prepare_headers(#http2_state{transport=Transport}, Method, Host0, Port, Path, Headers0) ->
Authority = case lists:keyfind(<<"host">>, 1, Headers0) of
@@ -293,45 +334,59 @@ prepare_headers(#http2_state{transport=Transport}, Method, Host0, Port, Path, He
},
{ok, PseudoHeaders, Headers}.
-data(State=#http2_state{http2_machine=HTTP2Machine}, StreamRef, ReplyTo, IsFin, Data) ->
+data(State=#http2_state{http2_machine=HTTP2Machine}, StreamRef, ReplyTo, IsFin, Data,
+ EvHandler, EvHandlerState) ->
case get_stream_by_ref(State, StreamRef) of
#stream{id=StreamID} ->
case cow_http2_machine:get_stream_local_state(StreamID, HTTP2Machine) of
{ok, fin, _} ->
- error_stream_closed(State, StreamRef, ReplyTo);
+ {error_stream_closed(State, StreamRef, ReplyTo), EvHandlerState};
{ok, _, fin} ->
- error_stream_closed(State, StreamRef, ReplyTo);
+ {error_stream_closed(State, StreamRef, ReplyTo), EvHandlerState};
{ok, _, _} ->
- maybe_send_data(State, StreamID, IsFin, Data)
+ maybe_send_data(State, StreamID, IsFin, Data, EvHandler, EvHandlerState)
end;
false ->
- error_stream_not_found(State, StreamRef, ReplyTo)
+ {error_stream_not_found(State, StreamRef, ReplyTo), EvHandlerState}
end.
-maybe_send_data(State=#http2_state{http2_machine=HTTP2Machine0}, StreamID, IsFin, Data0) ->
+maybe_send_data(State=#http2_state{http2_machine=HTTP2Machine0}, StreamID, IsFin, Data0,
+ EvHandler, EvHandlerState) ->
Data = case is_tuple(Data0) of
false -> {data, Data0};
true -> Data0
end,
case cow_http2_machine:send_or_queue_data(StreamID, HTTP2Machine0, IsFin, Data) of
{ok, HTTP2Machine} ->
- State#http2_state{http2_machine=HTTP2Machine};
+ {State#http2_state{http2_machine=HTTP2Machine}, EvHandlerState};
{send, SendData, HTTP2Machine} ->
- send_data(State#http2_state{http2_machine=HTTP2Machine}, SendData)
+ send_data(State#http2_state{http2_machine=HTTP2Machine}, SendData,
+ EvHandler, EvHandlerState)
end.
-send_data(State, []) ->
- State;
-send_data(State0, [{StreamID, IsFin, SendData}|Tail]) ->
- State = send_data(State0, StreamID, IsFin, SendData),
- send_data(State, Tail).
+send_data(State, [], _, EvHandlerState) ->
+ {State, EvHandlerState};
+send_data(State0, [{StreamID, IsFin, SendData}|Tail], EvHandler, EvHandlerState0) ->
+ {State, EvHandlerState} = send_data(State0, StreamID, IsFin, SendData, EvHandler, EvHandlerState0),
+ send_data(State, Tail, EvHandler, EvHandlerState).
-send_data(State0, StreamID, IsFin, [Data]) ->
+send_data(State0, StreamID, IsFin, [Data], EvHandler, EvHandlerState0) ->
State = send_data_frame(State0, StreamID, IsFin, Data),
- maybe_delete_stream(State, StreamID, local, IsFin);
-send_data(State0, StreamID, IsFin, [Data|Tail]) ->
+ EvHandlerState = case IsFin of
+ nofin ->
+ EvHandlerState0;
+ fin ->
+ #stream{ref=StreamRef, reply_to=ReplyTo} = get_stream_by_id(State, StreamID),
+ RequestEndEvent = #{
+ stream_ref => StreamRef,
+ reply_to => ReplyTo
+ },
+ EvHandler:request_end(RequestEndEvent, EvHandlerState0)
+ end,
+ {maybe_delete_stream(State, StreamID, local, IsFin), EvHandlerState};
+send_data(State0, StreamID, IsFin, [Data|Tail], EvHandler, EvHandlerState) ->
State = send_data_frame(State0, StreamID, nofin, Data),
- send_data(State, StreamID, IsFin, Tail).
+ send_data(State, StreamID, IsFin, Tail, EvHandler, EvHandlerState).
send_data_frame(State=#http2_state{socket=Socket, transport=Transport},
StreamID, IsFin, {data, Data}) ->
diff --git a/src/gun_ws.erl b/src/gun_ws.erl
index 176ba3b..ff54ecd 100644
--- a/src/gun_ws.erl
+++ b/src/gun_ws.erl
@@ -17,7 +17,7 @@
-export([check_options/1]).
-export([name/0]).
-export([init/8]).
--export([handle/2]).
+-export([handle/4]).
-export([close/2]).
-export([send/2]).
-export([down/1]).
@@ -72,6 +72,9 @@ init(Owner, Socket, Transport, StreamRef, Headers, Extensions, Handler, Opts) ->
{switch_protocol, ?MODULE, #ws_state{owner=Owner, socket=Socket, transport=Transport,
extensions=Extensions, handler=Handler, handler_state=HandlerState}}.
+handle(Data, State, _EvHandler, EvHandlerState) ->
+ {handle(Data, State), EvHandlerState}.
+
%% Do not handle anything if we received a close frame.
handle(_, State=#ws_state{in=close}) ->
{state, State};
diff --git a/test/event_SUITE.erl b/test/event_SUITE.erl
index 05bff8c..d52296e 100644
--- a/test/event_SUITE.erl
+++ b/test/event_SUITE.erl
@@ -23,7 +23,17 @@
-import(gun_test, [init_origin/1]).
all() ->
- ct_helper:all(?MODULE).
+ [
+ {group, http},
+ {group, http2}
+ ].
+
+groups() ->
+ Tests = ct_helper:all(?MODULE),
+ [
+ {http, [parallel], Tests},
+ {http2, [parallel], Tests}
+ ].
init_per_suite(Config) ->
{ok, _} = cowboy:start_clear(?MODULE, [], #{env => #{
@@ -37,10 +47,13 @@ end_per_suite(_) ->
%% init.
-init(_) ->
+init(Config) ->
doc("Confirm that the init event callback is called."),
Self = self(),
- Opts = #{event_handler => {?MODULE, Self}},
+ Opts = #{
+ event_handler => {?MODULE, self()},
+ protocols => [config(name, config(tc_group_properties, Config))]
+ },
{ok, Pid} = gun:open("localhost", 12345, Opts),
#{
owner := Self,
@@ -54,11 +67,9 @@ init(_) ->
%% connect_start/connect_end.
-connect_start(_) ->
+connect_start(Config) ->
doc("Confirm that the connect_start event callback is called."),
- Self = self(),
- Opts = #{event_handler => {?MODULE, Self}},
- {ok, Pid} = gun:open("localhost", 12345, Opts),
+ {ok, Pid, _} = do_gun_open(12345, Config),
#{
host := "localhost",
port := 12345,
@@ -68,11 +79,9 @@ connect_start(_) ->
} = do_receive_event(?FUNCTION_NAME),
gun:close(Pid).
-connect_end_error(_) ->
+connect_end_error(Config) ->
doc("Confirm that the connect_end event callback is called on connect failure."),
- Self = self(),
- Opts = #{event_handler => {?MODULE, Self}},
- {ok, Pid} = gun:open("localhost", 12345, Opts),
+ {ok, Pid, _} = do_gun_open(12345, Config),
#{
host := "localhost",
port := 12345,
@@ -85,10 +94,8 @@ connect_end_error(_) ->
connect_end_ok(Config) ->
doc("Confirm that the connect_end event callback is called on connect success."),
- Self = self(),
- Opts = #{event_handler => {?MODULE, Self}},
- OriginPort = config(origin_port, Config),
- {ok, Pid} = gun:open("localhost", OriginPort, Opts),
+ {ok, Pid, OriginPort} = do_gun_open(Config),
+ {ok, Protocol} = gun:await_up(Pid),
#{
host := "localhost",
port := OriginPort,
@@ -96,17 +103,111 @@ connect_end_ok(Config) ->
transport_opts := _,
timeout := _,
socket := _,
- protocol := http
+ protocol := Protocol
} = do_receive_event(connect_end),
gun:close(Pid).
-disconnect(_) ->
+request_start(Config) ->
+ doc("Confirm that the request_start event callback is called."),
+ do_request_event(Config, ?FUNCTION_NAME),
+ do_request_event_headers(Config, ?FUNCTION_NAME).
+
+request_headers(Config) ->
+ doc("Confirm that the request_headers event callback is called."),
+ do_request_event(Config, ?FUNCTION_NAME),
+ do_request_event_headers(Config, ?FUNCTION_NAME).
+
+do_request_event(Config, EventName) ->
+ {ok, Pid, OriginPort} = do_gun_open(Config),
+ {ok, _} = gun:await_up(Pid),
+ StreamRef = gun:get(Pid, "/"),
+ ReplyTo = self(),
+ Authority = iolist_to_binary([<<"localhost:">>, integer_to_list(OriginPort)]),
+ #{
+ stream_ref := StreamRef,
+ reply_to := ReplyTo,
+ function := request,
+ method := <<"GET">>,
+ authority := EventAuthority,
+ path := "/",
+ headers := [_|_]
+ } = do_receive_event(EventName),
+ Authority = iolist_to_binary(EventAuthority),
+ gun:close(Pid).
+
+do_request_event_headers(Config, EventName) ->
+ {ok, Pid, OriginPort} = do_gun_open(Config),
+ {ok, _} = gun:await_up(Pid),
+ StreamRef = gun:put(Pid, "/", [
+ {<<"content-type">>, <<"text/plain">>}
+ ]),
+ ReplyTo = self(),
+ Authority = iolist_to_binary([<<"localhost:">>, integer_to_list(OriginPort)]),
+ #{
+ stream_ref := StreamRef,
+ reply_to := ReplyTo,
+ function := headers,
+ method := <<"PUT">>,
+ authority := EventAuthority,
+ path := "/",
+ headers := [_|_]
+ } = do_receive_event(EventName),
+ Authority = iolist_to_binary(EventAuthority),
+ gun:close(Pid).
+
+request_end(Config) ->
+ doc("Confirm that the request_end event callback is called."),
+ do_request_end_event(Config, ?FUNCTION_NAME),
+ do_request_end_event_headers(Config, ?FUNCTION_NAME),
+ do_request_end_event_headers_content_length(Config, ?FUNCTION_NAME).
+
+do_request_end_event(Config, EventName) ->
+ {ok, Pid, _} = do_gun_open(Config),
+ {ok, _} = gun:await_up(Pid),
+ StreamRef = gun:get(Pid, "/"),
+ ReplyTo = self(),
+ #{
+ stream_ref := StreamRef,
+ reply_to := ReplyTo
+ } = do_receive_event(EventName),
+ gun:close(Pid).
+
+do_request_end_event_headers(Config, EventName) ->
+ {ok, Pid, _} = do_gun_open(Config),
+ {ok, _} = gun:await_up(Pid),
+ StreamRef = gun:put(Pid, "/", [
+ {<<"content-type">>, <<"text/plain">>}
+ ]),
+ gun:data(Pid, StreamRef, nofin, <<"Hello ">>),
+ gun:data(Pid, StreamRef, fin, <<"world!">>),
+ ReplyTo = self(),
+ #{
+ stream_ref := StreamRef,
+ reply_to := ReplyTo
+ } = do_receive_event(EventName),
+ gun:close(Pid).
+
+do_request_end_event_headers_content_length(Config, EventName) ->
+ {ok, Pid, _} = do_gun_open(Config),
+ {ok, _} = gun:await_up(Pid),
+ StreamRef = gun:put(Pid, "/", [
+ {<<"content-type">>, <<"text/plain">>},
+ {<<"content-length">>, <<"12">>}
+ ]),
+ gun:data(Pid, StreamRef, nofin, <<"Hello ">>),
+ gun:data(Pid, StreamRef, fin, <<"world!">>),
+ ReplyTo = self(),
+ #{
+ stream_ref := StreamRef,
+ reply_to := ReplyTo
+ } = do_receive_event(EventName),
+ gun:close(Pid).
+
+disconnect(Config) ->
doc("Confirm that the disconnect event callback is called on disconnect."),
- Self = self(),
- Opts = #{event_handler => {?MODULE, Self}},
{ok, OriginPid, OriginPort} = init_origin(tcp),
- {ok, Pid} = gun:open("localhost", OriginPort, Opts),
- {ok, http} = gun:await_up(Pid),
+ {ok, Pid, _} = do_gun_open(OriginPort, Config),
+ {ok, _} = gun:await_up(Pid),
%% We make the origin exit to trigger a disconnect.
unlink(OriginPid),
exit(OriginPid, shutdown),
@@ -115,11 +216,9 @@ disconnect(_) ->
} = do_receive_event(disconnect),
gun:close(Pid).
-terminate(_) ->
+terminate(Config) ->
doc("Confirm that the terminate event callback is called on terminate."),
- Self = self(),
- Opts = #{event_handler => {?MODULE, Self}},
- {ok, Pid} = gun:open("localhost", 12345, Opts),
+ {ok, Pid, _} = do_gun_open(12345, Config),
gun:close(Pid),
#{
state := not_connected,
@@ -129,6 +228,18 @@ terminate(_) ->
%% Internal.
+do_gun_open(Config) ->
+ OriginPort = config(origin_port, Config),
+ do_gun_open(OriginPort, Config).
+
+do_gun_open(OriginPort, Config) ->
+ Opts = #{
+ event_handler => {?MODULE, self()},
+ protocols => [config(name, config(tc_group_properties, Config))]
+ },
+ {ok, Pid} = gun:open("localhost", OriginPort, Opts),
+ {ok, Pid, OriginPort}.
+
do_receive_event(Event) ->
receive
{Event, EventData} ->
@@ -154,6 +265,18 @@ connect_end(EventData, Pid) ->
Pid ! {?FUNCTION_NAME, EventData},
Pid.
+request_start(EventData, Pid) ->
+ Pid ! {?FUNCTION_NAME, EventData},
+ Pid.
+
+request_headers(EventData, Pid) ->
+ Pid ! {?FUNCTION_NAME, EventData},
+ Pid.
+
+request_end(EventData, Pid) ->
+ Pid ! {?FUNCTION_NAME, EventData},
+ Pid.
+
disconnect(EventData, Pid) ->
Pid ! {?FUNCTION_NAME, EventData},
Pid.