aboutsummaryrefslogtreecommitdiffstats
path: root/src/cowboy_http.erl
diff options
context:
space:
mode:
authorLoïc Hoguin <[email protected]>2017-01-16 14:22:43 +0100
committerLoïc Hoguin <[email protected]>2017-01-16 14:36:33 +0100
commit0f8452cafaa27aeffb103019564c086eacfd34f9 (patch)
tree6f060f0c7e1eb54508cd128c9bf281869593e812 /src/cowboy_http.erl
parente5a8088e68f29206e162ee0f25f45a55ce05fe04 (diff)
downloadcowboy-0f8452cafaa27aeffb103019564c086eacfd34f9.tar.gz
cowboy-0f8452cafaa27aeffb103019564c086eacfd34f9.tar.bz2
cowboy-0f8452cafaa27aeffb103019564c086eacfd34f9.zip
Add support for multiple stream handlers
The stream handlers can be specified using the protocol option 'stream_handlers'. It defaults to [cowboy_stream_h]. The cowboy_stream_h module currently does not forward the calls to further stream handlers. It feels like an edge case; usually we'd want to put our own handlers between the protocol code and the request process. I am therefore going to focus on other things for now. The various types and specifications for stream handlers have been updated and the cowboy_stream module can now be safely used as a behavior. The interface might change a little more, though. This commit does not include tests or documentation. They will follow separately.
Diffstat (limited to 'src/cowboy_http.erl')
-rw-r--r--src/cowboy_http.erl75
1 files changed, 35 insertions, 40 deletions
diff --git a/src/cowboy_http.erl b/src/cowboy_http.erl
index fda768d..dcda3fb 100644
--- a/src/cowboy_http.erl
+++ b/src/cowboy_http.erl
@@ -14,7 +14,7 @@
-module(cowboy_http).
--export([init/6]).
+-export([init/5]).
-export([system_continue/3]).
-export([system_terminate/4]).
@@ -67,17 +67,13 @@
}).
-record(stream, {
- %% Stream identifier.
id = undefined :: cowboy_stream:streamid(),
-
- %% Stream handler state.
- state = undefined :: any(),
-
+ %% Stream handlers and their state.
+ state = undefined :: {module(), any()},
%% Client HTTP version for this stream.
version = undefined :: cowboy:http_version(),
-
%% Commands queued.
- queue = [] :: [] %% @todo better type
+ queue = [] :: cowboy_stream:commands()
}).
-type stream() :: #stream{}.
@@ -88,7 +84,6 @@
socket :: inet:socket(),
transport :: module(),
opts = #{} :: map(),
- handler :: module(),
%% Remote address and port for the connection.
peer = undefined :: {inet:ip_address(), inet:port_number()},
@@ -124,14 +119,14 @@
-include_lib("cowlib/include/cow_inline.hrl").
-include_lib("cowlib/include/cow_parse.hrl").
--spec init(pid(), ranch:ref(), inet:socket(), module(), cowboy:opts(), module()) -> ok.
-init(Parent, Ref, Socket, Transport, Opts, Handler) ->
+-spec init(pid(), ranch:ref(), inet:socket(), module(), cowboy:opts()) -> ok.
+init(Parent, Ref, Socket, Transport, Opts) ->
case Transport:peername(Socket) of
{ok, Peer} ->
LastStreamID = maps:get(max_keepalive, Opts, 100),
before_loop(set_request_timeout(#state{
parent=Parent, ref=Ref, socket=Socket,
- transport=Transport, opts=Opts, handler=Handler,
+ transport=Transport, opts=Opts,
peer=Peer, last_streamid=LastStreamID}), <<>>);
{error, Reason} ->
%% Couldn't read the peer address; connection is gone.
@@ -159,7 +154,7 @@ before_loop(State=#state{socket=Socket, transport=Transport}, Buffer) ->
loop(State, Buffer).
loop(State=#state{parent=Parent, socket=Socket, transport=Transport,
- handler=_Handler, timer=TimerRef, children=Children}, Buffer) ->
+ timer=TimerRef, children=Children}, Buffer) ->
{OK, Closed, Error} = Transport:messages(),
receive
%% Socket messages.
@@ -257,9 +252,8 @@ parse(Buffer, State=#state{in_state=#ps_body{}}) ->
%% @todo Don't parse if body is finished but request isn't. Let's not parallelize for now.
after_parse({request, Req=#{streamid := StreamID, headers := Headers, version := Version},
- State0=#state{handler=Handler, opts=Opts, streams=Streams0}, Buffer}) ->
- %% @todo Opts at the end. Maybe pass the same Opts we got?
- try Handler:init(StreamID, Req, Opts) of
+ State0=#state{opts=Opts, streams=Streams0}, Buffer}) ->
+ try cowboy_stream:init(StreamID, Req, Opts) of
{Commands, StreamState} ->
Streams = [#stream{id=StreamID, state=StreamState, version=Version}|Streams0],
State = case maybe_req_close(State0, Headers, Version) of
@@ -268,27 +262,27 @@ after_parse({request, Req=#{streamid := StreamID, headers := Headers, version :=
end,
parse(Buffer, commands(State, StreamID, Commands))
catch Class:Reason ->
- error_logger:error_msg("Exception occurred in ~s:init(~p, ~p, ~p) "
- "with reason ~p:~p.",
- [Handler, StreamID, Req, Opts, Class, Reason]),
- %% @todo Bad value returned here. Crashes.
- ok
+ error_logger:error_msg("Exception occurred in "
+ "cowboy_stream:init(~p, ~p, ~p) with reason ~p:~p.",
+ [StreamID, Req, Opts, Class, Reason]),
+ ok %% @todo send a proper response, etc. note that terminate must NOT be called
%% @todo Status code.
% stream_reset(State, StreamID, {internal_error, {Class, Reason},
% 'Exception occurred in StreamHandler:init/10 call.'}) %% @todo Check final arity.
end;
%% Streams are sequential so the body is always about the last stream created
%% unless that stream has terminated.
-after_parse({data, StreamID, IsFin, Data, State=#state{handler=Handler,
+after_parse({data, StreamID, IsFin, Data, State=#state{
streams=Streams0=[Stream=#stream{id=StreamID, state=StreamState0}|_]}, Buffer}) ->
- try Handler:data(StreamID, IsFin, Data, StreamState0) of
+ try cowboy_stream:data(StreamID, IsFin, Data, StreamState0) of
{Commands, StreamState} ->
Streams = lists:keyreplace(StreamID, #stream.id, Streams0,
Stream#stream{state=StreamState}),
parse(Buffer, commands(State#state{streams=Streams}, StreamID, Commands))
catch Class:Reason ->
- error_logger:error_msg("Exception occurred in ~s:data(~p, ~p, ~p, ~p) with reason ~p:~p.",
- [Handler, StreamID, IsFin, Data, StreamState0, Class, Reason]),
+ error_logger:error_msg("Exception occurred in "
+ "cowboy_stream:data(~p, ~p, ~p, ~p) with reason ~p:~p.",
+ [StreamID, IsFin, Data, StreamState0, Class, Reason]),
%% @todo Bad value returned here. Crashes.
ok
%% @todo
@@ -669,18 +663,18 @@ is_http2_upgrade(_, _) ->
%% Prior knowledge upgrade, without an HTTP/1.1 request.
http2_upgrade(State=#state{parent=Parent, ref=Ref, socket=Socket, transport=Transport,
- opts=Opts, handler=Handler, peer=Peer}, Buffer) ->
+ opts=Opts, peer=Peer}, Buffer) ->
case Transport:secure() of
false ->
_ = cancel_request_timeout(State),
- cowboy_http2:init(Parent, Ref, Socket, Transport, Opts, Handler, Peer, Buffer);
+ cowboy_http2:init(Parent, Ref, Socket, Transport, Opts, Peer, Buffer);
true ->
error_terminate(400, State, {connection_error, protocol_error,
'Clients that support HTTP/2 over TLS MUST use ALPN. (RFC7540 3.4)'})
end.
http2_upgrade(State=#state{parent=Parent, ref=Ref, socket=Socket, transport=Transport,
- opts=Opts, handler=Handler, peer=Peer}, Buffer, HTTP2Settings, Req) ->
+ opts=Opts, peer=Peer}, Buffer, HTTP2Settings, Req) ->
%% @todo
%% However if the client sent a body, we need to read the body in full
%% and if we can't do that, return a 413 response. Some options are in order.
@@ -695,7 +689,7 @@ http2_upgrade(State=#state{parent=Parent, ref=Ref, socket=Socket, transport=Tran
%% @todo Possibly redirect the request if it was https.
_ = cancel_request_timeout(State),
- cowboy_http2:init(Parent, Ref, Socket, Transport, Opts, Handler, Peer, Buffer, Settings, Req)
+ cowboy_http2:init(Parent, Ref, Socket, Transport, Opts, Peer, Buffer, Settings, Req)
catch _:_ ->
error_terminate(400, State, {connection_error, protocol_error,
'The HTTP2-Settings header contains a base64 SETTINGS payload. (RFC7540 3.2, RFC7540 3.2.1)'})
@@ -748,17 +742,18 @@ down(State=#state{children=Children0}, Pid, Msg) ->
State
end.
-info(State=#state{handler=Handler, streams=Streams0}, StreamID, Msg) ->
+info(State=#state{streams=Streams0}, StreamID, Msg) ->
case lists:keyfind(StreamID, #stream.id, Streams0) of
Stream = #stream{state=StreamState0} ->
- try Handler:info(StreamID, Msg, StreamState0) of
+ try cowboy_stream:info(StreamID, Msg, StreamState0) of
{Commands, StreamState} ->
Streams = lists:keyreplace(StreamID, #stream.id, Streams0,
Stream#stream{state=StreamState}),
commands(State#state{streams=Streams}, StreamID, Commands)
catch Class:Reason ->
- error_logger:error_msg("Exception occurred in ~s:info(~p, ~p, ~p) with reason ~p:~p.",
- [Handler, StreamID, Msg, StreamState0, Class, Reason]),
+ error_logger:error_msg("Exception occurred in "
+ "cowboy_stream:info(~p, ~p, ~p) with reason ~p:~p.",
+ [StreamID, Msg, StreamState0, Class, Reason]),
ok
%% @todo
% stream_reset(State, StreamID, {internal_error, {Class, Reason},
@@ -926,7 +921,7 @@ stream_reset(State, StreamID, StreamError={internal_error, _, _}) ->
% stream_terminate(State#state{out_state=done}, StreamID, StreamError).
stream_terminate(State, StreamID, StreamError).
-stream_terminate(State=#state{socket=Socket, transport=Transport, handler=Handler,
+stream_terminate(State=#state{socket=Socket, transport=Transport,
out_streamid=OutStreamID, out_state=OutState,
streams=Streams0, children=Children0}, StreamID, Reason) ->
{value, #stream{state=StreamState, version=Version}, Streams}
@@ -940,7 +935,7 @@ stream_terminate(State=#state{socket=Socket, transport=Transport, handler=Handle
ok
end,
- stream_call_terminate(StreamID, Reason, Handler, StreamState),
+ stream_call_terminate(StreamID, Reason, StreamState),
%% @todo initiate children shutdown
% Children = stream_terminate_children(Children0, StreamID, []),
Children = [case C of
@@ -964,13 +959,13 @@ stream_terminate(State=#state{socket=Socket, transport=Transport, handler=Handle
end.
%% @todo Taken directly from _http2
-stream_call_terminate(StreamID, Reason, Handler, StreamState) ->
+stream_call_terminate(StreamID, Reason, StreamState) ->
try
- Handler:terminate(StreamID, Reason, StreamState),
- ok
+ cowboy_stream:terminate(StreamID, Reason, StreamState)
catch Class:Reason ->
- error_logger:error_msg("Exception occurred in ~s:terminate(~p, ~p, ~p) with reason ~p:~p.",
- [Handler, StreamID, Reason, StreamState, Class, Reason])
+ error_logger:error_msg("Exception occurred in "
+ "cowboy_stream:terminate(~p, ~p, ~p) with reason ~p:~p.",
+ [StreamID, Reason, StreamState, Class, Reason])
end.
%stream_terminate_children([], _, Acc) ->