From 0f8452cafaa27aeffb103019564c086eacfd34f9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Hoguin?= Date: Mon, 16 Jan 2017 14:22:43 +0100 Subject: Add support for multiple stream handlers The stream handlers can be specified using the protocol option 'stream_handlers'. It defaults to [cowboy_stream_h]. The cowboy_stream_h module currently does not forward the calls to further stream handlers. It feels like an edge case; usually we'd want to put our own handlers between the protocol code and the request process. I am therefore going to focus on other things for now. The various types and specifications for stream handlers have been updated and the cowboy_stream module can now be safely used as a behavior. The interface might change a little more, though. This commit does not include tests or documentation. They will follow separately. --- src/cowboy_http.erl | 75 +++++++++++++++++++++++++---------------------------- 1 file changed, 35 insertions(+), 40 deletions(-) (limited to 'src/cowboy_http.erl') diff --git a/src/cowboy_http.erl b/src/cowboy_http.erl index fda768d..dcda3fb 100644 --- a/src/cowboy_http.erl +++ b/src/cowboy_http.erl @@ -14,7 +14,7 @@ -module(cowboy_http). --export([init/6]). +-export([init/5]). -export([system_continue/3]). -export([system_terminate/4]). @@ -67,17 +67,13 @@ }). -record(stream, { - %% Stream identifier. id = undefined :: cowboy_stream:streamid(), - - %% Stream handler state. - state = undefined :: any(), - + %% Stream handlers and their state. + state = undefined :: {module(), any()}, %% Client HTTP version for this stream. version = undefined :: cowboy:http_version(), - %% Commands queued. - queue = [] :: [] %% @todo better type + queue = [] :: cowboy_stream:commands() }). -type stream() :: #stream{}. @@ -88,7 +84,6 @@ socket :: inet:socket(), transport :: module(), opts = #{} :: map(), - handler :: module(), %% Remote address and port for the connection. peer = undefined :: {inet:ip_address(), inet:port_number()}, @@ -124,14 +119,14 @@ -include_lib("cowlib/include/cow_inline.hrl"). -include_lib("cowlib/include/cow_parse.hrl"). --spec init(pid(), ranch:ref(), inet:socket(), module(), cowboy:opts(), module()) -> ok. -init(Parent, Ref, Socket, Transport, Opts, Handler) -> +-spec init(pid(), ranch:ref(), inet:socket(), module(), cowboy:opts()) -> ok. +init(Parent, Ref, Socket, Transport, Opts) -> case Transport:peername(Socket) of {ok, Peer} -> LastStreamID = maps:get(max_keepalive, Opts, 100), before_loop(set_request_timeout(#state{ parent=Parent, ref=Ref, socket=Socket, - transport=Transport, opts=Opts, handler=Handler, + transport=Transport, opts=Opts, peer=Peer, last_streamid=LastStreamID}), <<>>); {error, Reason} -> %% Couldn't read the peer address; connection is gone. @@ -159,7 +154,7 @@ before_loop(State=#state{socket=Socket, transport=Transport}, Buffer) -> loop(State, Buffer). loop(State=#state{parent=Parent, socket=Socket, transport=Transport, - handler=_Handler, timer=TimerRef, children=Children}, Buffer) -> + timer=TimerRef, children=Children}, Buffer) -> {OK, Closed, Error} = Transport:messages(), receive %% Socket messages. @@ -257,9 +252,8 @@ parse(Buffer, State=#state{in_state=#ps_body{}}) -> %% @todo Don't parse if body is finished but request isn't. Let's not parallelize for now. after_parse({request, Req=#{streamid := StreamID, headers := Headers, version := Version}, - State0=#state{handler=Handler, opts=Opts, streams=Streams0}, Buffer}) -> - %% @todo Opts at the end. Maybe pass the same Opts we got? - try Handler:init(StreamID, Req, Opts) of + State0=#state{opts=Opts, streams=Streams0}, Buffer}) -> + try cowboy_stream:init(StreamID, Req, Opts) of {Commands, StreamState} -> Streams = [#stream{id=StreamID, state=StreamState, version=Version}|Streams0], State = case maybe_req_close(State0, Headers, Version) of @@ -268,27 +262,27 @@ after_parse({request, Req=#{streamid := StreamID, headers := Headers, version := end, parse(Buffer, commands(State, StreamID, Commands)) catch Class:Reason -> - error_logger:error_msg("Exception occurred in ~s:init(~p, ~p, ~p) " - "with reason ~p:~p.", - [Handler, StreamID, Req, Opts, Class, Reason]), - %% @todo Bad value returned here. Crashes. - ok + error_logger:error_msg("Exception occurred in " + "cowboy_stream:init(~p, ~p, ~p) with reason ~p:~p.", + [StreamID, Req, Opts, Class, Reason]), + ok %% @todo send a proper response, etc. note that terminate must NOT be called %% @todo Status code. % stream_reset(State, StreamID, {internal_error, {Class, Reason}, % 'Exception occurred in StreamHandler:init/10 call.'}) %% @todo Check final arity. end; %% Streams are sequential so the body is always about the last stream created %% unless that stream has terminated. -after_parse({data, StreamID, IsFin, Data, State=#state{handler=Handler, +after_parse({data, StreamID, IsFin, Data, State=#state{ streams=Streams0=[Stream=#stream{id=StreamID, state=StreamState0}|_]}, Buffer}) -> - try Handler:data(StreamID, IsFin, Data, StreamState0) of + try cowboy_stream:data(StreamID, IsFin, Data, StreamState0) of {Commands, StreamState} -> Streams = lists:keyreplace(StreamID, #stream.id, Streams0, Stream#stream{state=StreamState}), parse(Buffer, commands(State#state{streams=Streams}, StreamID, Commands)) catch Class:Reason -> - error_logger:error_msg("Exception occurred in ~s:data(~p, ~p, ~p, ~p) with reason ~p:~p.", - [Handler, StreamID, IsFin, Data, StreamState0, Class, Reason]), + error_logger:error_msg("Exception occurred in " + "cowboy_stream:data(~p, ~p, ~p, ~p) with reason ~p:~p.", + [StreamID, IsFin, Data, StreamState0, Class, Reason]), %% @todo Bad value returned here. Crashes. ok %% @todo @@ -669,18 +663,18 @@ is_http2_upgrade(_, _) -> %% Prior knowledge upgrade, without an HTTP/1.1 request. http2_upgrade(State=#state{parent=Parent, ref=Ref, socket=Socket, transport=Transport, - opts=Opts, handler=Handler, peer=Peer}, Buffer) -> + opts=Opts, peer=Peer}, Buffer) -> case Transport:secure() of false -> _ = cancel_request_timeout(State), - cowboy_http2:init(Parent, Ref, Socket, Transport, Opts, Handler, Peer, Buffer); + cowboy_http2:init(Parent, Ref, Socket, Transport, Opts, Peer, Buffer); true -> error_terminate(400, State, {connection_error, protocol_error, 'Clients that support HTTP/2 over TLS MUST use ALPN. (RFC7540 3.4)'}) end. http2_upgrade(State=#state{parent=Parent, ref=Ref, socket=Socket, transport=Transport, - opts=Opts, handler=Handler, peer=Peer}, Buffer, HTTP2Settings, Req) -> + opts=Opts, peer=Peer}, Buffer, HTTP2Settings, Req) -> %% @todo %% However if the client sent a body, we need to read the body in full %% and if we can't do that, return a 413 response. Some options are in order. @@ -695,7 +689,7 @@ http2_upgrade(State=#state{parent=Parent, ref=Ref, socket=Socket, transport=Tran %% @todo Possibly redirect the request if it was https. _ = cancel_request_timeout(State), - cowboy_http2:init(Parent, Ref, Socket, Transport, Opts, Handler, Peer, Buffer, Settings, Req) + cowboy_http2:init(Parent, Ref, Socket, Transport, Opts, Peer, Buffer, Settings, Req) catch _:_ -> error_terminate(400, State, {connection_error, protocol_error, 'The HTTP2-Settings header contains a base64 SETTINGS payload. (RFC7540 3.2, RFC7540 3.2.1)'}) @@ -748,17 +742,18 @@ down(State=#state{children=Children0}, Pid, Msg) -> State end. -info(State=#state{handler=Handler, streams=Streams0}, StreamID, Msg) -> +info(State=#state{streams=Streams0}, StreamID, Msg) -> case lists:keyfind(StreamID, #stream.id, Streams0) of Stream = #stream{state=StreamState0} -> - try Handler:info(StreamID, Msg, StreamState0) of + try cowboy_stream:info(StreamID, Msg, StreamState0) of {Commands, StreamState} -> Streams = lists:keyreplace(StreamID, #stream.id, Streams0, Stream#stream{state=StreamState}), commands(State#state{streams=Streams}, StreamID, Commands) catch Class:Reason -> - error_logger:error_msg("Exception occurred in ~s:info(~p, ~p, ~p) with reason ~p:~p.", - [Handler, StreamID, Msg, StreamState0, Class, Reason]), + error_logger:error_msg("Exception occurred in " + "cowboy_stream:info(~p, ~p, ~p) with reason ~p:~p.", + [StreamID, Msg, StreamState0, Class, Reason]), ok %% @todo % stream_reset(State, StreamID, {internal_error, {Class, Reason}, @@ -926,7 +921,7 @@ stream_reset(State, StreamID, StreamError={internal_error, _, _}) -> % stream_terminate(State#state{out_state=done}, StreamID, StreamError). stream_terminate(State, StreamID, StreamError). -stream_terminate(State=#state{socket=Socket, transport=Transport, handler=Handler, +stream_terminate(State=#state{socket=Socket, transport=Transport, out_streamid=OutStreamID, out_state=OutState, streams=Streams0, children=Children0}, StreamID, Reason) -> {value, #stream{state=StreamState, version=Version}, Streams} @@ -940,7 +935,7 @@ stream_terminate(State=#state{socket=Socket, transport=Transport, handler=Handle ok end, - stream_call_terminate(StreamID, Reason, Handler, StreamState), + stream_call_terminate(StreamID, Reason, StreamState), %% @todo initiate children shutdown % Children = stream_terminate_children(Children0, StreamID, []), Children = [case C of @@ -964,13 +959,13 @@ stream_terminate(State=#state{socket=Socket, transport=Transport, handler=Handle end. %% @todo Taken directly from _http2 -stream_call_terminate(StreamID, Reason, Handler, StreamState) -> +stream_call_terminate(StreamID, Reason, StreamState) -> try - Handler:terminate(StreamID, Reason, StreamState), - ok + cowboy_stream:terminate(StreamID, Reason, StreamState) catch Class:Reason -> - error_logger:error_msg("Exception occurred in ~s:terminate(~p, ~p, ~p) with reason ~p:~p.", - [Handler, StreamID, Reason, StreamState, Class, Reason]) + error_logger:error_msg("Exception occurred in " + "cowboy_stream:terminate(~p, ~p, ~p) with reason ~p:~p.", + [StreamID, Reason, StreamState, Class, Reason]) end. %stream_terminate_children([], _, Acc) -> -- cgit v1.2.3