From 0f8452cafaa27aeffb103019564c086eacfd34f9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Hoguin?= Date: Mon, 16 Jan 2017 14:22:43 +0100 Subject: Add support for multiple stream handlers The stream handlers can be specified using the protocol option 'stream_handlers'. It defaults to [cowboy_stream_h]. The cowboy_stream_h module currently does not forward the calls to further stream handlers. It feels like an edge case; usually we'd want to put our own handlers between the protocol code and the request process. I am therefore going to focus on other things for now. The various types and specifications for stream handlers have been updated and the cowboy_stream module can now be safely used as a behavior. The interface might change a little more, though. This commit does not include tests or documentation. They will follow separately. --- src/cowboy.erl | 21 ++++++---- src/cowboy_clear.erl | 5 +-- src/cowboy_http.erl | 75 ++++++++++++++++------------------ src/cowboy_http2.erl | 102 ++++++++++++++++++++++------------------------ src/cowboy_stream.erl | 106 +++++++++++++++++++++++++++++++++++++++++------- src/cowboy_stream_h.erl | 23 ++++++----- src/cowboy_tls.erl | 5 +-- 7 files changed, 203 insertions(+), 134 deletions(-) (limited to 'src') diff --git a/src/cowboy.erl b/src/cowboy.erl index 7bb9f1f..56aef34 100644 --- a/src/cowboy.erl +++ b/src/cowboy.erl @@ -41,26 +41,29 @@ %% doesn't let us do that yet. -spec start_clear(ranch:ref(), non_neg_integer(), ranch_tcp:opts(), opts()) -> {ok, pid()} | {error, any()}. -start_clear(Ref, NbAcceptors, TransOpts0, ProtoOpts) +start_clear(Ref, NbAcceptors, TransOpts0, ProtoOpts0) when is_integer(NbAcceptors), NbAcceptors > 0 -> - TransOpts = [connection_type(ProtoOpts)|TransOpts0], + {TransOpts, ConnectionType} = ensure_connection_type(TransOpts0), + ProtoOpts = ProtoOpts0#{connection_type => ConnectionType}, ranch:start_listener(Ref, NbAcceptors, ranch_tcp, TransOpts, cowboy_clear, ProtoOpts). -spec start_tls(ranch:ref(), non_neg_integer(), ranch_ssl:opts(), opts()) -> {ok, pid()} | {error, any()}. -start_tls(Ref, NbAcceptors, TransOpts0, ProtoOpts) +start_tls(Ref, NbAcceptors, TransOpts0, ProtoOpts0) when is_integer(NbAcceptors), NbAcceptors > 0 -> + {TransOpts1, ConnectionType} = ensure_connection_type(TransOpts0), TransOpts = [ - connection_type(ProtoOpts), {next_protocols_advertised, [<<"h2">>, <<"http/1.1">>]}, {alpn_preferred_protocols, [<<"h2">>, <<"http/1.1">>]} - |TransOpts0], + |TransOpts1], + ProtoOpts = ProtoOpts0#{connection_type => ConnectionType}, ranch:start_listener(Ref, NbAcceptors, ranch_ssl, TransOpts, cowboy_tls, ProtoOpts). --spec connection_type(opts()) -> {connection_type, worker | supervisor}. -connection_type(ProtoOpts) -> - {_, Type} = maps:get(stream_handler, ProtoOpts, {cowboy_stream_h, supervisor}), - {connection_type, Type}. +ensure_connection_type(TransOpts) -> + case proplists:get_value(connection_type, TransOpts) of + undefined -> {[{connection_type, supervisor}|TransOpts], supervisor}; + ConnectionType -> {TransOpts, ConnectionType} + end. -spec stop_listener(ranch:ref()) -> ok | {error, not_found}. stop_listener(Ref) -> diff --git a/src/cowboy_clear.erl b/src/cowboy_clear.erl index cd8b2ca..7ff5d8b 100644 --- a/src/cowboy_clear.erl +++ b/src/cowboy_clear.erl @@ -40,9 +40,8 @@ init(Parent, Ref, Socket, Transport, Opts) -> init(Parent, Ref, Socket, Transport, Opts, cowboy_http). init(Parent, Ref, Socket, Transport, Opts, Protocol) -> - {Handler, Type} = maps:get(stream_handler, Opts, {cowboy_stream_h, supervisor}), - _ = case Type of + _ = case maps:get(connection_type, Opts, supervisor) of worker -> ok; supervisor -> process_flag(trap_exit, true) end, - Protocol:init(Parent, Ref, Socket, Transport, Opts, Handler). + Protocol:init(Parent, Ref, Socket, Transport, Opts). diff --git a/src/cowboy_http.erl b/src/cowboy_http.erl index fda768d..dcda3fb 100644 --- a/src/cowboy_http.erl +++ b/src/cowboy_http.erl @@ -14,7 +14,7 @@ -module(cowboy_http). --export([init/6]). +-export([init/5]). -export([system_continue/3]). -export([system_terminate/4]). @@ -67,17 +67,13 @@ }). -record(stream, { - %% Stream identifier. id = undefined :: cowboy_stream:streamid(), - - %% Stream handler state. - state = undefined :: any(), - + %% Stream handlers and their state. + state = undefined :: {module(), any()}, %% Client HTTP version for this stream. version = undefined :: cowboy:http_version(), - %% Commands queued. - queue = [] :: [] %% @todo better type + queue = [] :: cowboy_stream:commands() }). -type stream() :: #stream{}. @@ -88,7 +84,6 @@ socket :: inet:socket(), transport :: module(), opts = #{} :: map(), - handler :: module(), %% Remote address and port for the connection. peer = undefined :: {inet:ip_address(), inet:port_number()}, @@ -124,14 +119,14 @@ -include_lib("cowlib/include/cow_inline.hrl"). -include_lib("cowlib/include/cow_parse.hrl"). --spec init(pid(), ranch:ref(), inet:socket(), module(), cowboy:opts(), module()) -> ok. -init(Parent, Ref, Socket, Transport, Opts, Handler) -> +-spec init(pid(), ranch:ref(), inet:socket(), module(), cowboy:opts()) -> ok. +init(Parent, Ref, Socket, Transport, Opts) -> case Transport:peername(Socket) of {ok, Peer} -> LastStreamID = maps:get(max_keepalive, Opts, 100), before_loop(set_request_timeout(#state{ parent=Parent, ref=Ref, socket=Socket, - transport=Transport, opts=Opts, handler=Handler, + transport=Transport, opts=Opts, peer=Peer, last_streamid=LastStreamID}), <<>>); {error, Reason} -> %% Couldn't read the peer address; connection is gone. @@ -159,7 +154,7 @@ before_loop(State=#state{socket=Socket, transport=Transport}, Buffer) -> loop(State, Buffer). loop(State=#state{parent=Parent, socket=Socket, transport=Transport, - handler=_Handler, timer=TimerRef, children=Children}, Buffer) -> + timer=TimerRef, children=Children}, Buffer) -> {OK, Closed, Error} = Transport:messages(), receive %% Socket messages. @@ -257,9 +252,8 @@ parse(Buffer, State=#state{in_state=#ps_body{}}) -> %% @todo Don't parse if body is finished but request isn't. Let's not parallelize for now. after_parse({request, Req=#{streamid := StreamID, headers := Headers, version := Version}, - State0=#state{handler=Handler, opts=Opts, streams=Streams0}, Buffer}) -> - %% @todo Opts at the end. Maybe pass the same Opts we got? - try Handler:init(StreamID, Req, Opts) of + State0=#state{opts=Opts, streams=Streams0}, Buffer}) -> + try cowboy_stream:init(StreamID, Req, Opts) of {Commands, StreamState} -> Streams = [#stream{id=StreamID, state=StreamState, version=Version}|Streams0], State = case maybe_req_close(State0, Headers, Version) of @@ -268,27 +262,27 @@ after_parse({request, Req=#{streamid := StreamID, headers := Headers, version := end, parse(Buffer, commands(State, StreamID, Commands)) catch Class:Reason -> - error_logger:error_msg("Exception occurred in ~s:init(~p, ~p, ~p) " - "with reason ~p:~p.", - [Handler, StreamID, Req, Opts, Class, Reason]), - %% @todo Bad value returned here. Crashes. - ok + error_logger:error_msg("Exception occurred in " + "cowboy_stream:init(~p, ~p, ~p) with reason ~p:~p.", + [StreamID, Req, Opts, Class, Reason]), + ok %% @todo send a proper response, etc. note that terminate must NOT be called %% @todo Status code. % stream_reset(State, StreamID, {internal_error, {Class, Reason}, % 'Exception occurred in StreamHandler:init/10 call.'}) %% @todo Check final arity. end; %% Streams are sequential so the body is always about the last stream created %% unless that stream has terminated. -after_parse({data, StreamID, IsFin, Data, State=#state{handler=Handler, +after_parse({data, StreamID, IsFin, Data, State=#state{ streams=Streams0=[Stream=#stream{id=StreamID, state=StreamState0}|_]}, Buffer}) -> - try Handler:data(StreamID, IsFin, Data, StreamState0) of + try cowboy_stream:data(StreamID, IsFin, Data, StreamState0) of {Commands, StreamState} -> Streams = lists:keyreplace(StreamID, #stream.id, Streams0, Stream#stream{state=StreamState}), parse(Buffer, commands(State#state{streams=Streams}, StreamID, Commands)) catch Class:Reason -> - error_logger:error_msg("Exception occurred in ~s:data(~p, ~p, ~p, ~p) with reason ~p:~p.", - [Handler, StreamID, IsFin, Data, StreamState0, Class, Reason]), + error_logger:error_msg("Exception occurred in " + "cowboy_stream:data(~p, ~p, ~p, ~p) with reason ~p:~p.", + [StreamID, IsFin, Data, StreamState0, Class, Reason]), %% @todo Bad value returned here. Crashes. ok %% @todo @@ -669,18 +663,18 @@ is_http2_upgrade(_, _) -> %% Prior knowledge upgrade, without an HTTP/1.1 request. http2_upgrade(State=#state{parent=Parent, ref=Ref, socket=Socket, transport=Transport, - opts=Opts, handler=Handler, peer=Peer}, Buffer) -> + opts=Opts, peer=Peer}, Buffer) -> case Transport:secure() of false -> _ = cancel_request_timeout(State), - cowboy_http2:init(Parent, Ref, Socket, Transport, Opts, Handler, Peer, Buffer); + cowboy_http2:init(Parent, Ref, Socket, Transport, Opts, Peer, Buffer); true -> error_terminate(400, State, {connection_error, protocol_error, 'Clients that support HTTP/2 over TLS MUST use ALPN. (RFC7540 3.4)'}) end. http2_upgrade(State=#state{parent=Parent, ref=Ref, socket=Socket, transport=Transport, - opts=Opts, handler=Handler, peer=Peer}, Buffer, HTTP2Settings, Req) -> + opts=Opts, peer=Peer}, Buffer, HTTP2Settings, Req) -> %% @todo %% However if the client sent a body, we need to read the body in full %% and if we can't do that, return a 413 response. Some options are in order. @@ -695,7 +689,7 @@ http2_upgrade(State=#state{parent=Parent, ref=Ref, socket=Socket, transport=Tran %% @todo Possibly redirect the request if it was https. _ = cancel_request_timeout(State), - cowboy_http2:init(Parent, Ref, Socket, Transport, Opts, Handler, Peer, Buffer, Settings, Req) + cowboy_http2:init(Parent, Ref, Socket, Transport, Opts, Peer, Buffer, Settings, Req) catch _:_ -> error_terminate(400, State, {connection_error, protocol_error, 'The HTTP2-Settings header contains a base64 SETTINGS payload. (RFC7540 3.2, RFC7540 3.2.1)'}) @@ -748,17 +742,18 @@ down(State=#state{children=Children0}, Pid, Msg) -> State end. -info(State=#state{handler=Handler, streams=Streams0}, StreamID, Msg) -> +info(State=#state{streams=Streams0}, StreamID, Msg) -> case lists:keyfind(StreamID, #stream.id, Streams0) of Stream = #stream{state=StreamState0} -> - try Handler:info(StreamID, Msg, StreamState0) of + try cowboy_stream:info(StreamID, Msg, StreamState0) of {Commands, StreamState} -> Streams = lists:keyreplace(StreamID, #stream.id, Streams0, Stream#stream{state=StreamState}), commands(State#state{streams=Streams}, StreamID, Commands) catch Class:Reason -> - error_logger:error_msg("Exception occurred in ~s:info(~p, ~p, ~p) with reason ~p:~p.", - [Handler, StreamID, Msg, StreamState0, Class, Reason]), + error_logger:error_msg("Exception occurred in " + "cowboy_stream:info(~p, ~p, ~p) with reason ~p:~p.", + [StreamID, Msg, StreamState0, Class, Reason]), ok %% @todo % stream_reset(State, StreamID, {internal_error, {Class, Reason}, @@ -926,7 +921,7 @@ stream_reset(State, StreamID, StreamError={internal_error, _, _}) -> % stream_terminate(State#state{out_state=done}, StreamID, StreamError). stream_terminate(State, StreamID, StreamError). -stream_terminate(State=#state{socket=Socket, transport=Transport, handler=Handler, +stream_terminate(State=#state{socket=Socket, transport=Transport, out_streamid=OutStreamID, out_state=OutState, streams=Streams0, children=Children0}, StreamID, Reason) -> {value, #stream{state=StreamState, version=Version}, Streams} @@ -940,7 +935,7 @@ stream_terminate(State=#state{socket=Socket, transport=Transport, handler=Handle ok end, - stream_call_terminate(StreamID, Reason, Handler, StreamState), + stream_call_terminate(StreamID, Reason, StreamState), %% @todo initiate children shutdown % Children = stream_terminate_children(Children0, StreamID, []), Children = [case C of @@ -964,13 +959,13 @@ stream_terminate(State=#state{socket=Socket, transport=Transport, handler=Handle end. %% @todo Taken directly from _http2 -stream_call_terminate(StreamID, Reason, Handler, StreamState) -> +stream_call_terminate(StreamID, Reason, StreamState) -> try - Handler:terminate(StreamID, Reason, StreamState), - ok + cowboy_stream:terminate(StreamID, Reason, StreamState) catch Class:Reason -> - error_logger:error_msg("Exception occurred in ~s:terminate(~p, ~p, ~p) with reason ~p:~p.", - [Handler, StreamID, Reason, StreamState, Class, Reason]) + error_logger:error_msg("Exception occurred in " + "cowboy_stream:terminate(~p, ~p, ~p) with reason ~p:~p.", + [StreamID, Reason, StreamState, Class, Reason]) end. %stream_terminate_children([], _, Acc) -> diff --git a/src/cowboy_http2.erl b/src/cowboy_http2.erl index 0a719b8..ead3ff5 100644 --- a/src/cowboy_http2.erl +++ b/src/cowboy_http2.erl @@ -14,9 +14,9 @@ -module(cowboy_http2). --export([init/6]). --export([init/8]). --export([init/10]). +-export([init/5]). +-export([init/7]). +-export([init/9]). -export([system_continue/3]). -export([system_terminate/4]). @@ -24,7 +24,8 @@ -record(stream, { id = undefined :: cowboy_stream:streamid(), - state = undefined :: any(), + %% Stream handlers and their state. + state = undefined :: {module(), any()}, %% Whether we finished sending data. local = idle :: idle | cowboy_stream:fin(), %% Whether we finished receiving data. @@ -44,7 +45,6 @@ socket = undefined :: inet:socket(), transport :: module(), opts = #{} :: map(), - handler :: module(), %% Remote address and port for the connection. peer = undefined :: {inet:ip_address(), inet:port_number()}, @@ -89,21 +89,21 @@ encode_state = cow_hpack:init() :: cow_hpack:state() }). --spec init(pid(), ranch:ref(), inet:socket(), module(), cowboy:opts(), module()) -> ok. -init(Parent, Ref, Socket, Transport, Opts, Handler) -> +-spec init(pid(), ranch:ref(), inet:socket(), module(), cowboy:opts()) -> ok. +init(Parent, Ref, Socket, Transport, Opts) -> case Transport:peername(Socket) of {ok, Peer} -> - init(Parent, Ref, Socket, Transport, Opts, Handler, Peer, <<>>); + init(Parent, Ref, Socket, Transport, Opts, Peer, <<>>); {error, Reason} -> %% Couldn't read the peer address; connection is gone. terminate(undefined, {socket_error, Reason, 'An error has occurred on the socket.'}) end. --spec init(pid(), ranch:ref(), inet:socket(), module(), cowboy:opts(), module(), +-spec init(pid(), ranch:ref(), inet:socket(), module(), cowboy:opts(), {inet:ip_address(), inet:port_number()}, binary()) -> ok. -init(Parent, Ref, Socket, Transport, Opts, Handler, Peer, Buffer) -> +init(Parent, Ref, Socket, Transport, Opts, Peer, Buffer) -> State = #state{parent=Parent, ref=Ref, socket=Socket, - transport=Transport, opts=Opts, handler=Handler, peer=Peer, + transport=Transport, opts=Opts, peer=Peer, parse_state={preface, sequence, preface_timeout(Opts)}}, preface(State), case Buffer of @@ -112,11 +112,11 @@ init(Parent, Ref, Socket, Transport, Opts, Handler, Peer, Buffer) -> end. %% @todo Add an argument for the request body. --spec init(pid(), ranch:ref(), inet:socket(), module(), cowboy:opts(), module(), +-spec init(pid(), ranch:ref(), inet:socket(), module(), cowboy:opts(), {inet:ip_address(), inet:port_number()}, binary(), map() | undefined, cowboy_req:req()) -> ok. -init(Parent, Ref, Socket, Transport, Opts, Handler, Peer, Buffer, _Settings, Req) -> +init(Parent, Ref, Socket, Transport, Opts, Peer, Buffer, _Settings, Req) -> State0 = #state{parent=Parent, ref=Ref, socket=Socket, - transport=Transport, opts=Opts, handler=Handler, peer=Peer, + transport=Transport, opts=Opts, peer=Peer, parse_state={preface, sequence, preface_timeout(Opts)}}, preface(State0), %% @todo Apply settings. @@ -245,7 +245,7 @@ parse_settings_preface(State, _, _, _) -> %% and terminate the stream if this is the end of it. %% DATA frame. -frame(State=#state{handler=Handler, streams=Streams}, {data, StreamID, IsFin0, Data}) -> +frame(State=#state{streams=Streams}, {data, StreamID, IsFin0, Data}) -> case lists:keyfind(StreamID, #stream.id, Streams) of Stream = #stream{state=StreamState0, remote=nofin, body_length=Len0} -> Len = Len0 + byte_size(Data), @@ -253,14 +253,15 @@ frame(State=#state{handler=Handler, streams=Streams}, {data, StreamID, IsFin0, D fin -> {fin, Len}; nofin -> nofin end, - try Handler:data(StreamID, IsFin, Data, StreamState0) of + try cowboy_stream:data(StreamID, IsFin, Data, StreamState0) of {Commands, StreamState} -> commands(State, Stream#stream{state=StreamState, body_length=Len}, Commands) catch Class:Reason -> - error_logger:error_msg("Exception occurred in ~s:data(~p, ~p, ~p, ~p) with reason ~p:~p.", - [Handler, StreamID, IsFin0, Data, StreamState0, Class, Reason]), + error_logger:error_msg("Exception occurred in " + "cowboy_stream:data(~p, ~p, ~p, ~p) with reason ~p:~p.", + [StreamID, IsFin0, Data, StreamState0, Class, Reason]), stream_reset(State, StreamID, {internal_error, {Class, Reason}, - 'Exception occurred in StreamHandler:data/4 call.'}) + 'Exception occurred in cowboy_stream:data/4.'}) end; _ -> stream_reset(State, StreamID, {stream_error, stream_closed, @@ -350,17 +351,18 @@ down(State=#state{children=Children0}, Pid, Msg) -> State end. -info(State=#state{handler=Handler, streams=Streams}, StreamID, Msg) -> +info(State=#state{streams=Streams}, StreamID, Msg) -> case lists:keyfind(StreamID, #stream.id, Streams) of Stream = #stream{state=StreamState0} -> - try Handler:info(StreamID, Msg, StreamState0) of + try cowboy_stream:info(StreamID, Msg, StreamState0) of {Commands, StreamState} -> commands(State, Stream#stream{state=StreamState}, Commands) catch Class:Reason -> - error_logger:error_msg("Exception occurred in ~s:info(~p, ~p, ~p) with reason ~p:~p.", - [Handler, StreamID, Msg, StreamState0, Class, Reason]), + error_logger:error_msg("Exception occurred in " + "cowboy_stream:info(~p, ~p, ~p) with reason ~p:~p.", + [StreamID, Msg, StreamState0, Class, Reason]), stream_reset(State, StreamID, {internal_error, {Class, Reason}, - 'Exception occurred in StreamHandler:info/3 call.'}) + 'Exception occurred in cowboy_stream:info/3.'}) end; false -> error_logger:error_msg("Received message ~p for unknown stream ~p.", [Msg, StreamID]), @@ -482,14 +484,8 @@ commands(State, Stream=#stream{id=StreamID}, [Error = {internal_error, _, _}|_Ta %% @todo Do we even allow commands after? %% @todo Only reset when the stream still exists. stream_reset(after_commands(State, Stream), StreamID, Error); -%% Upgrade to a new protocol. -%% -%% @todo Implementation. -%% @todo Can only upgrade if: there are no other streams and there are no children left alive. -%% @todo For HTTP/1.1 we should reject upgrading if pipelining is used. -commands(State, Stream, [{upgrade, _Mod, _ModState}]) -> - commands(State, Stream, []); -commands(State, Stream, [{upgrade, _Mod, _ModState}|Tail]) -> +%% @todo HTTP/2 has no support for the Upgrade mechanism. +commands(State, Stream, [{switch_protocol, _Headers, _Mod, _ModState}|Tail]) -> %% @todo This is an error. Not sure what to do here yet. commands(State, Stream, Tail); commands(State, Stream=#stream{id=StreamID}, [stop|_Tail]) -> @@ -518,19 +514,19 @@ send_data(Socket, Transport, StreamID, IsFin, Data, Length) -> end. -spec terminate(#state{}, _) -> no_return(). -terminate(#state{socket=Socket, transport=Transport, handler=Handler, +terminate(#state{socket=Socket, transport=Transport, streams=Streams, children=Children}, Reason) -> %% @todo Send GOAWAY frame; need to keep track of last good stream id; how? - terminate_all_streams(Streams, Reason, Handler, Children), + terminate_all_streams(Streams, Reason, Children), Transport:close(Socket), exit({shutdown, Reason}). -terminate_all_streams([], _, _, []) -> +terminate_all_streams([], _, []) -> ok; -terminate_all_streams([#stream{id=StreamID, state=StreamState}|Tail], Reason, Handler, Children0) -> - stream_call_terminate(StreamID, Reason, Handler, StreamState), +terminate_all_streams([#stream{id=StreamID, state=StreamState}|Tail], Reason, Children0) -> + stream_call_terminate(StreamID, Reason, StreamState), Children = stream_terminate_children(Children0, StreamID, []), - terminate_all_streams(Tail, Reason, Handler, Children). + terminate_all_streams(Tail, Reason, Children). %% Stream functions. @@ -593,16 +589,16 @@ stream_init(State0=#state{ref=Ref, socket=Socket, transport=Transport, peer=Peer 'Error while trying to decode HPACK-encoded header block. (RFC7540 4.3)'}) end. -stream_handler_init(State=#state{handler=Handler, opts=Opts}, StreamID, IsFin, Req) -> - try Handler:init(StreamID, Req, Opts) of +stream_handler_init(State=#state{opts=Opts}, StreamID, IsFin, Req) -> + try cowboy_stream:init(StreamID, Req, Opts) of {Commands, StreamState} -> commands(State, #stream{id=StreamID, state=StreamState, remote=IsFin}, Commands) catch Class:Reason -> - error_logger:error_msg("Exception occurred in ~s:init(~p, ~p, ~p) " - "with reason ~p:~p.", - [Handler, StreamID, IsFin, Req, Class, Reason]), + error_logger:error_msg("Exception occurred in " + "cowboy_stream:init(~p, ~p, ~p) with reason ~p:~p.", + [StreamID, IsFin, Req, Class, Reason]), stream_reset(State, StreamID, {internal_error, {Class, Reason}, - 'Exception occurred in StreamHandler:init/7 call.'}) %% @todo Check final arity. + 'Exception occurred in cowboy_stream:init/3.'}) end. %% @todo We might need to keep track of which stream has been reset so we don't send lots of them. @@ -615,23 +611,23 @@ stream_reset(State=#state{socket=Socket, transport=Transport}, StreamID, Transport:send(Socket, cow_http2:rst_stream(StreamID, Reason)), stream_terminate(State, StreamID, StreamError). -stream_terminate(State=#state{socket=Socket, transport=Transport, handler=Handler, +stream_terminate(State=#state{socket=Socket, transport=Transport, streams=Streams0, children=Children0, encode_state=EncodeState0}, StreamID, Reason) -> case lists:keytake(StreamID, #stream.id, Streams0) of {value, #stream{state=StreamState, local=idle}, Streams} when Reason =:= normal -> Headers = #{<<":status">> => <<"204">>}, {HeaderBlock, EncodeState} = headers_encode(Headers, EncodeState0), Transport:send(Socket, cow_http2:headers(StreamID, fin, HeaderBlock)), - stream_call_terminate(StreamID, Reason, Handler, StreamState), + stream_call_terminate(StreamID, Reason, StreamState), Children = stream_terminate_children(Children0, StreamID, []), State#state{streams=Streams, children=Children, encode_state=EncodeState}; {value, #stream{state=StreamState, local=nofin}, Streams} when Reason =:= normal -> Transport:send(Socket, cow_http2:data(StreamID, fin, <<>>)), - stream_call_terminate(StreamID, Reason, Handler, StreamState), + stream_call_terminate(StreamID, Reason, StreamState), Children = stream_terminate_children(Children0, StreamID, []), State#state{streams=Streams, children=Children}; {value, #stream{state=StreamState}, Streams} -> - stream_call_terminate(StreamID, Reason, Handler, StreamState), + stream_call_terminate(StreamID, Reason, StreamState), Children = stream_terminate_children(Children0, StreamID, []), State#state{streams=Streams, children=Children}; false -> @@ -640,13 +636,13 @@ stream_terminate(State=#state{socket=Socket, transport=Transport, handler=Handle State end. -stream_call_terminate(StreamID, Reason, Handler, StreamState) -> +stream_call_terminate(StreamID, Reason, StreamState) -> try - Handler:terminate(StreamID, Reason, StreamState), - ok + cowboy_stream:terminate(StreamID, Reason, StreamState) catch Class:Reason -> - error_logger:error_msg("Exception occurred in ~s:terminate(~p, ~p, ~p) with reason ~p:~p.", - [Handler, StreamID, Reason, StreamState, Class, Reason]) + error_logger:error_msg("Exception occurred in " + "cowboy_stream:terminate(~p, ~p, ~p) with reason ~p:~p.", + [StreamID, Reason, StreamState, Class, Reason]) end. stream_terminate_children([], _, Acc) -> diff --git a/src/cowboy_stream.erl b/src/cowboy_stream.erl index 04601af..19b3663 100644 --- a/src/cowboy_stream.erl +++ b/src/cowboy_stream.erl @@ -14,31 +14,46 @@ -module(cowboy_stream). +-type state() :: any(). +-type human_reason() :: atom(). + -type streamid() :: any(). +-export_type([streamid/0]). + -type fin() :: fin | nofin. --type headers() :: map(). %% @todo cowboy:http_headers() when they're maps +-export_type([fin/0]). --type status_code() :: 100..999. %% @todo cowboy:http_status() when not binary --type state() :: any(). +%% @todo Perhaps it makes more sense to have resp_body in this module? --type commands() :: [{response, fin(), status_code(), headers()} +-type commands() :: [{response, cowboy:http_status(), cowboy:http_headers(), cowboy_req:resp_body()} + | {headers, cowboy:http_status(), cowboy:http_headers()} | {data, fin(), iodata()} - | {promise, binary(), binary(), binary(), binary(), headers()} + | {push, binary(), binary(), binary(), inet:port_number(), + binary(), binary(), cowboy:http_headers()} | {flow, auto | integer()} - | {spawn, pid()} - | {upgrade, module(), state()}]. + | {spawn, pid(), timeout()} + | {error_response, cowboy:http_status(), cowboy:http_headers(), iodata()} + | {internal_error, any(), human_reason()} + | {switch_protocol, cowboy:http_headers(), module(), state()} + %% @todo I'm not convinced we need this 'stop' command. + %% It's used on crashes, but error_response should + %% terminate the request instead. It's also used on + %% normal exits of children. I'm not sure what to do + %% there yet. Investigate. + | stop]. +-export_type([commands/0]). --type human_reason() :: atom(). --type reason() :: [{internal_error, timeout | {error | exit | throw, any()}, human_reason()} +-type reason() :: normal + | {internal_error, timeout | {error | exit | throw, any()}, human_reason()} | {socket_error, closed | atom(), human_reason()} - | {stream_error, cow_http2:error_reason(), human_reason()} - | {connection_error, cow_http2:error_reason(), human_reason()} - | {stop, cow_http2:frame(), human_reason()}]. + | {stream_error, cow_http2:error(), human_reason()} + | {connection_error, cow_http2:error(), human_reason()} + | {stop, cow_http2:frame(), human_reason()}. +-export_type([reason/0]). --callback init(streamid(), fin(), binary(), binary(), binary(), binary(), - headers(), cowboy:opts()) -> {commands(), state()}. +-callback init(streamid(), cowboy_req:req(), cowboy:opts()) -> {commands(), state()}. -callback data(streamid(), fin(), binary(), State) -> {commands(), State} when State::state(). --callback info(streamid(), any(), state()) -> {commands(), State} when State::state(). +-callback info(streamid(), any(), State) -> {commands(), State} when State::state(). -callback terminate(streamid(), reason(), state()) -> any(). %% @todo To optimize the number of active timers we could have a command @@ -51,3 +66,64 @@ %% %% This same timer can be used to try and send PING frames to help detect %% that the connection is indeed unresponsive. + +-export([init/3]). +-export([data/4]). +-export([info/3]). +-export([terminate/3]). + +%% Note that this and other functions in this module do NOT catch +%% exceptions. We want the exception to go all the way down to the +%% protocol code. +%% +%% OK the failure scenario is not so clear. The problem is +%% that the failure at any point in init/3 will result in the +%% corresponding state being lost. I am unfortunately not +%% confident we can do anything about this. If the crashing +%% handler just created a process, we'll never know about it. +%% Therefore at this time I choose to leave all failure handling +%% to the protocol process. +%% +%% Note that a failure in init/3 will result in terminate/3 +%% NOT being called. This is because the state is not available. + +-spec init(streamid(), cowboy_req:req(), cowboy:opts()) + -> {commands(), {module(), state()} | undefined}. +init(StreamID, Req, Opts) -> + case maps:get(stream_handlers, Opts, [cowboy_stream_h]) of + [] -> + {[], undefined}; + [Handler|Tail] -> + %% We call the next handler and remove it from the list of + %% stream handlers. This means that handlers that run after + %% it have no knowledge it exists. Should user require this + %% knowledge they can just define a separate option that will + %% be left untouched. + {Commands, State} = Handler:init(StreamID, Req, Opts#{stream_handlers => Tail}), + {Commands, {Handler, State}} + end. + +-spec data(streamid(), fin(), binary(), {Handler, State} | undefined) + -> {commands(), {Handler, State} | undefined} + when Handler::module(), State::state(). +data(_, _, _, undefined) -> + {[], undefined}; +data(StreamID, IsFin, Data, {Handler, State0}) -> + {Commands, State} = Handler:data(StreamID, IsFin, Data, State0), + {Commands, {Handler, State}}. + +-spec info(streamid(), any(), {Handler, State} | undefined) + -> {commands(), {Handler, State} | undefined} + when Handler::module(), State::state(). +info(_, _, undefined) -> + {[], undefined}; +info(StreamID, Info, {Handler, State0}) -> + {Commands, State} = Handler:info(StreamID, Info, State0), + {Commands, {Handler, State}}. + +-spec terminate(streamid(), reason(), {module(), state()} | undefined) -> ok. +terminate(_, _, undefined) -> + ok; +terminate(StreamID, Reason, {Handler, State}) -> + _ = Handler:terminate(StreamID, Reason, State), + ok. diff --git a/src/cowboy_stream_h.erl b/src/cowboy_stream_h.erl index 3a9abe6..531a3fc 100644 --- a/src/cowboy_stream_h.erl +++ b/src/cowboy_stream_h.erl @@ -13,7 +13,7 @@ %% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. -module(cowboy_stream_h). -%% @todo -behaviour(cowboy_stream). +-behavior(cowboy_stream). %% @todo Maybe have a callback for the type of process this is, worker or supervisor. -export([init/3]). @@ -25,6 +25,8 @@ -export([execute/3]). -export([resume/5]). +%% @todo Need to call subsequent handlers. + -record(state, { ref = undefined :: ranch:ref(), pid = undefined :: pid(), @@ -39,8 +41,8 @@ %% the stream like supervisors do. So here just send a message to yourself first, %% and then decide what to do when receiving this message. -%% @todo proper specs --spec init(_,_,_) -> _. +-spec init(cowboy_stream:streamid(), cowboy_req:req(), cowboy:opts()) + -> {[{spawn, pid(), timeout()}], #state{}}. init(_StreamID, Req=#{ref := Ref}, Opts) -> Env = maps:get(env, Opts, #{}), Middlewares = maps:get(middlewares, Opts, [cowboy_router, cowboy_handler]), @@ -52,9 +54,8 @@ init(_StreamID, Req=#{ref := Ref}, Opts) -> %% If we accumulated enough data or IsFin=fin, send it. %% If not, buffer it. %% If not, buffer it. - -%% @todo proper specs --spec data(_,_,_,_) -> _. +-spec data(cowboy_stream:streamid(), cowboy_stream:fin(), cowboy_req:resp_body(), State) + -> {cowboy_stream:commands(), State} when State::#state{}. data(_StreamID, IsFin, Data, State=#state{read_body_ref=undefined, read_body_buffer=Buffer}) -> {[], State#state{read_body_is_fin=IsFin, read_body_buffer= << Buffer/binary, Data/binary >>}}; data(_StreamID, nofin, Data, State=#state{read_body_length=Length, read_body_buffer=Buffer}) when byte_size(Data) + byte_size(Buffer) < Length -> @@ -65,9 +66,10 @@ data(_StreamID, IsFin, Data, State=#state{pid=Pid, read_body_ref=Ref, Pid ! {request_body, Ref, IsFin, << Buffer/binary, Data/binary >>}, {[], State#state{read_body_ref=undefined, read_body_timer_ref=undefined, read_body_buffer= <<>>}}. -%% @todo proper specs --spec info(_,_,_) -> _. +-spec info(cowboy_stream:streamid(), any(), State) + -> {cowboy_stream:commands(), State} when State::#state{}. info(_StreamID, {'EXIT', Pid, normal}, State=#state{pid=Pid}) -> + %% @todo Do we even reach this clause? {[stop], State}; info(_StreamID, {'EXIT', Pid, {_Reason, [_, {cow_http_hd, _, _, _}|_]}}, State=#state{pid=Pid}) -> %% @todo Have an option to enable/disable this specific crash report? @@ -113,13 +115,12 @@ info(_StreamID, Push = {push, _, _, _, _, _, _, _}, State) -> info(_StreamID, SwitchProtocol = {switch_protocol, _, _, _}, State) -> {[SwitchProtocol], State}; %% Stray message. -info(_StreamID, _Msg, State) -> +info(_StreamID, _Info, State) -> %% @todo Error report. %% @todo Cleanup if no reply was sent when stream ends. {[], State}. -%% @todo proper specs --spec terminate(_,_,_) -> _. +-spec terminate(cowboy_stream:streamid(), cowboy_stream:reason(), #state{}) -> ok. terminate(_StreamID, _Reason, _State) -> ok. diff --git a/src/cowboy_tls.erl b/src/cowboy_tls.erl index 7dd558a..24e6666 100644 --- a/src/cowboy_tls.erl +++ b/src/cowboy_tls.erl @@ -45,9 +45,8 @@ init(Parent, Ref, Socket, Transport, Opts) -> end. init(Parent, Ref, Socket, Transport, Opts, Protocol) -> - {Handler, Type} = maps:get(stream_handler, Opts, {cowboy_stream_h, supervisor}), - _ = case Type of + _ = case maps:get(connection_type, Opts, supervisor) of worker -> ok; supervisor -> process_flag(trap_exit, true) end, - Protocol:init(Parent, Ref, Socket, Transport, Opts, Handler). + Protocol:init(Parent, Ref, Socket, Transport, Opts). -- cgit v1.2.3