aboutsummaryrefslogtreecommitdiffstats
path: root/src/cowboy_http.erl
diff options
context:
space:
mode:
authorLoïc Hoguin <[email protected]>2016-02-10 17:28:32 +0100
committerLoïc Hoguin <[email protected]>2016-03-05 20:20:42 +0100
commitb370442a6352c5acb13b88e135c32ca1720095bd (patch)
tree29485cb24f1b5208b5482358e2c659c9de39bdb3 /src/cowboy_http.erl
parentdbb636034f20736e16eb9d6c809217c9525b6cbd (diff)
downloadcowboy-b370442a6352c5acb13b88e135c32ca1720095bd.tar.gz
cowboy-b370442a6352c5acb13b88e135c32ca1720095bd.tar.bz2
cowboy-b370442a6352c5acb13b88e135c32ca1720095bd.zip
Initial commit with connection/streams
Breaking changes with previous commit. This is a very large change, and I am giving up on making a single commit that fixes everything. More commits will follow slowly adding back features, introducing new tests and fixing the documentation. This change contains most of the work toward unifying the interface for handling both HTTP/1.1 and HTTP/2. HTTP/1.1 connections are now no longer 1 process per connection; instead by default 1 process per request is also created. This has a number of pros and cons. Because it has cons, we also allow users to use a lower-level API that acts on "streams" (requests/responses) directly at the connection process-level. If performance is a concern, one can always write a stream handler. The performance in this case will be even greater than with Cowboy 1, although all the special handlers are unavailable. When switching to Websocket, after the handler returns from init/2, Cowboy stops the stream and the Websocket protocol takes over the connection process. Websocket then calls websocket_init/2 for any additional initialization such as timers, because the process is different in init/2 and websocket_*/* functions. This however would allow us to use websocket_init/2 for sending messages on connect, instead of sending ourselves a message and be subject to races. Note that websocket_init/2 is optional. This is all a big change and while most of the tests pass, some functionality currently doesn't. SPDY is broken and will be removed soon in favor of HTTP/2. Automatic compression is currently disabled. The cowboy_req interface probably still have a few functions that need to be updated. The docs and examples do not refer the current functionality anymore. Everything will be fixed over time. Feedback is more than welcome. Open a ticket!
Diffstat (limited to 'src/cowboy_http.erl')
-rw-r--r--src/cowboy_http.erl973
1 files changed, 973 insertions, 0 deletions
diff --git a/src/cowboy_http.erl b/src/cowboy_http.erl
new file mode 100644
index 0000000..3ec3c17
--- /dev/null
+++ b/src/cowboy_http.erl
@@ -0,0 +1,973 @@
+%% Copyright (c) 2016, Loïc Hoguin <[email protected]>
+%%
+%% Permission to use, copy, modify, and/or distribute this software for any
+%% purpose with or without fee is hereby granted, provided that the above
+%% copyright notice and this permission notice appear in all copies.
+%%
+%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+
+-module(cowboy_http).
+
+-export([init/6]).
+
+-export([system_continue/3]).
+-export([system_terminate/4]).
+-export([system_code_change/4]).
+
+%% @todo map
+-type opts() :: [{compress, boolean()}
+ | {env, cowboy_middleware:env()}
+ | {max_empty_lines, non_neg_integer()}
+ | {max_header_name_length, non_neg_integer()}
+ | {max_header_value_length, non_neg_integer()}
+ | {max_headers, non_neg_integer()}
+ | {max_keepalive, non_neg_integer()}
+ | {max_request_line_length, non_neg_integer()}
+ | {middlewares, [module()]}
+ | {onresponse, cowboy:onresponse_fun()}
+ | {timeout, timeout()}].
+-export_type([opts/0]).
+
+-record(ps_request_line, {
+ empty_lines = 0 :: non_neg_integer()
+}).
+
+-record(ps_header, {
+ method = undefined :: binary(),
+ path = undefined :: binary(),
+ qs = undefined :: binary(),
+ version = undefined :: cowboy:http_version(),
+ headers = undefined :: map() | undefined, %% @todo better type than map()
+ name = undefined :: binary()
+}).
+
+%% @todo We need a state where we wait for the stream process to ask for the body.
+%% OR DO WE
+
+%% In HTTP/2 we start receiving data before the body asks for it, even if optionally
+%% (and by default), so we need to be able to do the same for HTTP/1.1 too. This means
+%% that when we receive data (up to a certain limit, we read from the socket and decode.
+%% When we reach a limit, we stop reading from the socket momentarily until the stream
+%% process asks for more or the stream ends.
+
+%% This means that we need to keep a buffer in the stream handler (until the stream
+%% process asks for it). And that we need the body state to indicate how much we have
+%% left to read (and stop/start reading from the socket depending on value).
+
+-record(ps_body, {
+ %% @todo flow
+ transfer_decode_fun :: fun(), %% @todo better type
+ transfer_decode_state :: any() %% @todo better type
+}).
+
+-record(stream, {
+ %% Stream identifier.
+ id = undefined :: cowboy_stream:streamid(),
+
+ %% Stream handler state.
+ state = undefined :: any(),
+
+ %% Client HTTP version for this stream.
+ version = undefined :: cowboy:http_version(),
+
+ %% Commands queued.
+ queue = [] :: [] %% @todo better type
+}).
+
+-type stream() :: #stream{}.
+
+-record(state, {
+ parent :: pid(),
+ ref :: ranch:ref(),
+ socket :: inet:socket(),
+ transport :: module(),
+ opts = #{} :: map(),
+ handler :: module(),
+
+ timer = undefined :: undefined | reference(),
+
+ %% Identifier for the stream currently being read (or waiting to be received).
+ in_streamid = 1 :: pos_integer(),
+
+ %% Parsing state for the current stream or stream-to-be.
+ in_state = #ps_request_line{} :: #ps_request_line{} | #ps_header{} | #ps_body{},
+
+ %% Identifier for the stream currently being written.
+ %% Note that out_streamid =< in_streamid.
+ out_streamid = 1 :: pos_integer(),
+
+ %% Whether we finished writing data for the current stream.
+ out_state = wait :: wait | headers | chunked,
+
+ %% The connection will be closed after this stream.
+ last_streamid = undefined :: pos_integer(),
+
+ %% Currently active HTTP/1.1 streams. Streams may be initiated either
+ %% by the client or by the server through PUSH_PROMISE frames.
+ streams = [] :: [stream()],
+
+ %% Children which are in the process of shutting down.
+ children = [] :: [{pid(), cowboy_stream:streamid(), timeout()}]
+
+ %% @todo Automatic compression. (compress option?)
+ %% @todo onresponse? Equivalent using streams.
+}).
+
+-include_lib("cowlib/include/cow_inline.hrl").
+-include_lib("cowlib/include/cow_parse.hrl").
+
+-spec init(pid(), ranch:ref(), inet:socket(), module(), cowboy:opts(), module()) -> ok.
+init(Parent, Ref, Socket, Transport, Opts, Handler) ->
+ LastStreamID = maps:get(max_keepalive, Opts, 100),
+ before_loop(set_request_timeout(#state{
+ parent=Parent, ref=Ref, socket=Socket,
+ transport=Transport, opts=Opts, handler=Handler,
+ last_streamid=LastStreamID}), <<>>).
+
+%% @todo Send a response depending on in_state and whether one was already sent.
+
+%% @todo
+%% Timeouts:
+%% - waiting for new request (if no stream is currently running)
+%% -> request_timeout: for whole request/headers, set at init/when we set ps_request_line{} state
+%% - waiting for body (if a stream requested the body to be read)
+%% -> read_body_timeout: amount of time we wait without receiving any data when reading the body
+%% - if we skip the body, skip only for a specific duration
+%% -> skip_body_timeout: also have a skip_body_length
+%% - none if we have a stream running and it didn't request the body to be read
+%% - global
+%% -> inactivity_timeout: max time to wait without anything happening before giving up
+
+before_loop(State=#state{socket=Socket, transport=Transport}, Buffer) ->
+ %% @todo disable this when we get to the body, until the stream asks for it?
+ %% Perhaps have a threshold for how much we're willing to read before waiting.
+ Transport:setopts(Socket, [{active, once}]),
+ loop(State, Buffer).
+
+loop(State=#state{parent=Parent, socket=Socket, transport=Transport,
+ handler=_Handler, timer=TimerRef, children=Children}, Buffer) ->
+ {OK, Closed, Error} = Transport:messages(),
+ receive
+ %% Socket messages.
+ {OK, Socket, Data} ->
+ parse(<< Buffer/binary, Data/binary >>, State);
+ {Closed, Socket} ->
+ terminate(State, {socket_error, closed, 'The socket has been closed.'});
+ {Error, Socket, Reason} ->
+ terminate(State, {socket_error, Reason, 'An error has occurred on the socket.'});
+ %% Timeouts.
+ {timeout, TimerRef, Reason} ->
+ timeout(State, Reason);
+ {timeout, _, _} ->
+ loop(State, Buffer);
+ %% System messages.
+ {'EXIT', Parent, Reason} ->
+ exit(Reason);
+ {system, From, Request} ->
+ sys:handle_system_msg(Request, From, Parent, ?MODULE, [], {State, Buffer});
+ %% Messages pertaining to a stream.
+ {{Pid, StreamID}, Msg} when Pid =:= self() ->
+ loop(info(State, StreamID, Msg), Buffer);
+ %% Exit signal from children.
+ Msg = {'EXIT', Pid, _} ->
+ loop(down(State, Pid, Msg), Buffer);
+ %% Calls from supervisor module.
+ {'$gen_call', {From, Tag}, which_children} ->
+ Workers = [{?MODULE, Pid, worker, [?MODULE]} || {Pid, _, _} <- Children],
+ From ! {Tag, Workers},
+ loop(State, Buffer);
+ {'$gen_call', {From, Tag}, count_children} ->
+ NbChildren = length(Children),
+ Counts = [{specs, 1}, {active, NbChildren},
+ {supervisors, 0}, {workers, NbChildren}],
+ From ! {Tag, Counts},
+ loop(State, Buffer);
+ {'$gen_call', {From, Tag}, _} ->
+ From ! {Tag, {error, ?MODULE}},
+ loop(State, Buffer);
+ %% Unknown messages.
+ Msg ->
+ error_logger:error_msg("Received stray message ~p.", [Msg]),
+ loop(State, Buffer)
+ %% @todo Configurable timeout. This should be a global inactivity timeout
+ %% that triggers when really nothing happens (ie something went really wrong).
+ after 300000 ->
+ terminate(State, {internal_error, timeout, 'No message or data received before timeout.'})
+ end.
+
+set_request_timeout(State0=#state{timer=TimerRef0, opts=Opts}) ->
+ State = cancel_request_timeout(State0),
+ Timeout = maps:get(request_timeout, Opts, 5000),
+ TimerRef = erlang:start_timer(Timeout, self(), request_timeout),
+ State#state{timer=TimerRef}.
+
+cancel_request_timeout(State=#state{timer=TimerRef, opts=Opts}) ->
+ ok = case TimerRef of
+ undefined -> ok;
+ _ -> erlang:cancel_timer(TimerRef, [{async, true}, {info, false}])
+ end,
+ State#state{timer=undefined}.
+
+%% @todo Honestly it would be much better if we didn't enable pipelining yet.
+timeout(State=#state{in_state=#ps_request_line{}}, request_timeout) ->
+ %% @todo If other streams are running, just set the connection to be closed
+ %% and stop trying to read from the socket?
+ terminate(State, {connection_error, timeout, 'No request-line received before timeout.'});
+timeout(State=#state{socket=Socket, transport=Transport, in_state=#ps_header{}}, request_timeout) ->
+ %% @todo If other streams are running, maybe wait for their reply before sending 408?
+ %% -> Definitely. Either way, stop reading from the socket and make that stream the last.
+ Transport:send(Socket, cow_http:response(408, 'HTTP/1.1', [])),
+ terminate(State, {connection_error, timeout, 'Request headers not received before timeout.'}).
+
+%% Request-line.
+parse(<<>>, State) ->
+ before_loop(State, <<>>);
+parse(Buffer, State=#state{in_state=#ps_request_line{empty_lines=EmptyLines}}) ->
+ after_parse(parse_request(Buffer, State, EmptyLines));
+parse(Buffer, State=#state{in_state=PS=#ps_header{headers=Headers, name=undefined}}) ->
+ after_parse(parse_header(Buffer,
+ State#state{in_state=PS#ps_header{headers=undefined}},
+ Headers));
+parse(Buffer, State=#state{in_state=PS=#ps_header{headers=Headers, name=Name}}) ->
+ after_parse(parse_hd_before_value(Buffer,
+ State#state{in_state=PS#ps_header{headers=undefined, name=undefined}},
+ Headers, Name));
+parse(Buffer, State=#state{in_state=#ps_body{}}) ->
+ %% @todo We do not want to get the body automatically if the request doesn't ask for it.
+ %% We may want to get bodies that are below a threshold without waiting, and buffer them
+ %% until the request asks, though.
+
+ %% @todo Transfer-decoding must be done here.
+ after_parse(parse_body(Buffer, State)).
+%% @todo Don't parse if body is finished but request isn't. Let's not parallelize for now.
+
+after_parse({request, Req=#{streamid := StreamID, headers := Headers, version := Version},
+ State0=#state{handler=Handler, opts=Opts, streams=Streams0}, Buffer}) ->
+ %% @todo Opts at the end. Maybe pass the same Opts we got?
+ try Handler:init(StreamID, Req, Opts) of
+ {Commands, StreamState} ->
+ Streams = [#stream{id=StreamID, state=StreamState, version=Version}|Streams0],
+ State = case maybe_req_close(State0, Headers, Version) of
+ close -> State0#state{streams=Streams, last_streamid=StreamID};
+ keepalive -> State0#state{streams=Streams}
+ end,
+ parse(Buffer, commands(State, StreamID, Commands))
+ catch Class:Reason ->
+ error_logger:error_msg("Exception occurred in ~s:init(~p, ~p, ~p) "
+ "with reason ~p:~p.",
+ [Handler, StreamID, Req, Opts, Class, Reason]),
+ ok
+ %% @todo Status code.
+% stream_reset(State, StreamID, {internal_error, {Class, Reason},
+% 'Exception occurred in StreamHandler:init/10 call.'}) %% @todo Check final arity.
+ end;
+%% Streams are sequential so the body is always about the last stream created
+%% unless that stream has terminated.
+after_parse({data, StreamID, IsFin, Data, State=#state{handler=Handler,
+ streams=Streams0=[Stream=#stream{id=StreamID, state=StreamState0}|_]}, Buffer}) ->
+ try Handler:data(StreamID, IsFin, Data, StreamState0) of
+ {Commands, StreamState} ->
+ Streams = lists:keyreplace(StreamID, #stream.id, Streams0,
+ Stream#stream{state=StreamState}),
+ parse(Buffer, commands(State#state{streams=Streams}, StreamID, Commands))
+ catch Class:Reason ->
+ error_logger:error_msg("Exception occurred in ~s:data(~p, ~p, ~p, ~p) with reason ~p:~p.",
+ [Handler, StreamID, IsFin, Data, StreamState0, Class, Reason]),
+ ok
+ %% @todo
+% stream_reset(State, StreamID, {internal_error, {Class, Reason},
+% 'Exception occurred in StreamHandler:data/4 call.'})
+ end;
+%% No corresponding stream, skip.
+after_parse({data, _, _, _, State, Buffer}) ->
+ before_loop(State, Buffer);
+after_parse({more, State, Buffer}) ->
+ before_loop(State, Buffer).
+
+%% Request-line.
+
+-spec parse_request(binary(), #state{}, non_neg_integer()) -> ok.
+%% Empty lines must be using \r\n.
+parse_request(<< $\n, _/bits >>, State, _) ->
+ error_terminate(400, State, {connection_error, protocol_error,
+ ''}); %% @todo
+parse_request(<< $\s, _/bits >>, State, _) ->
+ error_terminate(400, State, {connection_error, protocol_error,
+ ''}); %% @todo
+%% We limit the length of the Request-line to MaxLength to avoid endlessly
+%% reading from the socket and eventually crashing.
+parse_request(Buffer, State=#state{opts=Opts}, EmptyLines) ->
+ MaxLength = maps:get(max_request_line_length, Opts, 8000),
+ MaxEmptyLines = maps:get(max_empty_lines, Opts, 5),
+ case match_eol(Buffer, 0) of
+ nomatch when byte_size(Buffer) > MaxLength ->
+ error_terminate(414, State, {connection_error, limit_reached,
+ ''}); %% @todo
+ nomatch ->
+ {more, State#state{in_state=#ps_request_line{empty_lines=EmptyLines}}, Buffer};
+ 1 when EmptyLines =:= MaxEmptyLines ->
+ error_terminate(400, State, {connection_error, limit_reached,
+ ''}); %% @todo
+ 1 ->
+ << _:16, Rest/bits >> = Buffer,
+ parse_request(Rest, State, EmptyLines + 1);
+ _ ->
+ case Buffer of
+ %% @todo * is only for server-wide OPTIONS request (RFC7230 5.3.4); tests
+ << "OPTIONS * ", Rest/bits >> ->
+ parse_version(Rest, State, <<"OPTIONS">>, <<"*">>, <<>>);
+% << "CONNECT ", Rest/bits >> ->
+% parse_authority( %% @todo
+ _ ->
+ parse_method(Buffer, State, <<>>,
+ maps:get(max_method_length, Opts, 32))
+ end
+ end.
+
+match_eol(<< $\n, _/bits >>, N) ->
+ N;
+match_eol(<< _, Rest/bits >>, N) ->
+ match_eol(Rest, N + 1);
+match_eol(_, _) ->
+ nomatch.
+
+parse_method(_, State, _, 0) ->
+ error_terminate(501, State, {connection_error, limit_reached,
+ 'The method name is longer than configuration allows. (RFC7230 3.1.1)'});
+parse_method(<< C, Rest/bits >>, State, SoFar, Remaining) ->
+ case C of
+ $\r -> error_terminate(400, State, {connection_error, protocol_error,
+ ''}); %% @todo
+ $\s -> parse_uri(Rest, State, SoFar);
+ _ when ?IS_TOKEN(C) -> parse_method(Rest, State, << SoFar/binary, C >>, Remaining - 1);
+ _ -> error_terminate(400, State, {connection_error, protocol_error,
+ 'The method name must contain only valid token characters. (RFC7230 3.1.1)'})
+ end.
+
+parse_uri(<< H, T, T, P, "://", Rest/bits >>, State, Method)
+ when H =:= $h orelse H =:= $H, T =:= $t orelse T =:= $T;
+ P =:= $p orelse P =:= $P ->
+ parse_uri_skip_host(Rest, State, Method);
+parse_uri(<< H, T, T, P, S, "://", Rest/bits >>, State, Method)
+ when H =:= $h orelse H =:= $H, T =:= $t orelse T =:= $T;
+ P =:= $p orelse P =:= $P; S =:= $s orelse S =:= $S ->
+ parse_uri_skip_host(Rest, State, Method);
+parse_uri(<< $/, Rest/bits >>, State, Method) ->
+ parse_uri_path(Rest, State, Method, << $/ >>);
+parse_uri(_, State, _) ->
+ error_terminate(400, State, {connection_error, protocol_error,
+ 'Invalid request-line or request-target. (RFC7230 3.1.1, RFC7230 5.3)'}).
+
+parse_uri_skip_host(<< C, Rest/bits >>, State, Method) ->
+ case C of
+ $\r -> error_terminate(400, State, {connection_error, protocol_error,
+ ''}); %% @todo
+ $/ -> parse_uri_path(Rest, State, Method, <<"/">>);
+ $\s -> parse_version(Rest, State, Method, <<"/">>, <<>>);
+ $? -> parse_uri_query(Rest, State, Method, <<"/">>, <<>>);
+ $# -> skip_uri_fragment(Rest, State, Method, <<"/">>, <<>>);
+ _ -> parse_uri_skip_host(Rest, State, Method)
+ end.
+
+parse_uri_path(<< C, Rest/bits >>, State, Method, SoFar) ->
+ case C of
+ $\r -> error_terminate(400, State, {connection_error, protocol_error,
+ ''}); %% @todo
+ $\s -> parse_version(Rest, State, Method, SoFar, <<>>);
+ $? -> parse_uri_query(Rest, State, Method, SoFar, <<>>);
+ $# -> skip_uri_fragment(Rest, State, Method, SoFar, <<>>);
+ _ -> parse_uri_path(Rest, State, Method, << SoFar/binary, C >>)
+ end.
+
+parse_uri_query(<< C, Rest/bits >>, State, M, P, SoFar) ->
+ case C of
+ $\r -> error_terminate(400, State, {connection_error, protocol_error,
+ ''}); %% @todo
+ $\s -> parse_version(Rest, State, M, P, SoFar);
+ $# -> skip_uri_fragment(Rest, State, M, P, SoFar);
+ _ -> parse_uri_query(Rest, State, M, P, << SoFar/binary, C >>)
+ end.
+
+skip_uri_fragment(<< C, Rest/bits >>, State, M, P, Q) ->
+ case C of
+ $\r -> error_terminate(400, State, {connection_error, protocol_error,
+ ''}); %% @todo
+ $\s -> parse_version(Rest, State, M, P, Q);
+ _ -> skip_uri_fragment(Rest, State, M, P, Q)
+ end.
+
+%% @todo Calls to parse_header should update the state.
+parse_version(<< "HTTP/1.1\r\n", Rest/bits >>, State, M, P, Q) ->
+ parse_headers(Rest, State, M, P, Q, 'HTTP/1.1');
+parse_version(<< "HTTP/1.0\r\n", Rest/bits >>, State, M, P, Q) ->
+ parse_headers(Rest, State, M, P, Q, 'HTTP/1.0');
+parse_version(<< "HTTP/1.", _, C, _/bits >>, State, _, _, _) when C =:= $\s; C =:= $\t ->
+ error_terminate(400, State, {connection_error, protocol_error,
+ 'Whitespace is not allowed after the HTTP version. (RFC7230 3.1.1)'});
+parse_version(<< C, _/bits >>, State, _, _, _) when C =:= $\s; C =:= $\t ->
+ error_terminate(400, State, {connection_error, protocol_error,
+ 'The separator between request target and version must be a single SP.'});
+parse_version(_, State, _, _, _) ->
+ error_terminate(505, State, {connection_error, protocol_error,
+ ''}). %% @todo
+
+parse_headers(Rest, State, M, P, Q, V) ->
+ %% @todo Figure out the parse states.
+ parse_header(Rest, State#state{in_state=#ps_header{
+ method=M, path=P, qs=Q, version=V}}, #{}).
+
+%% Headers.
+
+%% We need two or more bytes in the buffer to continue.
+parse_header(Rest, State=#state{in_state=PS}, Headers) when byte_size(Rest) < 2 ->
+ {more, State#state{in_state=PS#ps_header{headers=Headers}}, Rest};
+parse_header(<< $\r, $\n, Rest/bits >>, S, Headers) ->
+ request(Rest, S, Headers);
+parse_header(Buffer, State=#state{opts=Opts, in_state=PS}, Headers) ->
+ MaxLength = maps:get(max_header_name_length, Opts, 64),
+ MaxHeaders = maps:get(max_headers, Opts, 100),
+ case match_colon(Buffer, 0) of
+ nomatch when byte_size(Buffer) > MaxLength ->
+ error_terminate(400, State, {connection_error, limit_reached,
+ ''}); %% @todo
+ nomatch when length(Headers) >= MaxHeaders ->
+ error_terminate(400, State, {connection_error, limit_reached,
+ ''}); %% @todo
+ nomatch ->
+ {more, State#state{in_state=PS#ps_header{headers=Headers}}, Buffer};
+ _ ->
+ parse_hd_name(Buffer, State, Headers, <<>>)
+ end.
+
+match_colon(<< $:, _/bits >>, N) ->
+ N;
+match_colon(<< _, Rest/bits >>, N) ->
+ match_colon(Rest, N + 1);
+match_colon(_, _) ->
+ nomatch.
+
+parse_hd_name(<< $:, Rest/bits >>, State, H, SoFar) ->
+ parse_hd_before_value(Rest, State, H, SoFar);
+parse_hd_name(<< C, _/bits >>, State, _, <<>>) when ?IS_WS(C) ->
+ error_terminate(400, State, {connection_error, protocol_error,
+ ''}); %% @todo
+parse_hd_name(<< C, Rest/bits >>, State, H, SoFar) when ?IS_WS(C) ->
+ parse_hd_name_ws(Rest, State, H, SoFar);
+parse_hd_name(<< C, Rest/bits >>, State, H, SoFar) ->
+ ?LOWER(parse_hd_name, Rest, State, H, SoFar).
+
+parse_hd_name_ws(<< C, Rest/bits >>, S, H, Name) ->
+ case C of
+ $\s -> parse_hd_name_ws(Rest, S, H, Name);
+ $\t -> parse_hd_name_ws(Rest, S, H, Name);
+ $: -> parse_hd_before_value(Rest, S, H, Name)
+ end.
+
+parse_hd_before_value(<< $\s, Rest/bits >>, S, H, N) ->
+ parse_hd_before_value(Rest, S, H, N);
+parse_hd_before_value(<< $\t, Rest/bits >>, S, H, N) ->
+ parse_hd_before_value(Rest, S, H, N);
+parse_hd_before_value(Buffer, State=#state{opts=Opts, in_state=PS}, H, N) ->
+ MaxLength = maps:get(max_header_value_length, Opts, 4096),
+ case match_eol(Buffer, 0) of
+ nomatch when byte_size(Buffer) > MaxLength ->
+ error_terminate(400, State, {connection_error, limit_reached,
+ ''}); %% @todo
+ nomatch ->
+ {more, State#state{in_state=PS#ps_header{headers=H, name=N}}, Buffer};
+ _ ->
+ parse_hd_value(Buffer, State, H, N, <<>>)
+ end.
+
+parse_hd_value(<< $\r, $\n, Rest/bits >>, S, Headers, Name, SoFar) ->
+ %% @todo What to do about duplicate header names.
+ parse_header(Rest, S, Headers#{Name => clean_value_ws_end(SoFar, byte_size(SoFar) - 1)});
+parse_hd_value(<< C, Rest/bits >>, S, H, N, SoFar) ->
+ parse_hd_value(Rest, S, H, N, << SoFar/binary, C >>).
+
+clean_value_ws_end(_, -1) ->
+ <<>>;
+clean_value_ws_end(Value, N) ->
+ case binary:at(Value, N) of
+ $\s -> clean_value_ws_end(Value, N - 1);
+ $\t -> clean_value_ws_end(Value, N - 1);
+ _ ->
+ S = N + 1,
+ << Value2:S/binary, _/bits >> = Value,
+ Value2
+ end.
+
+-ifdef(TEST).
+clean_value_ws_end_test_() ->
+ Tests = [
+ {<<>>, <<>>},
+ {<<" ">>, <<>>},
+ {<<"text/*;q=0.3, text/html;q=0.7, text/html;level=1, "
+ "text/html;level=2;q=0.4, */*;q=0.5 \t \t ">>,
+ <<"text/*;q=0.3, text/html;q=0.7, text/html;level=1, "
+ "text/html;level=2;q=0.4, */*;q=0.5">>}
+ ],
+ [{V, fun() -> R = clean_value_ws_end(V, byte_size(V) - 1) end} || {V, R} <- Tests].
+
+horse_clean_value_ws_end() ->
+ horse:repeat(200000,
+ clean_value_ws_end(
+ <<"text/*;q=0.3, text/html;q=0.7, text/html;level=1, "
+ "text/html;level=2;q=0.4, */*;q=0.5 ">>,
+ byte_size(<<"text/*;q=0.3, text/html;q=0.7, text/html;level=1, "
+ "text/html;level=2;q=0.4, */*;q=0.5 ">>) - 1)
+ ).
+-endif.
+
+request(Buffer, State=#state{transport=Transport, in_streamid=StreamID,
+ in_state=#ps_header{version=Version}}, Headers) ->
+ case maps:get(<<"host">>, Headers, undefined) of
+ undefined when Version =:= 'HTTP/1.1' ->
+ %% @todo Might want to not close the connection on this and next one.
+ error_terminate(400, State, {stream_error, StreamID, protocol_error,
+ ''}); %% @todo
+ undefined ->
+ request(Buffer, State, Headers, <<>>, default_port(Transport:secure()));
+ RawHost ->
+ try parse_host(RawHost, false, <<>>) of
+ {Host, undefined} ->
+ request(Buffer, State, Headers, Host, default_port(Transport:secure()));
+ {Host, Port} ->
+ request(Buffer, State, Headers, Host, Port)
+ catch _:_ ->
+ error_terminate(400, State, {stream_error, StreamID, protocol_error,
+ ''}) %% @todo
+ end
+ end.
+
+-spec default_port(boolean()) -> 80 | 443.
+default_port(true) -> 443;
+default_port(_) -> 80.
+
+%% @todo Yeah probably just call the cowlib function.
+%% Same code as cow_http:parse_fullhost/1, but inline because we
+%% really want this to go fast.
+parse_host(<< $[, Rest/bits >>, false, <<>>) ->
+ parse_host(Rest, true, << $[ >>);
+parse_host(<<>>, false, Acc) ->
+ {Acc, undefined};
+parse_host(<< $:, Rest/bits >>, false, Acc) ->
+ {Acc, list_to_integer(binary_to_list(Rest))};
+parse_host(<< $], Rest/bits >>, true, Acc) ->
+ parse_host(Rest, false, << Acc/binary, $] >>);
+parse_host(<< C, Rest/bits >>, E, Acc) ->
+ ?LOWER(parse_host, Rest, E, Acc).
+
+%% End of request parsing.
+
+%% @todo We used to get the peername here, bad idea, should
+%% get it at the very start of the connection, or the first
+%% time requested if we go the route of handler sending a
+%% message to get it (we probably shouldn't).
+request(Buffer, State0=#state{ref=Ref, transport=Transport, in_streamid=StreamID,
+ in_state=#ps_header{method=Method, path=Path, qs=Qs, version=Version}},
+ Headers, Host, Port) ->
+ Scheme = case Transport:secure() of
+ true -> <<"https">>;
+ false -> <<"http">>
+ end,
+ {HasBody, BodyLength, TDecodeFun, TDecodeState} = case Headers of
+ #{<<"content-length">> := <<"0">>} ->
+ {false, 0, undefined, undefined};
+ #{<<"content-length">> := BinLength} ->
+ Length = try
+ cow_http_hd:parse_content_length(BinLength)
+ catch _:_ ->
+ error_terminate(400, State0, {stream_error, StreamID, protocol_error,
+ ''}) %% @todo
+ %% @todo Err should terminate here...
+ end,
+ {true, Length, fun cow_http_te:stream_identity/2, {0, Length}};
+ %% @todo Better handling of transfer decoding.
+ #{<<"transfer-encoding">> := <<"chunked">>} ->
+ {true, undefined, fun cow_http_te:stream_chunked/2, {0, 0}};
+ _ ->
+ {false, 0, undefined, undefined}
+ end,
+ Req = #{
+ ref => Ref,
+ pid => self(),
+ streamid => StreamID,
+
+ %% @todo peer
+ %% @todo sockname
+ %% @todo ssl client cert?
+
+ method => Method,
+ scheme => Scheme,
+ host => Host,
+ %% host_info (cowboy_router)
+ port => Port,
+ path => Path,
+ %% path_info (cowboy_router)
+ %% bindings (cowboy_router)
+ qs => Qs,
+ version => Version,
+ %% We are transparently taking care of transfer-encodings so
+ %% the user code has no need to know about it.
+ headers => maps:remove(<<"transfer-encoding">>, Headers),
+
+ has_body => HasBody,
+ body_length => BodyLength
+ %% @todo multipart? keep state separate
+
+ %% meta values (cowboy_websocket, cowboy_rest)
+ },
+ State = case HasBody of
+ true ->
+ cancel_request_timeout(State0#state{in_state=#ps_body{
+ %% @todo Don't need length anymore?
+ transfer_decode_fun = TDecodeFun,
+ transfer_decode_state = TDecodeState
+ }});
+ false ->
+ set_request_timeout(State0#state{in_streamid=StreamID + 1, in_state=#ps_request_line{}})
+ end,
+ {request, Req, State, Buffer}.
+
+%% Request body parsing.
+
+parse_body(Buffer, State=#state{in_streamid=StreamID, in_state=
+ PS=#ps_body{transfer_decode_fun=TDecode, transfer_decode_state=TState0}}) ->
+ %% @todo Proper trailers.
+ case TDecode(Buffer, TState0) of
+ more ->
+ %% @todo Asks for 0 or more bytes.
+ {more, State, Buffer};
+ {more, Data, TState} ->
+ %% @todo Asks for 0 or more bytes.
+ {data, StreamID, nofin, Data, State#state{in_state=
+ PS#ps_body{transfer_decode_state=TState}}, <<>>};
+ {more, Data, _Length, TState} when is_integer(_Length) ->
+ %% @todo Asks for Length more bytes.
+ {data, StreamID, nofin, Data, State#state{in_state=
+ PS#ps_body{transfer_decode_state=TState}}, <<>>};
+ {more, Data, Rest, TState} ->
+ %% @todo Asks for 0 or more bytes.
+ {data, StreamID, nofin, Data, State#state{in_state=
+ PS#ps_body{transfer_decode_state=TState}}, Rest};
+ {done, TotalLength, Rest} ->
+ {data, StreamID, {fin, TotalLength}, <<>>, set_request_timeout(
+ State#state{in_streamid=StreamID + 1, in_state=#ps_request_line{}}), Rest};
+ {done, Data, TotalLength, Rest} ->
+ {data, StreamID, {fin, TotalLength}, Data, set_request_timeout(
+ State#state{in_streamid=StreamID + 1, in_state=#ps_request_line{}}), Rest}
+ end.
+
+%% Message handling.
+
+down(State=#state{children=Children0}, Pid, Msg) ->
+ case lists:keytake(Pid, 1, Children0) of
+ {value, {_, undefined, _}, Children} ->
+ State#state{children=Children};
+ {value, {_, StreamID, _}, Children} ->
+ info(State#state{children=Children}, StreamID, Msg);
+ false ->
+ error_logger:error_msg("Received EXIT signal ~p for unknown process ~p.", [Msg, Pid]),
+ State
+ end.
+
+info(State=#state{handler=Handler, streams=Streams0}, StreamID, Msg) ->
+ case lists:keyfind(StreamID, #stream.id, Streams0) of
+ Stream = #stream{state=StreamState0} ->
+ try Handler:info(StreamID, Msg, StreamState0) of
+ {Commands, StreamState} ->
+ Streams = lists:keyreplace(StreamID, #stream.id, Streams0,
+ Stream#stream{state=StreamState}),
+ commands(State#state{streams=Streams}, StreamID, Commands)
+ catch Class:Reason ->
+ error_logger:error_msg("Exception occurred in ~s:info(~p, ~p, ~p) with reason ~p:~p.",
+ [Handler, StreamID, Msg, StreamState0, Class, Reason]),
+ ok
+%% @todo
+% stream_reset(State, StreamID, {internal_error, {Class, Reason},
+% 'Exception occurred in StreamHandler:info/3 call.'})
+ end;
+ false ->
+ error_logger:error_msg("Received message ~p for unknown stream ~p.", [Msg, StreamID]),
+ State
+ end.
+
+%% @todo commands/3
+%% @todo stream_reset
+
+
+
+
+%% Commands.
+
+commands(State, _, []) ->
+ State;
+%% Supervise a child process.
+commands(State=#state{children=Children}, StreamID, [{spawn, Pid, Shutdown}|Tail]) ->
+ commands(State#state{children=[{Pid, StreamID, Shutdown}|Children]}, StreamID, Tail);
+%% Error handling.
+commands(State, StreamID, [Error = {internal_error, _, _}|Tail]) ->
+ commands(stream_reset(State, StreamID, Error), StreamID, Tail);
+%% Commands for a stream currently inactive.
+commands(State=#state{out_streamid=Current, streams=Streams0}, StreamID, Commands)
+ when Current =/= StreamID ->
+
+ %% @todo We still want to handle some commands...
+
+ Stream = #stream{queue=Queue} = lists:keyfind(StreamID, #stream.id, Streams0),
+ Streams = lists:keyreplace(StreamID, #stream.id, Streams0,
+ Stream#stream{queue=Queue ++ Commands}),
+ State#state{streams=Streams};
+%% Read the request body.
+commands(State, StreamID, [{flow, _Length}|Tail]) ->
+ %% @todo We only read from socket if buffer is empty, otherwise
+ %% we decode the buffer.
+
+ %% @todo Set the body reading length to min(Length, BodyLength)
+
+ commands(State, StreamID, Tail);
+%% @todo Probably a good idea to have an atomic response send (single send call for resp+body).
+%% Send a full response.
+%%
+%% @todo Kill the stream if it sent a response when one has already been sent.
+%% @todo Keep IsFin in the state.
+%% @todo Same two things above apply to DATA, possibly promise too.
+commands(State0=#state{socket=Socket, transport=Transport, streams=Streams}, StreamID,
+ [{response, StatusCode, Headers0, Body}|Tail]) ->
+ %% @todo I'm pretty sure the last stream in the list is the one we want
+ %% considering all others are queued.
+ #stream{version=Version} = lists:keyfind(StreamID, #stream.id, Streams),
+ {State, Headers} = connection(State0, Headers0, StreamID, Version),
+ %% @todo Ensure content-length is set.
+ Response = cow_http:response(StatusCode, 'HTTP/1.1', maps:to_list(Headers)),
+ case Body of
+ {sendfile, O, B, P} ->
+ Transport:send(Socket, Response),
+ commands(State#state{out_state=done}, StreamID, [{sendfile, fin, O, B, P}|Tail]);
+ _ ->
+ Transport:send(Socket, [Response, Body]),
+ %% @todo If max number of requests, close connection.
+ %% @todo If IsFin, maybe skip body of current request.
+ maybe_terminate(State#state{out_state=done}, StreamID, Tail, fin)
+ end;
+%% Send response headers and initiate chunked encoding.
+commands(State0=#state{socket=Socket, transport=Transport, streams=Streams}, StreamID,
+ [{headers, StatusCode, Headers0}|Tail]) ->
+ %% @todo Same as above.
+ #stream{version=Version} = lists:keyfind(StreamID, #stream.id, Streams),
+ {State1, Headers1} = case Version of
+ 'HTTP/1.1' ->
+ {State0, Headers0#{<<"transfer-encoding">> => <<"chunked">>}};
+ %% Close the connection after streaming the data to HTTP/1.0 client.
+ %% @todo I'm guessing we need to differentiate responses with a content-length and others.
+ 'HTTP/1.0' ->
+ {State0#state{last_streamid=StreamID}, Headers0}
+ end,
+ {State, Headers} = connection(State1, Headers1, StreamID, Version),
+ Transport:send(Socket, cow_http:response(StatusCode, 'HTTP/1.1', maps:to_list(Headers))),
+ commands(State#state{out_state=chunked}, StreamID, Tail);
+%% Send a response body chunk.
+%%
+%% @todo WINDOW_UPDATE stuff require us to buffer some data.
+commands(State=#state{socket=Socket, transport=Transport, streams=Streams}, StreamID,
+ [{data, IsFin, Data}|Tail]) ->
+ %% @todo Same as above.
+ Headers1 = case lists:keyfind(StreamID, #stream.id, Streams) of
+ #stream{version='HTTP/1.1'} ->
+ Size = iolist_size(Data),
+ Transport:send(Socket, [integer_to_list(Size, 16), <<"\r\n">>, Data, <<"\r\n">>]);
+ #stream{version='HTTP/1.0'} ->
+ Transport:send(Socket, Data)
+ end,
+ maybe_terminate(State, StreamID, Tail, IsFin);
+%% Send a file.
+commands(State=#state{socket=Socket, transport=Transport}, StreamID,
+ [{sendfile, IsFin, Offset, Bytes, Path}|Tail]) ->
+ Transport:sendfile(Socket, Path, Offset, Bytes),
+ maybe_terminate(State, StreamID, Tail, IsFin);
+%% Protocol takeover.
+commands(State0=#state{ref=Ref, parent=Parent, socket=Socket, transport=Transport,
+ opts=Opts, children=Children}, StreamID,
+ [{switch_protocol, Headers, Protocol, InitialState}|_Tail]) ->
+ %% @todo This should be the last stream running otherwise we need to wait before switching.
+ %% @todo If there's streams opened after this one, fail instead of 101.
+ State = cancel_request_timeout(State0),
+ %% @todo When we actually do the upgrade, we only have the one stream left, plus
+ %% possibly some processes terminating. We need a smart strategy for handling the
+ %% children shutdown. We can start with brutal_kill and discarding the EXIT messages
+ %% received before switching to Websocket. Something better would be to let the
+ %% stream processes finish but that implies the Websocket module to know about
+ %% them and filter the messages. For now, kill them all and discard all messages
+ %% in the mailbox.
+ _ = [exit(Pid, kill) || {Pid, _, _} <- Children],
+ flush(),
+ %% Everything good, upgrade!
+ _ = commands(State, StreamID, [{response, 101, Headers, <<>>}]),
+ %% @todo This is no good because commands return a state normally and here it doesn't
+ %% we need to let this module go entirely. Perhaps it should be handled directly in
+ %% cowboy_clear/cowboy_tls? Perhaps not. We do want that Buffer.
+ Protocol:takeover(Parent, Ref, Socket, Transport, Opts, <<>>, InitialState);
+%% Stream shutdown.
+commands(State, StreamID, [stop|Tail]) ->
+ %% @todo Do we want to run the commands after a stop?
+% commands(stream_terminate(State, StreamID, stop), StreamID, Tail).
+ maybe_terminate(State, StreamID, Tail, fin).
+
+flush() ->
+ receive _ -> flush() after 0 -> ok end.
+
+maybe_terminate(State, StreamID, Tail, nofin) ->
+ commands(State, StreamID, Tail);
+%% @todo In these cases I'm not sure if we should continue processing commands.
+maybe_terminate(State=#state{last_streamid=StreamID}, StreamID, _Tail, fin) ->
+ terminate(stream_terminate(State, StreamID, normal), normal); %% @todo Reason ok?
+maybe_terminate(State, StreamID, _Tail, fin) ->
+ stream_terminate(State, StreamID, normal).
+
+stream_reset(State=#state{socket=Socket, transport=Transport}, StreamID,
+ StreamError={internal_error, _, _}) ->
+ %% @todo headers
+ %% @todo Don't send this if there are no streams left.
+ Transport:send(Socket, cow_http:response(500, 'HTTP/1.1', [
+ {<<"content-length">>, <<"0">>}
+ ])),
+ %% @todo update IsFin local
+ stream_terminate(State#state{out_state=done}, StreamID, StreamError).
+
+stream_terminate(State=#state{socket=Socket, transport=Transport, handler=Handler,
+ out_streamid=OutStreamID, out_state=OutState,
+ streams=Streams0, children=Children0}, StreamID, Reason) ->
+ {value, #stream{state=StreamState, version=Version}, Streams}
+ = lists:keytake(StreamID, #stream.id, Streams0),
+ _ = case OutState of
+ wait ->
+ Transport:send(Socket, cow_http:response(204, 'HTTP/1.1', []));
+ chunked when Version =:= 'HTTP/1.1' ->
+ Transport:send(Socket, <<"0\r\n\r\n">>);
+ _ -> %% done or Version =:= 'HTTP/1.0'
+ ok
+ end,
+
+ stream_call_terminate(StreamID, Reason, Handler, StreamState),
+%% @todo initiate children shutdown
+% Children = stream_terminate_children(Children0, StreamID, []),
+ Children = [case C of
+ {Pid, StreamID, Shutdown} -> {Pid, undefined, Shutdown};
+ _ -> C
+ end || C <- Children0],
+
+ %% @todo Skip the body, if any, or drop the connection if too large.
+
+ %% @todo Only do this if Current =:= StreamID.
+ NextOutStreamID = OutStreamID + 1,
+ case lists:keyfind(NextOutStreamID, #stream.id, Streams) of
+ false ->
+ %% @todo This is clearly wrong, if the stream is gone we need to check if
+ %% there used to be such a stream, and if there was to send an error.
+ State#state{out_streamid=NextOutStreamID, out_state=wait, streams=Streams, children=Children};
+ #stream{queue=Commands} ->
+ %% @todo Remove queue from the stream.
+ commands(State#state{out_streamid=NextOutStreamID, out_state=wait,
+ streams=Streams, children=Children}, NextOutStreamID, Commands)
+ end.
+
+%% @todo Taken directly from _http2
+stream_call_terminate(StreamID, Reason, Handler, StreamState) ->
+ try
+ Handler:terminate(StreamID, Reason, StreamState),
+ ok
+ catch Class:Reason ->
+ error_logger:error_msg("Exception occurred in ~s:terminate(~p, ~p, ~p) with reason ~p:~p.",
+ [Handler, StreamID, Reason, StreamState, Class, Reason])
+ end.
+
+%stream_terminate_children([], _, Acc) ->
+% Acc;
+%stream_terminate_children([{Pid, StreamID}|Tail], StreamID, Acc) ->
+% exit(Pid, kill),
+% stream_terminate_children(Tail, StreamID, Acc);
+%stream_terminate_children([Child|Tail], StreamID, Acc) ->
+% stream_terminate_children(Tail, StreamID, [Child|Acc]).
+
+
+%% @todo max_reqs also
+maybe_req_close(_, #{<<"connection">> := Conn}, 'HTTP/1.0') ->
+ Conns = cow_http_hd:parse_connection(Conn),
+ case lists:member(<<"keep-alive">>, Conns) of
+ true -> keepalive;
+ false -> close
+ end;
+maybe_req_close(_, _, 'HTTP/1.0') ->
+ close;
+maybe_req_close(_, #{<<"connection">> := Conn}, 'HTTP/1.1') ->
+ case connection_hd_is_close(Conn) of
+ true -> close;
+ false -> keepalive
+ end;
+maybe_req_close(_State, _, _) ->
+ keepalive.
+
+connection(State=#state{last_streamid=StreamID}, Headers=#{<<"connection">> := Conn}, StreamID, _) ->
+ case connection_hd_is_close(Conn) of
+ true -> {State, Headers};
+ %% @todo Here we need to remove keep-alive and add close, not just add close.
+ false -> {State, Headers#{<<"connection">> => [<<"close, ">>, Conn]}}
+ end;
+connection(State=#state{last_streamid=StreamID}, Headers, StreamID, _) ->
+ {State, Headers#{<<"connection">> => <<"close">>}};
+connection(State, Headers=#{<<"connection">> := Conn}, StreamID, _) ->
+ case connection_hd_is_close(Conn) of
+ true -> {State#state{last_streamid=StreamID}, Headers};
+ %% @todo Here we need to set keep-alive only if it wasn't set before.
+ false -> {State, Headers}
+ end;
+connection(State, Headers, _, 'HTTP/1.0') ->
+ {State, Headers#{<<"connection">> => <<"keep-alive">>}};
+connection(State, Headers, _, _) ->
+ {State, Headers}.
+
+connection_hd_is_close(Conn) ->
+ Conns = cow_http_hd:parse_connection(iolist_to_binary(Conn)),
+ lists:member(<<"close">>, Conns).
+
+error_terminate(StatusCode, State=#state{socket=Socket, transport=Transport}, Reason) ->
+ Transport:send(Socket, cow_http:response(StatusCode, 'HTTP/1.1', [
+ {<<"content-length">>, <<"0">>}
+ ])),
+ terminate(State, Reason).
+
+terminate(_State, _Reason) ->
+ exit(normal). %% @todo
+
+
+
+
+
+
+
+
+
+
+
+
+
+%% System callbacks.
+
+-spec system_continue(_, _, #state{}) -> ok.
+system_continue(_, _, {State, Buffer}) ->
+ loop(State, Buffer).
+
+-spec system_terminate(any(), _, _, _) -> no_return().
+system_terminate(Reason, _, _, _) ->
+ exit(Reason).
+
+-spec system_code_change(Misc, _, _, _) -> {ok, Misc} when Misc::{#state{}, binary()}.
+system_code_change(Misc, _, _, _) ->
+ {ok, Misc}.