aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorLoïc Hoguin <[email protected]>2016-02-10 17:28:32 +0100
committerLoïc Hoguin <[email protected]>2016-03-05 20:20:42 +0100
commitb370442a6352c5acb13b88e135c32ca1720095bd (patch)
tree29485cb24f1b5208b5482358e2c659c9de39bdb3 /src
parentdbb636034f20736e16eb9d6c809217c9525b6cbd (diff)
downloadcowboy-b370442a6352c5acb13b88e135c32ca1720095bd.tar.gz
cowboy-b370442a6352c5acb13b88e135c32ca1720095bd.tar.bz2
cowboy-b370442a6352c5acb13b88e135c32ca1720095bd.zip
Initial commit with connection/streams
Breaking changes with previous commit. This is a very large change, and I am giving up on making a single commit that fixes everything. More commits will follow slowly adding back features, introducing new tests and fixing the documentation. This change contains most of the work toward unifying the interface for handling both HTTP/1.1 and HTTP/2. HTTP/1.1 connections are now no longer 1 process per connection; instead by default 1 process per request is also created. This has a number of pros and cons. Because it has cons, we also allow users to use a lower-level API that acts on "streams" (requests/responses) directly at the connection process-level. If performance is a concern, one can always write a stream handler. The performance in this case will be even greater than with Cowboy 1, although all the special handlers are unavailable. When switching to Websocket, after the handler returns from init/2, Cowboy stops the stream and the Websocket protocol takes over the connection process. Websocket then calls websocket_init/2 for any additional initialization such as timers, because the process is different in init/2 and websocket_*/* functions. This however would allow us to use websocket_init/2 for sending messages on connect, instead of sending ourselves a message and be subject to races. Note that websocket_init/2 is optional. This is all a big change and while most of the tests pass, some functionality currently doesn't. SPDY is broken and will be removed soon in favor of HTTP/2. Automatic compression is currently disabled. The cowboy_req interface probably still have a few functions that need to be updated. The docs and examples do not refer the current functionality anymore. Everything will be fixed over time. Feedback is more than welcome. Open a ticket!
Diffstat (limited to 'src')
-rw-r--r--src/cowboy.erl42
-rw-r--r--src/cowboy_clear.erl37
-rw-r--r--src/cowboy_handler.erl32
-rw-r--r--src/cowboy_http.erl973
-rw-r--r--src/cowboy_http2.erl53
-rw-r--r--src/cowboy_loop.erl87
-rw-r--r--src/cowboy_protocol.erl534
-rw-r--r--src/cowboy_req.erl690
-rw-r--r--src/cowboy_rest.erl16
-rw-r--r--src/cowboy_router.erl15
-rw-r--r--src/cowboy_static.erl9
-rw-r--r--src/cowboy_stream_h.erl128
-rw-r--r--src/cowboy_tls.erl1
-rw-r--r--src/cowboy_websocket.erl90
14 files changed, 1687 insertions, 1020 deletions
diff --git a/src/cowboy.erl b/src/cowboy.erl
index 1f0e2a9..67afd7b 100644
--- a/src/cowboy.erl
+++ b/src/cowboy.erl
@@ -14,9 +14,7 @@
-module(cowboy).
--export([start_http/4]).
--export([start_https/4]).
--export([start_spdy/4]).
+-export([start_clear/4]).
-export([start_tls/4]).
-export([stop_listener/1]).
-export([set_env/3]).
@@ -42,43 +40,27 @@
fun((http_status(), http_headers(), iodata(), Req) -> Req).
-export_type([onresponse_fun/0]).
--spec start_http(ranch:ref(), non_neg_integer(), ranch_tcp:opts(),
+-spec start_clear(ranch:ref(), non_neg_integer(), ranch_tcp:opts(),
cowboy_protocol:opts()) -> {ok, pid()} | {error, any()}.
-start_http(Ref, NbAcceptors, TransOpts, ProtoOpts)
+start_clear(Ref, NbAcceptors, TransOpts0, ProtoOpts)
when is_integer(NbAcceptors), NbAcceptors > 0 ->
- ranch:start_listener(Ref, NbAcceptors,
- ranch_tcp, TransOpts, cowboy_protocol, ProtoOpts).
-
--spec start_https(ranch:ref(), non_neg_integer(), ranch_ssl:opts(),
- cowboy_protocol:opts()) -> {ok, pid()} | {error, any()}.
-start_https(Ref, NbAcceptors, TransOpts, ProtoOpts)
- when is_integer(NbAcceptors), NbAcceptors > 0 ->
- ranch:start_listener(Ref, NbAcceptors,
- ranch_ssl, TransOpts, cowboy_protocol, ProtoOpts).
-
--spec start_spdy(ranch:ref(), non_neg_integer(), ranch_ssl:opts(),
- cowboy_spdy:opts()) -> {ok, pid()} | {error, any()}.
-start_spdy(Ref, NbAcceptors, TransOpts, ProtoOpts)
- when is_integer(NbAcceptors), NbAcceptors > 0 ->
- TransOpts2 = [
- {connection_type, supervisor},
- {next_protocols_advertised,
- [<<"spdy/3">>, <<"http/1.1">>, <<"http/1.0">>]}
- |TransOpts],
- ranch:start_listener(Ref, NbAcceptors,
- ranch_ssl, TransOpts2, cowboy_spdy, ProtoOpts).
+ TransOpts = TransOpts0,%[connection_type(ProtoOpts)|TransOpts0],
+ ranch:start_listener(Ref, NbAcceptors, ranch_tcp, TransOpts, cowboy_clear, ProtoOpts).
-spec start_tls(ranch:ref(), non_neg_integer(), ranch_ssl:opts(), opts()) -> {ok, pid()} | {error, any()}.
start_tls(Ref, NbAcceptors, TransOpts0, ProtoOpts)
when is_integer(NbAcceptors), NbAcceptors > 0 ->
- {_, Type} = maps:get(stream_handler, ProtoOpts, {cowboy_stream_h, supervisor}),
TransOpts = [
- {connection_type, Type},
+ connection_type(ProtoOpts),
{next_protocols_advertised, [<<"h2">>, <<"spdy/3">>, <<"http/1.1">>]},
{alpn_preferred_protocols, [<<"h2">>, <<"spdy/3">>, <<"http/1.1">>]}
|TransOpts0],
- ranch:start_listener(Ref, NbAcceptors,
- ranch_ssl, TransOpts, cowboy_tls, ProtoOpts).
+ ranch:start_listener(Ref, NbAcceptors, ranch_ssl, TransOpts, cowboy_tls, ProtoOpts).
+
+-spec connection_type(opts()) -> {connection_type, worker | supervisor}.
+connection_type(ProtoOpts) ->
+ {_, Type} = maps:get(stream_handler, ProtoOpts, {cowboy_stream_h, supervisor}),
+ {connection_type, Type}.
-spec stop_listener(ranch:ref()) -> ok | {error, not_found}.
stop_listener(Ref) ->
diff --git a/src/cowboy_clear.erl b/src/cowboy_clear.erl
new file mode 100644
index 0000000..cc6078e
--- /dev/null
+++ b/src/cowboy_clear.erl
@@ -0,0 +1,37 @@
+%% Copyright (c) 2016, Loïc Hoguin <[email protected]>
+%%
+%% Permission to use, copy, modify, and/or distribute this software for any
+%% purpose with or without fee is hereby granted, provided that the above
+%% copyright notice and this permission notice appear in all copies.
+%%
+%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+
+-module(cowboy_clear).
+-behavior(ranch_protocol).
+
+-export([start_link/4]).
+-export([init/5]).
+
+-spec start_link(ranch:ref(), inet:socket(), module(), cowboy:opts()) -> {ok, pid()}.
+start_link(Ref, Socket, Transport, Opts) ->
+ Pid = proc_lib:spawn_link(?MODULE, init, [self(), Ref, Socket, Transport, Opts]),
+ {ok, Pid}.
+
+-spec init(pid(), ranch:ref(), inet:socket(), module(), cowboy:opts()) -> ok.
+init(Parent, Ref, Socket, Transport, Opts) ->
+ ok = ranch:accept_ack(Ref),
+ init(Parent, Ref, Socket, Transport, Opts, cowboy_http).
+
+init(Parent, Ref, Socket, Transport, Opts, Protocol) ->
+ {Handler, Type} = maps:get(stream_handler, Opts, {cowboy_stream_h, supervisor}),
+ _ = case Type of
+ worker -> ok;
+ supervisor -> process_flag(trap_exit, true)
+ end,
+ Protocol:init(Parent, Ref, Socket, Transport, Opts, Handler).
diff --git a/src/cowboy_handler.erl b/src/cowboy_handler.erl
index 165888e..7791fe2 100644
--- a/src/cowboy_handler.erl
+++ b/src/cowboy_handler.erl
@@ -35,10 +35,8 @@
-spec execute(Req, Env) -> {ok, Req, Env}
when Req::cowboy_req:req(), Env::cowboy_middleware:env().
-execute(Req, Env) ->
- {_, Handler} = lists:keyfind(handler, 1, Env),
- {_, HandlerOpts} = lists:keyfind(handler_opts, 1, Env),
- try Handler:init(Req, HandlerOpts) of
+execute(Req, Env=#{handler := Handler, handler_opts := HandlerOpts}) ->
+ case Handler:init(Req, HandlerOpts) of
{ok, Req2, State} ->
Result = terminate(normal, Req2, State, Handler),
{ok, Req2, [{result, Result}|Env]};
@@ -50,37 +48,13 @@ execute(Req, Env) ->
Mod:upgrade(Req2, Env, Handler, State, Timeout, run);
{Mod, Req2, State, Timeout, hibernate} ->
Mod:upgrade(Req2, Env, Handler, State, Timeout, hibernate)
- catch Class:Reason ->
- Stacktrace = erlang:get_stacktrace(),
- cowboy_req:maybe_reply(Stacktrace, Req),
- terminate({crash, Class, Reason}, Req, HandlerOpts, Handler),
- exit({cowboy_handler, [
- {class, Class},
- {reason, Reason},
- {mfa, {Handler, init, 2}},
- {stacktrace, Stacktrace},
- {req, cowboy_req:to_list(Req)},
- {opts, HandlerOpts}
- ]})
end.
-spec terminate(any(), Req, any(), module()) -> ok when Req::cowboy_req:req().
terminate(Reason, Req, State, Handler) ->
case erlang:function_exported(Handler, terminate, 3) of
true ->
- try
- Handler:terminate(Reason, cowboy_req:lock(Req), State)
- catch Class:Reason2 ->
- exit({cowboy_handler, [
- {class, Class},
- {reason, Reason2},
- {mfa, {Handler, terminate, 3}},
- {stacktrace, erlang:get_stacktrace()},
- {req, cowboy_req:to_list(Req)},
- {state, State},
- {terminate_reason, Reason}
- ]})
- end;
+ Handler:terminate(Reason, cowboy_req:lock(Req), State);
false ->
ok
end.
diff --git a/src/cowboy_http.erl b/src/cowboy_http.erl
new file mode 100644
index 0000000..3ec3c17
--- /dev/null
+++ b/src/cowboy_http.erl
@@ -0,0 +1,973 @@
+%% Copyright (c) 2016, Loïc Hoguin <[email protected]>
+%%
+%% Permission to use, copy, modify, and/or distribute this software for any
+%% purpose with or without fee is hereby granted, provided that the above
+%% copyright notice and this permission notice appear in all copies.
+%%
+%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+
+-module(cowboy_http).
+
+-export([init/6]).
+
+-export([system_continue/3]).
+-export([system_terminate/4]).
+-export([system_code_change/4]).
+
+%% @todo map
+-type opts() :: [{compress, boolean()}
+ | {env, cowboy_middleware:env()}
+ | {max_empty_lines, non_neg_integer()}
+ | {max_header_name_length, non_neg_integer()}
+ | {max_header_value_length, non_neg_integer()}
+ | {max_headers, non_neg_integer()}
+ | {max_keepalive, non_neg_integer()}
+ | {max_request_line_length, non_neg_integer()}
+ | {middlewares, [module()]}
+ | {onresponse, cowboy:onresponse_fun()}
+ | {timeout, timeout()}].
+-export_type([opts/0]).
+
+-record(ps_request_line, {
+ empty_lines = 0 :: non_neg_integer()
+}).
+
+-record(ps_header, {
+ method = undefined :: binary(),
+ path = undefined :: binary(),
+ qs = undefined :: binary(),
+ version = undefined :: cowboy:http_version(),
+ headers = undefined :: map() | undefined, %% @todo better type than map()
+ name = undefined :: binary()
+}).
+
+%% @todo We need a state where we wait for the stream process to ask for the body.
+%% OR DO WE
+
+%% In HTTP/2 we start receiving data before the body asks for it, even if optionally
+%% (and by default), so we need to be able to do the same for HTTP/1.1 too. This means
+%% that when we receive data (up to a certain limit, we read from the socket and decode.
+%% When we reach a limit, we stop reading from the socket momentarily until the stream
+%% process asks for more or the stream ends.
+
+%% This means that we need to keep a buffer in the stream handler (until the stream
+%% process asks for it). And that we need the body state to indicate how much we have
+%% left to read (and stop/start reading from the socket depending on value).
+
+-record(ps_body, {
+ %% @todo flow
+ transfer_decode_fun :: fun(), %% @todo better type
+ transfer_decode_state :: any() %% @todo better type
+}).
+
+-record(stream, {
+ %% Stream identifier.
+ id = undefined :: cowboy_stream:streamid(),
+
+ %% Stream handler state.
+ state = undefined :: any(),
+
+ %% Client HTTP version for this stream.
+ version = undefined :: cowboy:http_version(),
+
+ %% Commands queued.
+ queue = [] :: [] %% @todo better type
+}).
+
+-type stream() :: #stream{}.
+
+-record(state, {
+ parent :: pid(),
+ ref :: ranch:ref(),
+ socket :: inet:socket(),
+ transport :: module(),
+ opts = #{} :: map(),
+ handler :: module(),
+
+ timer = undefined :: undefined | reference(),
+
+ %% Identifier for the stream currently being read (or waiting to be received).
+ in_streamid = 1 :: pos_integer(),
+
+ %% Parsing state for the current stream or stream-to-be.
+ in_state = #ps_request_line{} :: #ps_request_line{} | #ps_header{} | #ps_body{},
+
+ %% Identifier for the stream currently being written.
+ %% Note that out_streamid =< in_streamid.
+ out_streamid = 1 :: pos_integer(),
+
+ %% Whether we finished writing data for the current stream.
+ out_state = wait :: wait | headers | chunked,
+
+ %% The connection will be closed after this stream.
+ last_streamid = undefined :: pos_integer(),
+
+ %% Currently active HTTP/1.1 streams. Streams may be initiated either
+ %% by the client or by the server through PUSH_PROMISE frames.
+ streams = [] :: [stream()],
+
+ %% Children which are in the process of shutting down.
+ children = [] :: [{pid(), cowboy_stream:streamid(), timeout()}]
+
+ %% @todo Automatic compression. (compress option?)
+ %% @todo onresponse? Equivalent using streams.
+}).
+
+-include_lib("cowlib/include/cow_inline.hrl").
+-include_lib("cowlib/include/cow_parse.hrl").
+
+-spec init(pid(), ranch:ref(), inet:socket(), module(), cowboy:opts(), module()) -> ok.
+init(Parent, Ref, Socket, Transport, Opts, Handler) ->
+ LastStreamID = maps:get(max_keepalive, Opts, 100),
+ before_loop(set_request_timeout(#state{
+ parent=Parent, ref=Ref, socket=Socket,
+ transport=Transport, opts=Opts, handler=Handler,
+ last_streamid=LastStreamID}), <<>>).
+
+%% @todo Send a response depending on in_state and whether one was already sent.
+
+%% @todo
+%% Timeouts:
+%% - waiting for new request (if no stream is currently running)
+%% -> request_timeout: for whole request/headers, set at init/when we set ps_request_line{} state
+%% - waiting for body (if a stream requested the body to be read)
+%% -> read_body_timeout: amount of time we wait without receiving any data when reading the body
+%% - if we skip the body, skip only for a specific duration
+%% -> skip_body_timeout: also have a skip_body_length
+%% - none if we have a stream running and it didn't request the body to be read
+%% - global
+%% -> inactivity_timeout: max time to wait without anything happening before giving up
+
+before_loop(State=#state{socket=Socket, transport=Transport}, Buffer) ->
+ %% @todo disable this when we get to the body, until the stream asks for it?
+ %% Perhaps have a threshold for how much we're willing to read before waiting.
+ Transport:setopts(Socket, [{active, once}]),
+ loop(State, Buffer).
+
+loop(State=#state{parent=Parent, socket=Socket, transport=Transport,
+ handler=_Handler, timer=TimerRef, children=Children}, Buffer) ->
+ {OK, Closed, Error} = Transport:messages(),
+ receive
+ %% Socket messages.
+ {OK, Socket, Data} ->
+ parse(<< Buffer/binary, Data/binary >>, State);
+ {Closed, Socket} ->
+ terminate(State, {socket_error, closed, 'The socket has been closed.'});
+ {Error, Socket, Reason} ->
+ terminate(State, {socket_error, Reason, 'An error has occurred on the socket.'});
+ %% Timeouts.
+ {timeout, TimerRef, Reason} ->
+ timeout(State, Reason);
+ {timeout, _, _} ->
+ loop(State, Buffer);
+ %% System messages.
+ {'EXIT', Parent, Reason} ->
+ exit(Reason);
+ {system, From, Request} ->
+ sys:handle_system_msg(Request, From, Parent, ?MODULE, [], {State, Buffer});
+ %% Messages pertaining to a stream.
+ {{Pid, StreamID}, Msg} when Pid =:= self() ->
+ loop(info(State, StreamID, Msg), Buffer);
+ %% Exit signal from children.
+ Msg = {'EXIT', Pid, _} ->
+ loop(down(State, Pid, Msg), Buffer);
+ %% Calls from supervisor module.
+ {'$gen_call', {From, Tag}, which_children} ->
+ Workers = [{?MODULE, Pid, worker, [?MODULE]} || {Pid, _, _} <- Children],
+ From ! {Tag, Workers},
+ loop(State, Buffer);
+ {'$gen_call', {From, Tag}, count_children} ->
+ NbChildren = length(Children),
+ Counts = [{specs, 1}, {active, NbChildren},
+ {supervisors, 0}, {workers, NbChildren}],
+ From ! {Tag, Counts},
+ loop(State, Buffer);
+ {'$gen_call', {From, Tag}, _} ->
+ From ! {Tag, {error, ?MODULE}},
+ loop(State, Buffer);
+ %% Unknown messages.
+ Msg ->
+ error_logger:error_msg("Received stray message ~p.", [Msg]),
+ loop(State, Buffer)
+ %% @todo Configurable timeout. This should be a global inactivity timeout
+ %% that triggers when really nothing happens (ie something went really wrong).
+ after 300000 ->
+ terminate(State, {internal_error, timeout, 'No message or data received before timeout.'})
+ end.
+
+set_request_timeout(State0=#state{timer=TimerRef0, opts=Opts}) ->
+ State = cancel_request_timeout(State0),
+ Timeout = maps:get(request_timeout, Opts, 5000),
+ TimerRef = erlang:start_timer(Timeout, self(), request_timeout),
+ State#state{timer=TimerRef}.
+
+cancel_request_timeout(State=#state{timer=TimerRef, opts=Opts}) ->
+ ok = case TimerRef of
+ undefined -> ok;
+ _ -> erlang:cancel_timer(TimerRef, [{async, true}, {info, false}])
+ end,
+ State#state{timer=undefined}.
+
+%% @todo Honestly it would be much better if we didn't enable pipelining yet.
+timeout(State=#state{in_state=#ps_request_line{}}, request_timeout) ->
+ %% @todo If other streams are running, just set the connection to be closed
+ %% and stop trying to read from the socket?
+ terminate(State, {connection_error, timeout, 'No request-line received before timeout.'});
+timeout(State=#state{socket=Socket, transport=Transport, in_state=#ps_header{}}, request_timeout) ->
+ %% @todo If other streams are running, maybe wait for their reply before sending 408?
+ %% -> Definitely. Either way, stop reading from the socket and make that stream the last.
+ Transport:send(Socket, cow_http:response(408, 'HTTP/1.1', [])),
+ terminate(State, {connection_error, timeout, 'Request headers not received before timeout.'}).
+
+%% Request-line.
+parse(<<>>, State) ->
+ before_loop(State, <<>>);
+parse(Buffer, State=#state{in_state=#ps_request_line{empty_lines=EmptyLines}}) ->
+ after_parse(parse_request(Buffer, State, EmptyLines));
+parse(Buffer, State=#state{in_state=PS=#ps_header{headers=Headers, name=undefined}}) ->
+ after_parse(parse_header(Buffer,
+ State#state{in_state=PS#ps_header{headers=undefined}},
+ Headers));
+parse(Buffer, State=#state{in_state=PS=#ps_header{headers=Headers, name=Name}}) ->
+ after_parse(parse_hd_before_value(Buffer,
+ State#state{in_state=PS#ps_header{headers=undefined, name=undefined}},
+ Headers, Name));
+parse(Buffer, State=#state{in_state=#ps_body{}}) ->
+ %% @todo We do not want to get the body automatically if the request doesn't ask for it.
+ %% We may want to get bodies that are below a threshold without waiting, and buffer them
+ %% until the request asks, though.
+
+ %% @todo Transfer-decoding must be done here.
+ after_parse(parse_body(Buffer, State)).
+%% @todo Don't parse if body is finished but request isn't. Let's not parallelize for now.
+
+after_parse({request, Req=#{streamid := StreamID, headers := Headers, version := Version},
+ State0=#state{handler=Handler, opts=Opts, streams=Streams0}, Buffer}) ->
+ %% @todo Opts at the end. Maybe pass the same Opts we got?
+ try Handler:init(StreamID, Req, Opts) of
+ {Commands, StreamState} ->
+ Streams = [#stream{id=StreamID, state=StreamState, version=Version}|Streams0],
+ State = case maybe_req_close(State0, Headers, Version) of
+ close -> State0#state{streams=Streams, last_streamid=StreamID};
+ keepalive -> State0#state{streams=Streams}
+ end,
+ parse(Buffer, commands(State, StreamID, Commands))
+ catch Class:Reason ->
+ error_logger:error_msg("Exception occurred in ~s:init(~p, ~p, ~p) "
+ "with reason ~p:~p.",
+ [Handler, StreamID, Req, Opts, Class, Reason]),
+ ok
+ %% @todo Status code.
+% stream_reset(State, StreamID, {internal_error, {Class, Reason},
+% 'Exception occurred in StreamHandler:init/10 call.'}) %% @todo Check final arity.
+ end;
+%% Streams are sequential so the body is always about the last stream created
+%% unless that stream has terminated.
+after_parse({data, StreamID, IsFin, Data, State=#state{handler=Handler,
+ streams=Streams0=[Stream=#stream{id=StreamID, state=StreamState0}|_]}, Buffer}) ->
+ try Handler:data(StreamID, IsFin, Data, StreamState0) of
+ {Commands, StreamState} ->
+ Streams = lists:keyreplace(StreamID, #stream.id, Streams0,
+ Stream#stream{state=StreamState}),
+ parse(Buffer, commands(State#state{streams=Streams}, StreamID, Commands))
+ catch Class:Reason ->
+ error_logger:error_msg("Exception occurred in ~s:data(~p, ~p, ~p, ~p) with reason ~p:~p.",
+ [Handler, StreamID, IsFin, Data, StreamState0, Class, Reason]),
+ ok
+ %% @todo
+% stream_reset(State, StreamID, {internal_error, {Class, Reason},
+% 'Exception occurred in StreamHandler:data/4 call.'})
+ end;
+%% No corresponding stream, skip.
+after_parse({data, _, _, _, State, Buffer}) ->
+ before_loop(State, Buffer);
+after_parse({more, State, Buffer}) ->
+ before_loop(State, Buffer).
+
+%% Request-line.
+
+-spec parse_request(binary(), #state{}, non_neg_integer()) -> ok.
+%% Empty lines must be using \r\n.
+parse_request(<< $\n, _/bits >>, State, _) ->
+ error_terminate(400, State, {connection_error, protocol_error,
+ ''}); %% @todo
+parse_request(<< $\s, _/bits >>, State, _) ->
+ error_terminate(400, State, {connection_error, protocol_error,
+ ''}); %% @todo
+%% We limit the length of the Request-line to MaxLength to avoid endlessly
+%% reading from the socket and eventually crashing.
+parse_request(Buffer, State=#state{opts=Opts}, EmptyLines) ->
+ MaxLength = maps:get(max_request_line_length, Opts, 8000),
+ MaxEmptyLines = maps:get(max_empty_lines, Opts, 5),
+ case match_eol(Buffer, 0) of
+ nomatch when byte_size(Buffer) > MaxLength ->
+ error_terminate(414, State, {connection_error, limit_reached,
+ ''}); %% @todo
+ nomatch ->
+ {more, State#state{in_state=#ps_request_line{empty_lines=EmptyLines}}, Buffer};
+ 1 when EmptyLines =:= MaxEmptyLines ->
+ error_terminate(400, State, {connection_error, limit_reached,
+ ''}); %% @todo
+ 1 ->
+ << _:16, Rest/bits >> = Buffer,
+ parse_request(Rest, State, EmptyLines + 1);
+ _ ->
+ case Buffer of
+ %% @todo * is only for server-wide OPTIONS request (RFC7230 5.3.4); tests
+ << "OPTIONS * ", Rest/bits >> ->
+ parse_version(Rest, State, <<"OPTIONS">>, <<"*">>, <<>>);
+% << "CONNECT ", Rest/bits >> ->
+% parse_authority( %% @todo
+ _ ->
+ parse_method(Buffer, State, <<>>,
+ maps:get(max_method_length, Opts, 32))
+ end
+ end.
+
+match_eol(<< $\n, _/bits >>, N) ->
+ N;
+match_eol(<< _, Rest/bits >>, N) ->
+ match_eol(Rest, N + 1);
+match_eol(_, _) ->
+ nomatch.
+
+parse_method(_, State, _, 0) ->
+ error_terminate(501, State, {connection_error, limit_reached,
+ 'The method name is longer than configuration allows. (RFC7230 3.1.1)'});
+parse_method(<< C, Rest/bits >>, State, SoFar, Remaining) ->
+ case C of
+ $\r -> error_terminate(400, State, {connection_error, protocol_error,
+ ''}); %% @todo
+ $\s -> parse_uri(Rest, State, SoFar);
+ _ when ?IS_TOKEN(C) -> parse_method(Rest, State, << SoFar/binary, C >>, Remaining - 1);
+ _ -> error_terminate(400, State, {connection_error, protocol_error,
+ 'The method name must contain only valid token characters. (RFC7230 3.1.1)'})
+ end.
+
+parse_uri(<< H, T, T, P, "://", Rest/bits >>, State, Method)
+ when H =:= $h orelse H =:= $H, T =:= $t orelse T =:= $T;
+ P =:= $p orelse P =:= $P ->
+ parse_uri_skip_host(Rest, State, Method);
+parse_uri(<< H, T, T, P, S, "://", Rest/bits >>, State, Method)
+ when H =:= $h orelse H =:= $H, T =:= $t orelse T =:= $T;
+ P =:= $p orelse P =:= $P; S =:= $s orelse S =:= $S ->
+ parse_uri_skip_host(Rest, State, Method);
+parse_uri(<< $/, Rest/bits >>, State, Method) ->
+ parse_uri_path(Rest, State, Method, << $/ >>);
+parse_uri(_, State, _) ->
+ error_terminate(400, State, {connection_error, protocol_error,
+ 'Invalid request-line or request-target. (RFC7230 3.1.1, RFC7230 5.3)'}).
+
+parse_uri_skip_host(<< C, Rest/bits >>, State, Method) ->
+ case C of
+ $\r -> error_terminate(400, State, {connection_error, protocol_error,
+ ''}); %% @todo
+ $/ -> parse_uri_path(Rest, State, Method, <<"/">>);
+ $\s -> parse_version(Rest, State, Method, <<"/">>, <<>>);
+ $? -> parse_uri_query(Rest, State, Method, <<"/">>, <<>>);
+ $# -> skip_uri_fragment(Rest, State, Method, <<"/">>, <<>>);
+ _ -> parse_uri_skip_host(Rest, State, Method)
+ end.
+
+parse_uri_path(<< C, Rest/bits >>, State, Method, SoFar) ->
+ case C of
+ $\r -> error_terminate(400, State, {connection_error, protocol_error,
+ ''}); %% @todo
+ $\s -> parse_version(Rest, State, Method, SoFar, <<>>);
+ $? -> parse_uri_query(Rest, State, Method, SoFar, <<>>);
+ $# -> skip_uri_fragment(Rest, State, Method, SoFar, <<>>);
+ _ -> parse_uri_path(Rest, State, Method, << SoFar/binary, C >>)
+ end.
+
+parse_uri_query(<< C, Rest/bits >>, State, M, P, SoFar) ->
+ case C of
+ $\r -> error_terminate(400, State, {connection_error, protocol_error,
+ ''}); %% @todo
+ $\s -> parse_version(Rest, State, M, P, SoFar);
+ $# -> skip_uri_fragment(Rest, State, M, P, SoFar);
+ _ -> parse_uri_query(Rest, State, M, P, << SoFar/binary, C >>)
+ end.
+
+skip_uri_fragment(<< C, Rest/bits >>, State, M, P, Q) ->
+ case C of
+ $\r -> error_terminate(400, State, {connection_error, protocol_error,
+ ''}); %% @todo
+ $\s -> parse_version(Rest, State, M, P, Q);
+ _ -> skip_uri_fragment(Rest, State, M, P, Q)
+ end.
+
+%% @todo Calls to parse_header should update the state.
+parse_version(<< "HTTP/1.1\r\n", Rest/bits >>, State, M, P, Q) ->
+ parse_headers(Rest, State, M, P, Q, 'HTTP/1.1');
+parse_version(<< "HTTP/1.0\r\n", Rest/bits >>, State, M, P, Q) ->
+ parse_headers(Rest, State, M, P, Q, 'HTTP/1.0');
+parse_version(<< "HTTP/1.", _, C, _/bits >>, State, _, _, _) when C =:= $\s; C =:= $\t ->
+ error_terminate(400, State, {connection_error, protocol_error,
+ 'Whitespace is not allowed after the HTTP version. (RFC7230 3.1.1)'});
+parse_version(<< C, _/bits >>, State, _, _, _) when C =:= $\s; C =:= $\t ->
+ error_terminate(400, State, {connection_error, protocol_error,
+ 'The separator between request target and version must be a single SP.'});
+parse_version(_, State, _, _, _) ->
+ error_terminate(505, State, {connection_error, protocol_error,
+ ''}). %% @todo
+
+parse_headers(Rest, State, M, P, Q, V) ->
+ %% @todo Figure out the parse states.
+ parse_header(Rest, State#state{in_state=#ps_header{
+ method=M, path=P, qs=Q, version=V}}, #{}).
+
+%% Headers.
+
+%% We need two or more bytes in the buffer to continue.
+parse_header(Rest, State=#state{in_state=PS}, Headers) when byte_size(Rest) < 2 ->
+ {more, State#state{in_state=PS#ps_header{headers=Headers}}, Rest};
+parse_header(<< $\r, $\n, Rest/bits >>, S, Headers) ->
+ request(Rest, S, Headers);
+parse_header(Buffer, State=#state{opts=Opts, in_state=PS}, Headers) ->
+ MaxLength = maps:get(max_header_name_length, Opts, 64),
+ MaxHeaders = maps:get(max_headers, Opts, 100),
+ case match_colon(Buffer, 0) of
+ nomatch when byte_size(Buffer) > MaxLength ->
+ error_terminate(400, State, {connection_error, limit_reached,
+ ''}); %% @todo
+ nomatch when length(Headers) >= MaxHeaders ->
+ error_terminate(400, State, {connection_error, limit_reached,
+ ''}); %% @todo
+ nomatch ->
+ {more, State#state{in_state=PS#ps_header{headers=Headers}}, Buffer};
+ _ ->
+ parse_hd_name(Buffer, State, Headers, <<>>)
+ end.
+
+match_colon(<< $:, _/bits >>, N) ->
+ N;
+match_colon(<< _, Rest/bits >>, N) ->
+ match_colon(Rest, N + 1);
+match_colon(_, _) ->
+ nomatch.
+
+parse_hd_name(<< $:, Rest/bits >>, State, H, SoFar) ->
+ parse_hd_before_value(Rest, State, H, SoFar);
+parse_hd_name(<< C, _/bits >>, State, _, <<>>) when ?IS_WS(C) ->
+ error_terminate(400, State, {connection_error, protocol_error,
+ ''}); %% @todo
+parse_hd_name(<< C, Rest/bits >>, State, H, SoFar) when ?IS_WS(C) ->
+ parse_hd_name_ws(Rest, State, H, SoFar);
+parse_hd_name(<< C, Rest/bits >>, State, H, SoFar) ->
+ ?LOWER(parse_hd_name, Rest, State, H, SoFar).
+
+parse_hd_name_ws(<< C, Rest/bits >>, S, H, Name) ->
+ case C of
+ $\s -> parse_hd_name_ws(Rest, S, H, Name);
+ $\t -> parse_hd_name_ws(Rest, S, H, Name);
+ $: -> parse_hd_before_value(Rest, S, H, Name)
+ end.
+
+parse_hd_before_value(<< $\s, Rest/bits >>, S, H, N) ->
+ parse_hd_before_value(Rest, S, H, N);
+parse_hd_before_value(<< $\t, Rest/bits >>, S, H, N) ->
+ parse_hd_before_value(Rest, S, H, N);
+parse_hd_before_value(Buffer, State=#state{opts=Opts, in_state=PS}, H, N) ->
+ MaxLength = maps:get(max_header_value_length, Opts, 4096),
+ case match_eol(Buffer, 0) of
+ nomatch when byte_size(Buffer) > MaxLength ->
+ error_terminate(400, State, {connection_error, limit_reached,
+ ''}); %% @todo
+ nomatch ->
+ {more, State#state{in_state=PS#ps_header{headers=H, name=N}}, Buffer};
+ _ ->
+ parse_hd_value(Buffer, State, H, N, <<>>)
+ end.
+
+parse_hd_value(<< $\r, $\n, Rest/bits >>, S, Headers, Name, SoFar) ->
+ %% @todo What to do about duplicate header names.
+ parse_header(Rest, S, Headers#{Name => clean_value_ws_end(SoFar, byte_size(SoFar) - 1)});
+parse_hd_value(<< C, Rest/bits >>, S, H, N, SoFar) ->
+ parse_hd_value(Rest, S, H, N, << SoFar/binary, C >>).
+
+clean_value_ws_end(_, -1) ->
+ <<>>;
+clean_value_ws_end(Value, N) ->
+ case binary:at(Value, N) of
+ $\s -> clean_value_ws_end(Value, N - 1);
+ $\t -> clean_value_ws_end(Value, N - 1);
+ _ ->
+ S = N + 1,
+ << Value2:S/binary, _/bits >> = Value,
+ Value2
+ end.
+
+-ifdef(TEST).
+clean_value_ws_end_test_() ->
+ Tests = [
+ {<<>>, <<>>},
+ {<<" ">>, <<>>},
+ {<<"text/*;q=0.3, text/html;q=0.7, text/html;level=1, "
+ "text/html;level=2;q=0.4, */*;q=0.5 \t \t ">>,
+ <<"text/*;q=0.3, text/html;q=0.7, text/html;level=1, "
+ "text/html;level=2;q=0.4, */*;q=0.5">>}
+ ],
+ [{V, fun() -> R = clean_value_ws_end(V, byte_size(V) - 1) end} || {V, R} <- Tests].
+
+horse_clean_value_ws_end() ->
+ horse:repeat(200000,
+ clean_value_ws_end(
+ <<"text/*;q=0.3, text/html;q=0.7, text/html;level=1, "
+ "text/html;level=2;q=0.4, */*;q=0.5 ">>,
+ byte_size(<<"text/*;q=0.3, text/html;q=0.7, text/html;level=1, "
+ "text/html;level=2;q=0.4, */*;q=0.5 ">>) - 1)
+ ).
+-endif.
+
+request(Buffer, State=#state{transport=Transport, in_streamid=StreamID,
+ in_state=#ps_header{version=Version}}, Headers) ->
+ case maps:get(<<"host">>, Headers, undefined) of
+ undefined when Version =:= 'HTTP/1.1' ->
+ %% @todo Might want to not close the connection on this and next one.
+ error_terminate(400, State, {stream_error, StreamID, protocol_error,
+ ''}); %% @todo
+ undefined ->
+ request(Buffer, State, Headers, <<>>, default_port(Transport:secure()));
+ RawHost ->
+ try parse_host(RawHost, false, <<>>) of
+ {Host, undefined} ->
+ request(Buffer, State, Headers, Host, default_port(Transport:secure()));
+ {Host, Port} ->
+ request(Buffer, State, Headers, Host, Port)
+ catch _:_ ->
+ error_terminate(400, State, {stream_error, StreamID, protocol_error,
+ ''}) %% @todo
+ end
+ end.
+
+-spec default_port(boolean()) -> 80 | 443.
+default_port(true) -> 443;
+default_port(_) -> 80.
+
+%% @todo Yeah probably just call the cowlib function.
+%% Same code as cow_http:parse_fullhost/1, but inline because we
+%% really want this to go fast.
+parse_host(<< $[, Rest/bits >>, false, <<>>) ->
+ parse_host(Rest, true, << $[ >>);
+parse_host(<<>>, false, Acc) ->
+ {Acc, undefined};
+parse_host(<< $:, Rest/bits >>, false, Acc) ->
+ {Acc, list_to_integer(binary_to_list(Rest))};
+parse_host(<< $], Rest/bits >>, true, Acc) ->
+ parse_host(Rest, false, << Acc/binary, $] >>);
+parse_host(<< C, Rest/bits >>, E, Acc) ->
+ ?LOWER(parse_host, Rest, E, Acc).
+
+%% End of request parsing.
+
+%% @todo We used to get the peername here, bad idea, should
+%% get it at the very start of the connection, or the first
+%% time requested if we go the route of handler sending a
+%% message to get it (we probably shouldn't).
+request(Buffer, State0=#state{ref=Ref, transport=Transport, in_streamid=StreamID,
+ in_state=#ps_header{method=Method, path=Path, qs=Qs, version=Version}},
+ Headers, Host, Port) ->
+ Scheme = case Transport:secure() of
+ true -> <<"https">>;
+ false -> <<"http">>
+ end,
+ {HasBody, BodyLength, TDecodeFun, TDecodeState} = case Headers of
+ #{<<"content-length">> := <<"0">>} ->
+ {false, 0, undefined, undefined};
+ #{<<"content-length">> := BinLength} ->
+ Length = try
+ cow_http_hd:parse_content_length(BinLength)
+ catch _:_ ->
+ error_terminate(400, State0, {stream_error, StreamID, protocol_error,
+ ''}) %% @todo
+ %% @todo Err should terminate here...
+ end,
+ {true, Length, fun cow_http_te:stream_identity/2, {0, Length}};
+ %% @todo Better handling of transfer decoding.
+ #{<<"transfer-encoding">> := <<"chunked">>} ->
+ {true, undefined, fun cow_http_te:stream_chunked/2, {0, 0}};
+ _ ->
+ {false, 0, undefined, undefined}
+ end,
+ Req = #{
+ ref => Ref,
+ pid => self(),
+ streamid => StreamID,
+
+ %% @todo peer
+ %% @todo sockname
+ %% @todo ssl client cert?
+
+ method => Method,
+ scheme => Scheme,
+ host => Host,
+ %% host_info (cowboy_router)
+ port => Port,
+ path => Path,
+ %% path_info (cowboy_router)
+ %% bindings (cowboy_router)
+ qs => Qs,
+ version => Version,
+ %% We are transparently taking care of transfer-encodings so
+ %% the user code has no need to know about it.
+ headers => maps:remove(<<"transfer-encoding">>, Headers),
+
+ has_body => HasBody,
+ body_length => BodyLength
+ %% @todo multipart? keep state separate
+
+ %% meta values (cowboy_websocket, cowboy_rest)
+ },
+ State = case HasBody of
+ true ->
+ cancel_request_timeout(State0#state{in_state=#ps_body{
+ %% @todo Don't need length anymore?
+ transfer_decode_fun = TDecodeFun,
+ transfer_decode_state = TDecodeState
+ }});
+ false ->
+ set_request_timeout(State0#state{in_streamid=StreamID + 1, in_state=#ps_request_line{}})
+ end,
+ {request, Req, State, Buffer}.
+
+%% Request body parsing.
+
+parse_body(Buffer, State=#state{in_streamid=StreamID, in_state=
+ PS=#ps_body{transfer_decode_fun=TDecode, transfer_decode_state=TState0}}) ->
+ %% @todo Proper trailers.
+ case TDecode(Buffer, TState0) of
+ more ->
+ %% @todo Asks for 0 or more bytes.
+ {more, State, Buffer};
+ {more, Data, TState} ->
+ %% @todo Asks for 0 or more bytes.
+ {data, StreamID, nofin, Data, State#state{in_state=
+ PS#ps_body{transfer_decode_state=TState}}, <<>>};
+ {more, Data, _Length, TState} when is_integer(_Length) ->
+ %% @todo Asks for Length more bytes.
+ {data, StreamID, nofin, Data, State#state{in_state=
+ PS#ps_body{transfer_decode_state=TState}}, <<>>};
+ {more, Data, Rest, TState} ->
+ %% @todo Asks for 0 or more bytes.
+ {data, StreamID, nofin, Data, State#state{in_state=
+ PS#ps_body{transfer_decode_state=TState}}, Rest};
+ {done, TotalLength, Rest} ->
+ {data, StreamID, {fin, TotalLength}, <<>>, set_request_timeout(
+ State#state{in_streamid=StreamID + 1, in_state=#ps_request_line{}}), Rest};
+ {done, Data, TotalLength, Rest} ->
+ {data, StreamID, {fin, TotalLength}, Data, set_request_timeout(
+ State#state{in_streamid=StreamID + 1, in_state=#ps_request_line{}}), Rest}
+ end.
+
+%% Message handling.
+
+down(State=#state{children=Children0}, Pid, Msg) ->
+ case lists:keytake(Pid, 1, Children0) of
+ {value, {_, undefined, _}, Children} ->
+ State#state{children=Children};
+ {value, {_, StreamID, _}, Children} ->
+ info(State#state{children=Children}, StreamID, Msg);
+ false ->
+ error_logger:error_msg("Received EXIT signal ~p for unknown process ~p.", [Msg, Pid]),
+ State
+ end.
+
+info(State=#state{handler=Handler, streams=Streams0}, StreamID, Msg) ->
+ case lists:keyfind(StreamID, #stream.id, Streams0) of
+ Stream = #stream{state=StreamState0} ->
+ try Handler:info(StreamID, Msg, StreamState0) of
+ {Commands, StreamState} ->
+ Streams = lists:keyreplace(StreamID, #stream.id, Streams0,
+ Stream#stream{state=StreamState}),
+ commands(State#state{streams=Streams}, StreamID, Commands)
+ catch Class:Reason ->
+ error_logger:error_msg("Exception occurred in ~s:info(~p, ~p, ~p) with reason ~p:~p.",
+ [Handler, StreamID, Msg, StreamState0, Class, Reason]),
+ ok
+%% @todo
+% stream_reset(State, StreamID, {internal_error, {Class, Reason},
+% 'Exception occurred in StreamHandler:info/3 call.'})
+ end;
+ false ->
+ error_logger:error_msg("Received message ~p for unknown stream ~p.", [Msg, StreamID]),
+ State
+ end.
+
+%% @todo commands/3
+%% @todo stream_reset
+
+
+
+
+%% Commands.
+
+commands(State, _, []) ->
+ State;
+%% Supervise a child process.
+commands(State=#state{children=Children}, StreamID, [{spawn, Pid, Shutdown}|Tail]) ->
+ commands(State#state{children=[{Pid, StreamID, Shutdown}|Children]}, StreamID, Tail);
+%% Error handling.
+commands(State, StreamID, [Error = {internal_error, _, _}|Tail]) ->
+ commands(stream_reset(State, StreamID, Error), StreamID, Tail);
+%% Commands for a stream currently inactive.
+commands(State=#state{out_streamid=Current, streams=Streams0}, StreamID, Commands)
+ when Current =/= StreamID ->
+
+ %% @todo We still want to handle some commands...
+
+ Stream = #stream{queue=Queue} = lists:keyfind(StreamID, #stream.id, Streams0),
+ Streams = lists:keyreplace(StreamID, #stream.id, Streams0,
+ Stream#stream{queue=Queue ++ Commands}),
+ State#state{streams=Streams};
+%% Read the request body.
+commands(State, StreamID, [{flow, _Length}|Tail]) ->
+ %% @todo We only read from socket if buffer is empty, otherwise
+ %% we decode the buffer.
+
+ %% @todo Set the body reading length to min(Length, BodyLength)
+
+ commands(State, StreamID, Tail);
+%% @todo Probably a good idea to have an atomic response send (single send call for resp+body).
+%% Send a full response.
+%%
+%% @todo Kill the stream if it sent a response when one has already been sent.
+%% @todo Keep IsFin in the state.
+%% @todo Same two things above apply to DATA, possibly promise too.
+commands(State0=#state{socket=Socket, transport=Transport, streams=Streams}, StreamID,
+ [{response, StatusCode, Headers0, Body}|Tail]) ->
+ %% @todo I'm pretty sure the last stream in the list is the one we want
+ %% considering all others are queued.
+ #stream{version=Version} = lists:keyfind(StreamID, #stream.id, Streams),
+ {State, Headers} = connection(State0, Headers0, StreamID, Version),
+ %% @todo Ensure content-length is set.
+ Response = cow_http:response(StatusCode, 'HTTP/1.1', maps:to_list(Headers)),
+ case Body of
+ {sendfile, O, B, P} ->
+ Transport:send(Socket, Response),
+ commands(State#state{out_state=done}, StreamID, [{sendfile, fin, O, B, P}|Tail]);
+ _ ->
+ Transport:send(Socket, [Response, Body]),
+ %% @todo If max number of requests, close connection.
+ %% @todo If IsFin, maybe skip body of current request.
+ maybe_terminate(State#state{out_state=done}, StreamID, Tail, fin)
+ end;
+%% Send response headers and initiate chunked encoding.
+commands(State0=#state{socket=Socket, transport=Transport, streams=Streams}, StreamID,
+ [{headers, StatusCode, Headers0}|Tail]) ->
+ %% @todo Same as above.
+ #stream{version=Version} = lists:keyfind(StreamID, #stream.id, Streams),
+ {State1, Headers1} = case Version of
+ 'HTTP/1.1' ->
+ {State0, Headers0#{<<"transfer-encoding">> => <<"chunked">>}};
+ %% Close the connection after streaming the data to HTTP/1.0 client.
+ %% @todo I'm guessing we need to differentiate responses with a content-length and others.
+ 'HTTP/1.0' ->
+ {State0#state{last_streamid=StreamID}, Headers0}
+ end,
+ {State, Headers} = connection(State1, Headers1, StreamID, Version),
+ Transport:send(Socket, cow_http:response(StatusCode, 'HTTP/1.1', maps:to_list(Headers))),
+ commands(State#state{out_state=chunked}, StreamID, Tail);
+%% Send a response body chunk.
+%%
+%% @todo WINDOW_UPDATE stuff require us to buffer some data.
+commands(State=#state{socket=Socket, transport=Transport, streams=Streams}, StreamID,
+ [{data, IsFin, Data}|Tail]) ->
+ %% @todo Same as above.
+ Headers1 = case lists:keyfind(StreamID, #stream.id, Streams) of
+ #stream{version='HTTP/1.1'} ->
+ Size = iolist_size(Data),
+ Transport:send(Socket, [integer_to_list(Size, 16), <<"\r\n">>, Data, <<"\r\n">>]);
+ #stream{version='HTTP/1.0'} ->
+ Transport:send(Socket, Data)
+ end,
+ maybe_terminate(State, StreamID, Tail, IsFin);
+%% Send a file.
+commands(State=#state{socket=Socket, transport=Transport}, StreamID,
+ [{sendfile, IsFin, Offset, Bytes, Path}|Tail]) ->
+ Transport:sendfile(Socket, Path, Offset, Bytes),
+ maybe_terminate(State, StreamID, Tail, IsFin);
+%% Protocol takeover.
+commands(State0=#state{ref=Ref, parent=Parent, socket=Socket, transport=Transport,
+ opts=Opts, children=Children}, StreamID,
+ [{switch_protocol, Headers, Protocol, InitialState}|_Tail]) ->
+ %% @todo This should be the last stream running otherwise we need to wait before switching.
+ %% @todo If there's streams opened after this one, fail instead of 101.
+ State = cancel_request_timeout(State0),
+ %% @todo When we actually do the upgrade, we only have the one stream left, plus
+ %% possibly some processes terminating. We need a smart strategy for handling the
+ %% children shutdown. We can start with brutal_kill and discarding the EXIT messages
+ %% received before switching to Websocket. Something better would be to let the
+ %% stream processes finish but that implies the Websocket module to know about
+ %% them and filter the messages. For now, kill them all and discard all messages
+ %% in the mailbox.
+ _ = [exit(Pid, kill) || {Pid, _, _} <- Children],
+ flush(),
+ %% Everything good, upgrade!
+ _ = commands(State, StreamID, [{response, 101, Headers, <<>>}]),
+ %% @todo This is no good because commands return a state normally and here it doesn't
+ %% we need to let this module go entirely. Perhaps it should be handled directly in
+ %% cowboy_clear/cowboy_tls? Perhaps not. We do want that Buffer.
+ Protocol:takeover(Parent, Ref, Socket, Transport, Opts, <<>>, InitialState);
+%% Stream shutdown.
+commands(State, StreamID, [stop|Tail]) ->
+ %% @todo Do we want to run the commands after a stop?
+% commands(stream_terminate(State, StreamID, stop), StreamID, Tail).
+ maybe_terminate(State, StreamID, Tail, fin).
+
+flush() ->
+ receive _ -> flush() after 0 -> ok end.
+
+maybe_terminate(State, StreamID, Tail, nofin) ->
+ commands(State, StreamID, Tail);
+%% @todo In these cases I'm not sure if we should continue processing commands.
+maybe_terminate(State=#state{last_streamid=StreamID}, StreamID, _Tail, fin) ->
+ terminate(stream_terminate(State, StreamID, normal), normal); %% @todo Reason ok?
+maybe_terminate(State, StreamID, _Tail, fin) ->
+ stream_terminate(State, StreamID, normal).
+
+stream_reset(State=#state{socket=Socket, transport=Transport}, StreamID,
+ StreamError={internal_error, _, _}) ->
+ %% @todo headers
+ %% @todo Don't send this if there are no streams left.
+ Transport:send(Socket, cow_http:response(500, 'HTTP/1.1', [
+ {<<"content-length">>, <<"0">>}
+ ])),
+ %% @todo update IsFin local
+ stream_terminate(State#state{out_state=done}, StreamID, StreamError).
+
+stream_terminate(State=#state{socket=Socket, transport=Transport, handler=Handler,
+ out_streamid=OutStreamID, out_state=OutState,
+ streams=Streams0, children=Children0}, StreamID, Reason) ->
+ {value, #stream{state=StreamState, version=Version}, Streams}
+ = lists:keytake(StreamID, #stream.id, Streams0),
+ _ = case OutState of
+ wait ->
+ Transport:send(Socket, cow_http:response(204, 'HTTP/1.1', []));
+ chunked when Version =:= 'HTTP/1.1' ->
+ Transport:send(Socket, <<"0\r\n\r\n">>);
+ _ -> %% done or Version =:= 'HTTP/1.0'
+ ok
+ end,
+
+ stream_call_terminate(StreamID, Reason, Handler, StreamState),
+%% @todo initiate children shutdown
+% Children = stream_terminate_children(Children0, StreamID, []),
+ Children = [case C of
+ {Pid, StreamID, Shutdown} -> {Pid, undefined, Shutdown};
+ _ -> C
+ end || C <- Children0],
+
+ %% @todo Skip the body, if any, or drop the connection if too large.
+
+ %% @todo Only do this if Current =:= StreamID.
+ NextOutStreamID = OutStreamID + 1,
+ case lists:keyfind(NextOutStreamID, #stream.id, Streams) of
+ false ->
+ %% @todo This is clearly wrong, if the stream is gone we need to check if
+ %% there used to be such a stream, and if there was to send an error.
+ State#state{out_streamid=NextOutStreamID, out_state=wait, streams=Streams, children=Children};
+ #stream{queue=Commands} ->
+ %% @todo Remove queue from the stream.
+ commands(State#state{out_streamid=NextOutStreamID, out_state=wait,
+ streams=Streams, children=Children}, NextOutStreamID, Commands)
+ end.
+
+%% @todo Taken directly from _http2
+stream_call_terminate(StreamID, Reason, Handler, StreamState) ->
+ try
+ Handler:terminate(StreamID, Reason, StreamState),
+ ok
+ catch Class:Reason ->
+ error_logger:error_msg("Exception occurred in ~s:terminate(~p, ~p, ~p) with reason ~p:~p.",
+ [Handler, StreamID, Reason, StreamState, Class, Reason])
+ end.
+
+%stream_terminate_children([], _, Acc) ->
+% Acc;
+%stream_terminate_children([{Pid, StreamID}|Tail], StreamID, Acc) ->
+% exit(Pid, kill),
+% stream_terminate_children(Tail, StreamID, Acc);
+%stream_terminate_children([Child|Tail], StreamID, Acc) ->
+% stream_terminate_children(Tail, StreamID, [Child|Acc]).
+
+
+%% @todo max_reqs also
+maybe_req_close(_, #{<<"connection">> := Conn}, 'HTTP/1.0') ->
+ Conns = cow_http_hd:parse_connection(Conn),
+ case lists:member(<<"keep-alive">>, Conns) of
+ true -> keepalive;
+ false -> close
+ end;
+maybe_req_close(_, _, 'HTTP/1.0') ->
+ close;
+maybe_req_close(_, #{<<"connection">> := Conn}, 'HTTP/1.1') ->
+ case connection_hd_is_close(Conn) of
+ true -> close;
+ false -> keepalive
+ end;
+maybe_req_close(_State, _, _) ->
+ keepalive.
+
+connection(State=#state{last_streamid=StreamID}, Headers=#{<<"connection">> := Conn}, StreamID, _) ->
+ case connection_hd_is_close(Conn) of
+ true -> {State, Headers};
+ %% @todo Here we need to remove keep-alive and add close, not just add close.
+ false -> {State, Headers#{<<"connection">> => [<<"close, ">>, Conn]}}
+ end;
+connection(State=#state{last_streamid=StreamID}, Headers, StreamID, _) ->
+ {State, Headers#{<<"connection">> => <<"close">>}};
+connection(State, Headers=#{<<"connection">> := Conn}, StreamID, _) ->
+ case connection_hd_is_close(Conn) of
+ true -> {State#state{last_streamid=StreamID}, Headers};
+ %% @todo Here we need to set keep-alive only if it wasn't set before.
+ false -> {State, Headers}
+ end;
+connection(State, Headers, _, 'HTTP/1.0') ->
+ {State, Headers#{<<"connection">> => <<"keep-alive">>}};
+connection(State, Headers, _, _) ->
+ {State, Headers}.
+
+connection_hd_is_close(Conn) ->
+ Conns = cow_http_hd:parse_connection(iolist_to_binary(Conn)),
+ lists:member(<<"close">>, Conns).
+
+error_terminate(StatusCode, State=#state{socket=Socket, transport=Transport}, Reason) ->
+ Transport:send(Socket, cow_http:response(StatusCode, 'HTTP/1.1', [
+ {<<"content-length">>, <<"0">>}
+ ])),
+ terminate(State, Reason).
+
+terminate(_State, _Reason) ->
+ exit(normal). %% @todo
+
+
+
+
+
+
+
+
+
+
+
+
+
+%% System callbacks.
+
+-spec system_continue(_, _, #state{}) -> ok.
+system_continue(_, _, {State, Buffer}) ->
+ loop(State, Buffer).
+
+-spec system_terminate(any(), _, _, _) -> no_return().
+system_terminate(Reason, _, _, _) ->
+ exit(Reason).
+
+-spec system_code_change(Misc, _, _, _) -> {ok, Misc} when Misc::{#state{}, binary()}.
+system_code_change(Misc, _, _, _) ->
+ {ok, Misc}.
diff --git a/src/cowboy_http2.erl b/src/cowboy_http2.erl
index 6018461..42d8ba5 100644
--- a/src/cowboy_http2.erl
+++ b/src/cowboy_http2.erl
@@ -86,8 +86,7 @@ init(Parent, Ref, Socket, Transport, Opts, Handler) ->
before_loop(State, Buffer) ->
loop(State, Buffer).
-loop(State=#state{parent=Parent, socket=Socket, transport=Transport,
- handler=Handler, children=Children}, Buffer) ->
+loop(State=#state{parent=Parent, socket=Socket, transport=Transport, children=Children}, Buffer) ->
Transport:setopts(Socket, [{active, once}]),
{OK, Closed, Error} = Transport:messages(),
receive
@@ -104,7 +103,7 @@ loop(State=#state{parent=Parent, socket=Socket, transport=Transport,
{system, From, Request} ->
sys:handle_system_msg(Request, From, Parent, ?MODULE, [], {State, Buffer});
%% Messages pertaining to a stream.
- {{Handler, StreamID}, Msg} ->
+ {{Pid, StreamID}, Msg} when Pid =:= self() ->
loop(info(State, StreamID, Msg), Buffer);
%% Exit signal from children.
Msg = {'EXIT', Pid, _} ->
@@ -328,8 +327,8 @@ commands(State0=#state{socket=Socket, transport=Transport, server_streamid=Promi
commands(State, StreamID, [{flow, _Size}|Tail]) ->
commands(State, StreamID, Tail);
%% Supervise a child process.
-commands(State=#state{children=Children}, StreamID, [{spawn, Pid}|Tail]) ->
- commands(State#state{children=[{Pid, StreamID}|Children]}, StreamID, Tail);
+commands(State=#state{children=Children}, StreamID, [{spawn, Pid, _Shutdown}|Tail]) -> %% @todo Shutdown
+ commands(State#state{children=[{Pid, StreamID}|Children]}, StreamID, Tail);
%% Upgrade to a new protocol.
%%
%% @todo Implementation.
@@ -357,7 +356,7 @@ terminate_all_streams([#stream{id=StreamID, state=StreamState}|Tail], Reason, Ha
%% Stream functions.
-stream_init(State0=#state{socket=Socket, transport=Transport, handler=Handler,
+stream_init(State0=#state{ref=Ref, socket=Socket, transport=Transport, handler=Handler, opts=Opts,
streams=Streams0, decode_state=DecodeState0}, StreamID, IsFin, HeaderBlock) ->
%% @todo Add clause for CONNECT requests (no scheme/path).
try headers_decode(HeaderBlock, DecodeState0) of
@@ -365,10 +364,46 @@ stream_init(State0=#state{socket=Socket, transport=Transport, handler=Handler,
<<":method">> := Method,
<<":scheme">> := Scheme,
<<":authority">> := Authority,
- <<":path">> := Path}, DecodeState} ->
+ <<":path">> := PathWithQs}, DecodeState} ->
State = State0#state{decode_state=DecodeState},
Headers = maps:without([<<":method">>, <<":scheme">>, <<":authority">>, <<":path">>], Headers0),
- try Handler:init(StreamID, IsFin, Method, Scheme, Authority, Path, Headers) of
+ %% @todo We need to parse the port out of :authority.
+ %% @todo We need to parse the query string out of :path.
+ %% @todo We need to give a way to get the socket infos.
+
+ Host = Authority, %% @todo
+ Port = todo, %% @todo
+ Path = PathWithQs, %% @todo
+ Qs = todo, %% @todo
+
+ Req = #{
+ ref => Ref,
+ pid => self(),
+ streamid => StreamID,
+
+ %% @todo peer
+ %% @todo sockname
+ %% @todo ssl client cert?
+
+ method => Method,
+ scheme => Scheme,
+ host => Host,
+ %% host_info (cowboy_router)
+ port => Port,
+ path => Path,
+ %% path_info (cowboy_router)
+ %% bindings (cowboy_router)
+ qs => Qs,
+ version => 'HTTP/2',
+ headers => Headers,
+
+ has_body => IsFin =:= nofin
+ %% @todo multipart? keep state separate
+
+ %% meta values (cowboy_websocket, cowboy_rest)
+ },
+
+ try Handler:init(StreamID, Req, Opts) of
{Commands, StreamState} ->
Streams = [#stream{id=StreamID, state=StreamState}|Streams0],
commands(State#state{streams=Streams}, StreamID, Commands)
@@ -377,7 +412,7 @@ stream_init(State0=#state{socket=Socket, transport=Transport, handler=Handler,
"with reason ~p:~p.",
[Handler, StreamID, IsFin, Method, Scheme, Authority, Path, Headers, Class, Reason]),
stream_reset(State, StreamID, {internal_error, {Class, Reason},
- 'Exception occurred in StreamHandler:init/7 call.'})
+ 'Exception occurred in StreamHandler:init/7 call.'}) %% @todo Check final arity.
end;
{_, DecodeState} ->
Transport:send(Socket, cow_http2:rst_stream(StreamID, protocol_error)),
diff --git a/src/cowboy_loop.erl b/src/cowboy_loop.erl
index 695439a..e162417 100644
--- a/src/cowboy_loop.erl
+++ b/src/cowboy_loop.erl
@@ -60,15 +60,21 @@ upgrade(Req, Env, Handler, HandlerState, Timeout, run) ->
State2 = timeout(State),
after_call(Req, State2, Handler, HandlerState);
upgrade(Req, Env, Handler, HandlerState, Timeout, hibernate) ->
+
+% dbg:start(),
+% dbg:tracer(),
+% dbg:tpl(?MODULE, []),
+% dbg:tpl(long_polling_h, []),
+% dbg:tpl(loop_handler_body_h, []),
+% dbg:tpl(cowboy_req, []),
+% dbg:p(all, c),
+
State = #state{env=Env, max_buffer=get_max_buffer(Env), hibernate=true, timeout=Timeout},
State2 = timeout(State),
after_call(Req, State2, Handler, HandlerState).
-get_max_buffer(Env) ->
- case lists:keyfind(loop_max_buffer, 1, Env) of
- false -> 5000;
- {_, MaxBuffer} -> MaxBuffer
- end.
+get_max_buffer(#{loop_max_buffer := MaxBuffer}) -> MaxBuffer;
+get_max_buffer(_) -> 5000.
%% Update the state if the response was sent in the callback.
after_call(Req, State=#state{resp_sent=false}, Handler,
@@ -83,12 +89,21 @@ after_call(Req, State, Handler, HandlerState) ->
before_loop(Req, State, Handler, HandlerState).
before_loop(Req, State=#state{hibernate=true}, Handler, HandlerState) ->
- [Socket, Transport] = cowboy_req:get([socket, transport], Req),
- Transport:setopts(Socket, [{active, once}]),
+
+ %% @todo Yeah we can't get the socket anymore.
+ %% Everything changes since we are a separate process now.
+ %% Proper flow control at the connection level should be implemented
+ %% instead of what we have here.
+
+% [Socket, Transport] = cowboy_req:get([socket, transport], Req),
+% Transport:setopts(Socket, [{active, once}]),
{suspend, ?MODULE, loop, [Req, State#state{hibernate=false}, Handler, HandlerState]};
before_loop(Req, State, Handler, HandlerState) ->
- [Socket, Transport] = cowboy_req:get([socket, transport], Req),
- Transport:setopts(Socket, [{active, once}]),
+
+ %% Same here.
+
+% [Socket, Transport] = cowboy_req:get([socket, transport], Req),
+% Transport:setopts(Socket, [{active, once}]),
loop(Req, State, Handler, HandlerState).
%% Almost the same code can be found in cowboy_websocket.
@@ -109,26 +124,26 @@ timeout(State=#state{timeout=Timeout,
loop(Req, State=#state{buffer_size=NbBytes,
max_buffer=Threshold, timeout_ref=TRef,
resp_sent=RespSent}, Handler, HandlerState) ->
- [Socket, Transport] = cowboy_req:get([socket, transport], Req),
- {OK, Closed, Error} = Transport:messages(),
+% [Socket, Transport] = cowboy_req:get([socket, transport], Req),
+% {OK, Closed, Error} = Transport:messages(),
receive
- {OK, Socket, Data} ->
- NbBytes2 = NbBytes + byte_size(Data),
- if NbBytes2 > Threshold ->
- _ = if RespSent -> ok; true ->
- cowboy_req:reply(500, Req)
- end,
- cowboy_handler:terminate({error, overflow}, Req, HandlerState, Handler),
- exit(normal);
- true ->
- Req2 = cowboy_req:append_buffer(Data, Req),
- State2 = timeout(State#state{buffer_size=NbBytes2}),
- before_loop(Req2, State2, Handler, HandlerState)
- end;
- {Closed, Socket} ->
- terminate(Req, State, Handler, HandlerState, {error, closed});
- {Error, Socket, Reason} ->
- terminate(Req, State, Handler, HandlerState, {error, Reason});
+% {OK, Socket, Data} ->
+% NbBytes2 = NbBytes + byte_size(Data),
+% if NbBytes2 > Threshold ->
+% _ = if RespSent -> ok; true ->
+% cowboy_req:reply(500, Req)
+% end,
+% cowboy_handler:terminate({error, overflow}, Req, HandlerState, Handler),
+% exit(normal);
+% true ->
+% Req2 = cowboy_req:append_buffer(Data, Req),
+% State2 = timeout(State#state{buffer_size=NbBytes2}),
+% before_loop(Req2, State2, Handler, HandlerState)
+% end;
+% {Closed, Socket} ->
+% terminate(Req, State, Handler, HandlerState, {error, closed});
+% {Error, Socket, Reason} ->
+% terminate(Req, State, Handler, HandlerState, {error, Reason});
{timeout, TRef, ?MODULE} ->
after_loop(Req, State, Handler, HandlerState, timeout);
{timeout, OlderTRef, ?MODULE} when is_reference(OlderTRef) ->
@@ -139,13 +154,13 @@ loop(Req, State=#state{buffer_size=NbBytes,
%% data received after that and put it into the buffer.
%% We do not check the size here, if data keeps coming
%% we'll error out on the next packet received.
- Transport:setopts(Socket, [{active, false}]),
- Req2 = receive {OK, Socket, Data} ->
- cowboy_req:append_buffer(Data, Req)
- after 0 ->
- Req
- end,
- call(Req2, State, Handler, HandlerState, Message)
+% Transport:setopts(Socket, [{active, false}]),
+% Req2 = receive {OK, Socket, Data} ->
+% cowboy_req:append_buffer(Data, Req)
+% after 0 ->
+% Req
+% end,
+ call(Req, State, Handler, HandlerState, Message)
end.
call(Req, State=#state{resp_sent=RespSent},
@@ -168,7 +183,7 @@ call(Req, State=#state{resp_sent=RespSent},
{reason, Reason},
{mfa, {Handler, info, 3}},
{stacktrace, Stacktrace},
- {req, cowboy_req:to_list(Req)},
+ {req, Req},
{state, HandlerState}
]})
end.
diff --git a/src/cowboy_protocol.erl b/src/cowboy_protocol.erl
deleted file mode 100644
index 90128c3..0000000
--- a/src/cowboy_protocol.erl
+++ /dev/null
@@ -1,534 +0,0 @@
-%% Copyright (c) 2011-2014, Loïc Hoguin <[email protected]>
-%% Copyright (c) 2011, Anthony Ramine <[email protected]>
-%%
-%% Permission to use, copy, modify, and/or distribute this software for any
-%% purpose with or without fee is hereby granted, provided that the above
-%% copyright notice and this permission notice appear in all copies.
-%%
-%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
-%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
-%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
-%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
-%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
-%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
-%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
-
--module(cowboy_protocol).
-
-%% API.
--export([start_link/4]).
-
-%% Internal.
--export([init/4]).
--export([parse_request/3]).
--export([resume/6]).
-
--type opts() :: [{compress, boolean()}
- | {env, cowboy_middleware:env()}
- | {max_empty_lines, non_neg_integer()}
- | {max_header_name_length, non_neg_integer()}
- | {max_header_value_length, non_neg_integer()}
- | {max_headers, non_neg_integer()}
- | {max_keepalive, non_neg_integer()}
- | {max_request_line_length, non_neg_integer()}
- | {middlewares, [module()]}
- | {onresponse, cowboy:onresponse_fun()}
- | {timeout, timeout()}].
--export_type([opts/0]).
-
--record(state, {
- socket :: inet:socket(),
- transport :: module(),
- middlewares :: [module()],
- compress :: boolean(),
- env :: cowboy_middleware:env(),
- onresponse = undefined :: undefined | cowboy:onresponse_fun(),
- max_empty_lines :: non_neg_integer(),
- req_keepalive = 1 :: non_neg_integer(),
- max_keepalive :: non_neg_integer(),
- max_request_line_length :: non_neg_integer(),
- max_header_name_length :: non_neg_integer(),
- max_header_value_length :: non_neg_integer(),
- max_headers :: non_neg_integer(),
- timeout :: timeout(),
- until :: non_neg_integer() | infinity
-}).
-
--include_lib("cowlib/include/cow_inline.hrl").
--include_lib("cowlib/include/cow_parse.hrl").
-
-%% API.
-
--spec start_link(ranch:ref(), inet:socket(), module(), opts()) -> {ok, pid()}.
-start_link(Ref, Socket, Transport, Opts) ->
- Pid = spawn_link(?MODULE, init, [Ref, Socket, Transport, Opts]),
- {ok, Pid}.
-
-%% Internal.
-
-%% Faster alternative to proplists:get_value/3.
-get_value(Key, Opts, Default) ->
- case lists:keyfind(Key, 1, Opts) of
- {_, Value} -> Value;
- _ -> Default
- end.
-
--spec init(ranch:ref(), inet:socket(), module(), opts()) -> ok.
-init(Ref, Socket, Transport, Opts) ->
- ok = ranch:accept_ack(Ref),
- Timeout = get_value(timeout, Opts, 5000),
- Until = until(Timeout),
- case recv(Socket, Transport, Until) of
- {ok, Data} ->
- OnFirstRequest = get_value(onfirstrequest, Opts, undefined),
- case OnFirstRequest of
- undefined -> ok;
- _ -> OnFirstRequest(Ref, Socket, Transport, Opts)
- end,
- Compress = get_value(compress, Opts, false),
- MaxEmptyLines = get_value(max_empty_lines, Opts, 5),
- MaxHeaderNameLength = get_value(max_header_name_length, Opts, 64),
- MaxHeaderValueLength = get_value(max_header_value_length, Opts, 4096),
- MaxHeaders = get_value(max_headers, Opts, 100),
- MaxKeepalive = get_value(max_keepalive, Opts, 100),
- MaxRequestLineLength = get_value(max_request_line_length, Opts, 4096),
- Middlewares = get_value(middlewares, Opts, [cowboy_router, cowboy_handler]),
- Env = [{listener, Ref}|get_value(env, Opts, [])],
- OnResponse = get_value(onresponse, Opts, undefined),
- parse_request(Data, #state{socket=Socket, transport=Transport,
- middlewares=Middlewares, compress=Compress, env=Env,
- max_empty_lines=MaxEmptyLines, max_keepalive=MaxKeepalive,
- max_request_line_length=MaxRequestLineLength,
- max_header_name_length=MaxHeaderNameLength,
- max_header_value_length=MaxHeaderValueLength, max_headers=MaxHeaders,
- onresponse=OnResponse, timeout=Timeout, until=Until}, 0);
- {error, _} ->
- terminate(#state{socket=Socket, transport=Transport}) %% @todo ridiculous
- end.
-
--spec until(timeout()) -> non_neg_integer() | infinity.
-until(infinity) ->
- infinity;
-until(Timeout) ->
- erlang:monotonic_time(milli_seconds) + Timeout.
-
-%% Request parsing.
-%%
-%% The next set of functions is the request parsing code. All of it
-%% runs using a single binary match context. This optimization ends
-%% right after the header parsing is finished and the code becomes
-%% more interesting past that point.
-
--spec recv(inet:socket(), module(), non_neg_integer() | infinity)
- -> {ok, binary()} | {error, closed | timeout | atom()}.
-recv(Socket, Transport, infinity) ->
- Transport:recv(Socket, 0, infinity);
-recv(Socket, Transport, Until) ->
- Timeout = Until - erlang:monotonic_time(milli_seconds),
- if Timeout < 0 ->
- {error, timeout};
- true ->
- Transport:recv(Socket, 0, Timeout)
- end.
-
--spec wait_request(binary(), #state{}, non_neg_integer()) -> ok.
-wait_request(Buffer, State=#state{socket=Socket, transport=Transport,
- until=Until}, ReqEmpty) ->
- case recv(Socket, Transport, Until) of
- {ok, Data} ->
- parse_request(<< Buffer/binary, Data/binary >>, State, ReqEmpty);
- {error, _} ->
- terminate(State)
- end.
-
--spec parse_request(binary(), #state{}, non_neg_integer()) -> ok.
-%% Empty lines must be using \r\n.
-parse_request(<< $\n, _/bits >>, State, _) ->
- error_terminate(400, State);
-parse_request(<< $\s, _/bits >>, State, _) ->
- error_terminate(400, State);
-%% We limit the length of the Request-line to MaxLength to avoid endlessly
-%% reading from the socket and eventually crashing.
-parse_request(Buffer, State=#state{max_request_line_length=MaxLength,
- max_empty_lines=MaxEmpty}, ReqEmpty) ->
- case match_eol(Buffer, 0) of
- nomatch when byte_size(Buffer) > MaxLength ->
- error_terminate(414, State);
- nomatch ->
- wait_request(Buffer, State, ReqEmpty);
- 1 when ReqEmpty =:= MaxEmpty ->
- error_terminate(400, State);
- 1 ->
- << _:16, Rest/bits >> = Buffer,
- parse_request(Rest, State, ReqEmpty + 1);
- _ ->
- parse_method(Buffer, State, <<>>)
- end.
-
-match_eol(<< $\n, _/bits >>, N) ->
- N;
-match_eol(<< _, Rest/bits >>, N) ->
- match_eol(Rest, N + 1);
-match_eol(_, _) ->
- nomatch.
-
-parse_method(<< C, Rest/bits >>, State, SoFar) ->
- case C of
- $\r -> error_terminate(400, State);
- $\s -> parse_uri(Rest, State, SoFar);
- _ -> parse_method(Rest, State, << SoFar/binary, C >>)
- end.
-
-parse_uri(<< $\r, _/bits >>, State, _) ->
- error_terminate(400, State);
-parse_uri(<< $\s, _/bits >>, State, _) ->
- error_terminate(400, State);
-parse_uri(<< "* ", Rest/bits >>, State, Method) ->
- parse_version(Rest, State, Method, <<"*">>, <<>>);
-parse_uri(<< "http://", Rest/bits >>, State, Method) ->
- parse_uri_skip_host(Rest, State, Method);
-parse_uri(<< "https://", Rest/bits >>, State, Method) ->
- parse_uri_skip_host(Rest, State, Method);
-parse_uri(<< "HTTP://", Rest/bits >>, State, Method) ->
- parse_uri_skip_host(Rest, State, Method);
-parse_uri(<< "HTTPS://", Rest/bits >>, State, Method) ->
- parse_uri_skip_host(Rest, State, Method);
-parse_uri(Buffer, State, Method) ->
- parse_uri_path(Buffer, State, Method, <<>>).
-
-parse_uri_skip_host(<< C, Rest/bits >>, State, Method) ->
- case C of
- $\r -> error_terminate(400, State);
- $/ -> parse_uri_path(Rest, State, Method, <<"/">>);
- $\s -> parse_version(Rest, State, Method, <<"/">>, <<>>);
- $? -> parse_uri_query(Rest, State, Method, <<"/">>, <<>>);
- $# -> skip_uri_fragment(Rest, State, Method, <<"/">>, <<>>);
- _ -> parse_uri_skip_host(Rest, State, Method)
- end.
-
-parse_uri_path(<< C, Rest/bits >>, State, Method, SoFar) ->
- case C of
- $\r -> error_terminate(400, State);
- $\s -> parse_version(Rest, State, Method, SoFar, <<>>);
- $? -> parse_uri_query(Rest, State, Method, SoFar, <<>>);
- $# -> skip_uri_fragment(Rest, State, Method, SoFar, <<>>);
- _ -> parse_uri_path(Rest, State, Method, << SoFar/binary, C >>)
- end.
-
-parse_uri_query(<< C, Rest/bits >>, S, M, P, SoFar) ->
- case C of
- $\r -> error_terminate(400, S);
- $\s -> parse_version(Rest, S, M, P, SoFar);
- $# -> skip_uri_fragment(Rest, S, M, P, SoFar);
- _ -> parse_uri_query(Rest, S, M, P, << SoFar/binary, C >>)
- end.
-
-skip_uri_fragment(<< C, Rest/bits >>, S, M, P, Q) ->
- case C of
- $\r -> error_terminate(400, S);
- $\s -> parse_version(Rest, S, M, P, Q);
- _ -> skip_uri_fragment(Rest, S, M, P, Q)
- end.
-
-parse_version(<< "HTTP/1.1\r\n", Rest/bits >>, S, M, P, Q) ->
- parse_header(Rest, S, M, P, Q, 'HTTP/1.1', []);
-parse_version(<< "HTTP/1.0\r\n", Rest/bits >>, S, M, P, Q) ->
- parse_header(Rest, S, M, P, Q, 'HTTP/1.0', []);
-parse_version(_, State, _, _, _) ->
- error_terminate(505, State).
-
-%% Stop receiving data if we have more than allowed number of headers.
-wait_header(_, State=#state{max_headers=MaxHeaders}, _, _, _, _, Headers)
- when length(Headers) >= MaxHeaders ->
- error_terminate(400, State);
-wait_header(Buffer, State=#state{socket=Socket, transport=Transport,
- until=Until}, M, P, Q, V, H) ->
- case recv(Socket, Transport, Until) of
- {ok, Data} ->
- parse_header(<< Buffer/binary, Data/binary >>,
- State, M, P, Q, V, H);
- {error, timeout} ->
- error_terminate(408, State);
- {error, _} ->
- terminate(State)
- end.
-
-parse_header(<< $\r, $\n, Rest/bits >>, S, M, P, Q, V, Headers) ->
- request(Rest, S, M, P, Q, V, lists:reverse(Headers));
-parse_header(Buffer, State=#state{max_header_name_length=MaxLength},
- M, P, Q, V, H) ->
- case match_colon(Buffer, 0) of
- nomatch when byte_size(Buffer) > MaxLength ->
- error_terminate(400, State);
- nomatch ->
- wait_header(Buffer, State, M, P, Q, V, H);
- _ ->
- parse_hd_name(Buffer, State, M, P, Q, V, H, <<>>)
- end.
-
-match_colon(<< $:, _/bits >>, N) ->
- N;
-match_colon(<< _, Rest/bits >>, N) ->
- match_colon(Rest, N + 1);
-match_colon(_, _) ->
- nomatch.
-
-parse_hd_name(<< $:, Rest/bits >>, S, M, P, Q, V, H, SoFar) ->
- parse_hd_before_value(Rest, S, M, P, Q, V, H, SoFar);
-parse_hd_name(<< C, Rest/bits >>, S, M, P, Q, V, H, SoFar) when ?IS_WS(C) ->
- parse_hd_name_ws(Rest, S, M, P, Q, V, H, SoFar);
-parse_hd_name(<< C, Rest/bits >>, S, M, P, Q, V, H, SoFar) ->
- ?LOWER(parse_hd_name, Rest, S, M, P, Q, V, H, SoFar).
-
-parse_hd_name_ws(<< C, Rest/bits >>, S, M, P, Q, V, H, Name) ->
- case C of
- $\s -> parse_hd_name_ws(Rest, S, M, P, Q, V, H, Name);
- $\t -> parse_hd_name_ws(Rest, S, M, P, Q, V, H, Name);
- $: -> parse_hd_before_value(Rest, S, M, P, Q, V, H, Name)
- end.
-
-wait_hd_before_value(Buffer, State=#state{
- socket=Socket, transport=Transport, until=Until},
- M, P, Q, V, H, N) ->
- case recv(Socket, Transport, Until) of
- {ok, Data} ->
- parse_hd_before_value(<< Buffer/binary, Data/binary >>,
- State, M, P, Q, V, H, N);
- {error, timeout} ->
- error_terminate(408, State);
- {error, _} ->
- terminate(State)
- end.
-
-parse_hd_before_value(<< $\s, Rest/bits >>, S, M, P, Q, V, H, N) ->
- parse_hd_before_value(Rest, S, M, P, Q, V, H, N);
-parse_hd_before_value(<< $\t, Rest/bits >>, S, M, P, Q, V, H, N) ->
- parse_hd_before_value(Rest, S, M, P, Q, V, H, N);
-parse_hd_before_value(Buffer, State=#state{
- max_header_value_length=MaxLength}, M, P, Q, V, H, N) ->
- case match_eol(Buffer, 0) of
- nomatch when byte_size(Buffer) > MaxLength ->
- error_terminate(400, State);
- nomatch ->
- wait_hd_before_value(Buffer, State, M, P, Q, V, H, N);
- _ ->
- parse_hd_value(Buffer, State, M, P, Q, V, H, N, <<>>)
- end.
-
-%% We completely ignore the first argument which is always
-%% the empty binary. We keep it there because we don't want
-%% to change the other arguments' position and trigger costy
-%% operations for no reasons.
-wait_hd_value(_, State=#state{
- socket=Socket, transport=Transport, until=Until},
- M, P, Q, V, H, N, SoFar) ->
- case recv(Socket, Transport, Until) of
- {ok, Data} ->
- parse_hd_value(Data, State, M, P, Q, V, H, N, SoFar);
- {error, timeout} ->
- error_terminate(408, State);
- {error, _} ->
- terminate(State)
- end.
-
-%% Pushing back as much as we could the retrieval of new data
-%% to check for multilines allows us to avoid a few tests in
-%% the critical path, but forces us to have a special function.
-wait_hd_value_nl(_, State=#state{
- socket=Socket, transport=Transport, until=Until},
- M, P, Q, V, Headers, Name, SoFar) ->
- case recv(Socket, Transport, Until) of
- {ok, << C, Data/bits >>} when C =:= $\s; C =:= $\t ->
- parse_hd_value(Data, State, M, P, Q, V, Headers, Name, SoFar);
- {ok, Data} ->
- parse_header(Data, State, M, P, Q, V, [{Name, SoFar}|Headers]);
- {error, timeout} ->
- error_terminate(408, State);
- {error, _} ->
- terminate(State)
- end.
-
-parse_hd_value(<< $\r, Rest/bits >>, S, M, P, Q, V, Headers, Name, SoFar) ->
- case Rest of
- << $\n >> ->
- wait_hd_value_nl(<<>>, S, M, P, Q, V, Headers, Name, SoFar);
- << $\n, C, Rest2/bits >> when C =:= $\s; C =:= $\t ->
- parse_hd_value(Rest2, S, M, P, Q, V, Headers, Name,
- << SoFar/binary, C >>);
- << $\n, Rest2/bits >> ->
- parse_header(Rest2, S, M, P, Q, V, [{Name, clean_value_ws_end(SoFar, byte_size(SoFar) - 1)}|Headers])
- end;
-parse_hd_value(<< C, Rest/bits >>, S, M, P, Q, V, H, N, SoFar) ->
- parse_hd_value(Rest, S, M, P, Q, V, H, N, << SoFar/binary, C >>);
-parse_hd_value(<<>>, State=#state{max_header_value_length=MaxLength},
- _, _, _, _, _, _, SoFar) when byte_size(SoFar) > MaxLength ->
- error_terminate(400, State);
-parse_hd_value(<<>>, S, M, P, Q, V, H, N, SoFar) ->
- wait_hd_value(<<>>, S, M, P, Q, V, H, N, SoFar).
-
-clean_value_ws_end(_, -1) ->
- <<>>;
-clean_value_ws_end(Value, N) ->
- case binary:at(Value, N) of
- $\s -> clean_value_ws_end(Value, N - 1);
- $\t -> clean_value_ws_end(Value, N - 1);
- _ ->
- S = N + 1,
- << Value2:S/binary, _/bits >> = Value,
- Value2
- end.
-
--ifdef(TEST).
-clean_value_ws_end_test_() ->
- Tests = [
- {<<>>, <<>>},
- {<<" ">>, <<>>},
- {<<"text/*;q=0.3, text/html;q=0.7, text/html;level=1, "
- "text/html;level=2;q=0.4, */*;q=0.5 \t \t ">>,
- <<"text/*;q=0.3, text/html;q=0.7, text/html;level=1, "
- "text/html;level=2;q=0.4, */*;q=0.5">>}
- ],
- [{V, fun() -> R = clean_value_ws_end(V, byte_size(V) - 1) end} || {V, R} <- Tests].
--endif.
-
--ifdef(PERF).
-horse_clean_value_ws_end() ->
- horse:repeat(200000,
- clean_value_ws_end(
- <<"text/*;q=0.3, text/html;q=0.7, text/html;level=1, "
- "text/html;level=2;q=0.4, */*;q=0.5 ">>,
- byte_size(<<"text/*;q=0.3, text/html;q=0.7, text/html;level=1, "
- "text/html;level=2;q=0.4, */*;q=0.5 ">>) - 1)
- ).
--endif.
-
-request(B, State=#state{transport=Transport}, M, P, Q, Version, Headers) ->
- case lists:keyfind(<<"host">>, 1, Headers) of
- false when Version =:= 'HTTP/1.1' ->
- error_terminate(400, State);
- false ->
- request(B, State, M, P, Q, Version, Headers,
- <<>>, default_port(Transport:name()));
- {_, RawHost} ->
- try parse_host(RawHost, false, <<>>) of
- {Host, undefined} ->
- request(B, State, M, P, Q, Version, Headers,
- Host, default_port(Transport:name()));
- {Host, Port} ->
- request(B, State, M, P, Q, Version, Headers,
- Host, Port)
- catch _:_ ->
- error_terminate(400, State)
- end
- end.
-
--spec default_port(atom()) -> 80 | 443.
-default_port(ssl) -> 443;
-default_port(_) -> 80.
-
-%% Same code as cow_http:parse_fullhost/1, but inline because we
-%% really want this to go fast.
-parse_host(<< $[, Rest/bits >>, false, <<>>) ->
- parse_host(Rest, true, << $[ >>);
-parse_host(<<>>, false, Acc) ->
- {Acc, undefined};
-parse_host(<< $:, Rest/bits >>, false, Acc) ->
- {Acc, list_to_integer(binary_to_list(Rest))};
-parse_host(<< $], Rest/bits >>, true, Acc) ->
- parse_host(Rest, false, << Acc/binary, $] >>);
-parse_host(<< C, Rest/bits >>, E, Acc) ->
- ?LOWER(parse_host, Rest, E, Acc).
-
-%% End of request parsing.
-%%
-%% We create the Req object and start handling the request.
-
-request(Buffer, State=#state{socket=Socket, transport=Transport,
- req_keepalive=ReqKeepalive, max_keepalive=MaxKeepalive,
- compress=Compress, onresponse=OnResponse},
- Method, Path, Query, Version, Headers, Host, Port) ->
- case Transport:peername(Socket) of
- {ok, Peer} ->
- Req = cowboy_req:new(Socket, Transport, Peer, Method, Path,
- Query, Version, Headers, Host, Port, Buffer,
- ReqKeepalive < MaxKeepalive, Compress, OnResponse),
- execute(Req, State);
- {error, _} ->
- %% Couldn't read the peer address; connection is gone.
- terminate(State)
- end.
-
--spec execute(cowboy_req:req(), #state{}) -> ok.
-execute(Req, State=#state{middlewares=Middlewares, env=Env}) ->
- execute(Req, State, Env, Middlewares).
-
--spec execute(cowboy_req:req(), #state{}, cowboy_middleware:env(), [module()])
- -> ok.
-execute(Req, State, Env, []) ->
- next_request(Req, State, get_value(result, Env, ok));
-execute(Req, State, Env, [Middleware|Tail]) ->
- case Middleware:execute(Req, Env) of
- {ok, Req2, Env2} ->
- execute(Req2, State, Env2, Tail);
- {suspend, Module, Function, Args} ->
- erlang:hibernate(?MODULE, resume,
- [State, Env, Tail, Module, Function, Args]);
- {stop, Req2} ->
- next_request(Req2, State, ok)
- end.
-
--spec resume(#state{}, cowboy_middleware:env(), [module()],
- module(), module(), [any()]) -> ok.
-resume(State, Env, Tail, Module, Function, Args) ->
- case apply(Module, Function, Args) of
- {ok, Req2, Env2} ->
- execute(Req2, State, Env2, Tail);
- {suspend, Module2, Function2, Args2} ->
- erlang:hibernate(?MODULE, resume,
- [State, Env, Tail, Module2, Function2, Args2]);
- {stop, Req2} ->
- next_request(Req2, State, ok)
- end.
-
--spec next_request(cowboy_req:req(), #state{}, any()) -> ok.
-next_request(Req, State=#state{req_keepalive=Keepalive, timeout=Timeout},
- HandlerRes) ->
- cowboy_req:ensure_response(Req, 204),
- %% If we are going to close the connection,
- %% we do not want to attempt to skip the body.
- case cowboy_req:get(connection, Req) of
- close ->
- terminate(State);
- _ ->
- %% Skip the body if it is reasonably sized. Close otherwise.
- Buffer = case cowboy_req:body(Req) of
- {ok, _, Req2} -> cowboy_req:get(buffer, Req2);
- _ -> close
- end,
- %% Flush the resp_sent message before moving on.
- if HandlerRes =:= ok, Buffer =/= close ->
- receive {cowboy_req, resp_sent} -> ok after 0 -> ok end,
- ?MODULE:parse_request(Buffer,
- State#state{req_keepalive=Keepalive + 1,
- until=until(Timeout)}, 0);
- true ->
- terminate(State)
- end
- end.
-
--spec error_terminate(cowboy:http_status(), #state{}) -> ok.
-error_terminate(Status, State=#state{socket=Socket, transport=Transport,
- compress=Compress, onresponse=OnResponse}) ->
- error_terminate(Status, cowboy_req:new(Socket, Transport,
- undefined, <<"GET">>, <<>>, <<>>, 'HTTP/1.1', [], <<>>,
- undefined, <<>>, false, Compress, OnResponse), State).
-
--spec error_terminate(cowboy:http_status(), cowboy_req:req(), #state{}) -> ok.
-error_terminate(Status, Req, State) ->
- _ = cowboy_req:reply(Status, Req),
- terminate(State).
-
--spec terminate(#state{}) -> ok.
-terminate(#state{socket=Socket, transport=Transport}) ->
- Transport:close(Socket),
- ok.
diff --git a/src/cowboy_req.erl b/src/cowboy_req.erl
index 8f0a04b..16b1fd1 100644
--- a/src/cowboy_req.erl
+++ b/src/cowboy_req.erl
@@ -47,6 +47,9 @@
%% Request body API.
-export([has_body/1]).
-export([body_length/1]).
+-export([read_body/1]).
+-export([read_body/2]).
+
-export([body/1]).
-export([body/2]).
-export([body_qs/1]).
@@ -70,10 +73,12 @@
-export([reply/2]).
-export([reply/3]).
-export([reply/4]).
+
+-export([send_body/3]).
+
-export([chunked_reply/2]).
-export([chunked_reply/3]).
-export([chunk/2]).
--export([upgrade_reply/3]).
-export([continue/1]).
-export([maybe_reply/2]).
-export([ensure_response/2]).
@@ -93,12 +98,12 @@
-type transfer_decode_fun() :: fun((binary(), any())
-> cow_http_te:decode_ret()).
--type body_opts() :: [{continue, boolean()}
+-type body_opts() :: [{continue, boolean()} %% doesn't apply
| {length, non_neg_integer()}
- | {read_length, non_neg_integer()}
- | {read_timeout, timeout()}
- | {transfer_decode, transfer_decode_fun(), any()}
- | {content_decode, content_decode_fun()}].
+ | {read_length, non_neg_integer()} %% to be added back later as optimization
+ | {read_timeout, timeout()} %% same
+ | {transfer_decode, transfer_decode_fun(), any()} %% doesn't apply
+ | {content_decode, content_decode_fun()}]. %% does apply
-export_type([body_opts/0]).
-type resp_body_fun() :: fun((any(), module()) -> ok).
@@ -182,43 +187,43 @@ new(Socket, Transport, Peer, Method, Path, Query,
end.
-spec method(req()) -> binary().
-method(Req) ->
- Req#http_req.method.
+method(#{method := Method}) ->
+ Method.
-spec version(req()) -> cowboy:http_version().
-version(Req) ->
- Req#http_req.version.
+version(#{version := Version}) ->
+ Version.
-spec peer(req()) -> {inet:ip_address(), inet:port_number()} | undefined.
peer(Req) ->
Req#http_req.peer.
-spec host(req()) -> binary().
-host(Req) ->
- Req#http_req.host.
+host(#{host := Host}) ->
+ Host.
-spec host_info(req()) -> cowboy_router:tokens() | undefined.
-host_info(Req) ->
- Req#http_req.host_info.
+host_info(#{host_info := HostInfo}) ->
+ HostInfo.
-spec port(req()) -> inet:port_number().
-port(Req) ->
- Req#http_req.port.
+port(#{port := Port}) ->
+ Port.
-spec path(req()) -> binary().
-path(Req) ->
- Req#http_req.path.
+path(#{path := Path}) ->
+ Path.
-spec path_info(req()) -> cowboy_router:tokens() | undefined.
-path_info(Req) ->
- Req#http_req.path_info.
+path_info(#{path_info := PathInfo}) ->
+ PathInfo.
-spec qs(req()) -> binary().
-qs(Req) ->
- Req#http_req.qs.
+qs(#{qs := Qs}) ->
+ Qs.
-spec parse_qs(req()) -> [{binary(), binary() | true}].
-parse_qs(#http_req{qs=Qs}) ->
+parse_qs(#{qs := Qs}) ->
cow_qs:parse_qs(Qs).
-spec match_qs(cowboy:fields(), req()) -> map().
@@ -227,30 +232,24 @@ match_qs(Fields, Req) ->
%% The URL includes the scheme, host and port only.
-spec host_url(req()) -> undefined | binary().
-host_url(#http_req{port=undefined}) ->
+host_url(#{port := undefined}) ->
undefined;
-host_url(#http_req{transport=Transport, host=Host, port=Port}) ->
- TransportName = Transport:name(),
- Secure = case TransportName of
- ssl -> <<"s">>;
- _ -> <<>>
- end,
- PortBin = case {TransportName, Port} of
- {ssl, 443} -> <<>>;
- {tcp, 80} -> <<>>;
+host_url(#{scheme := Scheme, host := Host, port := Port}) ->
+ PortBin = case {Scheme, Port} of
+ {<<"https">>, 443} -> <<>>;
+ {<<"http">>, 80} -> <<>>;
_ -> << ":", (integer_to_binary(Port))/binary >>
end,
- << "http", Secure/binary, "://", Host/binary, PortBin/binary >>.
+ << Scheme/binary, "://", Host/binary, PortBin/binary >>.
%% The URL includes the scheme, host, port, path and query string.
-spec url(req()) -> undefined | binary().
-url(Req=#http_req{}) ->
- HostURL = host_url(Req),
- url(Req, HostURL).
+url(Req) ->
+ url(Req, host_url(Req)).
url(_, undefined) ->
undefined;
-url(#http_req{path=Path, qs=QS}, HostURL) ->
+url(#{path := Path, qs := QS}, HostURL) ->
QS2 = case QS of
<<>> -> <<>>;
_ -> << "?", QS/binary >>
@@ -262,30 +261,31 @@ binding(Name, Req) ->
binding(Name, Req, undefined).
-spec binding(atom(), req(), Default) -> any() | Default when Default::any().
-binding(Name, Req, Default) when is_atom(Name) ->
- case lists:keyfind(Name, 1, Req#http_req.bindings) of
+binding(Name, #{bindings := Bindings}, Default) when is_atom(Name) ->
+ case lists:keyfind(Name, 1, Bindings) of
{_, Value} -> Value;
false -> Default
- end.
+ end;
+binding(Name, _, Default) when is_atom(Name) ->
+ Default.
-spec bindings(req()) -> [{atom(), any()}].
-bindings(Req) ->
- Req#http_req.bindings.
+bindings(#{bindings := Bindings}) ->
+ Bindings;
+bindings(_) ->
+ [].
-spec header(binary(), req()) -> binary() | undefined.
header(Name, Req) ->
header(Name, Req, undefined).
-spec header(binary(), req(), Default) -> binary() | Default when Default::any().
-header(Name, Req, Default) ->
- case lists:keyfind(Name, 1, Req#http_req.headers) of
- {Name, Value} -> Value;
- false -> Default
- end.
+header(Name, #{headers := Headers}, Default) ->
+ maps:get(Name, Headers, Default).
-spec headers(req()) -> cowboy:http_headers().
-headers(Req) ->
- Req#http_req.headers.
+headers(#{headers := Headers}) ->
+ Headers.
-spec parse_header(binary(), Req) -> any() when Req::req().
parse_header(Name = <<"content-length">>, Req) ->
@@ -354,31 +354,52 @@ set_meta(Name, Value, Req=#http_req{meta=Meta}) ->
%% Request Body API.
-spec has_body(req()) -> boolean().
-has_body(Req) ->
- case lists:keyfind(<<"content-length">>, 1, Req#http_req.headers) of
- {_, <<"0">>} ->
- false;
- {_, _} ->
- true;
- _ ->
- lists:keymember(<<"transfer-encoding">>, 1, Req#http_req.headers)
- end.
+has_body(#{has_body := HasBody}) ->
+ HasBody.
%% The length may not be known if Transfer-Encoding is not identity,
%% and the body hasn't been read at the time of the call.
-spec body_length(req()) -> undefined | non_neg_integer().
-body_length(Req) ->
- case parse_header(<<"transfer-encoding">>, Req) of
- [<<"identity">>] ->
- parse_header(<<"content-length">>, Req);
- _ ->
- undefined
- end.
+body_length(#{body_length := Length}) ->
+ Length.
-spec body(Req) -> {ok, binary(), Req} | {more, binary(), Req} when Req::req().
body(Req) ->
body(Req, []).
+-spec read_body(Req) -> {ok, binary(), Req} | {more, binary(), Req} when Req::req().
+read_body(Req) ->
+ read_body(Req, []).
+
+-spec read_body(Req, body_opts()) -> {ok, binary(), Req} | {more, binary(), Req} when Req::req().
+read_body(Req=#{pid := Pid, streamid := StreamID}, Opts) ->
+ %% @todo Opts should be a map
+ Length = case lists:keyfind(length, 1, Opts) of
+ false -> 8000000;
+ {_, ChunkLen0} -> ChunkLen0
+ end,
+ ReadTimeout = case lists:keyfind(read_timeout, 1, Opts) of
+ false -> 15000;
+ {_, ReadTimeout0} -> ReadTimeout0
+ end,
+ Ref = make_ref(),
+ Pid ! {{Pid, StreamID}, {read_body, Ref, Length}},
+% io:format("READ_BODY ~p ~p ~p ~p~n", [Pid, StreamID, Ref, Length]),
+ receive
+ {request_body, Ref, nofin, Body} ->
+ {more, Body, Req};
+ {request_body, Ref, {fin, BodyLength}, Body} ->
+ {ok, Body, set_body_length(Req, BodyLength)}
+ after ReadTimeout ->
+ exit(read_body_timeout)
+ end.
+
+set_body_length(Req=#{headers := Headers}, BodyLength) ->
+ Req#{
+ headers => Headers#{<<"content-length">> => integer_to_binary(BodyLength)},
+ body_length => BodyLength
+ }.
+
-spec body(Req, body_opts()) -> {ok, binary(), Req} | {more, binary(), Req} when Req::req().
body(Req=#http_req{body_state=waiting}, Opts) ->
%% Send a 100 continue if needed (enabled by default).
@@ -514,7 +535,7 @@ body_qs(Req) ->
-spec body_qs(Req, body_opts()) -> {ok, [{binary(), binary() | true}], Req}
| {badlength, Req} when Req::req().
body_qs(Req, Opts) ->
- case body(Req, Opts) of
+ case read_body(Req, Opts) of
{ok, Body, Req2} ->
{ok, cow_qs:parse_qs(Body), Req2};
{more, _, Req2} ->
@@ -535,13 +556,16 @@ part(Req) ->
-spec part(Req, body_opts())
-> {ok, cow_multipart:headers(), Req} | {done, Req}
when Req::req().
-part(Req=#http_req{multipart=undefined}, Opts) ->
- part(init_multipart(Req), Opts);
part(Req, Opts) ->
- {Data, Req2} = stream_multipart(Req, Opts),
- part(Data, Opts, Req2).
+ case maps:is_key(multipart, Req) of
+ true ->
+ {Data, Req2} = stream_multipart(Req, Opts),
+ part(Data, Opts, Req2);
+ false ->
+ part(init_multipart(Req), Opts)
+ end.
-part(Buffer, Opts, Req=#http_req{multipart={Boundary, _}}) ->
+part(Buffer, Opts, Req=#{multipart := {Boundary, _}}) ->
case cow_multipart:parse_headers(Buffer, Boundary) of
more ->
{Data, Req2} = stream_multipart(Req, Opts),
@@ -550,10 +574,10 @@ part(Buffer, Opts, Req=#http_req{multipart={Boundary, _}}) ->
{Data, Req2} = stream_multipart(Req, Opts),
part(<< Buffer2/binary, Data/binary >>, Opts, Req2);
{ok, Headers, Rest} ->
- {ok, Headers, Req#http_req{multipart={Boundary, Rest}}};
+ {ok, Headers, Req#{multipart => {Boundary, Rest}}};
%% Ignore epilogue.
{done, _} ->
- {done, Req#http_req{multipart=undefined}}
+ {done, Req#{multipart => done}}
end.
-spec part_body(Req)
@@ -565,19 +589,22 @@ part_body(Req) ->
-spec part_body(Req, body_opts())
-> {ok, binary(), Req} | {more, binary(), Req}
when Req::req().
-part_body(Req=#http_req{multipart=undefined}, Opts) ->
- part_body(init_multipart(Req), Opts);
part_body(Req, Opts) ->
- part_body(<<>>, Opts, Req, <<>>).
+ case maps:is_key(multipart, Req) of
+ true ->
+ part_body(<<>>, Opts, Req, <<>>);
+ false ->
+ part_body(init_multipart(Req), Opts)
+ end.
-part_body(Buffer, Opts, Req=#http_req{multipart={Boundary, _}}, Acc) ->
+part_body(Buffer, Opts, Req=#{multipart := {Boundary, _}}, Acc) ->
ChunkLen = case lists:keyfind(length, 1, Opts) of
false -> 8000000;
{_, ChunkLen0} -> ChunkLen0
end,
case byte_size(Acc) > ChunkLen of
true ->
- {more, Acc, Req#http_req{multipart={Boundary, Buffer}}};
+ {more, Acc, Req#{multipart => {Boundary, Buffer}}};
false ->
{Data, Req2} = stream_multipart(Req, Opts),
case cow_multipart:parse_body(<< Buffer/binary, Data/binary >>, Boundary) of
@@ -591,21 +618,22 @@ part_body(Buffer, Opts, Req=#http_req{multipart={Boundary, _}}, Acc) ->
{ok, << Acc/binary, Body/binary >>, Req2};
{done, Body, Rest} ->
{ok, << Acc/binary, Body/binary >>,
- Req2#http_req{multipart={Boundary, Rest}}}
+ Req2#{multipart => {Boundary, Rest}}}
end
end.
init_multipart(Req) ->
{<<"multipart">>, _, Params} = parse_header(<<"content-type">>, Req),
{_, Boundary} = lists:keyfind(<<"boundary">>, 1, Params),
- Req#http_req{multipart={Boundary, <<>>}}.
+ Req#{multipart => {Boundary, <<>>}}.
-stream_multipart(Req=#http_req{body_state=BodyState, multipart={_, <<>>}}, Opts) ->
- true = BodyState =/= done,
- {_, Data, Req2} = body(Req, Opts),
+stream_multipart(Req=#{multipart := done}, _) ->
+ {<<>>, Req};
+stream_multipart(Req=#{multipart := {_, <<>>}}, Opts) ->
+ {_, Data, Req2} = read_body(Req, Opts),
{Data, Req2};
-stream_multipart(Req=#http_req{multipart={Boundary, Buffer}}, _) ->
- {Buffer, Req#http_req{multipart={Boundary, <<>>}}}.
+stream_multipart(Req=#{multipart := {Boundary, Buffer}}, _) ->
+ {Buffer, Req#{multipart => {Boundary, <<>>}}}.
%% Response API.
@@ -618,16 +646,22 @@ stream_multipart(Req=#http_req{multipart={Boundary, Buffer}}, _) ->
-> Req when Req::req().
set_resp_cookie(Name, Value, Opts, Req) ->
Cookie = cow_cookie:setcookie(Name, Value, Opts),
+ %% @todo Nah, keep separate.
set_resp_header(<<"set-cookie">>, Cookie, Req).
-spec set_resp_header(binary(), iodata(), Req)
-> Req when Req::req().
-set_resp_header(Name, Value, Req=#http_req{resp_headers=RespHeaders}) ->
- Req#http_req{resp_headers=[{Name, Value}|RespHeaders]}.
+set_resp_header(Name, Value, Req=#{resp_headers := RespHeaders}) ->
+ Req#{resp_headers => RespHeaders#{Name => Value}};
+set_resp_header(Name,Value, Req) ->
+ Req#{resp_headers => #{Name => Value}}.
+%% @todo {sendfile, Offset, Bytes, Path} tuple
-spec set_resp_body(iodata(), Req) -> Req when Req::req().
set_resp_body(Body, Req) ->
- Req#http_req{resp_body=Body}.
+ Req#{resp_body => Body}.
+%set_resp_body(Body, Req) ->
+% Req#http_req{resp_body=Body}.
-spec set_resp_body_fun(resp_body_fun(), Req) -> Req when Req::req().
set_resp_body_fun(StreamFun, Req) when is_function(StreamFun) ->
@@ -647,189 +681,217 @@ set_resp_body_fun(chunked, StreamFun, Req)
Req#http_req{resp_body={chunked, StreamFun}}.
-spec has_resp_header(binary(), req()) -> boolean().
-has_resp_header(Name, #http_req{resp_headers=RespHeaders}) ->
- lists:keymember(Name, 1, RespHeaders).
+has_resp_header(Name, #{resp_headers := RespHeaders}) ->
+ maps:is_key(Name, RespHeaders);
+has_resp_header(_, _) ->
+ false.
-spec has_resp_body(req()) -> boolean().
-has_resp_body(#http_req{resp_body=RespBody}) when is_function(RespBody) ->
- true;
-has_resp_body(#http_req{resp_body={chunked, _}}) ->
- true;
-has_resp_body(#http_req{resp_body={Length, _}}) ->
- Length > 0;
-has_resp_body(#http_req{resp_body=RespBody}) ->
- iolist_size(RespBody) > 0.
+has_resp_body(#{resp_body := {sendfile, Len, _}}) ->
+ Len > 0;
+has_resp_body(#{resp_body := RespBody}) ->
+ iolist_size(RespBody) > 0;
+has_resp_body(_) ->
+ false.
+
+%has_resp_body(#http_req{resp_body=RespBody}) when is_function(RespBody) ->
+% true;
+%has_resp_body(#http_req{resp_body={chunked, _}}) ->
+% true;
+%has_resp_body(#http_req{resp_body={Length, _}}) ->
+% Length > 0;
+%has_resp_body(#http_req{resp_body=RespBody}) ->
+% iolist_size(RespBody) > 0.
-spec delete_resp_header(binary(), Req)
-> Req when Req::req().
-delete_resp_header(Name, Req=#http_req{resp_headers=RespHeaders}) ->
- RespHeaders2 = lists:keydelete(Name, 1, RespHeaders),
- Req#http_req{resp_headers=RespHeaders2}.
+delete_resp_header(Name, Req=#{resp_headers := RespHeaders}) ->
+ Req#{resp_headers => maps:remove(Name, RespHeaders)}.
-spec reply(cowboy:http_status(), Req) -> Req when Req::req().
-reply(Status, Req=#http_req{resp_body=Body}) ->
- reply(Status, [], Body, Req).
+reply(Status, Req) ->
+ reply(Status, #{}, Req).
-spec reply(cowboy:http_status(), cowboy:http_headers(), Req)
-> Req when Req::req().
-reply(Status, Headers, Req=#http_req{resp_body=Body}) ->
- reply(Status, Headers, Body, Req).
+reply(Status, Headers, Req=#{resp_body := Body}) ->
+ reply(Status, Headers, Body, Req);
+reply(Status, Headers, Req) ->
+ reply(Status, Headers, <<>>, Req).
-spec reply(cowboy:http_status(), cowboy:http_headers(),
iodata() | resp_body_fun() | {non_neg_integer(), resp_body_fun()}
| {chunked, resp_chunked_fun()}, Req)
-> Req when Req::req().
-reply(Status, Headers, Body, Req=#http_req{
- socket=Socket, transport=Transport,
- version=Version, connection=Connection,
- method=Method, resp_compress=Compress,
- resp_state=RespState, resp_headers=RespHeaders})
- when RespState =:= waiting; RespState =:= waiting_stream ->
- HTTP11Headers = if
- Transport =/= cowboy_spdy, Version =:= 'HTTP/1.0', Connection =:= keepalive ->
- [{<<"connection">>, atom_to_connection(Connection)}];
- Transport =/= cowboy_spdy, Version =:= 'HTTP/1.1', Connection =:= close ->
- [{<<"connection">>, atom_to_connection(Connection)}];
- true ->
- []
- end,
- Req3 = case Body of
- BodyFun when is_function(BodyFun) ->
- %% We stream the response body until we close the connection.
- RespConn = close,
- {RespType, Req2} = if
- Transport =:= cowboy_spdy ->
- response(Status, Headers, RespHeaders, [
- {<<"date">>, cowboy_clock:rfc1123()},
- {<<"server">>, <<"Cowboy">>}
- ], stream, Req);
- true ->
- response(Status, Headers, RespHeaders, [
- {<<"connection">>, <<"close">>},
- {<<"date">>, cowboy_clock:rfc1123()},
- {<<"server">>, <<"Cowboy">>},
- {<<"transfer-encoding">>, <<"identity">>}
- ], <<>>, Req)
- end,
- if RespType =/= hook, Method =/= <<"HEAD">> ->
- BodyFun(Socket, Transport);
- true -> ok
- end,
- Req2#http_req{connection=RespConn};
- {chunked, BodyFun} ->
- %% We stream the response body in chunks.
- {RespType, Req2} = chunked_response(Status, Headers, Req),
- if RespType =/= hook, Method =/= <<"HEAD">> ->
- ChunkFun = fun(IoData) -> chunk(IoData, Req2) end,
- BodyFun(ChunkFun),
- %% Send the last chunk if chunked encoding was used.
- if
- Version =:= 'HTTP/1.0'; RespState =:= waiting_stream ->
- Req2;
- true ->
- last_chunk(Req2)
- end;
- true -> Req2
- end;
- {ContentLength, BodyFun} ->
- %% We stream the response body for ContentLength bytes.
- RespConn = response_connection(Headers, Connection),
- {RespType, Req2} = response(Status, Headers, RespHeaders, [
- {<<"content-length">>, integer_to_list(ContentLength)},
- {<<"date">>, cowboy_clock:rfc1123()},
- {<<"server">>, <<"Cowboy">>}
- |HTTP11Headers], stream, Req),
- if RespType =/= hook, Method =/= <<"HEAD">> ->
- BodyFun(Socket, Transport);
- true -> ok
- end,
- Req2#http_req{connection=RespConn};
- _ when Compress ->
- RespConn = response_connection(Headers, Connection),
- Req2 = reply_may_compress(Status, Headers, Body, Req,
- RespHeaders, HTTP11Headers, Method),
- Req2#http_req{connection=RespConn};
- _ ->
- RespConn = response_connection(Headers, Connection),
- Req2 = reply_no_compress(Status, Headers, Body, Req,
- RespHeaders, HTTP11Headers, Method, iolist_size(Body)),
- Req2#http_req{connection=RespConn}
- end,
- Req3#http_req{resp_state=done, resp_headers=[], resp_body= <<>>}.
-
-reply_may_compress(Status, Headers, Body, Req,
- RespHeaders, HTTP11Headers, Method) ->
- BodySize = iolist_size(Body),
- try parse_header(<<"accept-encoding">>, Req) of
- Encodings ->
- CanGzip = (BodySize > 300)
- andalso (false =:= lists:keyfind(<<"content-encoding">>,
- 1, Headers))
- andalso (false =:= lists:keyfind(<<"content-encoding">>,
- 1, RespHeaders))
- andalso (false =:= lists:keyfind(<<"transfer-encoding">>,
- 1, Headers))
- andalso (false =:= lists:keyfind(<<"transfer-encoding">>,
- 1, RespHeaders))
- andalso (Encodings =/= undefined)
- andalso (false =/= lists:keyfind(<<"gzip">>, 1, Encodings)),
- case CanGzip of
- true ->
- GzBody = zlib:gzip(Body),
- {_, Req2} = response(Status, Headers, RespHeaders, [
- {<<"content-length">>, integer_to_list(byte_size(GzBody))},
- {<<"content-encoding">>, <<"gzip">>},
- {<<"date">>, cowboy_clock:rfc1123()},
- {<<"server">>, <<"Cowboy">>}
- |HTTP11Headers],
- case Method of <<"HEAD">> -> <<>>; _ -> GzBody end,
- Req),
- Req2;
- false ->
- reply_no_compress(Status, Headers, Body, Req,
- RespHeaders, HTTP11Headers, Method, BodySize)
- end
- catch _:_ ->
- reply_no_compress(Status, Headers, Body, Req,
- RespHeaders, HTTP11Headers, Method, BodySize)
- end.
+reply(Status, Headers, Stream = {stream, undefined, _}, Req) ->
+ do_stream_reply(Status, Headers, Stream, Req);
+reply(Status, Headers, Stream = {stream, Len, _}, Req) ->
+ do_stream_reply(Status, Headers#{
+ <<"content-length">> => integer_to_binary(Len)
+ }, Stream, Req);
+reply(Status, Headers, SendFile = {sendfile, _, Len, _}, Req) ->
+ do_reply(Status, Headers#{
+ <<"content-length">> => integer_to_binary(Len)
+ }, SendFile, Req);
+reply(Status, Headers, Body, Req) ->
+ do_reply(Status, Headers#{
+ <<"content-length">> => integer_to_binary(iolist_size(Body))
+ }, Body, Req).
+
+do_stream_reply(Status, Headers, {stream, _, Fun}, Req=#{pid := Pid, streamid := StreamID}) ->
+ Pid ! {{Pid, StreamID}, {headers, Status, response_headers(Headers, Req)}},
+ Fun(),
+ ok.
+
+do_reply(Status, Headers, Body, Req=#{pid := Pid, streamid := StreamID}) ->
+ Pid ! {{Pid, StreamID}, {response, Status, response_headers(Headers, Req), Body}},
+ ok.
+
+send_body(Data, IsFin, #{pid := Pid, streamid := StreamID}) ->
+ Pid ! {{Pid, StreamID}, {data, IsFin, Data}},
+ ok.
-reply_no_compress(Status, Headers, Body, Req,
- RespHeaders, HTTP11Headers, Method, BodySize) ->
- {_, Req2} = response(Status, Headers, RespHeaders, [
- {<<"content-length">>, integer_to_list(BodySize)},
- {<<"date">>, cowboy_clock:rfc1123()},
- {<<"server">>, <<"Cowboy">>}
- |HTTP11Headers],
- case Method of <<"HEAD">> -> <<>>; _ -> Body end,
- Req),
- Req2.
+response_headers(Headers, Req) ->
+ RespHeaders = maps:get(resp_headers, Req, #{}),
+ maps:merge(#{
+ <<"date">> => cowboy_clock:rfc1123(),
+ <<"server">> => <<"Cowboy">>
+ }, maps:merge(RespHeaders, Headers)).
+
+%reply(Status, Headers, Body, Req=#http_req{
+% socket=Socket, transport=Transport,
+% version=Version, connection=Connection,
+% method=Method, resp_compress=Compress,
+% resp_state=RespState, resp_headers=RespHeaders})
+% when RespState =:= waiting; RespState =:= waiting_stream ->
+% Req3 = case Body of
+% BodyFun when is_function(BodyFun) ->
+% %% We stream the response body until we close the connection.
+% RespConn = close,
+% {RespType, Req2} = if
+% Transport =:= cowboy_spdy ->
+% response(Status, Headers, RespHeaders, [
+% {<<"date">>, cowboy_clock:rfc1123()},
+% {<<"server">>, <<"Cowboy">>}
+% ], stream, Req);
+% true ->
+% response(Status, Headers, RespHeaders, [
+% {<<"connection">>, <<"close">>},
+% {<<"date">>, cowboy_clock:rfc1123()},
+% {<<"server">>, <<"Cowboy">>},
+% {<<"transfer-encoding">>, <<"identity">>}
+% ], <<>>, Req)
+% end,
+% if RespType =/= hook, Method =/= <<"HEAD">> ->
+% BodyFun(Socket, Transport);
+% true -> ok
+% end,
+% Req2#http_req{connection=RespConn};
+% {chunked, BodyFun} ->
+% %% We stream the response body in chunks.
+% {RespType, Req2} = chunked_response(Status, Headers, Req),
+% if RespType =/= hook, Method =/= <<"HEAD">> ->
+% ChunkFun = fun(IoData) -> chunk(IoData, Req2) end,
+% BodyFun(ChunkFun),
+% %% Send the last chunk if chunked encoding was used.
+% if
+% Version =:= 'HTTP/1.0'; RespState =:= waiting_stream ->
+% Req2;
+% true ->
+% last_chunk(Req2)
+% end;
+% true -> Req2
+% end;
+% {ContentLength, BodyFun} ->
+% %% We stream the response body for ContentLength bytes.
+% RespConn = response_connection(Headers, Connection),
+% {RespType, Req2} = response(Status, Headers, RespHeaders, [
+% {<<"content-length">>, integer_to_list(ContentLength)},
+% {<<"date">>, cowboy_clock:rfc1123()},
+% {<<"server">>, <<"Cowboy">>}
+% |HTTP11Headers], stream, Req),
+% if RespType =/= hook, Method =/= <<"HEAD">> ->
+% BodyFun(Socket, Transport);
+% true -> ok
+% end,
+% Req2#http_req{connection=RespConn};
+% _ when Compress ->
+% RespConn = response_connection(Headers, Connection),
+% Req2 = reply_may_compress(Status, Headers, Body, Req,
+% RespHeaders, HTTP11Headers, Method),
+% Req2#http_req{connection=RespConn};
+% _ ->
+% RespConn = response_connection(Headers, Connection),
+% Req2 = reply_no_compress(Status, Headers, Body, Req,
+% RespHeaders, HTTP11Headers, Method, iolist_size(Body)),
+% Req2#http_req{connection=RespConn}
+% end,
+% Req3#http_req{resp_state=done, resp_headers=[], resp_body= <<>>}.
+
+%reply_may_compress(Status, Headers, Body, Req,
+% RespHeaders, HTTP11Headers, Method) ->
+% BodySize = iolist_size(Body),
+% try parse_header(<<"accept-encoding">>, Req) of
+% Encodings ->
+% CanGzip = (BodySize > 300)
+% andalso (false =:= lists:keyfind(<<"content-encoding">>,
+% 1, Headers))
+% andalso (false =:= lists:keyfind(<<"content-encoding">>,
+% 1, RespHeaders))
+% andalso (false =:= lists:keyfind(<<"transfer-encoding">>,
+% 1, Headers))
+% andalso (false =:= lists:keyfind(<<"transfer-encoding">>,
+% 1, RespHeaders))
+% andalso (Encodings =/= undefined)
+% andalso (false =/= lists:keyfind(<<"gzip">>, 1, Encodings)),
+% case CanGzip of
+% true ->
+% GzBody = zlib:gzip(Body),
+% {_, Req2} = response(Status, Headers, RespHeaders, [
+% {<<"content-length">>, integer_to_list(byte_size(GzBody))},
+% {<<"content-encoding">>, <<"gzip">>},
+% |HTTP11Headers],
+% case Method of <<"HEAD">> -> <<>>; _ -> GzBody end,
+% Req),
+% Req2;
+% false ->
+% reply_no_compress(Status, Headers, Body, Req,
+% RespHeaders, HTTP11Headers, Method, BodySize)
+% end
+% catch _:_ ->
+% reply_no_compress(Status, Headers, Body, Req,
+% RespHeaders, HTTP11Headers, Method, BodySize)
+% end.
+%
+%reply_no_compress(Status, Headers, Body, Req,
+% RespHeaders, HTTP11Headers, Method, BodySize) ->
+% {_, Req2} = response(Status, Headers, RespHeaders, [
+% {<<"content-length">>, integer_to_list(BodySize)},
+% |HTTP11Headers],
+% case Method of <<"HEAD">> -> <<>>; _ -> Body end,
+% Req),
+% Req2.
-spec chunked_reply(cowboy:http_status(), Req) -> Req when Req::req().
chunked_reply(Status, Req) ->
- chunked_reply(Status, [], Req).
+ chunked_reply(Status, #{}, Req).
-spec chunked_reply(cowboy:http_status(), cowboy:http_headers(), Req)
-> Req when Req::req().
-chunked_reply(Status, Headers, Req) ->
- {_, Req2} = chunked_response(Status, Headers, Req),
- Req2.
+chunked_reply(Status, Headers, Req=#{pid := Pid, streamid := StreamID}) ->
+ Pid ! {{Pid, StreamID}, {headers, Status, response_headers(Headers, Req)}},
+ Req. %% @todo return ok
+% ok.
-spec chunk(iodata(), req()) -> ok.
-chunk(_Data, #http_req{method= <<"HEAD">>}) ->
+chunk(_Data, #{method := <<"HEAD">>}) ->
ok;
-chunk(Data, #http_req{socket=Socket, transport=cowboy_spdy,
- resp_state=chunks}) ->
- cowboy_spdy:stream_data(Socket, Data);
-chunk(Data, #http_req{socket=Socket, transport=Transport,
- resp_state=stream}) ->
- ok = Transport:send(Socket, Data);
-chunk(Data, #http_req{socket=Socket, transport=Transport,
- resp_state=chunks}) ->
+chunk(Data, #{pid := Pid, streamid := StreamID}) ->
case iolist_size(Data) of
0 -> ok;
- Size -> Transport:send(Socket, [integer_to_list(Size, 16),
- <<"\r\n">>, Data, <<"\r\n">>])
+ _ ->
+ Pid ! {{Pid, StreamID}, {data, nofin, Data}},
+ ok
end.
%% If ever made public, need to send nothing if HEAD.
@@ -841,16 +903,6 @@ last_chunk(Req=#http_req{socket=Socket, transport=Transport}) ->
_ = Transport:send(Socket, <<"0\r\n\r\n">>),
Req#http_req{resp_state=done}.
--spec upgrade_reply(cowboy:http_status(), cowboy:http_headers(), Req)
- -> Req when Req::req().
-upgrade_reply(Status, Headers, Req=#http_req{transport=Transport,
- resp_state=waiting, resp_headers=RespHeaders})
- when Transport =/= cowboy_spdy ->
- {_, Req2} = response(Status, Headers, RespHeaders, [
- {<<"connection">>, <<"Upgrade">>}
- ], <<>>, Req),
- Req2#http_req{resp_state=done, resp_headers=[], resp_body= <<>>}.
-
-spec continue(req()) -> ok.
continue(#http_req{socket=Socket, transport=Transport,
version=Version}) ->
@@ -973,49 +1025,49 @@ to_list(Req) ->
%% Internal.
--spec chunked_response(cowboy:http_status(), cowboy:http_headers(), Req) ->
- {normal | hook, Req} when Req::req().
-chunked_response(Status, Headers, Req=#http_req{
- transport=cowboy_spdy, resp_state=waiting,
- resp_headers=RespHeaders}) ->
- {RespType, Req2} = response(Status, Headers, RespHeaders, [
- {<<"date">>, cowboy_clock:rfc1123()},
- {<<"server">>, <<"Cowboy">>}
- ], stream, Req),
- {RespType, Req2#http_req{resp_state=chunks,
- resp_headers=[], resp_body= <<>>}};
-chunked_response(Status, Headers, Req=#http_req{
- version=Version, connection=Connection,
- resp_state=RespState, resp_headers=RespHeaders})
- when RespState =:= waiting; RespState =:= waiting_stream ->
- RespConn = response_connection(Headers, Connection),
- HTTP11Headers = if
- Version =:= 'HTTP/1.0', Connection =:= keepalive ->
- [{<<"connection">>, atom_to_connection(Connection)}];
- Version =:= 'HTTP/1.0' -> [];
- true ->
- MaybeTE = if
- RespState =:= waiting_stream -> [];
- true -> [{<<"transfer-encoding">>, <<"chunked">>}]
- end,
- if
- Connection =:= close ->
- [{<<"connection">>, atom_to_connection(Connection)}|MaybeTE];
- true ->
- MaybeTE
- end
- end,
- RespState2 = if
- Version =:= 'HTTP/1.1', RespState =:= 'waiting' -> chunks;
- true -> stream
- end,
- {RespType, Req2} = response(Status, Headers, RespHeaders, [
- {<<"date">>, cowboy_clock:rfc1123()},
- {<<"server">>, <<"Cowboy">>}
- |HTTP11Headers], <<>>, Req),
- {RespType, Req2#http_req{connection=RespConn, resp_state=RespState2,
- resp_headers=[], resp_body= <<>>}}.
-
+%-spec chunked_response(cowboy:http_status(), cowboy:http_headers(), Req) ->
+% {normal | hook, Req} when Req::req().
+%chunked_response(Status, Headers, Req=#http_req{
+% transport=cowboy_spdy, resp_state=waiting,
+% resp_headers=RespHeaders}) ->
+% {RespType, Req2} = response(Status, Headers, RespHeaders, [
+% {<<"date">>, cowboy_clock:rfc1123()},
+% {<<"server">>, <<"Cowboy">>}
+% ], stream, Req),
+% {RespType, Req2#http_req{resp_state=chunks,
+% resp_headers=[], resp_body= <<>>}};
+%chunked_response(Status, Headers, Req=#http_req{
+% version=Version, connection=Connection,
+% resp_state=RespState, resp_headers=RespHeaders})
+% when RespState =:= waiting; RespState =:= waiting_stream ->
+% RespConn = response_connection(Headers, Connection),
+% HTTP11Headers = if
+% Version =:= 'HTTP/1.0', Connection =:= keepalive ->
+% [{<<"connection">>, atom_to_connection(Connection)}];
+% Version =:= 'HTTP/1.0' -> [];
+% true ->
+% MaybeTE = if
+% RespState =:= waiting_stream -> [];
+% true -> [{<<"transfer-encoding">>, <<"chunked">>}]
+% end,
+% if
+% Connection =:= close ->
+% [{<<"connection">>, atom_to_connection(Connection)}|MaybeTE];
+% true ->
+% MaybeTE
+% end
+% end,
+% RespState2 = if
+% Version =:= 'HTTP/1.1', RespState =:= 'waiting' -> chunks;
+% true -> stream
+% end,
+% {RespType, Req2} = response(Status, Headers, RespHeaders, [
+% {<<"date">>, cowboy_clock:rfc1123()},
+% {<<"server">>, <<"Cowboy">>}
+% |HTTP11Headers], <<>>, Req),
+% {RespType, Req2#http_req{connection=RespConn, resp_state=RespState2,
+% resp_headers=[], resp_body= <<>>}}.
+%
-spec response(cowboy:http_status(), cowboy:http_headers(),
cowboy:http_headers(), cowboy:http_headers(), stream | iodata(), Req)
-> {normal | hook, Req} when Req::req().
@@ -1063,20 +1115,20 @@ response(Status, Headers, RespHeaders, DefaultHeaders, Body, Req=#http_req{
hook
end,
{ReplyType, Req2}.
-
--spec response_connection(cowboy:http_headers(), keepalive | close)
- -> keepalive | close.
-response_connection([], Connection) ->
- Connection;
-response_connection([{Name, Value}|Tail], Connection) ->
- case Name of
- <<"connection">> ->
- Tokens = cow_http_hd:parse_connection(Value),
- connection_to_atom(Tokens);
- _ ->
- response_connection(Tail, Connection)
- end.
-
+%
+%-spec response_connection(cowboy:http_headers(), keepalive | close)
+% -> keepalive | close.
+%response_connection([], Connection) ->
+% Connection;
+%response_connection([{Name, Value}|Tail], Connection) ->
+% case Name of
+% <<"connection">> ->
+% Tokens = cow_http_hd:parse_connection(Value),
+% connection_to_atom(Tokens);
+% _ ->
+% response_connection(Tail, Connection)
+% end.
+%
-spec response_merge_headers(cowboy:http_headers(), cowboy:http_headers(),
cowboy:http_headers()) -> cowboy:http_headers().
response_merge_headers(Headers, RespHeaders, DefaultHeaders) ->
@@ -1105,13 +1157,13 @@ merge_headers(Headers, [{Name, Value}|Tail]) ->
false -> [{Name, Value}|Headers]
end,
merge_headers(Headers2, Tail).
-
--spec atom_to_connection(keepalive) -> <<_:80>>;
- (close) -> <<_:40>>.
-atom_to_connection(keepalive) ->
- <<"keep-alive">>;
-atom_to_connection(close) ->
- <<"close">>.
+%
+%-spec atom_to_connection(keepalive) -> <<_:80>>;
+% (close) -> <<_:40>>.
+%atom_to_connection(keepalive) ->
+% <<"keep-alive">>;
+%atom_to_connection(close) ->
+% <<"close">>.
%% We don't match on "keep-alive" since it is the default value.
-spec connection_to_atom([binary()]) -> keepalive | close.
diff --git a/src/cowboy_rest.erl b/src/cowboy_rest.erl
index 75e8e63..914b273 100644
--- a/src/cowboy_rest.erl
+++ b/src/cowboy_rest.erl
@@ -373,7 +373,7 @@ content_types_provided(Req, State) ->
try cowboy_req:parse_header(<<"accept">>, Req) of
undefined ->
languages_provided(
- cowboy_req:set_meta(media_type, {<<"text">>, <<"html">>, []}, Req),
+ Req#{media_type => {<<"text">>, <<"html">>, []}},
State2#state{content_type_a={{<<"text">>, <<"html">>, []}, to_html}});
Accept ->
choose_media_type(Req, State2, prioritize_accept(Accept))
@@ -392,7 +392,7 @@ content_types_provided(Req, State) ->
undefined ->
{PMT, _Fun} = HeadCTP = hd(CTP2),
languages_provided(
- cowboy_req:set_meta(media_type, PMT, Req2),
+ Req2#{media_type => PMT},
State2#state{content_type_a=HeadCTP});
Accept ->
choose_media_type(Req2, State2, prioritize_accept(Accept))
@@ -460,14 +460,14 @@ match_media_type_params(Req, State, _Accept,
[Provided = {{TP, STP, '*'}, _Fun}|_Tail],
{{_TA, _STA, Params_A}, _QA, _APA}) ->
PMT = {TP, STP, Params_A},
- languages_provided(cowboy_req:set_meta(media_type, PMT, Req),
+ languages_provided(Req#{media_type => PMT},
State#state{content_type_a=Provided});
match_media_type_params(Req, State, Accept,
[Provided = {PMT = {_TP, _STP, Params_P}, _Fun}|Tail],
MediaType = {{_TA, _STA, Params_A}, _QA, _APA}) ->
case lists:sort(Params_P) =:= lists:sort(Params_A) of
true ->
- languages_provided(cowboy_req:set_meta(media_type, PMT, Req),
+ languages_provided(Req#{media_type => PMT},
State#state{content_type_a=Provided});
false ->
match_media_type(Req, State, Accept, Tail, MediaType)
@@ -534,7 +534,7 @@ match_language(Req, State, Accept, [Provided|Tail],
set_language(Req, State=#state{language_a=Language}) ->
Req2 = cowboy_req:set_resp_header(<<"content-language">>, Language, Req),
- charsets_provided(cowboy_req:set_meta(language, Language, Req2), State).
+ charsets_provided(Req2#{language => Language}, State).
%% charsets_provided should return a list of binary values indicating
%% which charsets are accepted by the resource.
@@ -599,7 +599,7 @@ set_content_type(Req, State=#state{
Charset -> [ContentType, <<"; charset=">>, Charset]
end,
Req2 = cowboy_req:set_resp_header(<<"content-type">>, ContentType2, Req),
- encodings_provided(cowboy_req:set_meta(charset, Charset, Req2), State).
+ encodings_provided(Req2#{charset => Charset}, State).
set_content_type_build_params('*', []) ->
<<>>;
@@ -1012,7 +1012,7 @@ set_resp_body(Req, State=#state{content_type_a={_, Callback}}) ->
cowboy_req:set_resp_body_fun(Len, StreamFun, Req2);
{chunked, StreamFun} ->
cowboy_req:set_resp_body_fun(chunked, StreamFun, Req2);
- _Contents ->
+ _ ->
cowboy_req:set_resp_body(Body, Req2)
end,
multiple_choices(Req3, State2)
@@ -1152,7 +1152,7 @@ error_terminate(Req, #state{handler=Handler, handler_state=HandlerState},
{reason, Reason},
{mfa, {Handler, Callback, 2}},
{stacktrace, Stacktrace},
- {req, cowboy_req:to_list(Req)},
+ {req, Req},
{state, HandlerState}
]}).
diff --git a/src/cowboy_router.erl b/src/cowboy_router.erl
index b806da5..f07b307 100644
--- a/src/cowboy_router.erl
+++ b/src/cowboy_router.erl
@@ -159,14 +159,17 @@ compile_brackets_split(<< C, Rest/bits >>, Acc, N) ->
-spec execute(Req, Env)
-> {ok, Req, Env} | {stop, Req}
when Req::cowboy_req:req(), Env::cowboy_middleware:env().
-execute(Req, Env) ->
- {_, Dispatch} = lists:keyfind(dispatch, 1, Env),
- Host = cowboy_req:host(Req),
- Path = cowboy_req:path(Req),
+execute(Req=#{host := Host, path := Path}, Env=#{dispatch := Dispatch}) ->
case match(Dispatch, Host, Path) of
{ok, Handler, HandlerOpts, Bindings, HostInfo, PathInfo} ->
- Req2 = cowboy_req:set_bindings(HostInfo, PathInfo, Bindings, Req),
- {ok, Req2, [{handler, Handler}, {handler_opts, HandlerOpts}|Env]};
+ {ok, Req#{
+ host_info => HostInfo,
+ path_info => PathInfo,
+ bindings => Bindings
+ }, Env#{
+ handler => Handler,
+ handler_opts => HandlerOpts
+ }};
{error, notfound, host} ->
{stop, cowboy_req:reply(400, Req)};
{error, badrequest, path} ->
diff --git a/src/cowboy_static.erl b/src/cowboy_static.erl
index b2a8302..c771771 100644
--- a/src/cowboy_static.erl
+++ b/src/cowboy_static.erl
@@ -274,11 +274,4 @@ last_modified(Req, State={_, {ok, #file_info{mtime=Modified}}, _}) ->
-> {{stream, non_neg_integer(), fun()}, Req, State}
when State::state().
get_file(Req, State={Path, {ok, #file_info{size=Size}}, _}) ->
- Sendfile = fun (Socket, Transport) ->
- case Transport:sendfile(Socket, Path) of
- {ok, _} -> ok;
- {error, closed} -> ok;
- {error, etimedout} -> ok
- end
- end,
- {{stream, Size, Sendfile}, Req, State}.
+ {{sendfile, 0, Size, Path}, Req, State}.
diff --git a/src/cowboy_stream_h.erl b/src/cowboy_stream_h.erl
new file mode 100644
index 0000000..f924e28
--- /dev/null
+++ b/src/cowboy_stream_h.erl
@@ -0,0 +1,128 @@
+%% Copyright (c) 2016, Loïc Hoguin <[email protected]>
+%%
+%% Permission to use, copy, modify, and/or distribute this software for any
+%% purpose with or without fee is hereby granted, provided that the above
+%% copyright notice and this permission notice appear in all copies.
+%%
+%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+
+-module(cowboy_stream_h).
+%% @todo -behaviour(cowboy_stream).
+
+%% @todo Maybe have a callback for the type of process this is, worker or supervisor.
+-export([init/3]).
+-export([data/4]).
+-export([info/3]).
+-export([terminate/3]).
+
+-export([execute/3]).
+-export([resume/5]).
+
+-record(state, {
+ pid = undefined :: pid(),
+ read_body_ref = undefined :: reference(),
+ read_body_length = 0 :: non_neg_integer(),
+ read_body_is_fin = nofin :: nofin | fin,
+ read_body_buffer = <<>> :: binary()
+}).
+
+%% @todo For shutting down children we need to have a timeout before we terminate
+%% the stream like supervisors do. So here just send a message to yourself first,
+%% and then decide what to do when receiving this message.
+
+%% @todo proper specs
+-spec init(_,_,_) -> _.
+init(_StreamID, Req, Opts) ->
+ Env = maps:get(env, Opts, #{}),
+ Middlewares = maps:get(middlewares, Opts, [cowboy_router, cowboy_handler]),
+ Shutdown = maps:get(shutdown, Opts, 5000),
+ Pid = proc_lib:spawn_link(?MODULE, execute, [Req, Env, Middlewares]),
+ {[{spawn, Pid, Shutdown}], #state{pid=Pid}}.
+
+%% If we receive data and stream is waiting for data:
+%% If we accumulated enough data or IsFin=fin, send it.
+%% If not, buffer it.
+%% If not, buffer it.
+
+%% @todo proper specs
+-spec data(_,_,_,_) -> _.
+data(_StreamID, IsFin, Data, State=#state{read_body_ref=undefined, read_body_buffer=Buffer}) ->
+ {[], State#state{read_body_is_fin=IsFin, read_body_buffer= << Buffer/binary, Data/binary >>}};
+data(_StreamID, nofin, Data, State=#state{read_body_length=Length, read_body_buffer=Buffer}) when byte_size(Data) + byte_size(Buffer) < Length ->
+ {[], State#state{read_body_buffer= << Buffer/binary, Data/binary >>}};
+data(_StreamID, IsFin, Data, State=#state{pid=Pid, read_body_ref=Ref, read_body_buffer=Buffer}) ->
+ Pid ! {request_body, Ref, IsFin, << Buffer/binary, Data/binary >>},
+ {[], State#state{read_body_ref=undefined, read_body_buffer= <<>>}}.
+
+%% @todo proper specs
+-spec info(_,_,_) -> _.
+info(_StreamID, {'EXIT', Pid, normal}, State=#state{pid=Pid}) ->
+ {[stop], State};
+info(_StreamID, Reason = {'EXIT', Pid, _}, State=#state{pid=Pid}) ->
+ {[{internal_error, Reason, 'Stream process crashed.'}], State};
+%% Request body, no body buffer but IsFin=fin.
+info(_StreamID, {read_body, Ref, _}, State=#state{pid=Pid, read_body_is_fin=fin, read_body_buffer= <<>>}) ->
+ Pid ! {request_body, Ref, fin, <<>>},
+ {[], State};
+%% Request body, body buffered large enough or complete.
+info(_StreamID, {read_body, Ref, Length}, State=#state{pid=Pid, read_body_is_fin=IsFin, read_body_buffer=Data})
+ when element(1, IsFin) =:= fin; byte_size(Data) >= Length ->
+ Pid ! {request_body, Ref, IsFin, Data},
+ {[], State#state{read_body_buffer= <<>>}};
+%% Request body, not enough to send yet.
+info(_StreamID, {read_body, Ref, Length}, State) ->
+ {[{flow, Length}], State#state{read_body_ref=Ref, read_body_length=Length}};
+%% Response.
+info(_StreamID, Response = {response, _, _, _}, State) ->
+ {[Response], State};
+info(_StreamID, Headers = {headers, _, _}, State) ->
+ {[Headers], State};
+info(_StreamID, Data = {data, _, _}, State) ->
+ {[Data], State};
+info(_StreamID, SwitchProtocol = {switch_protocol, _, _, _}, State) ->
+ {[SwitchProtocol], State};
+%% Stray message.
+info(_StreamID, _Msg, State) ->
+ %% @todo Cleanup if no reply was sent when stream ends.
+ {[], State}.
+
+%% @todo proper specs
+-spec terminate(_,_,_) -> _.
+terminate(_StreamID, _Reason, _State) ->
+ ok.
+
+%% Request process.
+
+%% @todo
+%-spec execute(cowboy_req:req(), #state{}, cowboy_middleware:env(), [module()])
+% -> ok.
+-spec execute(_, _, _) -> _.
+execute(_, _, []) ->
+ ok; %% @todo Maybe error reason should differ here and there.
+execute(Req, Env, [Middleware|Tail]) ->
+ case Middleware:execute(Req, Env) of
+ {ok, Req2, Env2} ->
+ execute(Req2, Env2, Tail);
+ {suspend, Module, Function, Args} ->
+ proc_lib:hibernate(?MODULE, resume, [Env, Tail, Module, Function, Args]);
+ {stop, _Req2} ->
+ ok %% @todo Maybe error reason should differ here and there.
+ end.
+
+-spec resume(cowboy_middleware:env(), [module()],
+ module(), module(), [any()]) -> ok.
+resume(Env, Tail, Module, Function, Args) ->
+ case apply(Module, Function, Args) of
+ {ok, Req2, Env2} ->
+ execute(Req2, Env2, Tail);
+ {suspend, Module2, Function2, Args2} ->
+ proc_lib:hibernate(?MODULE, resume, [Env, Tail, Module2, Function2, Args2]);
+ {stop, _Req2} ->
+ ok %% @todo Maybe error reason should differ here and there.
+ end.
diff --git a/src/cowboy_tls.erl b/src/cowboy_tls.erl
index 745d502..0ccf733 100644
--- a/src/cowboy_tls.erl
+++ b/src/cowboy_tls.erl
@@ -13,6 +13,7 @@
%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
-module(cowboy_tls).
+-behavior(ranch_protocol).
-export([start_link/4]).
-export([init/5]).
diff --git a/src/cowboy_websocket.erl b/src/cowboy_websocket.erl
index 3c34908..9a2862e 100644
--- a/src/cowboy_websocket.erl
+++ b/src/cowboy_websocket.erl
@@ -18,6 +18,7 @@
-behaviour(cowboy_sub_protocol).
-export([upgrade/6]).
+-export([takeover/7]).
-export([handler_loop/4]).
-type terminate_reason() :: normal | stop | timeout
@@ -50,7 +51,6 @@
-optional_callbacks([terminate/3]).
-record(state, {
- env :: cowboy_middleware:env(),
socket = undefined :: inet:socket(),
transport = undefined :: module(),
handler :: module(),
@@ -65,25 +65,22 @@
extensions = #{} :: map()
}).
+%% Stream process.
+
-spec upgrade(Req, Env, module(), any(), timeout(), run | hibernate)
- -> {ok, Req, Env} | {suspend, module(), atom(), [any()]}
+ -> {ok, Req, Env}
when Req::cowboy_req:req(), Env::cowboy_middleware:env().
upgrade(Req, Env, Handler, HandlerState, Timeout, Hibernate) ->
- {_, Ref} = lists:keyfind(listener, 1, Env),
- ranch:remove_connection(Ref),
- [Socket, Transport] = cowboy_req:get([socket, transport], Req),
- State = #state{env=Env, socket=Socket, transport=Transport, handler=Handler,
- timeout=Timeout, hibernate=Hibernate =:= hibernate},
+ State = #state{handler=Handler, timeout=Timeout,
+ hibernate=Hibernate =:= hibernate},
+ %% @todo We need to fail if HTTP/2.
try websocket_upgrade(State, Req) of
{ok, State2, Req2} ->
- websocket_handshake(State2, Req2, HandlerState)
+ websocket_handshake(State2, Req2, HandlerState, Env)
catch _:_ ->
- receive
- {cowboy_req, resp_sent} -> ok
- after 0 ->
- _ = cowboy_req:reply(400, Req),
- exit(normal)
- end
+ %% @todo Test that we can have 2 /ws 400 status code in a row on the same connection.
+ cowboy_req:reply(400, Req),
+ {ok, Req, Env}
end.
-spec websocket_upgrade(#state{}, Req)
@@ -99,14 +96,15 @@ websocket_upgrade(State, Req) ->
orelse (IntVersion =:= 13),
Key = cowboy_req:header(<<"sec-websocket-key">>, Req),
false = Key =:= undefined,
- websocket_extensions(State#state{key=Key},
- cowboy_req:set_meta(websocket_version, IntVersion, Req)).
+ websocket_extensions(State#state{key=Key}, Req#{websocket_version => IntVersion}).
-spec websocket_extensions(#state{}, Req)
-> {ok, #state{}, Req} when Req::cowboy_req:req().
websocket_extensions(State, Req) ->
- [Compress] = cowboy_req:get([resp_compress], Req),
- Req2 = cowboy_req:set_meta(websocket_compress, false, Req),
+ %% @todo Proper options for this.
+% [Compress] = cowboy_req:get([resp_compress], Req),
+ Compress = false,
+ Req2 = Req#{websocket_compress => false},
case {Compress, cowboy_req:parse_header(<<"sec-websocket-extensions">>, Req2)} of
{true, Extensions} when Extensions =/= undefined ->
websocket_extensions(State, Req2, Extensions, []);
@@ -123,7 +121,7 @@ websocket_extensions(State=#state{extensions=Extensions}, Req, [{<<"permessage-d
Opts = #{level => best_compression, mem_level => 8, strategy => default},
case cow_ws:negotiate_permessage_deflate(Params, Extensions, Opts) of
{ok, RespExt, Extensions2} ->
- Req2 = cowboy_req:set_meta(websocket_compress, true, Req),
+ Req2 = Req#{websocket_compress => true},
websocket_extensions(State#state{extensions=Extensions2},
Req2, Tail, [<<", ">>, RespExt|RespHeader]);
ignore ->
@@ -143,33 +141,46 @@ websocket_extensions(State=#state{extensions=Extensions}, Req, [{<<"x-webkit-def
websocket_extensions(State, Req, [_|Tail], RespHeader) ->
websocket_extensions(State, Req, Tail, RespHeader).
--spec websocket_handshake(#state{}, Req, any())
- -> {ok, Req, cowboy_middleware:env()}
- | {suspend, module(), atom(), [any()]}
- when Req::cowboy_req:req().
-websocket_handshake(State=#state{transport=Transport, key=Key}, Req, HandlerState) ->
+-spec websocket_handshake(#state{}, Req, any(), Env)
+ -> {ok, Req, Env}
+ when Req::cowboy_req:req(), Env::cowboy_middleware:env().
+websocket_handshake(State=#state{transport=Transport, key=Key},
+ Req=#{pid := Pid, streamid := StreamID}, HandlerState, Env) ->
Challenge = base64:encode(crypto:hash(sha,
<< Key/binary, "258EAFA5-E914-47DA-95CA-C5AB0DC85B11" >>)),
- Req2 = cowboy_req:upgrade_reply(101, [
- {<<"upgrade">>, <<"websocket">>},
- {<<"sec-websocket-accept">>, Challenge}
- ], Req),
- %% Flush the resp_sent message before moving on.
- receive {cowboy_req, resp_sent} -> ok after 0 -> ok end,
- State2 = handler_loop_timeout(State),
+ Headers = #{
+ %% @todo Hmm should those be here or in cowboy_http?
+ <<"connection">> => <<"Upgrade">>,
+ <<"upgrade">> => <<"websocket">>,
+ <<"sec-websocket-accept">> => Challenge
+ },
+ Pid ! {{Pid, StreamID}, {switch_protocol, Headers, ?MODULE, {Req, State, HandlerState}}},
+ {ok, Req, Env}.
+
+%% Connection process.
+
+takeover(Parent, Ref, Socket, Transport, Opts, Buffer, {Req0, State=#state{handler=Handler}, HandlerState0}) ->
+ ranch:remove_connection(Ref),
+ %% @todo Remove Req from Websocket callbacks.
+ %% @todo Allow sending a reply from websocket_init.
+ %% @todo Try/catch.
+ {ok, Req, HandlerState} = case erlang:function_exported(Handler, websocket_init, 2) of
+ true -> Handler:websocket_init(Req0, HandlerState0);
+ false -> {ok, Req0, HandlerState0}
+ end,
+ State2 = handler_loop_timeout(State#state{socket=Socket, transport=Transport}),
handler_before_loop(State2#state{key=undefined,
- messages=Transport:messages()}, Req2, HandlerState, <<>>).
+ messages=Transport:messages()}, Req, HandlerState, Buffer).
-spec handler_before_loop(#state{}, Req, any(), binary())
-> {ok, Req, cowboy_middleware:env()}
- | {suspend, module(), atom(), [any()]}
when Req::cowboy_req:req().
handler_before_loop(State=#state{
socket=Socket, transport=Transport, hibernate=true},
Req, HandlerState, SoFar) ->
Transport:setopts(Socket, [{active, once}]),
- {suspend, ?MODULE, handler_loop,
- [State#state{hibernate=false}, Req, HandlerState, SoFar]};
+ proc_lib:hibernate(?MODULE, handler_loop,
+ [State#state{hibernate=false}, Req, HandlerState, SoFar]);
handler_before_loop(State=#state{socket=Socket, transport=Transport},
Req, HandlerState, SoFar) ->
Transport:setopts(Socket, [{active, once}]),
@@ -186,7 +197,6 @@ handler_loop_timeout(State=#state{timeout=Timeout, timeout_ref=PrevRef}) ->
-spec handler_loop(#state{}, Req, any(), binary())
-> {ok, Req, cowboy_middleware:env()}
- | {suspend, module(), atom(), [any()]}
when Req::cowboy_req:req().
handler_loop(State=#state{socket=Socket, messages={OK, Closed, Error},
timeout_ref=TRef}, Req, HandlerState, SoFar) ->
@@ -210,7 +220,6 @@ handler_loop(State=#state{socket=Socket, messages={OK, Closed, Error},
-spec websocket_data(#state{}, Req, any(), binary())
-> {ok, Req, cowboy_middleware:env()}
- | {suspend, module(), atom(), [any()]}
when Req::cowboy_req:req().
websocket_data(State=#state{frag_state=FragState, extensions=Extensions}, Req, HandlerState, Data) ->
case cow_ws:parse_header(Data, Extensions, FragState) of
@@ -298,7 +307,6 @@ websocket_dispatch(State=#state{socket=Socket, transport=Transport, frag_state=F
-spec handler_call(#state{}, Req, any(), binary(), atom(), any(), fun())
-> {ok, Req, cowboy_middleware:env()}
- | {suspend, module(), atom(), [any()]}
when Req::cowboy_req:req().
handler_call(State=#state{handler=Handler}, Req, HandlerState,
RemainingData, Callback, Message, NextState) ->
@@ -358,7 +366,7 @@ handler_call(State=#state{handler=Handler}, Req, HandlerState,
{mfa, {Handler, Callback, 3}},
{stacktrace, erlang:get_stacktrace()},
{msg, Message},
- {req, cowboy_req:to_list(Req)},
+ {req, Req},
{state, HandlerState}
]})
end.
@@ -407,7 +415,7 @@ websocket_close(State=#state{socket=Socket, transport=Transport, extensions=Exte
-spec handler_terminate(#state{}, Req, any(), terminate_reason())
-> {ok, Req, cowboy_middleware:env()}
when Req::cowboy_req:req().
-handler_terminate(#state{env=Env, handler=Handler},
+handler_terminate(#state{handler=Handler},
Req, HandlerState, Reason) ->
cowboy_handler:terminate(Reason, Req, HandlerState, Handler),
- {ok, Req, [{result, closed}|Env]}.
+ exit(normal).