diff options
Diffstat (limited to 'src/cowboy_http.erl')
-rw-r--r-- | src/cowboy_http.erl | 973 |
1 files changed, 973 insertions, 0 deletions
diff --git a/src/cowboy_http.erl b/src/cowboy_http.erl new file mode 100644 index 0000000..3ec3c17 --- /dev/null +++ b/src/cowboy_http.erl @@ -0,0 +1,973 @@ +%% Copyright (c) 2016, Loïc Hoguin <[email protected]> +%% +%% Permission to use, copy, modify, and/or distribute this software for any +%% purpose with or without fee is hereby granted, provided that the above +%% copyright notice and this permission notice appear in all copies. +%% +%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES +%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF +%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR +%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES +%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN +%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF +%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + +-module(cowboy_http). + +-export([init/6]). + +-export([system_continue/3]). +-export([system_terminate/4]). +-export([system_code_change/4]). + +%% @todo map +-type opts() :: [{compress, boolean()} + | {env, cowboy_middleware:env()} + | {max_empty_lines, non_neg_integer()} + | {max_header_name_length, non_neg_integer()} + | {max_header_value_length, non_neg_integer()} + | {max_headers, non_neg_integer()} + | {max_keepalive, non_neg_integer()} + | {max_request_line_length, non_neg_integer()} + | {middlewares, [module()]} + | {onresponse, cowboy:onresponse_fun()} + | {timeout, timeout()}]. +-export_type([opts/0]). + +-record(ps_request_line, { + empty_lines = 0 :: non_neg_integer() +}). + +-record(ps_header, { + method = undefined :: binary(), + path = undefined :: binary(), + qs = undefined :: binary(), + version = undefined :: cowboy:http_version(), + headers = undefined :: map() | undefined, %% @todo better type than map() + name = undefined :: binary() +}). + +%% @todo We need a state where we wait for the stream process to ask for the body. +%% OR DO WE + +%% In HTTP/2 we start receiving data before the body asks for it, even if optionally +%% (and by default), so we need to be able to do the same for HTTP/1.1 too. This means +%% that when we receive data (up to a certain limit, we read from the socket and decode. +%% When we reach a limit, we stop reading from the socket momentarily until the stream +%% process asks for more or the stream ends. + +%% This means that we need to keep a buffer in the stream handler (until the stream +%% process asks for it). And that we need the body state to indicate how much we have +%% left to read (and stop/start reading from the socket depending on value). + +-record(ps_body, { + %% @todo flow + transfer_decode_fun :: fun(), %% @todo better type + transfer_decode_state :: any() %% @todo better type +}). + +-record(stream, { + %% Stream identifier. + id = undefined :: cowboy_stream:streamid(), + + %% Stream handler state. + state = undefined :: any(), + + %% Client HTTP version for this stream. + version = undefined :: cowboy:http_version(), + + %% Commands queued. + queue = [] :: [] %% @todo better type +}). + +-type stream() :: #stream{}. + +-record(state, { + parent :: pid(), + ref :: ranch:ref(), + socket :: inet:socket(), + transport :: module(), + opts = #{} :: map(), + handler :: module(), + + timer = undefined :: undefined | reference(), + + %% Identifier for the stream currently being read (or waiting to be received). + in_streamid = 1 :: pos_integer(), + + %% Parsing state for the current stream or stream-to-be. + in_state = #ps_request_line{} :: #ps_request_line{} | #ps_header{} | #ps_body{}, + + %% Identifier for the stream currently being written. + %% Note that out_streamid =< in_streamid. + out_streamid = 1 :: pos_integer(), + + %% Whether we finished writing data for the current stream. + out_state = wait :: wait | headers | chunked, + + %% The connection will be closed after this stream. + last_streamid = undefined :: pos_integer(), + + %% Currently active HTTP/1.1 streams. Streams may be initiated either + %% by the client or by the server through PUSH_PROMISE frames. + streams = [] :: [stream()], + + %% Children which are in the process of shutting down. + children = [] :: [{pid(), cowboy_stream:streamid(), timeout()}] + + %% @todo Automatic compression. (compress option?) + %% @todo onresponse? Equivalent using streams. +}). + +-include_lib("cowlib/include/cow_inline.hrl"). +-include_lib("cowlib/include/cow_parse.hrl"). + +-spec init(pid(), ranch:ref(), inet:socket(), module(), cowboy:opts(), module()) -> ok. +init(Parent, Ref, Socket, Transport, Opts, Handler) -> + LastStreamID = maps:get(max_keepalive, Opts, 100), + before_loop(set_request_timeout(#state{ + parent=Parent, ref=Ref, socket=Socket, + transport=Transport, opts=Opts, handler=Handler, + last_streamid=LastStreamID}), <<>>). + +%% @todo Send a response depending on in_state and whether one was already sent. + +%% @todo +%% Timeouts: +%% - waiting for new request (if no stream is currently running) +%% -> request_timeout: for whole request/headers, set at init/when we set ps_request_line{} state +%% - waiting for body (if a stream requested the body to be read) +%% -> read_body_timeout: amount of time we wait without receiving any data when reading the body +%% - if we skip the body, skip only for a specific duration +%% -> skip_body_timeout: also have a skip_body_length +%% - none if we have a stream running and it didn't request the body to be read +%% - global +%% -> inactivity_timeout: max time to wait without anything happening before giving up + +before_loop(State=#state{socket=Socket, transport=Transport}, Buffer) -> + %% @todo disable this when we get to the body, until the stream asks for it? + %% Perhaps have a threshold for how much we're willing to read before waiting. + Transport:setopts(Socket, [{active, once}]), + loop(State, Buffer). + +loop(State=#state{parent=Parent, socket=Socket, transport=Transport, + handler=_Handler, timer=TimerRef, children=Children}, Buffer) -> + {OK, Closed, Error} = Transport:messages(), + receive + %% Socket messages. + {OK, Socket, Data} -> + parse(<< Buffer/binary, Data/binary >>, State); + {Closed, Socket} -> + terminate(State, {socket_error, closed, 'The socket has been closed.'}); + {Error, Socket, Reason} -> + terminate(State, {socket_error, Reason, 'An error has occurred on the socket.'}); + %% Timeouts. + {timeout, TimerRef, Reason} -> + timeout(State, Reason); + {timeout, _, _} -> + loop(State, Buffer); + %% System messages. + {'EXIT', Parent, Reason} -> + exit(Reason); + {system, From, Request} -> + sys:handle_system_msg(Request, From, Parent, ?MODULE, [], {State, Buffer}); + %% Messages pertaining to a stream. + {{Pid, StreamID}, Msg} when Pid =:= self() -> + loop(info(State, StreamID, Msg), Buffer); + %% Exit signal from children. + Msg = {'EXIT', Pid, _} -> + loop(down(State, Pid, Msg), Buffer); + %% Calls from supervisor module. + {'$gen_call', {From, Tag}, which_children} -> + Workers = [{?MODULE, Pid, worker, [?MODULE]} || {Pid, _, _} <- Children], + From ! {Tag, Workers}, + loop(State, Buffer); + {'$gen_call', {From, Tag}, count_children} -> + NbChildren = length(Children), + Counts = [{specs, 1}, {active, NbChildren}, + {supervisors, 0}, {workers, NbChildren}], + From ! {Tag, Counts}, + loop(State, Buffer); + {'$gen_call', {From, Tag}, _} -> + From ! {Tag, {error, ?MODULE}}, + loop(State, Buffer); + %% Unknown messages. + Msg -> + error_logger:error_msg("Received stray message ~p.", [Msg]), + loop(State, Buffer) + %% @todo Configurable timeout. This should be a global inactivity timeout + %% that triggers when really nothing happens (ie something went really wrong). + after 300000 -> + terminate(State, {internal_error, timeout, 'No message or data received before timeout.'}) + end. + +set_request_timeout(State0=#state{timer=TimerRef0, opts=Opts}) -> + State = cancel_request_timeout(State0), + Timeout = maps:get(request_timeout, Opts, 5000), + TimerRef = erlang:start_timer(Timeout, self(), request_timeout), + State#state{timer=TimerRef}. + +cancel_request_timeout(State=#state{timer=TimerRef, opts=Opts}) -> + ok = case TimerRef of + undefined -> ok; + _ -> erlang:cancel_timer(TimerRef, [{async, true}, {info, false}]) + end, + State#state{timer=undefined}. + +%% @todo Honestly it would be much better if we didn't enable pipelining yet. +timeout(State=#state{in_state=#ps_request_line{}}, request_timeout) -> + %% @todo If other streams are running, just set the connection to be closed + %% and stop trying to read from the socket? + terminate(State, {connection_error, timeout, 'No request-line received before timeout.'}); +timeout(State=#state{socket=Socket, transport=Transport, in_state=#ps_header{}}, request_timeout) -> + %% @todo If other streams are running, maybe wait for their reply before sending 408? + %% -> Definitely. Either way, stop reading from the socket and make that stream the last. + Transport:send(Socket, cow_http:response(408, 'HTTP/1.1', [])), + terminate(State, {connection_error, timeout, 'Request headers not received before timeout.'}). + +%% Request-line. +parse(<<>>, State) -> + before_loop(State, <<>>); +parse(Buffer, State=#state{in_state=#ps_request_line{empty_lines=EmptyLines}}) -> + after_parse(parse_request(Buffer, State, EmptyLines)); +parse(Buffer, State=#state{in_state=PS=#ps_header{headers=Headers, name=undefined}}) -> + after_parse(parse_header(Buffer, + State#state{in_state=PS#ps_header{headers=undefined}}, + Headers)); +parse(Buffer, State=#state{in_state=PS=#ps_header{headers=Headers, name=Name}}) -> + after_parse(parse_hd_before_value(Buffer, + State#state{in_state=PS#ps_header{headers=undefined, name=undefined}}, + Headers, Name)); +parse(Buffer, State=#state{in_state=#ps_body{}}) -> + %% @todo We do not want to get the body automatically if the request doesn't ask for it. + %% We may want to get bodies that are below a threshold without waiting, and buffer them + %% until the request asks, though. + + %% @todo Transfer-decoding must be done here. + after_parse(parse_body(Buffer, State)). +%% @todo Don't parse if body is finished but request isn't. Let's not parallelize for now. + +after_parse({request, Req=#{streamid := StreamID, headers := Headers, version := Version}, + State0=#state{handler=Handler, opts=Opts, streams=Streams0}, Buffer}) -> + %% @todo Opts at the end. Maybe pass the same Opts we got? + try Handler:init(StreamID, Req, Opts) of + {Commands, StreamState} -> + Streams = [#stream{id=StreamID, state=StreamState, version=Version}|Streams0], + State = case maybe_req_close(State0, Headers, Version) of + close -> State0#state{streams=Streams, last_streamid=StreamID}; + keepalive -> State0#state{streams=Streams} + end, + parse(Buffer, commands(State, StreamID, Commands)) + catch Class:Reason -> + error_logger:error_msg("Exception occurred in ~s:init(~p, ~p, ~p) " + "with reason ~p:~p.", + [Handler, StreamID, Req, Opts, Class, Reason]), + ok + %% @todo Status code. +% stream_reset(State, StreamID, {internal_error, {Class, Reason}, +% 'Exception occurred in StreamHandler:init/10 call.'}) %% @todo Check final arity. + end; +%% Streams are sequential so the body is always about the last stream created +%% unless that stream has terminated. +after_parse({data, StreamID, IsFin, Data, State=#state{handler=Handler, + streams=Streams0=[Stream=#stream{id=StreamID, state=StreamState0}|_]}, Buffer}) -> + try Handler:data(StreamID, IsFin, Data, StreamState0) of + {Commands, StreamState} -> + Streams = lists:keyreplace(StreamID, #stream.id, Streams0, + Stream#stream{state=StreamState}), + parse(Buffer, commands(State#state{streams=Streams}, StreamID, Commands)) + catch Class:Reason -> + error_logger:error_msg("Exception occurred in ~s:data(~p, ~p, ~p, ~p) with reason ~p:~p.", + [Handler, StreamID, IsFin, Data, StreamState0, Class, Reason]), + ok + %% @todo +% stream_reset(State, StreamID, {internal_error, {Class, Reason}, +% 'Exception occurred in StreamHandler:data/4 call.'}) + end; +%% No corresponding stream, skip. +after_parse({data, _, _, _, State, Buffer}) -> + before_loop(State, Buffer); +after_parse({more, State, Buffer}) -> + before_loop(State, Buffer). + +%% Request-line. + +-spec parse_request(binary(), #state{}, non_neg_integer()) -> ok. +%% Empty lines must be using \r\n. +parse_request(<< $\n, _/bits >>, State, _) -> + error_terminate(400, State, {connection_error, protocol_error, + ''}); %% @todo +parse_request(<< $\s, _/bits >>, State, _) -> + error_terminate(400, State, {connection_error, protocol_error, + ''}); %% @todo +%% We limit the length of the Request-line to MaxLength to avoid endlessly +%% reading from the socket and eventually crashing. +parse_request(Buffer, State=#state{opts=Opts}, EmptyLines) -> + MaxLength = maps:get(max_request_line_length, Opts, 8000), + MaxEmptyLines = maps:get(max_empty_lines, Opts, 5), + case match_eol(Buffer, 0) of + nomatch when byte_size(Buffer) > MaxLength -> + error_terminate(414, State, {connection_error, limit_reached, + ''}); %% @todo + nomatch -> + {more, State#state{in_state=#ps_request_line{empty_lines=EmptyLines}}, Buffer}; + 1 when EmptyLines =:= MaxEmptyLines -> + error_terminate(400, State, {connection_error, limit_reached, + ''}); %% @todo + 1 -> + << _:16, Rest/bits >> = Buffer, + parse_request(Rest, State, EmptyLines + 1); + _ -> + case Buffer of + %% @todo * is only for server-wide OPTIONS request (RFC7230 5.3.4); tests + << "OPTIONS * ", Rest/bits >> -> + parse_version(Rest, State, <<"OPTIONS">>, <<"*">>, <<>>); +% << "CONNECT ", Rest/bits >> -> +% parse_authority( %% @todo + _ -> + parse_method(Buffer, State, <<>>, + maps:get(max_method_length, Opts, 32)) + end + end. + +match_eol(<< $\n, _/bits >>, N) -> + N; +match_eol(<< _, Rest/bits >>, N) -> + match_eol(Rest, N + 1); +match_eol(_, _) -> + nomatch. + +parse_method(_, State, _, 0) -> + error_terminate(501, State, {connection_error, limit_reached, + 'The method name is longer than configuration allows. (RFC7230 3.1.1)'}); +parse_method(<< C, Rest/bits >>, State, SoFar, Remaining) -> + case C of + $\r -> error_terminate(400, State, {connection_error, protocol_error, + ''}); %% @todo + $\s -> parse_uri(Rest, State, SoFar); + _ when ?IS_TOKEN(C) -> parse_method(Rest, State, << SoFar/binary, C >>, Remaining - 1); + _ -> error_terminate(400, State, {connection_error, protocol_error, + 'The method name must contain only valid token characters. (RFC7230 3.1.1)'}) + end. + +parse_uri(<< H, T, T, P, "://", Rest/bits >>, State, Method) + when H =:= $h orelse H =:= $H, T =:= $t orelse T =:= $T; + P =:= $p orelse P =:= $P -> + parse_uri_skip_host(Rest, State, Method); +parse_uri(<< H, T, T, P, S, "://", Rest/bits >>, State, Method) + when H =:= $h orelse H =:= $H, T =:= $t orelse T =:= $T; + P =:= $p orelse P =:= $P; S =:= $s orelse S =:= $S -> + parse_uri_skip_host(Rest, State, Method); +parse_uri(<< $/, Rest/bits >>, State, Method) -> + parse_uri_path(Rest, State, Method, << $/ >>); +parse_uri(_, State, _) -> + error_terminate(400, State, {connection_error, protocol_error, + 'Invalid request-line or request-target. (RFC7230 3.1.1, RFC7230 5.3)'}). + +parse_uri_skip_host(<< C, Rest/bits >>, State, Method) -> + case C of + $\r -> error_terminate(400, State, {connection_error, protocol_error, + ''}); %% @todo + $/ -> parse_uri_path(Rest, State, Method, <<"/">>); + $\s -> parse_version(Rest, State, Method, <<"/">>, <<>>); + $? -> parse_uri_query(Rest, State, Method, <<"/">>, <<>>); + $# -> skip_uri_fragment(Rest, State, Method, <<"/">>, <<>>); + _ -> parse_uri_skip_host(Rest, State, Method) + end. + +parse_uri_path(<< C, Rest/bits >>, State, Method, SoFar) -> + case C of + $\r -> error_terminate(400, State, {connection_error, protocol_error, + ''}); %% @todo + $\s -> parse_version(Rest, State, Method, SoFar, <<>>); + $? -> parse_uri_query(Rest, State, Method, SoFar, <<>>); + $# -> skip_uri_fragment(Rest, State, Method, SoFar, <<>>); + _ -> parse_uri_path(Rest, State, Method, << SoFar/binary, C >>) + end. + +parse_uri_query(<< C, Rest/bits >>, State, M, P, SoFar) -> + case C of + $\r -> error_terminate(400, State, {connection_error, protocol_error, + ''}); %% @todo + $\s -> parse_version(Rest, State, M, P, SoFar); + $# -> skip_uri_fragment(Rest, State, M, P, SoFar); + _ -> parse_uri_query(Rest, State, M, P, << SoFar/binary, C >>) + end. + +skip_uri_fragment(<< C, Rest/bits >>, State, M, P, Q) -> + case C of + $\r -> error_terminate(400, State, {connection_error, protocol_error, + ''}); %% @todo + $\s -> parse_version(Rest, State, M, P, Q); + _ -> skip_uri_fragment(Rest, State, M, P, Q) + end. + +%% @todo Calls to parse_header should update the state. +parse_version(<< "HTTP/1.1\r\n", Rest/bits >>, State, M, P, Q) -> + parse_headers(Rest, State, M, P, Q, 'HTTP/1.1'); +parse_version(<< "HTTP/1.0\r\n", Rest/bits >>, State, M, P, Q) -> + parse_headers(Rest, State, M, P, Q, 'HTTP/1.0'); +parse_version(<< "HTTP/1.", _, C, _/bits >>, State, _, _, _) when C =:= $\s; C =:= $\t -> + error_terminate(400, State, {connection_error, protocol_error, + 'Whitespace is not allowed after the HTTP version. (RFC7230 3.1.1)'}); +parse_version(<< C, _/bits >>, State, _, _, _) when C =:= $\s; C =:= $\t -> + error_terminate(400, State, {connection_error, protocol_error, + 'The separator between request target and version must be a single SP.'}); +parse_version(_, State, _, _, _) -> + error_terminate(505, State, {connection_error, protocol_error, + ''}). %% @todo + +parse_headers(Rest, State, M, P, Q, V) -> + %% @todo Figure out the parse states. + parse_header(Rest, State#state{in_state=#ps_header{ + method=M, path=P, qs=Q, version=V}}, #{}). + +%% Headers. + +%% We need two or more bytes in the buffer to continue. +parse_header(Rest, State=#state{in_state=PS}, Headers) when byte_size(Rest) < 2 -> + {more, State#state{in_state=PS#ps_header{headers=Headers}}, Rest}; +parse_header(<< $\r, $\n, Rest/bits >>, S, Headers) -> + request(Rest, S, Headers); +parse_header(Buffer, State=#state{opts=Opts, in_state=PS}, Headers) -> + MaxLength = maps:get(max_header_name_length, Opts, 64), + MaxHeaders = maps:get(max_headers, Opts, 100), + case match_colon(Buffer, 0) of + nomatch when byte_size(Buffer) > MaxLength -> + error_terminate(400, State, {connection_error, limit_reached, + ''}); %% @todo + nomatch when length(Headers) >= MaxHeaders -> + error_terminate(400, State, {connection_error, limit_reached, + ''}); %% @todo + nomatch -> + {more, State#state{in_state=PS#ps_header{headers=Headers}}, Buffer}; + _ -> + parse_hd_name(Buffer, State, Headers, <<>>) + end. + +match_colon(<< $:, _/bits >>, N) -> + N; +match_colon(<< _, Rest/bits >>, N) -> + match_colon(Rest, N + 1); +match_colon(_, _) -> + nomatch. + +parse_hd_name(<< $:, Rest/bits >>, State, H, SoFar) -> + parse_hd_before_value(Rest, State, H, SoFar); +parse_hd_name(<< C, _/bits >>, State, _, <<>>) when ?IS_WS(C) -> + error_terminate(400, State, {connection_error, protocol_error, + ''}); %% @todo +parse_hd_name(<< C, Rest/bits >>, State, H, SoFar) when ?IS_WS(C) -> + parse_hd_name_ws(Rest, State, H, SoFar); +parse_hd_name(<< C, Rest/bits >>, State, H, SoFar) -> + ?LOWER(parse_hd_name, Rest, State, H, SoFar). + +parse_hd_name_ws(<< C, Rest/bits >>, S, H, Name) -> + case C of + $\s -> parse_hd_name_ws(Rest, S, H, Name); + $\t -> parse_hd_name_ws(Rest, S, H, Name); + $: -> parse_hd_before_value(Rest, S, H, Name) + end. + +parse_hd_before_value(<< $\s, Rest/bits >>, S, H, N) -> + parse_hd_before_value(Rest, S, H, N); +parse_hd_before_value(<< $\t, Rest/bits >>, S, H, N) -> + parse_hd_before_value(Rest, S, H, N); +parse_hd_before_value(Buffer, State=#state{opts=Opts, in_state=PS}, H, N) -> + MaxLength = maps:get(max_header_value_length, Opts, 4096), + case match_eol(Buffer, 0) of + nomatch when byte_size(Buffer) > MaxLength -> + error_terminate(400, State, {connection_error, limit_reached, + ''}); %% @todo + nomatch -> + {more, State#state{in_state=PS#ps_header{headers=H, name=N}}, Buffer}; + _ -> + parse_hd_value(Buffer, State, H, N, <<>>) + end. + +parse_hd_value(<< $\r, $\n, Rest/bits >>, S, Headers, Name, SoFar) -> + %% @todo What to do about duplicate header names. + parse_header(Rest, S, Headers#{Name => clean_value_ws_end(SoFar, byte_size(SoFar) - 1)}); +parse_hd_value(<< C, Rest/bits >>, S, H, N, SoFar) -> + parse_hd_value(Rest, S, H, N, << SoFar/binary, C >>). + +clean_value_ws_end(_, -1) -> + <<>>; +clean_value_ws_end(Value, N) -> + case binary:at(Value, N) of + $\s -> clean_value_ws_end(Value, N - 1); + $\t -> clean_value_ws_end(Value, N - 1); + _ -> + S = N + 1, + << Value2:S/binary, _/bits >> = Value, + Value2 + end. + +-ifdef(TEST). +clean_value_ws_end_test_() -> + Tests = [ + {<<>>, <<>>}, + {<<" ">>, <<>>}, + {<<"text/*;q=0.3, text/html;q=0.7, text/html;level=1, " + "text/html;level=2;q=0.4, */*;q=0.5 \t \t ">>, + <<"text/*;q=0.3, text/html;q=0.7, text/html;level=1, " + "text/html;level=2;q=0.4, */*;q=0.5">>} + ], + [{V, fun() -> R = clean_value_ws_end(V, byte_size(V) - 1) end} || {V, R} <- Tests]. + +horse_clean_value_ws_end() -> + horse:repeat(200000, + clean_value_ws_end( + <<"text/*;q=0.3, text/html;q=0.7, text/html;level=1, " + "text/html;level=2;q=0.4, */*;q=0.5 ">>, + byte_size(<<"text/*;q=0.3, text/html;q=0.7, text/html;level=1, " + "text/html;level=2;q=0.4, */*;q=0.5 ">>) - 1) + ). +-endif. + +request(Buffer, State=#state{transport=Transport, in_streamid=StreamID, + in_state=#ps_header{version=Version}}, Headers) -> + case maps:get(<<"host">>, Headers, undefined) of + undefined when Version =:= 'HTTP/1.1' -> + %% @todo Might want to not close the connection on this and next one. + error_terminate(400, State, {stream_error, StreamID, protocol_error, + ''}); %% @todo + undefined -> + request(Buffer, State, Headers, <<>>, default_port(Transport:secure())); + RawHost -> + try parse_host(RawHost, false, <<>>) of + {Host, undefined} -> + request(Buffer, State, Headers, Host, default_port(Transport:secure())); + {Host, Port} -> + request(Buffer, State, Headers, Host, Port) + catch _:_ -> + error_terminate(400, State, {stream_error, StreamID, protocol_error, + ''}) %% @todo + end + end. + +-spec default_port(boolean()) -> 80 | 443. +default_port(true) -> 443; +default_port(_) -> 80. + +%% @todo Yeah probably just call the cowlib function. +%% Same code as cow_http:parse_fullhost/1, but inline because we +%% really want this to go fast. +parse_host(<< $[, Rest/bits >>, false, <<>>) -> + parse_host(Rest, true, << $[ >>); +parse_host(<<>>, false, Acc) -> + {Acc, undefined}; +parse_host(<< $:, Rest/bits >>, false, Acc) -> + {Acc, list_to_integer(binary_to_list(Rest))}; +parse_host(<< $], Rest/bits >>, true, Acc) -> + parse_host(Rest, false, << Acc/binary, $] >>); +parse_host(<< C, Rest/bits >>, E, Acc) -> + ?LOWER(parse_host, Rest, E, Acc). + +%% End of request parsing. + +%% @todo We used to get the peername here, bad idea, should +%% get it at the very start of the connection, or the first +%% time requested if we go the route of handler sending a +%% message to get it (we probably shouldn't). +request(Buffer, State0=#state{ref=Ref, transport=Transport, in_streamid=StreamID, + in_state=#ps_header{method=Method, path=Path, qs=Qs, version=Version}}, + Headers, Host, Port) -> + Scheme = case Transport:secure() of + true -> <<"https">>; + false -> <<"http">> + end, + {HasBody, BodyLength, TDecodeFun, TDecodeState} = case Headers of + #{<<"content-length">> := <<"0">>} -> + {false, 0, undefined, undefined}; + #{<<"content-length">> := BinLength} -> + Length = try + cow_http_hd:parse_content_length(BinLength) + catch _:_ -> + error_terminate(400, State0, {stream_error, StreamID, protocol_error, + ''}) %% @todo + %% @todo Err should terminate here... + end, + {true, Length, fun cow_http_te:stream_identity/2, {0, Length}}; + %% @todo Better handling of transfer decoding. + #{<<"transfer-encoding">> := <<"chunked">>} -> + {true, undefined, fun cow_http_te:stream_chunked/2, {0, 0}}; + _ -> + {false, 0, undefined, undefined} + end, + Req = #{ + ref => Ref, + pid => self(), + streamid => StreamID, + + %% @todo peer + %% @todo sockname + %% @todo ssl client cert? + + method => Method, + scheme => Scheme, + host => Host, + %% host_info (cowboy_router) + port => Port, + path => Path, + %% path_info (cowboy_router) + %% bindings (cowboy_router) + qs => Qs, + version => Version, + %% We are transparently taking care of transfer-encodings so + %% the user code has no need to know about it. + headers => maps:remove(<<"transfer-encoding">>, Headers), + + has_body => HasBody, + body_length => BodyLength + %% @todo multipart? keep state separate + + %% meta values (cowboy_websocket, cowboy_rest) + }, + State = case HasBody of + true -> + cancel_request_timeout(State0#state{in_state=#ps_body{ + %% @todo Don't need length anymore? + transfer_decode_fun = TDecodeFun, + transfer_decode_state = TDecodeState + }}); + false -> + set_request_timeout(State0#state{in_streamid=StreamID + 1, in_state=#ps_request_line{}}) + end, + {request, Req, State, Buffer}. + +%% Request body parsing. + +parse_body(Buffer, State=#state{in_streamid=StreamID, in_state= + PS=#ps_body{transfer_decode_fun=TDecode, transfer_decode_state=TState0}}) -> + %% @todo Proper trailers. + case TDecode(Buffer, TState0) of + more -> + %% @todo Asks for 0 or more bytes. + {more, State, Buffer}; + {more, Data, TState} -> + %% @todo Asks for 0 or more bytes. + {data, StreamID, nofin, Data, State#state{in_state= + PS#ps_body{transfer_decode_state=TState}}, <<>>}; + {more, Data, _Length, TState} when is_integer(_Length) -> + %% @todo Asks for Length more bytes. + {data, StreamID, nofin, Data, State#state{in_state= + PS#ps_body{transfer_decode_state=TState}}, <<>>}; + {more, Data, Rest, TState} -> + %% @todo Asks for 0 or more bytes. + {data, StreamID, nofin, Data, State#state{in_state= + PS#ps_body{transfer_decode_state=TState}}, Rest}; + {done, TotalLength, Rest} -> + {data, StreamID, {fin, TotalLength}, <<>>, set_request_timeout( + State#state{in_streamid=StreamID + 1, in_state=#ps_request_line{}}), Rest}; + {done, Data, TotalLength, Rest} -> + {data, StreamID, {fin, TotalLength}, Data, set_request_timeout( + State#state{in_streamid=StreamID + 1, in_state=#ps_request_line{}}), Rest} + end. + +%% Message handling. + +down(State=#state{children=Children0}, Pid, Msg) -> + case lists:keytake(Pid, 1, Children0) of + {value, {_, undefined, _}, Children} -> + State#state{children=Children}; + {value, {_, StreamID, _}, Children} -> + info(State#state{children=Children}, StreamID, Msg); + false -> + error_logger:error_msg("Received EXIT signal ~p for unknown process ~p.", [Msg, Pid]), + State + end. + +info(State=#state{handler=Handler, streams=Streams0}, StreamID, Msg) -> + case lists:keyfind(StreamID, #stream.id, Streams0) of + Stream = #stream{state=StreamState0} -> + try Handler:info(StreamID, Msg, StreamState0) of + {Commands, StreamState} -> + Streams = lists:keyreplace(StreamID, #stream.id, Streams0, + Stream#stream{state=StreamState}), + commands(State#state{streams=Streams}, StreamID, Commands) + catch Class:Reason -> + error_logger:error_msg("Exception occurred in ~s:info(~p, ~p, ~p) with reason ~p:~p.", + [Handler, StreamID, Msg, StreamState0, Class, Reason]), + ok +%% @todo +% stream_reset(State, StreamID, {internal_error, {Class, Reason}, +% 'Exception occurred in StreamHandler:info/3 call.'}) + end; + false -> + error_logger:error_msg("Received message ~p for unknown stream ~p.", [Msg, StreamID]), + State + end. + +%% @todo commands/3 +%% @todo stream_reset + + + + +%% Commands. + +commands(State, _, []) -> + State; +%% Supervise a child process. +commands(State=#state{children=Children}, StreamID, [{spawn, Pid, Shutdown}|Tail]) -> + commands(State#state{children=[{Pid, StreamID, Shutdown}|Children]}, StreamID, Tail); +%% Error handling. +commands(State, StreamID, [Error = {internal_error, _, _}|Tail]) -> + commands(stream_reset(State, StreamID, Error), StreamID, Tail); +%% Commands for a stream currently inactive. +commands(State=#state{out_streamid=Current, streams=Streams0}, StreamID, Commands) + when Current =/= StreamID -> + + %% @todo We still want to handle some commands... + + Stream = #stream{queue=Queue} = lists:keyfind(StreamID, #stream.id, Streams0), + Streams = lists:keyreplace(StreamID, #stream.id, Streams0, + Stream#stream{queue=Queue ++ Commands}), + State#state{streams=Streams}; +%% Read the request body. +commands(State, StreamID, [{flow, _Length}|Tail]) -> + %% @todo We only read from socket if buffer is empty, otherwise + %% we decode the buffer. + + %% @todo Set the body reading length to min(Length, BodyLength) + + commands(State, StreamID, Tail); +%% @todo Probably a good idea to have an atomic response send (single send call for resp+body). +%% Send a full response. +%% +%% @todo Kill the stream if it sent a response when one has already been sent. +%% @todo Keep IsFin in the state. +%% @todo Same two things above apply to DATA, possibly promise too. +commands(State0=#state{socket=Socket, transport=Transport, streams=Streams}, StreamID, + [{response, StatusCode, Headers0, Body}|Tail]) -> + %% @todo I'm pretty sure the last stream in the list is the one we want + %% considering all others are queued. + #stream{version=Version} = lists:keyfind(StreamID, #stream.id, Streams), + {State, Headers} = connection(State0, Headers0, StreamID, Version), + %% @todo Ensure content-length is set. + Response = cow_http:response(StatusCode, 'HTTP/1.1', maps:to_list(Headers)), + case Body of + {sendfile, O, B, P} -> + Transport:send(Socket, Response), + commands(State#state{out_state=done}, StreamID, [{sendfile, fin, O, B, P}|Tail]); + _ -> + Transport:send(Socket, [Response, Body]), + %% @todo If max number of requests, close connection. + %% @todo If IsFin, maybe skip body of current request. + maybe_terminate(State#state{out_state=done}, StreamID, Tail, fin) + end; +%% Send response headers and initiate chunked encoding. +commands(State0=#state{socket=Socket, transport=Transport, streams=Streams}, StreamID, + [{headers, StatusCode, Headers0}|Tail]) -> + %% @todo Same as above. + #stream{version=Version} = lists:keyfind(StreamID, #stream.id, Streams), + {State1, Headers1} = case Version of + 'HTTP/1.1' -> + {State0, Headers0#{<<"transfer-encoding">> => <<"chunked">>}}; + %% Close the connection after streaming the data to HTTP/1.0 client. + %% @todo I'm guessing we need to differentiate responses with a content-length and others. + 'HTTP/1.0' -> + {State0#state{last_streamid=StreamID}, Headers0} + end, + {State, Headers} = connection(State1, Headers1, StreamID, Version), + Transport:send(Socket, cow_http:response(StatusCode, 'HTTP/1.1', maps:to_list(Headers))), + commands(State#state{out_state=chunked}, StreamID, Tail); +%% Send a response body chunk. +%% +%% @todo WINDOW_UPDATE stuff require us to buffer some data. +commands(State=#state{socket=Socket, transport=Transport, streams=Streams}, StreamID, + [{data, IsFin, Data}|Tail]) -> + %% @todo Same as above. + Headers1 = case lists:keyfind(StreamID, #stream.id, Streams) of + #stream{version='HTTP/1.1'} -> + Size = iolist_size(Data), + Transport:send(Socket, [integer_to_list(Size, 16), <<"\r\n">>, Data, <<"\r\n">>]); + #stream{version='HTTP/1.0'} -> + Transport:send(Socket, Data) + end, + maybe_terminate(State, StreamID, Tail, IsFin); +%% Send a file. +commands(State=#state{socket=Socket, transport=Transport}, StreamID, + [{sendfile, IsFin, Offset, Bytes, Path}|Tail]) -> + Transport:sendfile(Socket, Path, Offset, Bytes), + maybe_terminate(State, StreamID, Tail, IsFin); +%% Protocol takeover. +commands(State0=#state{ref=Ref, parent=Parent, socket=Socket, transport=Transport, + opts=Opts, children=Children}, StreamID, + [{switch_protocol, Headers, Protocol, InitialState}|_Tail]) -> + %% @todo This should be the last stream running otherwise we need to wait before switching. + %% @todo If there's streams opened after this one, fail instead of 101. + State = cancel_request_timeout(State0), + %% @todo When we actually do the upgrade, we only have the one stream left, plus + %% possibly some processes terminating. We need a smart strategy for handling the + %% children shutdown. We can start with brutal_kill and discarding the EXIT messages + %% received before switching to Websocket. Something better would be to let the + %% stream processes finish but that implies the Websocket module to know about + %% them and filter the messages. For now, kill them all and discard all messages + %% in the mailbox. + _ = [exit(Pid, kill) || {Pid, _, _} <- Children], + flush(), + %% Everything good, upgrade! + _ = commands(State, StreamID, [{response, 101, Headers, <<>>}]), + %% @todo This is no good because commands return a state normally and here it doesn't + %% we need to let this module go entirely. Perhaps it should be handled directly in + %% cowboy_clear/cowboy_tls? Perhaps not. We do want that Buffer. + Protocol:takeover(Parent, Ref, Socket, Transport, Opts, <<>>, InitialState); +%% Stream shutdown. +commands(State, StreamID, [stop|Tail]) -> + %% @todo Do we want to run the commands after a stop? +% commands(stream_terminate(State, StreamID, stop), StreamID, Tail). + maybe_terminate(State, StreamID, Tail, fin). + +flush() -> + receive _ -> flush() after 0 -> ok end. + +maybe_terminate(State, StreamID, Tail, nofin) -> + commands(State, StreamID, Tail); +%% @todo In these cases I'm not sure if we should continue processing commands. +maybe_terminate(State=#state{last_streamid=StreamID}, StreamID, _Tail, fin) -> + terminate(stream_terminate(State, StreamID, normal), normal); %% @todo Reason ok? +maybe_terminate(State, StreamID, _Tail, fin) -> + stream_terminate(State, StreamID, normal). + +stream_reset(State=#state{socket=Socket, transport=Transport}, StreamID, + StreamError={internal_error, _, _}) -> + %% @todo headers + %% @todo Don't send this if there are no streams left. + Transport:send(Socket, cow_http:response(500, 'HTTP/1.1', [ + {<<"content-length">>, <<"0">>} + ])), + %% @todo update IsFin local + stream_terminate(State#state{out_state=done}, StreamID, StreamError). + +stream_terminate(State=#state{socket=Socket, transport=Transport, handler=Handler, + out_streamid=OutStreamID, out_state=OutState, + streams=Streams0, children=Children0}, StreamID, Reason) -> + {value, #stream{state=StreamState, version=Version}, Streams} + = lists:keytake(StreamID, #stream.id, Streams0), + _ = case OutState of + wait -> + Transport:send(Socket, cow_http:response(204, 'HTTP/1.1', [])); + chunked when Version =:= 'HTTP/1.1' -> + Transport:send(Socket, <<"0\r\n\r\n">>); + _ -> %% done or Version =:= 'HTTP/1.0' + ok + end, + + stream_call_terminate(StreamID, Reason, Handler, StreamState), +%% @todo initiate children shutdown +% Children = stream_terminate_children(Children0, StreamID, []), + Children = [case C of + {Pid, StreamID, Shutdown} -> {Pid, undefined, Shutdown}; + _ -> C + end || C <- Children0], + + %% @todo Skip the body, if any, or drop the connection if too large. + + %% @todo Only do this if Current =:= StreamID. + NextOutStreamID = OutStreamID + 1, + case lists:keyfind(NextOutStreamID, #stream.id, Streams) of + false -> + %% @todo This is clearly wrong, if the stream is gone we need to check if + %% there used to be such a stream, and if there was to send an error. + State#state{out_streamid=NextOutStreamID, out_state=wait, streams=Streams, children=Children}; + #stream{queue=Commands} -> + %% @todo Remove queue from the stream. + commands(State#state{out_streamid=NextOutStreamID, out_state=wait, + streams=Streams, children=Children}, NextOutStreamID, Commands) + end. + +%% @todo Taken directly from _http2 +stream_call_terminate(StreamID, Reason, Handler, StreamState) -> + try + Handler:terminate(StreamID, Reason, StreamState), + ok + catch Class:Reason -> + error_logger:error_msg("Exception occurred in ~s:terminate(~p, ~p, ~p) with reason ~p:~p.", + [Handler, StreamID, Reason, StreamState, Class, Reason]) + end. + +%stream_terminate_children([], _, Acc) -> +% Acc; +%stream_terminate_children([{Pid, StreamID}|Tail], StreamID, Acc) -> +% exit(Pid, kill), +% stream_terminate_children(Tail, StreamID, Acc); +%stream_terminate_children([Child|Tail], StreamID, Acc) -> +% stream_terminate_children(Tail, StreamID, [Child|Acc]). + + +%% @todo max_reqs also +maybe_req_close(_, #{<<"connection">> := Conn}, 'HTTP/1.0') -> + Conns = cow_http_hd:parse_connection(Conn), + case lists:member(<<"keep-alive">>, Conns) of + true -> keepalive; + false -> close + end; +maybe_req_close(_, _, 'HTTP/1.0') -> + close; +maybe_req_close(_, #{<<"connection">> := Conn}, 'HTTP/1.1') -> + case connection_hd_is_close(Conn) of + true -> close; + false -> keepalive + end; +maybe_req_close(_State, _, _) -> + keepalive. + +connection(State=#state{last_streamid=StreamID}, Headers=#{<<"connection">> := Conn}, StreamID, _) -> + case connection_hd_is_close(Conn) of + true -> {State, Headers}; + %% @todo Here we need to remove keep-alive and add close, not just add close. + false -> {State, Headers#{<<"connection">> => [<<"close, ">>, Conn]}} + end; +connection(State=#state{last_streamid=StreamID}, Headers, StreamID, _) -> + {State, Headers#{<<"connection">> => <<"close">>}}; +connection(State, Headers=#{<<"connection">> := Conn}, StreamID, _) -> + case connection_hd_is_close(Conn) of + true -> {State#state{last_streamid=StreamID}, Headers}; + %% @todo Here we need to set keep-alive only if it wasn't set before. + false -> {State, Headers} + end; +connection(State, Headers, _, 'HTTP/1.0') -> + {State, Headers#{<<"connection">> => <<"keep-alive">>}}; +connection(State, Headers, _, _) -> + {State, Headers}. + +connection_hd_is_close(Conn) -> + Conns = cow_http_hd:parse_connection(iolist_to_binary(Conn)), + lists:member(<<"close">>, Conns). + +error_terminate(StatusCode, State=#state{socket=Socket, transport=Transport}, Reason) -> + Transport:send(Socket, cow_http:response(StatusCode, 'HTTP/1.1', [ + {<<"content-length">>, <<"0">>} + ])), + terminate(State, Reason). + +terminate(_State, _Reason) -> + exit(normal). %% @todo + + + + + + + + + + + + + +%% System callbacks. + +-spec system_continue(_, _, #state{}) -> ok. +system_continue(_, _, {State, Buffer}) -> + loop(State, Buffer). + +-spec system_terminate(any(), _, _, _) -> no_return(). +system_terminate(Reason, _, _, _) -> + exit(Reason). + +-spec system_code_change(Misc, _, _, _) -> {ok, Misc} when Misc::{#state{}, binary()}. +system_code_change(Misc, _, _, _) -> + {ok, Misc}. |