diff options
author | Loïc Hoguin <[email protected]> | 2019-06-02 17:26:37 +0200 |
---|---|---|
committer | Loïc Hoguin <[email protected]> | 2019-06-02 17:28:34 +0200 |
commit | b4c7749176e0a55b5763f3e04bf9312adff7ea82 (patch) | |
tree | cbb923a692aa3c578501a0a27e550ec9a4062a71 /src | |
parent | a309f196d15d3045d2e70b2d7e23858f47adb7df (diff) | |
download | gun-b4c7749176e0a55b5763f3e04bf9312adff7ea82.tar.gz gun-b4c7749176e0a55b5763f3e04bf9312adff7ea82.tar.bz2 gun-b4c7749176e0a55b5763f3e04bf9312adff7ea82.zip |
Add request_start, request_headers and request_end events
Diffstat (limited to 'src')
-rw-r--r-- | src/gun.erl | 72 | ||||
-rw-r--r-- | src/gun_default_event_h.erl | 12 | ||||
-rw-r--r-- | src/gun_event.erl | 32 | ||||
-rw-r--r-- | src/gun_http.erl | 116 | ||||
-rw-r--r-- | src/gun_http2.erl | 173 | ||||
-rw-r--r-- | src/gun_ws.erl | 5 |
6 files changed, 279 insertions, 131 deletions
diff --git a/src/gun.erl b/src/gun.erl index 0ad12cf..a15ca5b 100644 --- a/src/gun.erl +++ b/src/gun.erl @@ -730,21 +730,21 @@ init({Owner, Host, Port, Opts}) -> tls -> {<<"https">>, gun_tls} end, OwnerRef = monitor(process, Owner), - {EventHandler, EventHandlerState0} = maps:get(event_handler, Opts, + {EvHandler, EvHandlerState0} = maps:get(event_handler, Opts, {gun_default_event_h, undefined}), - EventHandlerState = EventHandler:init(#{ + EvHandlerState = EvHandler:init(#{ owner => Owner, transport => OriginTransport, origin_scheme => OriginScheme, origin_host => Host, origin_port => Port, opts => Opts - }, EventHandlerState0), + }, EvHandlerState0), State = #state{owner=Owner, owner_ref=OwnerRef, host=Host, port=Port, origin_scheme=OriginScheme, origin_host=Host, origin_port=Port, opts=Opts, transport=Transport, messages=Transport:messages(), - event_handler=EventHandler, event_handler_state=EventHandlerState}, + event_handler=EvHandler, event_handler_state=EvHandlerState}, {ok, not_connected, State, {next_event, internal, {retries, Retry}}}. @@ -752,7 +752,7 @@ default_transport(443) -> tls; default_transport(_) -> tcp. not_connected(_, {retries, Retries}, State0=#state{host=Host, port=Port, opts=Opts, - transport=Transport, event_handler=EventHandler, event_handler_state=EventHandlerState0}) -> + transport=Transport, event_handler=EvHandler, event_handler_state=EvHandlerState0}) -> TransOpts0 = maps:get(transport_opts, Opts, []), TransOpts1 = case Transport of gun_tcp -> TransOpts0; @@ -767,7 +767,7 @@ not_connected(_, {retries, Retries}, State0=#state{host=Host, port=Port, opts=Op transport_opts => TransOpts, timeout => ConnectTimeout }, - EventHandlerState1 = EventHandler:connect_start(ConnectEvent, EventHandlerState0), + EvHandlerState1 = EvHandler:connect_start(ConnectEvent, EvHandlerState0), case Transport:connect(Host, Port, TransOpts, ConnectTimeout) of {ok, Socket} -> Protocol = case Transport of @@ -782,17 +782,17 @@ not_connected(_, {retries, Retries}, State0=#state{host=Host, port=Port, opts=Op _ -> gun_http end end, - EventHandlerState = EventHandler:connect_end(ConnectEvent#{ + EvHandlerState = EvHandler:connect_end(ConnectEvent#{ socket => Socket, protocol => Protocol:name() - }, EventHandlerState1), - {next_state, connected, State0#state{event_handler_state=EventHandlerState}, + }, EvHandlerState1), + {next_state, connected, State0#state{event_handler_state=EvHandlerState}, {next_event, internal, {connected, Socket, Protocol}}}; {error, Reason} -> - EventHandlerState = EventHandler:connect_end(ConnectEvent#{ + EvHandlerState = EvHandler:connect_end(ConnectEvent#{ error => Reason - }, EventHandlerState1), - State = State0#state{event_handler_state=EventHandlerState}, + }, EvHandlerState1), + State = State0#state{event_handler_state=EvHandlerState}, case Retries of 0 -> {stop, {shutdown, Reason}, State}; @@ -830,8 +830,10 @@ connected(internal, {connected, Socket, Protocol}, protocol=Protocol, protocol_state=ProtoState}))}; %% Socket events. connected(info, {OK, Socket, Data}, State=#state{socket=Socket, messages={OK, _, _}, - protocol=Protocol, protocol_state=ProtoState}) -> - commands(Protocol:handle(Data, ProtoState), active(State)); + protocol=Protocol, protocol_state=ProtoState, + event_handler=EvHandler, event_handler_state=EvHandlerState0}) -> + {Commands, EvHandlerState} = Protocol:handle(Data, ProtoState, EvHandler, EvHandlerState0), + commands(Commands, active(State#state{event_handler_state=EvHandlerState})); connected(info, {Closed, Socket}, State=#state{socket=Socket, messages={_, Closed, _}}) -> disconnect(State, closed); connected(info, {Error, Socket, Reason}, State=#state{socket=Socket, messages={_, _, Error}}) -> @@ -846,22 +848,28 @@ connected(info, keepalive, State=#state{protocol=Protocol, protocol_state=ProtoS %% Public HTTP interface. connected(cast, {headers, ReplyTo, StreamRef, Method, Path, Headers}, State=#state{origin_host=Host, origin_port=Port, - protocol=Protocol, protocol_state=ProtoState}) -> - ProtoState2 = Protocol:headers(ProtoState, - StreamRef, ReplyTo, Method, Host, Port, Path, Headers), - {keep_state, State#state{protocol_state=ProtoState2}}; + protocol=Protocol, protocol_state=ProtoState, + event_handler=EvHandler, event_handler_state=EvHandlerState0}) -> + {ProtoState2, EvHandlerState} = Protocol:headers(ProtoState, + StreamRef, ReplyTo, Method, Host, Port, Path, Headers, + EvHandler, EvHandlerState0), + {keep_state, State#state{protocol_state=ProtoState2, event_handler_state=EvHandlerState}}; connected(cast, {request, ReplyTo, StreamRef, Method, Path, Headers, Body}, State=#state{origin_host=Host, origin_port=Port, - protocol=Protocol, protocol_state=ProtoState}) -> - ProtoState2 = Protocol:request(ProtoState, - StreamRef, ReplyTo, Method, Host, Port, Path, Headers, Body), - {keep_state, State#state{protocol_state=ProtoState2}}; + protocol=Protocol, protocol_state=ProtoState, + event_handler=EvHandler, event_handler_state=EvHandlerState0}) -> + {ProtoState2, EvHandlerState} = Protocol:request(ProtoState, + StreamRef, ReplyTo, Method, Host, Port, Path, Headers, Body, + EvHandler, EvHandlerState0), + {keep_state, State#state{protocol_state=ProtoState2, event_handler_state=EvHandlerState}}; %% @todo Do we want to reject ReplyTo if it's not the process %% who initiated the connection? For both data and cancel. connected(cast, {data, ReplyTo, StreamRef, IsFin, Data}, - State=#state{protocol=Protocol, protocol_state=ProtoState}) -> - ProtoState2 = Protocol:data(ProtoState, StreamRef, ReplyTo, IsFin, Data), - {keep_state, State#state{protocol_state=ProtoState2}}; + State=#state{protocol=Protocol, protocol_state=ProtoState, + event_handler=EvHandler, event_handler_state=EvHandlerState0}) -> + {ProtoState2, EvHandlerState} = Protocol:data(ProtoState, + StreamRef, ReplyTo, IsFin, Data, EvHandler, EvHandlerState0), + {keep_state, State#state{protocol_state=ProtoState2, event_handler_state=EvHandlerState}}; connected(cast, {connect, ReplyTo, StreamRef, Destination0, Headers}, State=#state{protocol=Protocol, protocol_state=ProtoState}) -> %% The protocol option has been deprecated in favor of the protocols option. @@ -1008,7 +1016,7 @@ commands([{switch_protocol, Protocol, _ProtoState0}|Tail], disconnect(State=#state{owner=Owner, opts=Opts, socket=Socket, transport=Transport, protocol=Protocol, protocol_state=ProtoState, - event_handler=EventHandler, event_handler_state=EventHandlerState0}, Reason) -> + event_handler=EvHandler, event_handler_state=EvHandlerState0}, Reason) -> Protocol:close(Reason, ProtoState), %% @todo Need a special state for orderly shutdown of a connection. Transport:close(Socket), @@ -1021,16 +1029,16 @@ disconnect(State=#state{owner=Owner, opts=Opts, DisconnectEvent = #{ reason => Reason }, - EventHandlerState = EventHandler:disconnect(DisconnectEvent, EventHandlerState0), + EvHandlerState = EvHandler:disconnect(DisconnectEvent, EvHandlerState0), Retry = maps:get(retry, Opts, 5), case Retry of 0 -> - {stop, {shutdown, Reason}, State#state{event_handler_state=EventHandlerState}}; + {stop, {shutdown, Reason}, State#state{event_handler_state=EvHandlerState}}; _ -> {next_state, not_connected, keepalive_cancel(State#state{socket=undefined, protocol=undefined, protocol_state=undefined, - event_handler_state=EventHandlerState}), + event_handler_state=EvHandlerState}), {next_event, internal, {retries, Retry - 1}}} end. @@ -1079,10 +1087,10 @@ owner_down(shutdown) -> {stop, shutdown}; owner_down(Shutdown = {shutdown, _}) -> {stop, Shutdown}; owner_down(Reason) -> {stop, {shutdown, {owner_down, Reason}}}. -terminate(Reason, StateName, #state{event_handler=EventHandler, - event_handler_state=EventHandlerState}) -> +terminate(Reason, StateName, #state{event_handler=EvHandler, + event_handler_state=EvHandlerState}) -> TerminateEvent = #{ state => StateName, reason => Reason }, - EventHandler:terminate(TerminateEvent, EventHandlerState). + EvHandler:terminate(TerminateEvent, EvHandlerState). diff --git a/src/gun_default_event_h.erl b/src/gun_default_event_h.erl index 6d64ff7..a29183d 100644 --- a/src/gun_default_event_h.erl +++ b/src/gun_default_event_h.erl @@ -18,6 +18,9 @@ -export([init/2]). -export([connect_start/2]). -export([connect_end/2]). +-export([request_start/2]). +-export([request_headers/2]). +-export([request_end/2]). -export([disconnect/2]). -export([terminate/2]). @@ -30,6 +33,15 @@ connect_start(_EventData, State) -> connect_end(_EventData, State) -> State. +request_start(_EventData, State) -> + State. + +request_headers(_EventData, State) -> + State. + +request_end(_EventData, State) -> + State. + disconnect(_EventData, State) -> State. diff --git a/src/gun_event.erl b/src/gun_event.erl index 3d83bea..56f1a36 100644 --- a/src/gun_event.erl +++ b/src/gun_event.erl @@ -43,6 +43,31 @@ -callback connect_start(connect_event(), State) -> State. -callback connect_end(connect_event(), State) -> State. +%% request_start/request_headers. + +-type request_start_event() :: #{ + stream_ref := reference(), + reply_to := pid(), + function := headers | request, + method := iodata(), + scheme => binary(), + authority := iodata(), + path := iodata(), + headers := [{binary(), iodata()}] +}. + +-callback request_start(request_start_event(), State) -> State. +-callback request_headers(request_start_event(), State) -> State. + +%% request_end. + +-type request_end_event() :: #{ + stream_ref := reference(), + reply_to := pid() +}. + +-callback request_end(request_end_event(), State) -> State. + %% disconnect. -type disconnect_event() :: #{ @@ -67,17 +92,14 @@ %% @todo origin_changed %% @todo transport_changed %% @todo protocol_changed -%% @todo stream_start -%% @todo stream_end -%% @todo request_start -%% @todo request_headers -%% @todo request_end %% @todo response_start (call it once per inform + one for the response) %% @todo response_inform %% @todo response_headers %% @todo response_end %% @todo push_promise_start %% @todo push_promise_end +%% @todo cancel_start +%% @todo cancel_end %% @todo ws_upgrade_start %% @todo ws_upgrade_end %% @todo ws_frame_read_start diff --git a/src/gun_http.erl b/src/gun_http.erl index ca04b10..582106b 100644 --- a/src/gun_http.erl +++ b/src/gun_http.erl @@ -17,12 +17,12 @@ -export([check_options/1]). -export([name/0]). -export([init/4]). --export([handle/2]). +-export([handle/4]). -export([close/2]). -export([keepalive/1]). --export([headers/8]). --export([request/9]). --export([data/5]). +-export([headers/10]). +-export([request/11]). +-export([data/7]). -export([connect/5]). -export([cancel/3]). -export([stream_info/2]). @@ -93,6 +93,9 @@ init(Owner, Socket, Transport, Opts) -> #http_state{owner=Owner, socket=Socket, transport=Transport, version=Version, content_handlers=Handlers, transform_header_name=TransformHeaderName}. +handle(Data, State, _EvHandler, EvHandlerState) -> + {handle(Data, State), EvHandlerState}. + %% Stop looping when we got no more data. handle(<<>>, State) -> {state, State}; @@ -345,42 +348,76 @@ keepalive(State=#http_state{socket=Socket, transport=Transport, out=head}) -> keepalive(State) -> State. -headers(State=#http_state{socket=Socket, transport=Transport, version=Version, - out=head}, StreamRef, ReplyTo, Method, Host, Port, Path, Headers) -> +headers(State=#http_state{socket=Socket, transport=Transport, version=Version, out=head}, + StreamRef, ReplyTo, Method, Host, Port, Path, Headers, + EvHandler, EvHandlerState0) -> + Authority0 = host_header(Transport, Host, Port), Headers2 = lists:keydelete(<<"transfer-encoding">>, 1, Headers), - Headers3 = case lists:keymember(<<"host">>, 1, Headers) of - false -> [{<<"host">>, host_header(Transport, Host, Port)}|Headers2]; - true -> Headers2 - end, %% We use Headers2 because this is the smallest list. Conn = conn_from_headers(Version, Headers2), Out = request_io_from_headers(Headers2), + {Authority, Headers3} = case lists:keyfind(<<"host">>, 1, Headers2) of + false -> {Authority0, [{<<"host">>, Authority0}|Headers2]}; + {_, Authority1} -> {Authority1, Headers2} + end, Headers4 = case Out of body_chunked when Version =:= 'HTTP/1.0' -> Headers3; body_chunked -> [{<<"transfer-encoding">>, <<"chunked">>}|Headers3]; _ -> Headers3 end, Headers5 = transform_header_names(State, Headers4), + RequestEvent = #{ + stream_ref => StreamRef, + reply_to => ReplyTo, + function => ?FUNCTION_NAME, + method => Method, + authority => Authority, + path => Path, + headers => Headers5 + }, + EvHandlerState1 = EvHandler:request_start(RequestEvent, EvHandlerState0), Transport:send(Socket, cow_http:request(Method, Path, Version, Headers5)), - new_stream(State#http_state{connection=Conn, out=Out}, StreamRef, ReplyTo, Method). - -request(State=#http_state{socket=Socket, transport=Transport, version=Version, - out=head}, StreamRef, ReplyTo, Method, Host, Port, Path, Headers, Body) -> + EvHandlerState = EvHandler:request_headers(RequestEvent, EvHandlerState1), + {new_stream(State#http_state{connection=Conn, out=Out}, StreamRef, ReplyTo, Method), + EvHandlerState}. + +request(State=#http_state{socket=Socket, transport=Transport, version=Version, out=head}, + StreamRef, ReplyTo, Method, Host, Port, Path, Headers, Body, + EvHandler, EvHandlerState0) -> + Authority0 = host_header(Transport, Host, Port), Headers2 = lists:keydelete(<<"content-length">>, 1, lists:keydelete(<<"transfer-encoding">>, 1, Headers)), - Headers3 = case lists:keymember(<<"host">>, 1, Headers) of - false -> [{<<"host">>, host_header(Transport, Host, Port)}|Headers2]; - true -> Headers2 - end, - Headers4 = transform_header_names(State, Headers3), %% We use Headers2 because this is the smallest list. Conn = conn_from_headers(Version, Headers2), + {Authority, Headers3} = case lists:keyfind(<<"host">>, 1, Headers2) of + false -> {Authority0, [{<<"host">>, Authority0}|Headers2]}; + {_, Authority1} -> {Authority1, Headers2} + end, + Headers4 = transform_header_names(State, Headers3), + Headers5 = [ + {<<"content-length">>, integer_to_binary(iolist_size(Body))} + |Headers4], + RequestEvent = #{ + stream_ref => StreamRef, + reply_to => ReplyTo, + function => ?FUNCTION_NAME, + method => Method, + authority => Authority, + path => Path, + headers => Headers5 + }, + EvHandlerState1 = EvHandler:request_start(RequestEvent, EvHandlerState0), Transport:send(Socket, [ - cow_http:request(Method, Path, Version, [ - {<<"content-length">>, integer_to_binary(iolist_size(Body))} - |Headers4]), + cow_http:request(Method, Path, Version, Headers5), Body]), - new_stream(State#http_state{connection=Conn}, StreamRef, ReplyTo, Method). + EvHandlerState2 = EvHandler:request_headers(RequestEvent, EvHandlerState1), + RequestEndEvent = #{ + stream_ref => StreamRef, + reply_to => ReplyTo + }, + EvHandlerState = EvHandler:request_end(RequestEndEvent, EvHandlerState2), + {new_stream(State#http_state{connection=Conn}, StreamRef, ReplyTo, Method), + EvHandlerState}. host_header(Transport, Host0, Port) -> Host = case Host0 of @@ -399,14 +436,15 @@ transform_header_names(#http_state{transform_header_name = Fun}, Headers) -> lists:keymap(Fun, 1, Headers). %% We are expecting a new stream. -data(State=#http_state{out=head}, StreamRef, ReplyTo, _, _) -> - error_stream_closed(State, StreamRef, ReplyTo); +data(State=#http_state{out=head}, StreamRef, ReplyTo, _, _, _, EvHandlerState) -> + {error_stream_closed(State, StreamRef, ReplyTo), EvHandlerState}; %% There are no active streams. -data(State=#http_state{streams=[]}, StreamRef, ReplyTo, _, _) -> - error_stream_not_found(State, StreamRef, ReplyTo); +data(State=#http_state{streams=[]}, StreamRef, ReplyTo, _, _, _, EvHandlerState) -> + {error_stream_not_found(State, StreamRef, ReplyTo), EvHandlerState}; %% We can only send data on the last created stream. data(State=#http_state{socket=Socket, transport=Transport, version=Version, - out=Out, streams=Streams}, StreamRef, ReplyTo, IsFin, Data) -> + out=Out, streams=Streams}, StreamRef, ReplyTo, IsFin, Data, + EvHandler, EvHandlerState0) -> case lists:last(Streams) of #stream{ref=StreamRef, is_alive=true} -> DataLength = iolist_size(Data), @@ -421,25 +459,35 @@ data(State=#http_state{socket=Socket, transport=Transport, version=Version, cow_http_te:last_chunk() ]) end, - State#http_state{out=head}; + RequestEndEvent = #{ + stream_ref => StreamRef, + reply_to => ReplyTo + }, + EvHandlerState = EvHandler:request_end(RequestEndEvent, EvHandlerState0), + {State#http_state{out=head}, EvHandlerState}; body_chunked when Version =:= 'HTTP/1.1' -> Transport:send(Socket, cow_http_te:chunk(Data)), - State; + {State, EvHandlerState0}; {body, Length} when DataLength =< Length -> Transport:send(Socket, Data), Length2 = Length - DataLength, if Length2 =:= 0, IsFin =:= fin -> - State#http_state{out=head}; + RequestEndEvent = #{ + stream_ref => StreamRef, + reply_to => ReplyTo + }, + EvHandlerState = EvHandler:request_end(RequestEndEvent, EvHandlerState0), + {State#http_state{out=head}, EvHandlerState}; Length2 > 0, IsFin =:= nofin -> - State#http_state{out={body, Length2}} + {State#http_state{out={body, Length2}}, EvHandlerState0} end; body_chunked -> %% HTTP/1.0 Transport:send(Socket, Data), - State + {State, EvHandlerState0} end; _ -> - error_stream_not_found(State, StreamRef, ReplyTo) + {error_stream_not_found(State, StreamRef, ReplyTo), EvHandlerState0} end. connect(State=#http_state{streams=Streams}, StreamRef, ReplyTo, _, _) when Streams =/= [] -> diff --git a/src/gun_http2.erl b/src/gun_http2.erl index 1dd2a75..40afb16 100644 --- a/src/gun_http2.erl +++ b/src/gun_http2.erl @@ -17,12 +17,12 @@ -export([check_options/1]). -export([name/0]). -export([init/4]). --export([handle/2]). +-export([handle/4]). -export([close/2]). -export([keepalive/1]). --export([headers/8]). --export([request/9]). --export([data/5]). +-export([headers/10]). +-export([request/11]). +-export([data/7]). -export([cancel/3]). -export([stream_info/2]). -export([down/1]). @@ -85,59 +85,70 @@ init(Owner, Socket, Transport, Opts) -> Transport:send(Socket, Preface), State. -handle(Data, State=#http2_state{buffer=Buffer}) -> - parse(<< Buffer/binary, Data/binary >>, State#http2_state{buffer= <<>>}). +handle(Data, State=#http2_state{buffer=Buffer}, EvHandler, EvHandlerState) -> + parse(<< Buffer/binary, Data/binary >>, State#http2_state{buffer= <<>>}, + EvHandler, EvHandlerState). -parse(Data, State0=#http2_state{http2_machine=HTTP2Machine}) -> +parse(Data, State0=#http2_state{http2_machine=HTTP2Machine}, EvHandler, EvHandlerState0) -> MaxFrameSize = cow_http2_machine:get_local_setting(max_frame_size, HTTP2Machine), case cow_http2:parse(Data, MaxFrameSize) of {ok, Frame, Rest} -> - case frame(State0, Frame) of - close -> close; - State -> parse(Rest, State) + case frame(State0, Frame, EvHandler, EvHandlerState0) of + Close = {close, _} -> Close; + {State, EvHandlerState} -> parse(Rest, State, EvHandler, EvHandlerState) end; {ignore, Rest} -> case ignored_frame(State0) of - close -> close; - State -> parse(Rest, State) + close -> {close, EvHandlerState0}; + State -> parse(Rest, State, EvHandler, EvHandlerState0) end; {stream_error, StreamID, Reason, Human, Rest} -> - parse(Rest, reset_stream(State0, StreamID, {stream_error, Reason, Human})); + parse(Rest, reset_stream(State0, StreamID, {stream_error, Reason, Human}), + EvHandler, EvHandlerState0); Error = {connection_error, _, _} -> - terminate(State0, Error); + {terminate(State0, Error), EvHandlerState0}; more -> - {state, State0#http2_state{buffer=Data}} + {{state, State0#http2_state{buffer=Data}}, EvHandlerState0} end. %% Frames received. -frame(State=#http2_state{http2_machine=HTTP2Machine0}, Frame) -> +frame(State=#http2_state{http2_machine=HTTP2Machine0}, Frame, EvHandler, EvHandlerState) -> case cow_http2_machine:frame(Frame, HTTP2Machine0) of {ok, HTTP2Machine} -> - maybe_ack(State#http2_state{http2_machine=HTTP2Machine}, Frame); + {maybe_ack(State#http2_state{http2_machine=HTTP2Machine}, Frame), + EvHandlerState}; {ok, {data, StreamID, IsFin, Data}, HTTP2Machine} -> - data_frame(State#http2_state{http2_machine=HTTP2Machine}, StreamID, IsFin, Data); + data_frame(State#http2_state{http2_machine=HTTP2Machine}, StreamID, IsFin, Data, + EvHandler, EvHandlerState); {ok, {headers, StreamID, IsFin, Headers, PseudoHeaders, BodyLen}, HTTP2Machine} -> headers_frame(State#http2_state{http2_machine=HTTP2Machine}, - StreamID, IsFin, Headers, PseudoHeaders, BodyLen); + StreamID, IsFin, Headers, PseudoHeaders, BodyLen, + EvHandler, EvHandlerState); {ok, {trailers, StreamID, Trailers}, HTTP2Machine} -> trailers_frame(State#http2_state{http2_machine=HTTP2Machine}, - StreamID, Trailers); + StreamID, Trailers, EvHandler, EvHandlerState); {ok, {rst_stream, StreamID, Reason}, HTTP2Machine} -> - rst_stream_frame(State#http2_state{http2_machine=HTTP2Machine}, StreamID, Reason); + {rst_stream_frame(State#http2_state{http2_machine=HTTP2Machine}, StreamID, Reason), + EvHandlerState}; {ok, {push_promise, StreamID, PromisedStreamID, Headers, PseudoHeaders}, HTTP2Machine} -> - push_promise_frame(State#http2_state{http2_machine=HTTP2Machine}, - StreamID, PromisedStreamID, Headers, PseudoHeaders); + {push_promise_frame(State#http2_state{http2_machine=HTTP2Machine}, + StreamID, PromisedStreamID, Headers, PseudoHeaders), + EvHandlerState}; {ok, Frame={goaway, _StreamID, _Reason, _Data}, HTTP2Machine} -> - terminate(State#http2_state{http2_machine=HTTP2Machine}, - {stop, Frame, 'Server is going away.'}); + {terminate(State#http2_state{http2_machine=HTTP2Machine}, + {stop, Frame, 'Server is going away.'}), + EvHandlerState}; {send, SendData, HTTP2Machine} -> - send_data(maybe_ack(State#http2_state{http2_machine=HTTP2Machine}, Frame), SendData); + send_data(maybe_ack(State#http2_state{http2_machine=HTTP2Machine}, Frame), SendData, + EvHandler, EvHandlerState); {error, {stream_error, StreamID, Reason, Human}, HTTP2Machine} -> - reset_stream(State#http2_state{http2_machine=HTTP2Machine}, - StreamID, {stream_error, Reason, Human}); + {reset_stream(State#http2_state{http2_machine=HTTP2Machine}, + StreamID, {stream_error, Reason, Human}), + EvHandlerState}; {error, Error={connection_error, _, _}, HTTP2Machine} -> - terminate(State#http2_state{http2_machine=HTTP2Machine}, Error) + {terminate(State#http2_state{http2_machine=HTTP2Machine}, Error), + EvHandlerState} end. maybe_ack(State=#http2_state{socket=Socket, transport=Transport}, Frame) -> @@ -149,7 +160,8 @@ maybe_ack(State=#http2_state{socket=Socket, transport=Transport}, Frame) -> State. data_frame(State=#http2_state{socket=Socket, transport=Transport, - http2_machine=HTTP2Machine0}, StreamID, IsFin, Data) -> + http2_machine=HTTP2Machine0}, StreamID, IsFin, Data, + _EvHandler, EvHandlerState) -> Stream = #stream{handler_state=Handlers0} = get_stream_by_id(State, StreamID), Handlers = gun_content_handler:handle(IsFin, Data, Handlers0), Size = byte_size(Data), @@ -169,16 +181,18 @@ data_frame(State=#http2_state{socket=Socket, transport=Transport, HTTP2Machine1 end end, - maybe_delete_stream(store_stream(State#http2_state{http2_machine=HTTP2Machine}, - Stream#stream{handler_state=Handlers}), StreamID, remote, IsFin). + {maybe_delete_stream(store_stream(State#http2_state{http2_machine=HTTP2Machine}, + Stream#stream{handler_state=Handlers}), StreamID, remote, IsFin), + EvHandlerState}. headers_frame(State=#http2_state{content_handlers=Handlers0}, - StreamID, IsFin, Headers, PseudoHeaders, _BodyLen) -> + StreamID, IsFin, Headers, PseudoHeaders, _BodyLen, + _EvHandler, EvHandlerState) -> Stream = #stream{ref=StreamRef, reply_to=ReplyTo} = get_stream_by_id(State, StreamID), case PseudoHeaders of #{status := Status} when Status >= 100, Status =< 199 -> ReplyTo ! {gun_inform, self(), StreamRef, Status, Headers}, - State; + {State, EvHandlerState}; #{status := Status} -> ReplyTo ! {gun_response, self(), StreamRef, IsFin, Status, Headers}, Handlers = case IsFin of @@ -187,15 +201,16 @@ headers_frame(State=#http2_state{content_handlers=Handlers0}, gun_content_handler:init(ReplyTo, StreamRef, Status, Headers, Handlers0) end, - maybe_delete_stream(store_stream(State, Stream#stream{handler_state=Handlers}), - StreamID, remote, IsFin) + {maybe_delete_stream(store_stream(State, Stream#stream{handler_state=Handlers}), + StreamID, remote, IsFin), + EvHandlerState} end. -trailers_frame(State, StreamID, Trailers) -> +trailers_frame(State, StreamID, Trailers, _EvHandler, EvHandlerState) -> #stream{ref=StreamRef, reply_to=ReplyTo} = get_stream_by_id(State, StreamID), %% @todo We probably want to pass this to gun_content_handler? ReplyTo ! {gun_trailers, self(), StreamRef, Trailers}, - maybe_delete_stream(State, StreamID, remote, fin). + {maybe_delete_stream(State, StreamID, remote, fin), EvHandlerState}. rst_stream_frame(State=#http2_state{streams=Streams0}, StreamID, Reason) -> case lists:keytake(StreamID, #stream.id, Streams0) of @@ -243,30 +258,56 @@ keepalive(State=#http2_state{socket=Socket, transport=Transport}) -> headers(State=#http2_state{socket=Socket, transport=Transport, http2_machine=HTTP2Machine0, streams=Streams}, - StreamRef, ReplyTo, Method, Host, Port, Path, Headers0) -> + StreamRef, ReplyTo, Method, Host, Port, Path, Headers0, + EvHandler, EvHandlerState0) -> {ok, StreamID, HTTP2Machine1} = cow_http2_machine:init_stream( iolist_to_binary(Method), HTTP2Machine0), {ok, PseudoHeaders, Headers} = prepare_headers(State, Method, Host, Port, Path, Headers0), + RequestEvent = #{ + stream_ref => StreamRef, + reply_to => ReplyTo, + function => ?FUNCTION_NAME, + method => Method, + authority => maps:get(authority, PseudoHeaders), + path => Path, + headers => Headers + }, + EvHandlerState1 = EvHandler:request_start(RequestEvent, EvHandlerState0), {ok, IsFin, HeaderBlock, HTTP2Machine} = cow_http2_machine:prepare_headers( StreamID, HTTP2Machine1, nofin, PseudoHeaders, Headers), Transport:send(Socket, cow_http2:headers(StreamID, IsFin, HeaderBlock)), + EvHandlerState = EvHandler:request_headers(RequestEvent, EvHandlerState1), Stream = #stream{id=StreamID, ref=StreamRef, reply_to=ReplyTo}, - State#http2_state{http2_machine=HTTP2Machine, streams=[Stream|Streams]}. + {State#http2_state{http2_machine=HTTP2Machine, streams=[Stream|Streams]}, + EvHandlerState}. request(State=#http2_state{socket=Socket, transport=Transport, http2_machine=HTTP2Machine0, streams=Streams}, - StreamRef, ReplyTo, Method, Host, Port, Path, Headers0, Body) -> + StreamRef, ReplyTo, Method, Host, Port, Path, Headers0, Body, + EvHandler, EvHandlerState0) -> Headers1 = lists:keystore(<<"content-length">>, 1, Headers0, {<<"content-length">>, integer_to_binary(iolist_size(Body))}), {ok, StreamID, HTTP2Machine1} = cow_http2_machine:init_stream( iolist_to_binary(Method), HTTP2Machine0), {ok, PseudoHeaders, Headers} = prepare_headers(State, Method, Host, Port, Path, Headers1), + RequestEvent = #{ + stream_ref => StreamRef, + reply_to => ReplyTo, + function => ?FUNCTION_NAME, + method => Method, + authority => maps:get(authority, PseudoHeaders), + path => Path, + headers => Headers + }, + EvHandlerState1 = EvHandler:request_start(RequestEvent, EvHandlerState0), {ok, IsFin, HeaderBlock, HTTP2Machine} = cow_http2_machine:prepare_headers( StreamID, HTTP2Machine1, nofin, PseudoHeaders, Headers), Transport:send(Socket, cow_http2:headers(StreamID, IsFin, HeaderBlock)), + EvHandlerState = EvHandler:request_headers(RequestEvent, EvHandlerState1), Stream = #stream{id=StreamID, ref=StreamRef, reply_to=ReplyTo}, maybe_send_data(State#http2_state{http2_machine=HTTP2Machine, - streams=[Stream|Streams]}, StreamID, fin, Body). + streams=[Stream|Streams]}, StreamID, fin, Body, + EvHandler, EvHandlerState). prepare_headers(#http2_state{transport=Transport}, Method, Host0, Port, Path, Headers0) -> Authority = case lists:keyfind(<<"host">>, 1, Headers0) of @@ -293,45 +334,59 @@ prepare_headers(#http2_state{transport=Transport}, Method, Host0, Port, Path, He }, {ok, PseudoHeaders, Headers}. -data(State=#http2_state{http2_machine=HTTP2Machine}, StreamRef, ReplyTo, IsFin, Data) -> +data(State=#http2_state{http2_machine=HTTP2Machine}, StreamRef, ReplyTo, IsFin, Data, + EvHandler, EvHandlerState) -> case get_stream_by_ref(State, StreamRef) of #stream{id=StreamID} -> case cow_http2_machine:get_stream_local_state(StreamID, HTTP2Machine) of {ok, fin, _} -> - error_stream_closed(State, StreamRef, ReplyTo); + {error_stream_closed(State, StreamRef, ReplyTo), EvHandlerState}; {ok, _, fin} -> - error_stream_closed(State, StreamRef, ReplyTo); + {error_stream_closed(State, StreamRef, ReplyTo), EvHandlerState}; {ok, _, _} -> - maybe_send_data(State, StreamID, IsFin, Data) + maybe_send_data(State, StreamID, IsFin, Data, EvHandler, EvHandlerState) end; false -> - error_stream_not_found(State, StreamRef, ReplyTo) + {error_stream_not_found(State, StreamRef, ReplyTo), EvHandlerState} end. -maybe_send_data(State=#http2_state{http2_machine=HTTP2Machine0}, StreamID, IsFin, Data0) -> +maybe_send_data(State=#http2_state{http2_machine=HTTP2Machine0}, StreamID, IsFin, Data0, + EvHandler, EvHandlerState) -> Data = case is_tuple(Data0) of false -> {data, Data0}; true -> Data0 end, case cow_http2_machine:send_or_queue_data(StreamID, HTTP2Machine0, IsFin, Data) of {ok, HTTP2Machine} -> - State#http2_state{http2_machine=HTTP2Machine}; + {State#http2_state{http2_machine=HTTP2Machine}, EvHandlerState}; {send, SendData, HTTP2Machine} -> - send_data(State#http2_state{http2_machine=HTTP2Machine}, SendData) + send_data(State#http2_state{http2_machine=HTTP2Machine}, SendData, + EvHandler, EvHandlerState) end. -send_data(State, []) -> - State; -send_data(State0, [{StreamID, IsFin, SendData}|Tail]) -> - State = send_data(State0, StreamID, IsFin, SendData), - send_data(State, Tail). +send_data(State, [], _, EvHandlerState) -> + {State, EvHandlerState}; +send_data(State0, [{StreamID, IsFin, SendData}|Tail], EvHandler, EvHandlerState0) -> + {State, EvHandlerState} = send_data(State0, StreamID, IsFin, SendData, EvHandler, EvHandlerState0), + send_data(State, Tail, EvHandler, EvHandlerState). -send_data(State0, StreamID, IsFin, [Data]) -> +send_data(State0, StreamID, IsFin, [Data], EvHandler, EvHandlerState0) -> State = send_data_frame(State0, StreamID, IsFin, Data), - maybe_delete_stream(State, StreamID, local, IsFin); -send_data(State0, StreamID, IsFin, [Data|Tail]) -> + EvHandlerState = case IsFin of + nofin -> + EvHandlerState0; + fin -> + #stream{ref=StreamRef, reply_to=ReplyTo} = get_stream_by_id(State, StreamID), + RequestEndEvent = #{ + stream_ref => StreamRef, + reply_to => ReplyTo + }, + EvHandler:request_end(RequestEndEvent, EvHandlerState0) + end, + {maybe_delete_stream(State, StreamID, local, IsFin), EvHandlerState}; +send_data(State0, StreamID, IsFin, [Data|Tail], EvHandler, EvHandlerState) -> State = send_data_frame(State0, StreamID, nofin, Data), - send_data(State, StreamID, IsFin, Tail). + send_data(State, StreamID, IsFin, Tail, EvHandler, EvHandlerState). send_data_frame(State=#http2_state{socket=Socket, transport=Transport}, StreamID, IsFin, {data, Data}) -> diff --git a/src/gun_ws.erl b/src/gun_ws.erl index 176ba3b..ff54ecd 100644 --- a/src/gun_ws.erl +++ b/src/gun_ws.erl @@ -17,7 +17,7 @@ -export([check_options/1]). -export([name/0]). -export([init/8]). --export([handle/2]). +-export([handle/4]). -export([close/2]). -export([send/2]). -export([down/1]). @@ -72,6 +72,9 @@ init(Owner, Socket, Transport, StreamRef, Headers, Extensions, Handler, Opts) -> {switch_protocol, ?MODULE, #ws_state{owner=Owner, socket=Socket, transport=Transport, extensions=Extensions, handler=Handler, handler_state=HandlerState}}. +handle(Data, State, _EvHandler, EvHandlerState) -> + {handle(Data, State), EvHandlerState}. + %% Do not handle anything if we received a close frame. handle(_, State=#ws_state{in=close}) -> {state, State}; |