diff options
Diffstat (limited to 'src/gun_http2.erl')
-rw-r--r-- | src/gun_http2.erl | 173 |
1 files changed, 114 insertions, 59 deletions
diff --git a/src/gun_http2.erl b/src/gun_http2.erl index 1dd2a75..40afb16 100644 --- a/src/gun_http2.erl +++ b/src/gun_http2.erl @@ -17,12 +17,12 @@ -export([check_options/1]). -export([name/0]). -export([init/4]). --export([handle/2]). +-export([handle/4]). -export([close/2]). -export([keepalive/1]). --export([headers/8]). --export([request/9]). --export([data/5]). +-export([headers/10]). +-export([request/11]). +-export([data/7]). -export([cancel/3]). -export([stream_info/2]). -export([down/1]). @@ -85,59 +85,70 @@ init(Owner, Socket, Transport, Opts) -> Transport:send(Socket, Preface), State. -handle(Data, State=#http2_state{buffer=Buffer}) -> - parse(<< Buffer/binary, Data/binary >>, State#http2_state{buffer= <<>>}). +handle(Data, State=#http2_state{buffer=Buffer}, EvHandler, EvHandlerState) -> + parse(<< Buffer/binary, Data/binary >>, State#http2_state{buffer= <<>>}, + EvHandler, EvHandlerState). -parse(Data, State0=#http2_state{http2_machine=HTTP2Machine}) -> +parse(Data, State0=#http2_state{http2_machine=HTTP2Machine}, EvHandler, EvHandlerState0) -> MaxFrameSize = cow_http2_machine:get_local_setting(max_frame_size, HTTP2Machine), case cow_http2:parse(Data, MaxFrameSize) of {ok, Frame, Rest} -> - case frame(State0, Frame) of - close -> close; - State -> parse(Rest, State) + case frame(State0, Frame, EvHandler, EvHandlerState0) of + Close = {close, _} -> Close; + {State, EvHandlerState} -> parse(Rest, State, EvHandler, EvHandlerState) end; {ignore, Rest} -> case ignored_frame(State0) of - close -> close; - State -> parse(Rest, State) + close -> {close, EvHandlerState0}; + State -> parse(Rest, State, EvHandler, EvHandlerState0) end; {stream_error, StreamID, Reason, Human, Rest} -> - parse(Rest, reset_stream(State0, StreamID, {stream_error, Reason, Human})); + parse(Rest, reset_stream(State0, StreamID, {stream_error, Reason, Human}), + EvHandler, EvHandlerState0); Error = {connection_error, _, _} -> - terminate(State0, Error); + {terminate(State0, Error), EvHandlerState0}; more -> - {state, State0#http2_state{buffer=Data}} + {{state, State0#http2_state{buffer=Data}}, EvHandlerState0} end. %% Frames received. -frame(State=#http2_state{http2_machine=HTTP2Machine0}, Frame) -> +frame(State=#http2_state{http2_machine=HTTP2Machine0}, Frame, EvHandler, EvHandlerState) -> case cow_http2_machine:frame(Frame, HTTP2Machine0) of {ok, HTTP2Machine} -> - maybe_ack(State#http2_state{http2_machine=HTTP2Machine}, Frame); + {maybe_ack(State#http2_state{http2_machine=HTTP2Machine}, Frame), + EvHandlerState}; {ok, {data, StreamID, IsFin, Data}, HTTP2Machine} -> - data_frame(State#http2_state{http2_machine=HTTP2Machine}, StreamID, IsFin, Data); + data_frame(State#http2_state{http2_machine=HTTP2Machine}, StreamID, IsFin, Data, + EvHandler, EvHandlerState); {ok, {headers, StreamID, IsFin, Headers, PseudoHeaders, BodyLen}, HTTP2Machine} -> headers_frame(State#http2_state{http2_machine=HTTP2Machine}, - StreamID, IsFin, Headers, PseudoHeaders, BodyLen); + StreamID, IsFin, Headers, PseudoHeaders, BodyLen, + EvHandler, EvHandlerState); {ok, {trailers, StreamID, Trailers}, HTTP2Machine} -> trailers_frame(State#http2_state{http2_machine=HTTP2Machine}, - StreamID, Trailers); + StreamID, Trailers, EvHandler, EvHandlerState); {ok, {rst_stream, StreamID, Reason}, HTTP2Machine} -> - rst_stream_frame(State#http2_state{http2_machine=HTTP2Machine}, StreamID, Reason); + {rst_stream_frame(State#http2_state{http2_machine=HTTP2Machine}, StreamID, Reason), + EvHandlerState}; {ok, {push_promise, StreamID, PromisedStreamID, Headers, PseudoHeaders}, HTTP2Machine} -> - push_promise_frame(State#http2_state{http2_machine=HTTP2Machine}, - StreamID, PromisedStreamID, Headers, PseudoHeaders); + {push_promise_frame(State#http2_state{http2_machine=HTTP2Machine}, + StreamID, PromisedStreamID, Headers, PseudoHeaders), + EvHandlerState}; {ok, Frame={goaway, _StreamID, _Reason, _Data}, HTTP2Machine} -> - terminate(State#http2_state{http2_machine=HTTP2Machine}, - {stop, Frame, 'Server is going away.'}); + {terminate(State#http2_state{http2_machine=HTTP2Machine}, + {stop, Frame, 'Server is going away.'}), + EvHandlerState}; {send, SendData, HTTP2Machine} -> - send_data(maybe_ack(State#http2_state{http2_machine=HTTP2Machine}, Frame), SendData); + send_data(maybe_ack(State#http2_state{http2_machine=HTTP2Machine}, Frame), SendData, + EvHandler, EvHandlerState); {error, {stream_error, StreamID, Reason, Human}, HTTP2Machine} -> - reset_stream(State#http2_state{http2_machine=HTTP2Machine}, - StreamID, {stream_error, Reason, Human}); + {reset_stream(State#http2_state{http2_machine=HTTP2Machine}, + StreamID, {stream_error, Reason, Human}), + EvHandlerState}; {error, Error={connection_error, _, _}, HTTP2Machine} -> - terminate(State#http2_state{http2_machine=HTTP2Machine}, Error) + {terminate(State#http2_state{http2_machine=HTTP2Machine}, Error), + EvHandlerState} end. maybe_ack(State=#http2_state{socket=Socket, transport=Transport}, Frame) -> @@ -149,7 +160,8 @@ maybe_ack(State=#http2_state{socket=Socket, transport=Transport}, Frame) -> State. data_frame(State=#http2_state{socket=Socket, transport=Transport, - http2_machine=HTTP2Machine0}, StreamID, IsFin, Data) -> + http2_machine=HTTP2Machine0}, StreamID, IsFin, Data, + _EvHandler, EvHandlerState) -> Stream = #stream{handler_state=Handlers0} = get_stream_by_id(State, StreamID), Handlers = gun_content_handler:handle(IsFin, Data, Handlers0), Size = byte_size(Data), @@ -169,16 +181,18 @@ data_frame(State=#http2_state{socket=Socket, transport=Transport, HTTP2Machine1 end end, - maybe_delete_stream(store_stream(State#http2_state{http2_machine=HTTP2Machine}, - Stream#stream{handler_state=Handlers}), StreamID, remote, IsFin). + {maybe_delete_stream(store_stream(State#http2_state{http2_machine=HTTP2Machine}, + Stream#stream{handler_state=Handlers}), StreamID, remote, IsFin), + EvHandlerState}. headers_frame(State=#http2_state{content_handlers=Handlers0}, - StreamID, IsFin, Headers, PseudoHeaders, _BodyLen) -> + StreamID, IsFin, Headers, PseudoHeaders, _BodyLen, + _EvHandler, EvHandlerState) -> Stream = #stream{ref=StreamRef, reply_to=ReplyTo} = get_stream_by_id(State, StreamID), case PseudoHeaders of #{status := Status} when Status >= 100, Status =< 199 -> ReplyTo ! {gun_inform, self(), StreamRef, Status, Headers}, - State; + {State, EvHandlerState}; #{status := Status} -> ReplyTo ! {gun_response, self(), StreamRef, IsFin, Status, Headers}, Handlers = case IsFin of @@ -187,15 +201,16 @@ headers_frame(State=#http2_state{content_handlers=Handlers0}, gun_content_handler:init(ReplyTo, StreamRef, Status, Headers, Handlers0) end, - maybe_delete_stream(store_stream(State, Stream#stream{handler_state=Handlers}), - StreamID, remote, IsFin) + {maybe_delete_stream(store_stream(State, Stream#stream{handler_state=Handlers}), + StreamID, remote, IsFin), + EvHandlerState} end. -trailers_frame(State, StreamID, Trailers) -> +trailers_frame(State, StreamID, Trailers, _EvHandler, EvHandlerState) -> #stream{ref=StreamRef, reply_to=ReplyTo} = get_stream_by_id(State, StreamID), %% @todo We probably want to pass this to gun_content_handler? ReplyTo ! {gun_trailers, self(), StreamRef, Trailers}, - maybe_delete_stream(State, StreamID, remote, fin). + {maybe_delete_stream(State, StreamID, remote, fin), EvHandlerState}. rst_stream_frame(State=#http2_state{streams=Streams0}, StreamID, Reason) -> case lists:keytake(StreamID, #stream.id, Streams0) of @@ -243,30 +258,56 @@ keepalive(State=#http2_state{socket=Socket, transport=Transport}) -> headers(State=#http2_state{socket=Socket, transport=Transport, http2_machine=HTTP2Machine0, streams=Streams}, - StreamRef, ReplyTo, Method, Host, Port, Path, Headers0) -> + StreamRef, ReplyTo, Method, Host, Port, Path, Headers0, + EvHandler, EvHandlerState0) -> {ok, StreamID, HTTP2Machine1} = cow_http2_machine:init_stream( iolist_to_binary(Method), HTTP2Machine0), {ok, PseudoHeaders, Headers} = prepare_headers(State, Method, Host, Port, Path, Headers0), + RequestEvent = #{ + stream_ref => StreamRef, + reply_to => ReplyTo, + function => ?FUNCTION_NAME, + method => Method, + authority => maps:get(authority, PseudoHeaders), + path => Path, + headers => Headers + }, + EvHandlerState1 = EvHandler:request_start(RequestEvent, EvHandlerState0), {ok, IsFin, HeaderBlock, HTTP2Machine} = cow_http2_machine:prepare_headers( StreamID, HTTP2Machine1, nofin, PseudoHeaders, Headers), Transport:send(Socket, cow_http2:headers(StreamID, IsFin, HeaderBlock)), + EvHandlerState = EvHandler:request_headers(RequestEvent, EvHandlerState1), Stream = #stream{id=StreamID, ref=StreamRef, reply_to=ReplyTo}, - State#http2_state{http2_machine=HTTP2Machine, streams=[Stream|Streams]}. + {State#http2_state{http2_machine=HTTP2Machine, streams=[Stream|Streams]}, + EvHandlerState}. request(State=#http2_state{socket=Socket, transport=Transport, http2_machine=HTTP2Machine0, streams=Streams}, - StreamRef, ReplyTo, Method, Host, Port, Path, Headers0, Body) -> + StreamRef, ReplyTo, Method, Host, Port, Path, Headers0, Body, + EvHandler, EvHandlerState0) -> Headers1 = lists:keystore(<<"content-length">>, 1, Headers0, {<<"content-length">>, integer_to_binary(iolist_size(Body))}), {ok, StreamID, HTTP2Machine1} = cow_http2_machine:init_stream( iolist_to_binary(Method), HTTP2Machine0), {ok, PseudoHeaders, Headers} = prepare_headers(State, Method, Host, Port, Path, Headers1), + RequestEvent = #{ + stream_ref => StreamRef, + reply_to => ReplyTo, + function => ?FUNCTION_NAME, + method => Method, + authority => maps:get(authority, PseudoHeaders), + path => Path, + headers => Headers + }, + EvHandlerState1 = EvHandler:request_start(RequestEvent, EvHandlerState0), {ok, IsFin, HeaderBlock, HTTP2Machine} = cow_http2_machine:prepare_headers( StreamID, HTTP2Machine1, nofin, PseudoHeaders, Headers), Transport:send(Socket, cow_http2:headers(StreamID, IsFin, HeaderBlock)), + EvHandlerState = EvHandler:request_headers(RequestEvent, EvHandlerState1), Stream = #stream{id=StreamID, ref=StreamRef, reply_to=ReplyTo}, maybe_send_data(State#http2_state{http2_machine=HTTP2Machine, - streams=[Stream|Streams]}, StreamID, fin, Body). + streams=[Stream|Streams]}, StreamID, fin, Body, + EvHandler, EvHandlerState). prepare_headers(#http2_state{transport=Transport}, Method, Host0, Port, Path, Headers0) -> Authority = case lists:keyfind(<<"host">>, 1, Headers0) of @@ -293,45 +334,59 @@ prepare_headers(#http2_state{transport=Transport}, Method, Host0, Port, Path, He }, {ok, PseudoHeaders, Headers}. -data(State=#http2_state{http2_machine=HTTP2Machine}, StreamRef, ReplyTo, IsFin, Data) -> +data(State=#http2_state{http2_machine=HTTP2Machine}, StreamRef, ReplyTo, IsFin, Data, + EvHandler, EvHandlerState) -> case get_stream_by_ref(State, StreamRef) of #stream{id=StreamID} -> case cow_http2_machine:get_stream_local_state(StreamID, HTTP2Machine) of {ok, fin, _} -> - error_stream_closed(State, StreamRef, ReplyTo); + {error_stream_closed(State, StreamRef, ReplyTo), EvHandlerState}; {ok, _, fin} -> - error_stream_closed(State, StreamRef, ReplyTo); + {error_stream_closed(State, StreamRef, ReplyTo), EvHandlerState}; {ok, _, _} -> - maybe_send_data(State, StreamID, IsFin, Data) + maybe_send_data(State, StreamID, IsFin, Data, EvHandler, EvHandlerState) end; false -> - error_stream_not_found(State, StreamRef, ReplyTo) + {error_stream_not_found(State, StreamRef, ReplyTo), EvHandlerState} end. -maybe_send_data(State=#http2_state{http2_machine=HTTP2Machine0}, StreamID, IsFin, Data0) -> +maybe_send_data(State=#http2_state{http2_machine=HTTP2Machine0}, StreamID, IsFin, Data0, + EvHandler, EvHandlerState) -> Data = case is_tuple(Data0) of false -> {data, Data0}; true -> Data0 end, case cow_http2_machine:send_or_queue_data(StreamID, HTTP2Machine0, IsFin, Data) of {ok, HTTP2Machine} -> - State#http2_state{http2_machine=HTTP2Machine}; + {State#http2_state{http2_machine=HTTP2Machine}, EvHandlerState}; {send, SendData, HTTP2Machine} -> - send_data(State#http2_state{http2_machine=HTTP2Machine}, SendData) + send_data(State#http2_state{http2_machine=HTTP2Machine}, SendData, + EvHandler, EvHandlerState) end. -send_data(State, []) -> - State; -send_data(State0, [{StreamID, IsFin, SendData}|Tail]) -> - State = send_data(State0, StreamID, IsFin, SendData), - send_data(State, Tail). +send_data(State, [], _, EvHandlerState) -> + {State, EvHandlerState}; +send_data(State0, [{StreamID, IsFin, SendData}|Tail], EvHandler, EvHandlerState0) -> + {State, EvHandlerState} = send_data(State0, StreamID, IsFin, SendData, EvHandler, EvHandlerState0), + send_data(State, Tail, EvHandler, EvHandlerState). -send_data(State0, StreamID, IsFin, [Data]) -> +send_data(State0, StreamID, IsFin, [Data], EvHandler, EvHandlerState0) -> State = send_data_frame(State0, StreamID, IsFin, Data), - maybe_delete_stream(State, StreamID, local, IsFin); -send_data(State0, StreamID, IsFin, [Data|Tail]) -> + EvHandlerState = case IsFin of + nofin -> + EvHandlerState0; + fin -> + #stream{ref=StreamRef, reply_to=ReplyTo} = get_stream_by_id(State, StreamID), + RequestEndEvent = #{ + stream_ref => StreamRef, + reply_to => ReplyTo + }, + EvHandler:request_end(RequestEndEvent, EvHandlerState0) + end, + {maybe_delete_stream(State, StreamID, local, IsFin), EvHandlerState}; +send_data(State0, StreamID, IsFin, [Data|Tail], EvHandler, EvHandlerState) -> State = send_data_frame(State0, StreamID, nofin, Data), - send_data(State, StreamID, IsFin, Tail). + send_data(State, StreamID, IsFin, Tail, EvHandler, EvHandlerState). send_data_frame(State=#http2_state{socket=Socket, transport=Transport}, StreamID, IsFin, {data, Data}) -> |