diff options
37 files changed, 1841 insertions, 1161 deletions
diff --git a/ebin/cowboy.app b/ebin/cowboy.app index 0f037ce..a450961 100644 --- a/ebin/cowboy.app +++ b/ebin/cowboy.app @@ -1,7 +1,7 @@ {application, cowboy, [ {description, "Small, fast, modular HTTP server."}, {vsn, "2.0.0-pre.2"}, - {modules, ['cowboy','cowboy_app','cowboy_bstr','cowboy_clock','cowboy_constraints','cowboy_handler','cowboy_http2','cowboy_loop','cowboy_middleware','cowboy_protocol','cowboy_req','cowboy_rest','cowboy_router','cowboy_spdy','cowboy_static','cowboy_stream','cowboy_sub_protocol','cowboy_sup','cowboy_tls','cowboy_websocket']}, + {modules, ['cowboy','cowboy_app','cowboy_bstr','cowboy_clear','cowboy_clock','cowboy_constraints','cowboy_handler','cowboy_http','cowboy_http2','cowboy_loop','cowboy_middleware','cowboy_protocol','cowboy_req','cowboy_rest','cowboy_router','cowboy_spdy','cowboy_static','cowboy_stream','cowboy_stream_h','cowboy_sub_protocol','cowboy_sup','cowboy_tls','cowboy_websocket']}, {registered, [cowboy_sup,cowboy_clock]}, {applications, [kernel,stdlib,crypto,cowlib,ranch]}, {mod, {cowboy_app, []}} @@ -16,7 +16,7 @@ ERLANG_MK_FILENAME := $(realpath $(lastword $(MAKEFILE_LIST))) -ERLANG_MK_VERSION = 2.0.0-pre.2-73-g87285ad-dirty +ERLANG_MK_VERSION = 2.0.0-pre.2-75-g18a7074-dirty # Core configuration. @@ -147,11 +147,6 @@ else core_native_path = $1 endif -ifeq ($(shell which wget 2>/dev/null | wc -l), 1) -define core_http_get - wget --no-check-certificate -O $(1) $(2)|| rm $(1) -endef -else define core_http_get.erl ssl:start(), inets:start(), @@ -170,7 +165,6 @@ endef define core_http_get $(call erlang,$(call core_http_get.erl,$(call core_native_path,$1),$2)) endef -endif core_eq = $(and $(findstring $(1),$(2)),$(findstring $(2),$(1))) @@ -4646,7 +4640,7 @@ dtl_verbose = $(dtl_verbose_$(V)) # Core targets. -DTL_FILES = $(sort $(call core_find,$(DTL_PATH),*.dtl)) +DTL_FILES := $(sort $(call core_find,$(DTL_PATH),*.dtl)) ifneq ($(DTL_FILES),) @@ -4711,10 +4705,11 @@ define compile_proto.erl halt(). endef -ifneq ($(wildcard src/),) +# @todo Convert like erlydtl was. +# @todo Give access to ALL_SRC_FILES. + ebin/$(PROJECT).app:: $(sort $(call core_find,src/,*.proto)) $(if $(strip $?),$(call compile_proto,$?)) -endif # Copyright (c) 2013-2015, Loïc Hoguin <[email protected]> # This file is part of erlang.mk and subject to the terms of the ISC License. @@ -4807,8 +4802,11 @@ app-build: ebin/$(PROJECT).app # Source files. -ERL_FILES = $(sort $(call core_find,src/,*.erl)) -CORE_FILES = $(sort $(call core_find,src/,*.core)) +# @todo have "all test/ files" also. +ALL_SRC_FILES := $(sort $(call core_find,src/,*)) + +ERL_FILES := $(filter %.erl,$(ALL_SRC_FILES)) +CORE_FILES := $(filter %.core,$(ALL_SRC_FILES)) # ASN.1 files. @@ -4841,11 +4839,11 @@ endif # Leex and Yecc files. -XRL_FILES = $(sort $(call core_find,src/,*.xrl)) +XRL_FILES := $(filter %.xrl,$(ALL_SRC_FILES)) XRL_ERL_FILES = $(addprefix src/,$(patsubst %.xrl,%.erl,$(notdir $(XRL_FILES)))) ERL_FILES += $(XRL_ERL_FILES) -YRL_FILES = $(sort $(call core_find,src/,*.yrl)) +YRL_FILES := $(filter %.yrl,$(ALL_SRC_FILES)) YRL_ERL_FILES = $(addprefix src/,$(patsubst %.yrl,%.erl,$(notdir $(YRL_FILES)))) ERL_FILES += $(YRL_ERL_FILES) @@ -4932,6 +4930,7 @@ define makedep.erl endef ifeq ($(if $(NO_MAKEDEP),$(wildcard $(PROJECT).d),),) +# @todo Not src/*.hrl? $(PROJECT).d:: $(ERL_FILES) $(call core_find,include/,*.hrl) $(makedep_verbose) $(call erlang,$(call makedep.erl,$@)) endif @@ -5852,6 +5851,12 @@ endif .PHONY: ci ci-prepare ci-setup distclean-kerl +CI_OTP ?= + +ifeq ($(strip $(CI_OTP)),) +ci:: +else + ifndef KERL KERL := $(shell which kerl 2>/dev/null) @@ -5870,11 +5875,7 @@ KERL_MAKEFLAGS ?= OTP_GIT ?= https://github.com/erlang/otp CI_INSTALL_DIR ?= $(HOME)/erlang -CI_OTP ?= -ifeq ($(strip $(CI_OTP)),) -ci:: -else ci:: $(addprefix ci-,$(CI_OTP)) ci-prepare: $(addprefix $(CI_INSTALL_DIR)/,$(CI_OTP)) @@ -5933,11 +5934,13 @@ endif # Configuration. CT_OPTS ?= + ifneq ($(wildcard $(TEST_DIR)),) - CT_SUITES ?= $(sort $(subst _SUITE.erl,,$(notdir $(call core_find,$(TEST_DIR)/,*_SUITE.erl)))) -else - CT_SUITES ?= +ifndef CT_SUITES +CT_SUITES := $(sort $(subst _SUITE.erl,,$(notdir $(call core_find,$(TEST_DIR)/,*_SUITE.erl)))) endif +endif +CT_SUITES ?= # Core targets. @@ -5967,7 +5970,7 @@ ct: $(if $(IS_APP),,apps-ct) else ct: test-build $(if $(IS_APP),,apps-ct) $(verbose) mkdir -p $(CURDIR)/logs/ - $(gen_verbose) $(CT_RUN) -suite $(addsuffix _SUITE,$(CT_SUITES)) $(CT_OPTS) + $(gen_verbose) $(CT_RUN) -sname ct_$(PROJECT) -suite $(addsuffix _SUITE,$(CT_SUITES)) $(CT_OPTS) endif ifneq ($(ALL_APPS_DIRS),) @@ -5989,7 +5992,7 @@ endif define ct_suite_target ct-$(1): test-build $(verbose) mkdir -p $(CURDIR)/logs/ - $(gen_verbose) $(CT_RUN) -suite $(addsuffix _SUITE,$(1)) $(CT_EXTRA) $(CT_OPTS) + $(gen_verbose) $(CT_RUN) -sname ct_$(PROJECT) -suite $(addsuffix _SUITE,$(1)) $(CT_EXTRA) $(CT_OPTS) endef $(foreach test,$(CT_SUITES),$(eval $(call ct_suite_target,$(test)))) @@ -6196,8 +6199,9 @@ eunit: test-build $(gen_verbose) $(call erlang,$(call eunit.erl,fun $(t)/0),$(EUNIT_ERL_OPTS)) endif else -EUNIT_EBIN_MODS = $(notdir $(basename $(call core_find,ebin/,*.beam))) -EUNIT_TEST_MODS = $(notdir $(basename $(call core_find,$(TEST_DIR)/,*.beam))) +EUNIT_EBIN_MODS = $(notdir $(basename $(ERL_FILES) $(BEAM_FILES))) +EUNIT_TEST_MODS = $(notdir $(basename $(call core_find,$(TEST_DIR)/,*.erl))) + EUNIT_MODS = $(foreach mod,$(EUNIT_EBIN_MODS) $(filter-out \ $(patsubst %,%_tests,$(EUNIT_EBIN_MODS)),$(EUNIT_TEST_MODS)),'$(mod)') @@ -6314,6 +6318,7 @@ shell: build-shell-deps # Copyright (c) 2015, Loïc Hoguin <[email protected]> # This file is part of erlang.mk and subject to the terms of the ISC License. +# @todo BUILD_DEPS too? ifeq ($(filter triq,$(DEPS) $(TEST_DEPS)),triq) .PHONY: triq diff --git a/examples/ssl_hello_world/src/ssl_hello_world_app.erl b/examples/ssl_hello_world/src/ssl_hello_world_app.erl index 3e53818..0338baf 100644 --- a/examples/ssl_hello_world/src/ssl_hello_world_app.erl +++ b/examples/ssl_hello_world/src/ssl_hello_world_app.erl @@ -17,12 +17,12 @@ start(_Type, _Args) -> ]} ]), PrivDir = code:priv_dir(ssl_hello_world), - {ok, _} = cowboy:start_https(https, 100, [ + {ok, _} = cowboy:start_tls(https, 100, [ {port, 8443}, {cacertfile, PrivDir ++ "/ssl/cowboy-ca.crt"}, {certfile, PrivDir ++ "/ssl/server.crt"}, {keyfile, PrivDir ++ "/ssl/server.key"} - ], [{env, [{dispatch, Dispatch}]}]), + ], #{env, [{dispatch, Dispatch}]}), ssl_hello_world_sup:start_link(). stop(_State) -> diff --git a/src/cowboy.erl b/src/cowboy.erl index 1f0e2a9..67afd7b 100644 --- a/src/cowboy.erl +++ b/src/cowboy.erl @@ -14,9 +14,7 @@ -module(cowboy). --export([start_http/4]). --export([start_https/4]). --export([start_spdy/4]). +-export([start_clear/4]). -export([start_tls/4]). -export([stop_listener/1]). -export([set_env/3]). @@ -42,43 +40,27 @@ fun((http_status(), http_headers(), iodata(), Req) -> Req). -export_type([onresponse_fun/0]). --spec start_http(ranch:ref(), non_neg_integer(), ranch_tcp:opts(), +-spec start_clear(ranch:ref(), non_neg_integer(), ranch_tcp:opts(), cowboy_protocol:opts()) -> {ok, pid()} | {error, any()}. -start_http(Ref, NbAcceptors, TransOpts, ProtoOpts) +start_clear(Ref, NbAcceptors, TransOpts0, ProtoOpts) when is_integer(NbAcceptors), NbAcceptors > 0 -> - ranch:start_listener(Ref, NbAcceptors, - ranch_tcp, TransOpts, cowboy_protocol, ProtoOpts). - --spec start_https(ranch:ref(), non_neg_integer(), ranch_ssl:opts(), - cowboy_protocol:opts()) -> {ok, pid()} | {error, any()}. -start_https(Ref, NbAcceptors, TransOpts, ProtoOpts) - when is_integer(NbAcceptors), NbAcceptors > 0 -> - ranch:start_listener(Ref, NbAcceptors, - ranch_ssl, TransOpts, cowboy_protocol, ProtoOpts). - --spec start_spdy(ranch:ref(), non_neg_integer(), ranch_ssl:opts(), - cowboy_spdy:opts()) -> {ok, pid()} | {error, any()}. -start_spdy(Ref, NbAcceptors, TransOpts, ProtoOpts) - when is_integer(NbAcceptors), NbAcceptors > 0 -> - TransOpts2 = [ - {connection_type, supervisor}, - {next_protocols_advertised, - [<<"spdy/3">>, <<"http/1.1">>, <<"http/1.0">>]} - |TransOpts], - ranch:start_listener(Ref, NbAcceptors, - ranch_ssl, TransOpts2, cowboy_spdy, ProtoOpts). + TransOpts = TransOpts0,%[connection_type(ProtoOpts)|TransOpts0], + ranch:start_listener(Ref, NbAcceptors, ranch_tcp, TransOpts, cowboy_clear, ProtoOpts). -spec start_tls(ranch:ref(), non_neg_integer(), ranch_ssl:opts(), opts()) -> {ok, pid()} | {error, any()}. start_tls(Ref, NbAcceptors, TransOpts0, ProtoOpts) when is_integer(NbAcceptors), NbAcceptors > 0 -> - {_, Type} = maps:get(stream_handler, ProtoOpts, {cowboy_stream_h, supervisor}), TransOpts = [ - {connection_type, Type}, + connection_type(ProtoOpts), {next_protocols_advertised, [<<"h2">>, <<"spdy/3">>, <<"http/1.1">>]}, {alpn_preferred_protocols, [<<"h2">>, <<"spdy/3">>, <<"http/1.1">>]} |TransOpts0], - ranch:start_listener(Ref, NbAcceptors, - ranch_ssl, TransOpts, cowboy_tls, ProtoOpts). + ranch:start_listener(Ref, NbAcceptors, ranch_ssl, TransOpts, cowboy_tls, ProtoOpts). + +-spec connection_type(opts()) -> {connection_type, worker | supervisor}. +connection_type(ProtoOpts) -> + {_, Type} = maps:get(stream_handler, ProtoOpts, {cowboy_stream_h, supervisor}), + {connection_type, Type}. -spec stop_listener(ranch:ref()) -> ok | {error, not_found}. stop_listener(Ref) -> diff --git a/src/cowboy_clear.erl b/src/cowboy_clear.erl new file mode 100644 index 0000000..cc6078e --- /dev/null +++ b/src/cowboy_clear.erl @@ -0,0 +1,37 @@ +%% Copyright (c) 2016, Loïc Hoguin <[email protected]> +%% +%% Permission to use, copy, modify, and/or distribute this software for any +%% purpose with or without fee is hereby granted, provided that the above +%% copyright notice and this permission notice appear in all copies. +%% +%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES +%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF +%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR +%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES +%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN +%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF +%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + +-module(cowboy_clear). +-behavior(ranch_protocol). + +-export([start_link/4]). +-export([init/5]). + +-spec start_link(ranch:ref(), inet:socket(), module(), cowboy:opts()) -> {ok, pid()}. +start_link(Ref, Socket, Transport, Opts) -> + Pid = proc_lib:spawn_link(?MODULE, init, [self(), Ref, Socket, Transport, Opts]), + {ok, Pid}. + +-spec init(pid(), ranch:ref(), inet:socket(), module(), cowboy:opts()) -> ok. +init(Parent, Ref, Socket, Transport, Opts) -> + ok = ranch:accept_ack(Ref), + init(Parent, Ref, Socket, Transport, Opts, cowboy_http). + +init(Parent, Ref, Socket, Transport, Opts, Protocol) -> + {Handler, Type} = maps:get(stream_handler, Opts, {cowboy_stream_h, supervisor}), + _ = case Type of + worker -> ok; + supervisor -> process_flag(trap_exit, true) + end, + Protocol:init(Parent, Ref, Socket, Transport, Opts, Handler). diff --git a/src/cowboy_handler.erl b/src/cowboy_handler.erl index 165888e..7791fe2 100644 --- a/src/cowboy_handler.erl +++ b/src/cowboy_handler.erl @@ -35,10 +35,8 @@ -spec execute(Req, Env) -> {ok, Req, Env} when Req::cowboy_req:req(), Env::cowboy_middleware:env(). -execute(Req, Env) -> - {_, Handler} = lists:keyfind(handler, 1, Env), - {_, HandlerOpts} = lists:keyfind(handler_opts, 1, Env), - try Handler:init(Req, HandlerOpts) of +execute(Req, Env=#{handler := Handler, handler_opts := HandlerOpts}) -> + case Handler:init(Req, HandlerOpts) of {ok, Req2, State} -> Result = terminate(normal, Req2, State, Handler), {ok, Req2, [{result, Result}|Env]}; @@ -50,37 +48,13 @@ execute(Req, Env) -> Mod:upgrade(Req2, Env, Handler, State, Timeout, run); {Mod, Req2, State, Timeout, hibernate} -> Mod:upgrade(Req2, Env, Handler, State, Timeout, hibernate) - catch Class:Reason -> - Stacktrace = erlang:get_stacktrace(), - cowboy_req:maybe_reply(Stacktrace, Req), - terminate({crash, Class, Reason}, Req, HandlerOpts, Handler), - exit({cowboy_handler, [ - {class, Class}, - {reason, Reason}, - {mfa, {Handler, init, 2}}, - {stacktrace, Stacktrace}, - {req, cowboy_req:to_list(Req)}, - {opts, HandlerOpts} - ]}) end. -spec terminate(any(), Req, any(), module()) -> ok when Req::cowboy_req:req(). terminate(Reason, Req, State, Handler) -> case erlang:function_exported(Handler, terminate, 3) of true -> - try - Handler:terminate(Reason, cowboy_req:lock(Req), State) - catch Class:Reason2 -> - exit({cowboy_handler, [ - {class, Class}, - {reason, Reason2}, - {mfa, {Handler, terminate, 3}}, - {stacktrace, erlang:get_stacktrace()}, - {req, cowboy_req:to_list(Req)}, - {state, State}, - {terminate_reason, Reason} - ]}) - end; + Handler:terminate(Reason, cowboy_req:lock(Req), State); false -> ok end. diff --git a/src/cowboy_http.erl b/src/cowboy_http.erl new file mode 100644 index 0000000..3ec3c17 --- /dev/null +++ b/src/cowboy_http.erl @@ -0,0 +1,973 @@ +%% Copyright (c) 2016, Loïc Hoguin <[email protected]> +%% +%% Permission to use, copy, modify, and/or distribute this software for any +%% purpose with or without fee is hereby granted, provided that the above +%% copyright notice and this permission notice appear in all copies. +%% +%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES +%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF +%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR +%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES +%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN +%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF +%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + +-module(cowboy_http). + +-export([init/6]). + +-export([system_continue/3]). +-export([system_terminate/4]). +-export([system_code_change/4]). + +%% @todo map +-type opts() :: [{compress, boolean()} + | {env, cowboy_middleware:env()} + | {max_empty_lines, non_neg_integer()} + | {max_header_name_length, non_neg_integer()} + | {max_header_value_length, non_neg_integer()} + | {max_headers, non_neg_integer()} + | {max_keepalive, non_neg_integer()} + | {max_request_line_length, non_neg_integer()} + | {middlewares, [module()]} + | {onresponse, cowboy:onresponse_fun()} + | {timeout, timeout()}]. +-export_type([opts/0]). + +-record(ps_request_line, { + empty_lines = 0 :: non_neg_integer() +}). + +-record(ps_header, { + method = undefined :: binary(), + path = undefined :: binary(), + qs = undefined :: binary(), + version = undefined :: cowboy:http_version(), + headers = undefined :: map() | undefined, %% @todo better type than map() + name = undefined :: binary() +}). + +%% @todo We need a state where we wait for the stream process to ask for the body. +%% OR DO WE + +%% In HTTP/2 we start receiving data before the body asks for it, even if optionally +%% (and by default), so we need to be able to do the same for HTTP/1.1 too. This means +%% that when we receive data (up to a certain limit, we read from the socket and decode. +%% When we reach a limit, we stop reading from the socket momentarily until the stream +%% process asks for more or the stream ends. + +%% This means that we need to keep a buffer in the stream handler (until the stream +%% process asks for it). And that we need the body state to indicate how much we have +%% left to read (and stop/start reading from the socket depending on value). + +-record(ps_body, { + %% @todo flow + transfer_decode_fun :: fun(), %% @todo better type + transfer_decode_state :: any() %% @todo better type +}). + +-record(stream, { + %% Stream identifier. + id = undefined :: cowboy_stream:streamid(), + + %% Stream handler state. + state = undefined :: any(), + + %% Client HTTP version for this stream. + version = undefined :: cowboy:http_version(), + + %% Commands queued. + queue = [] :: [] %% @todo better type +}). + +-type stream() :: #stream{}. + +-record(state, { + parent :: pid(), + ref :: ranch:ref(), + socket :: inet:socket(), + transport :: module(), + opts = #{} :: map(), + handler :: module(), + + timer = undefined :: undefined | reference(), + + %% Identifier for the stream currently being read (or waiting to be received). + in_streamid = 1 :: pos_integer(), + + %% Parsing state for the current stream or stream-to-be. + in_state = #ps_request_line{} :: #ps_request_line{} | #ps_header{} | #ps_body{}, + + %% Identifier for the stream currently being written. + %% Note that out_streamid =< in_streamid. + out_streamid = 1 :: pos_integer(), + + %% Whether we finished writing data for the current stream. + out_state = wait :: wait | headers | chunked, + + %% The connection will be closed after this stream. + last_streamid = undefined :: pos_integer(), + + %% Currently active HTTP/1.1 streams. Streams may be initiated either + %% by the client or by the server through PUSH_PROMISE frames. + streams = [] :: [stream()], + + %% Children which are in the process of shutting down. + children = [] :: [{pid(), cowboy_stream:streamid(), timeout()}] + + %% @todo Automatic compression. (compress option?) + %% @todo onresponse? Equivalent using streams. +}). + +-include_lib("cowlib/include/cow_inline.hrl"). +-include_lib("cowlib/include/cow_parse.hrl"). + +-spec init(pid(), ranch:ref(), inet:socket(), module(), cowboy:opts(), module()) -> ok. +init(Parent, Ref, Socket, Transport, Opts, Handler) -> + LastStreamID = maps:get(max_keepalive, Opts, 100), + before_loop(set_request_timeout(#state{ + parent=Parent, ref=Ref, socket=Socket, + transport=Transport, opts=Opts, handler=Handler, + last_streamid=LastStreamID}), <<>>). + +%% @todo Send a response depending on in_state and whether one was already sent. + +%% @todo +%% Timeouts: +%% - waiting for new request (if no stream is currently running) +%% -> request_timeout: for whole request/headers, set at init/when we set ps_request_line{} state +%% - waiting for body (if a stream requested the body to be read) +%% -> read_body_timeout: amount of time we wait without receiving any data when reading the body +%% - if we skip the body, skip only for a specific duration +%% -> skip_body_timeout: also have a skip_body_length +%% - none if we have a stream running and it didn't request the body to be read +%% - global +%% -> inactivity_timeout: max time to wait without anything happening before giving up + +before_loop(State=#state{socket=Socket, transport=Transport}, Buffer) -> + %% @todo disable this when we get to the body, until the stream asks for it? + %% Perhaps have a threshold for how much we're willing to read before waiting. + Transport:setopts(Socket, [{active, once}]), + loop(State, Buffer). + +loop(State=#state{parent=Parent, socket=Socket, transport=Transport, + handler=_Handler, timer=TimerRef, children=Children}, Buffer) -> + {OK, Closed, Error} = Transport:messages(), + receive + %% Socket messages. + {OK, Socket, Data} -> + parse(<< Buffer/binary, Data/binary >>, State); + {Closed, Socket} -> + terminate(State, {socket_error, closed, 'The socket has been closed.'}); + {Error, Socket, Reason} -> + terminate(State, {socket_error, Reason, 'An error has occurred on the socket.'}); + %% Timeouts. + {timeout, TimerRef, Reason} -> + timeout(State, Reason); + {timeout, _, _} -> + loop(State, Buffer); + %% System messages. + {'EXIT', Parent, Reason} -> + exit(Reason); + {system, From, Request} -> + sys:handle_system_msg(Request, From, Parent, ?MODULE, [], {State, Buffer}); + %% Messages pertaining to a stream. + {{Pid, StreamID}, Msg} when Pid =:= self() -> + loop(info(State, StreamID, Msg), Buffer); + %% Exit signal from children. + Msg = {'EXIT', Pid, _} -> + loop(down(State, Pid, Msg), Buffer); + %% Calls from supervisor module. + {'$gen_call', {From, Tag}, which_children} -> + Workers = [{?MODULE, Pid, worker, [?MODULE]} || {Pid, _, _} <- Children], + From ! {Tag, Workers}, + loop(State, Buffer); + {'$gen_call', {From, Tag}, count_children} -> + NbChildren = length(Children), + Counts = [{specs, 1}, {active, NbChildren}, + {supervisors, 0}, {workers, NbChildren}], + From ! {Tag, Counts}, + loop(State, Buffer); + {'$gen_call', {From, Tag}, _} -> + From ! {Tag, {error, ?MODULE}}, + loop(State, Buffer); + %% Unknown messages. + Msg -> + error_logger:error_msg("Received stray message ~p.", [Msg]), + loop(State, Buffer) + %% @todo Configurable timeout. This should be a global inactivity timeout + %% that triggers when really nothing happens (ie something went really wrong). + after 300000 -> + terminate(State, {internal_error, timeout, 'No message or data received before timeout.'}) + end. + +set_request_timeout(State0=#state{timer=TimerRef0, opts=Opts}) -> + State = cancel_request_timeout(State0), + Timeout = maps:get(request_timeout, Opts, 5000), + TimerRef = erlang:start_timer(Timeout, self(), request_timeout), + State#state{timer=TimerRef}. + +cancel_request_timeout(State=#state{timer=TimerRef, opts=Opts}) -> + ok = case TimerRef of + undefined -> ok; + _ -> erlang:cancel_timer(TimerRef, [{async, true}, {info, false}]) + end, + State#state{timer=undefined}. + +%% @todo Honestly it would be much better if we didn't enable pipelining yet. +timeout(State=#state{in_state=#ps_request_line{}}, request_timeout) -> + %% @todo If other streams are running, just set the connection to be closed + %% and stop trying to read from the socket? + terminate(State, {connection_error, timeout, 'No request-line received before timeout.'}); +timeout(State=#state{socket=Socket, transport=Transport, in_state=#ps_header{}}, request_timeout) -> + %% @todo If other streams are running, maybe wait for their reply before sending 408? + %% -> Definitely. Either way, stop reading from the socket and make that stream the last. + Transport:send(Socket, cow_http:response(408, 'HTTP/1.1', [])), + terminate(State, {connection_error, timeout, 'Request headers not received before timeout.'}). + +%% Request-line. +parse(<<>>, State) -> + before_loop(State, <<>>); +parse(Buffer, State=#state{in_state=#ps_request_line{empty_lines=EmptyLines}}) -> + after_parse(parse_request(Buffer, State, EmptyLines)); +parse(Buffer, State=#state{in_state=PS=#ps_header{headers=Headers, name=undefined}}) -> + after_parse(parse_header(Buffer, + State#state{in_state=PS#ps_header{headers=undefined}}, + Headers)); +parse(Buffer, State=#state{in_state=PS=#ps_header{headers=Headers, name=Name}}) -> + after_parse(parse_hd_before_value(Buffer, + State#state{in_state=PS#ps_header{headers=undefined, name=undefined}}, + Headers, Name)); +parse(Buffer, State=#state{in_state=#ps_body{}}) -> + %% @todo We do not want to get the body automatically if the request doesn't ask for it. + %% We may want to get bodies that are below a threshold without waiting, and buffer them + %% until the request asks, though. + + %% @todo Transfer-decoding must be done here. + after_parse(parse_body(Buffer, State)). +%% @todo Don't parse if body is finished but request isn't. Let's not parallelize for now. + +after_parse({request, Req=#{streamid := StreamID, headers := Headers, version := Version}, + State0=#state{handler=Handler, opts=Opts, streams=Streams0}, Buffer}) -> + %% @todo Opts at the end. Maybe pass the same Opts we got? + try Handler:init(StreamID, Req, Opts) of + {Commands, StreamState} -> + Streams = [#stream{id=StreamID, state=StreamState, version=Version}|Streams0], + State = case maybe_req_close(State0, Headers, Version) of + close -> State0#state{streams=Streams, last_streamid=StreamID}; + keepalive -> State0#state{streams=Streams} + end, + parse(Buffer, commands(State, StreamID, Commands)) + catch Class:Reason -> + error_logger:error_msg("Exception occurred in ~s:init(~p, ~p, ~p) " + "with reason ~p:~p.", + [Handler, StreamID, Req, Opts, Class, Reason]), + ok + %% @todo Status code. +% stream_reset(State, StreamID, {internal_error, {Class, Reason}, +% 'Exception occurred in StreamHandler:init/10 call.'}) %% @todo Check final arity. + end; +%% Streams are sequential so the body is always about the last stream created +%% unless that stream has terminated. +after_parse({data, StreamID, IsFin, Data, State=#state{handler=Handler, + streams=Streams0=[Stream=#stream{id=StreamID, state=StreamState0}|_]}, Buffer}) -> + try Handler:data(StreamID, IsFin, Data, StreamState0) of + {Commands, StreamState} -> + Streams = lists:keyreplace(StreamID, #stream.id, Streams0, + Stream#stream{state=StreamState}), + parse(Buffer, commands(State#state{streams=Streams}, StreamID, Commands)) + catch Class:Reason -> + error_logger:error_msg("Exception occurred in ~s:data(~p, ~p, ~p, ~p) with reason ~p:~p.", + [Handler, StreamID, IsFin, Data, StreamState0, Class, Reason]), + ok + %% @todo +% stream_reset(State, StreamID, {internal_error, {Class, Reason}, +% 'Exception occurred in StreamHandler:data/4 call.'}) + end; +%% No corresponding stream, skip. +after_parse({data, _, _, _, State, Buffer}) -> + before_loop(State, Buffer); +after_parse({more, State, Buffer}) -> + before_loop(State, Buffer). + +%% Request-line. + +-spec parse_request(binary(), #state{}, non_neg_integer()) -> ok. +%% Empty lines must be using \r\n. +parse_request(<< $\n, _/bits >>, State, _) -> + error_terminate(400, State, {connection_error, protocol_error, + ''}); %% @todo +parse_request(<< $\s, _/bits >>, State, _) -> + error_terminate(400, State, {connection_error, protocol_error, + ''}); %% @todo +%% We limit the length of the Request-line to MaxLength to avoid endlessly +%% reading from the socket and eventually crashing. +parse_request(Buffer, State=#state{opts=Opts}, EmptyLines) -> + MaxLength = maps:get(max_request_line_length, Opts, 8000), + MaxEmptyLines = maps:get(max_empty_lines, Opts, 5), + case match_eol(Buffer, 0) of + nomatch when byte_size(Buffer) > MaxLength -> + error_terminate(414, State, {connection_error, limit_reached, + ''}); %% @todo + nomatch -> + {more, State#state{in_state=#ps_request_line{empty_lines=EmptyLines}}, Buffer}; + 1 when EmptyLines =:= MaxEmptyLines -> + error_terminate(400, State, {connection_error, limit_reached, + ''}); %% @todo + 1 -> + << _:16, Rest/bits >> = Buffer, + parse_request(Rest, State, EmptyLines + 1); + _ -> + case Buffer of + %% @todo * is only for server-wide OPTIONS request (RFC7230 5.3.4); tests + << "OPTIONS * ", Rest/bits >> -> + parse_version(Rest, State, <<"OPTIONS">>, <<"*">>, <<>>); +% << "CONNECT ", Rest/bits >> -> +% parse_authority( %% @todo + _ -> + parse_method(Buffer, State, <<>>, + maps:get(max_method_length, Opts, 32)) + end + end. + +match_eol(<< $\n, _/bits >>, N) -> + N; +match_eol(<< _, Rest/bits >>, N) -> + match_eol(Rest, N + 1); +match_eol(_, _) -> + nomatch. + +parse_method(_, State, _, 0) -> + error_terminate(501, State, {connection_error, limit_reached, + 'The method name is longer than configuration allows. (RFC7230 3.1.1)'}); +parse_method(<< C, Rest/bits >>, State, SoFar, Remaining) -> + case C of + $\r -> error_terminate(400, State, {connection_error, protocol_error, + ''}); %% @todo + $\s -> parse_uri(Rest, State, SoFar); + _ when ?IS_TOKEN(C) -> parse_method(Rest, State, << SoFar/binary, C >>, Remaining - 1); + _ -> error_terminate(400, State, {connection_error, protocol_error, + 'The method name must contain only valid token characters. (RFC7230 3.1.1)'}) + end. + +parse_uri(<< H, T, T, P, "://", Rest/bits >>, State, Method) + when H =:= $h orelse H =:= $H, T =:= $t orelse T =:= $T; + P =:= $p orelse P =:= $P -> + parse_uri_skip_host(Rest, State, Method); +parse_uri(<< H, T, T, P, S, "://", Rest/bits >>, State, Method) + when H =:= $h orelse H =:= $H, T =:= $t orelse T =:= $T; + P =:= $p orelse P =:= $P; S =:= $s orelse S =:= $S -> + parse_uri_skip_host(Rest, State, Method); +parse_uri(<< $/, Rest/bits >>, State, Method) -> + parse_uri_path(Rest, State, Method, << $/ >>); +parse_uri(_, State, _) -> + error_terminate(400, State, {connection_error, protocol_error, + 'Invalid request-line or request-target. (RFC7230 3.1.1, RFC7230 5.3)'}). + +parse_uri_skip_host(<< C, Rest/bits >>, State, Method) -> + case C of + $\r -> error_terminate(400, State, {connection_error, protocol_error, + ''}); %% @todo + $/ -> parse_uri_path(Rest, State, Method, <<"/">>); + $\s -> parse_version(Rest, State, Method, <<"/">>, <<>>); + $? -> parse_uri_query(Rest, State, Method, <<"/">>, <<>>); + $# -> skip_uri_fragment(Rest, State, Method, <<"/">>, <<>>); + _ -> parse_uri_skip_host(Rest, State, Method) + end. + +parse_uri_path(<< C, Rest/bits >>, State, Method, SoFar) -> + case C of + $\r -> error_terminate(400, State, {connection_error, protocol_error, + ''}); %% @todo + $\s -> parse_version(Rest, State, Method, SoFar, <<>>); + $? -> parse_uri_query(Rest, State, Method, SoFar, <<>>); + $# -> skip_uri_fragment(Rest, State, Method, SoFar, <<>>); + _ -> parse_uri_path(Rest, State, Method, << SoFar/binary, C >>) + end. + +parse_uri_query(<< C, Rest/bits >>, State, M, P, SoFar) -> + case C of + $\r -> error_terminate(400, State, {connection_error, protocol_error, + ''}); %% @todo + $\s -> parse_version(Rest, State, M, P, SoFar); + $# -> skip_uri_fragment(Rest, State, M, P, SoFar); + _ -> parse_uri_query(Rest, State, M, P, << SoFar/binary, C >>) + end. + +skip_uri_fragment(<< C, Rest/bits >>, State, M, P, Q) -> + case C of + $\r -> error_terminate(400, State, {connection_error, protocol_error, + ''}); %% @todo + $\s -> parse_version(Rest, State, M, P, Q); + _ -> skip_uri_fragment(Rest, State, M, P, Q) + end. + +%% @todo Calls to parse_header should update the state. +parse_version(<< "HTTP/1.1\r\n", Rest/bits >>, State, M, P, Q) -> + parse_headers(Rest, State, M, P, Q, 'HTTP/1.1'); +parse_version(<< "HTTP/1.0\r\n", Rest/bits >>, State, M, P, Q) -> + parse_headers(Rest, State, M, P, Q, 'HTTP/1.0'); +parse_version(<< "HTTP/1.", _, C, _/bits >>, State, _, _, _) when C =:= $\s; C =:= $\t -> + error_terminate(400, State, {connection_error, protocol_error, + 'Whitespace is not allowed after the HTTP version. (RFC7230 3.1.1)'}); +parse_version(<< C, _/bits >>, State, _, _, _) when C =:= $\s; C =:= $\t -> + error_terminate(400, State, {connection_error, protocol_error, + 'The separator between request target and version must be a single SP.'}); +parse_version(_, State, _, _, _) -> + error_terminate(505, State, {connection_error, protocol_error, + ''}). %% @todo + +parse_headers(Rest, State, M, P, Q, V) -> + %% @todo Figure out the parse states. + parse_header(Rest, State#state{in_state=#ps_header{ + method=M, path=P, qs=Q, version=V}}, #{}). + +%% Headers. + +%% We need two or more bytes in the buffer to continue. +parse_header(Rest, State=#state{in_state=PS}, Headers) when byte_size(Rest) < 2 -> + {more, State#state{in_state=PS#ps_header{headers=Headers}}, Rest}; +parse_header(<< $\r, $\n, Rest/bits >>, S, Headers) -> + request(Rest, S, Headers); +parse_header(Buffer, State=#state{opts=Opts, in_state=PS}, Headers) -> + MaxLength = maps:get(max_header_name_length, Opts, 64), + MaxHeaders = maps:get(max_headers, Opts, 100), + case match_colon(Buffer, 0) of + nomatch when byte_size(Buffer) > MaxLength -> + error_terminate(400, State, {connection_error, limit_reached, + ''}); %% @todo + nomatch when length(Headers) >= MaxHeaders -> + error_terminate(400, State, {connection_error, limit_reached, + ''}); %% @todo + nomatch -> + {more, State#state{in_state=PS#ps_header{headers=Headers}}, Buffer}; + _ -> + parse_hd_name(Buffer, State, Headers, <<>>) + end. + +match_colon(<< $:, _/bits >>, N) -> + N; +match_colon(<< _, Rest/bits >>, N) -> + match_colon(Rest, N + 1); +match_colon(_, _) -> + nomatch. + +parse_hd_name(<< $:, Rest/bits >>, State, H, SoFar) -> + parse_hd_before_value(Rest, State, H, SoFar); +parse_hd_name(<< C, _/bits >>, State, _, <<>>) when ?IS_WS(C) -> + error_terminate(400, State, {connection_error, protocol_error, + ''}); %% @todo +parse_hd_name(<< C, Rest/bits >>, State, H, SoFar) when ?IS_WS(C) -> + parse_hd_name_ws(Rest, State, H, SoFar); +parse_hd_name(<< C, Rest/bits >>, State, H, SoFar) -> + ?LOWER(parse_hd_name, Rest, State, H, SoFar). + +parse_hd_name_ws(<< C, Rest/bits >>, S, H, Name) -> + case C of + $\s -> parse_hd_name_ws(Rest, S, H, Name); + $\t -> parse_hd_name_ws(Rest, S, H, Name); + $: -> parse_hd_before_value(Rest, S, H, Name) + end. + +parse_hd_before_value(<< $\s, Rest/bits >>, S, H, N) -> + parse_hd_before_value(Rest, S, H, N); +parse_hd_before_value(<< $\t, Rest/bits >>, S, H, N) -> + parse_hd_before_value(Rest, S, H, N); +parse_hd_before_value(Buffer, State=#state{opts=Opts, in_state=PS}, H, N) -> + MaxLength = maps:get(max_header_value_length, Opts, 4096), + case match_eol(Buffer, 0) of + nomatch when byte_size(Buffer) > MaxLength -> + error_terminate(400, State, {connection_error, limit_reached, + ''}); %% @todo + nomatch -> + {more, State#state{in_state=PS#ps_header{headers=H, name=N}}, Buffer}; + _ -> + parse_hd_value(Buffer, State, H, N, <<>>) + end. + +parse_hd_value(<< $\r, $\n, Rest/bits >>, S, Headers, Name, SoFar) -> + %% @todo What to do about duplicate header names. + parse_header(Rest, S, Headers#{Name => clean_value_ws_end(SoFar, byte_size(SoFar) - 1)}); +parse_hd_value(<< C, Rest/bits >>, S, H, N, SoFar) -> + parse_hd_value(Rest, S, H, N, << SoFar/binary, C >>). + +clean_value_ws_end(_, -1) -> + <<>>; +clean_value_ws_end(Value, N) -> + case binary:at(Value, N) of + $\s -> clean_value_ws_end(Value, N - 1); + $\t -> clean_value_ws_end(Value, N - 1); + _ -> + S = N + 1, + << Value2:S/binary, _/bits >> = Value, + Value2 + end. + +-ifdef(TEST). +clean_value_ws_end_test_() -> + Tests = [ + {<<>>, <<>>}, + {<<" ">>, <<>>}, + {<<"text/*;q=0.3, text/html;q=0.7, text/html;level=1, " + "text/html;level=2;q=0.4, */*;q=0.5 \t \t ">>, + <<"text/*;q=0.3, text/html;q=0.7, text/html;level=1, " + "text/html;level=2;q=0.4, */*;q=0.5">>} + ], + [{V, fun() -> R = clean_value_ws_end(V, byte_size(V) - 1) end} || {V, R} <- Tests]. + +horse_clean_value_ws_end() -> + horse:repeat(200000, + clean_value_ws_end( + <<"text/*;q=0.3, text/html;q=0.7, text/html;level=1, " + "text/html;level=2;q=0.4, */*;q=0.5 ">>, + byte_size(<<"text/*;q=0.3, text/html;q=0.7, text/html;level=1, " + "text/html;level=2;q=0.4, */*;q=0.5 ">>) - 1) + ). +-endif. + +request(Buffer, State=#state{transport=Transport, in_streamid=StreamID, + in_state=#ps_header{version=Version}}, Headers) -> + case maps:get(<<"host">>, Headers, undefined) of + undefined when Version =:= 'HTTP/1.1' -> + %% @todo Might want to not close the connection on this and next one. + error_terminate(400, State, {stream_error, StreamID, protocol_error, + ''}); %% @todo + undefined -> + request(Buffer, State, Headers, <<>>, default_port(Transport:secure())); + RawHost -> + try parse_host(RawHost, false, <<>>) of + {Host, undefined} -> + request(Buffer, State, Headers, Host, default_port(Transport:secure())); + {Host, Port} -> + request(Buffer, State, Headers, Host, Port) + catch _:_ -> + error_terminate(400, State, {stream_error, StreamID, protocol_error, + ''}) %% @todo + end + end. + +-spec default_port(boolean()) -> 80 | 443. +default_port(true) -> 443; +default_port(_) -> 80. + +%% @todo Yeah probably just call the cowlib function. +%% Same code as cow_http:parse_fullhost/1, but inline because we +%% really want this to go fast. +parse_host(<< $[, Rest/bits >>, false, <<>>) -> + parse_host(Rest, true, << $[ >>); +parse_host(<<>>, false, Acc) -> + {Acc, undefined}; +parse_host(<< $:, Rest/bits >>, false, Acc) -> + {Acc, list_to_integer(binary_to_list(Rest))}; +parse_host(<< $], Rest/bits >>, true, Acc) -> + parse_host(Rest, false, << Acc/binary, $] >>); +parse_host(<< C, Rest/bits >>, E, Acc) -> + ?LOWER(parse_host, Rest, E, Acc). + +%% End of request parsing. + +%% @todo We used to get the peername here, bad idea, should +%% get it at the very start of the connection, or the first +%% time requested if we go the route of handler sending a +%% message to get it (we probably shouldn't). +request(Buffer, State0=#state{ref=Ref, transport=Transport, in_streamid=StreamID, + in_state=#ps_header{method=Method, path=Path, qs=Qs, version=Version}}, + Headers, Host, Port) -> + Scheme = case Transport:secure() of + true -> <<"https">>; + false -> <<"http">> + end, + {HasBody, BodyLength, TDecodeFun, TDecodeState} = case Headers of + #{<<"content-length">> := <<"0">>} -> + {false, 0, undefined, undefined}; + #{<<"content-length">> := BinLength} -> + Length = try + cow_http_hd:parse_content_length(BinLength) + catch _:_ -> + error_terminate(400, State0, {stream_error, StreamID, protocol_error, + ''}) %% @todo + %% @todo Err should terminate here... + end, + {true, Length, fun cow_http_te:stream_identity/2, {0, Length}}; + %% @todo Better handling of transfer decoding. + #{<<"transfer-encoding">> := <<"chunked">>} -> + {true, undefined, fun cow_http_te:stream_chunked/2, {0, 0}}; + _ -> + {false, 0, undefined, undefined} + end, + Req = #{ + ref => Ref, + pid => self(), + streamid => StreamID, + + %% @todo peer + %% @todo sockname + %% @todo ssl client cert? + + method => Method, + scheme => Scheme, + host => Host, + %% host_info (cowboy_router) + port => Port, + path => Path, + %% path_info (cowboy_router) + %% bindings (cowboy_router) + qs => Qs, + version => Version, + %% We are transparently taking care of transfer-encodings so + %% the user code has no need to know about it. + headers => maps:remove(<<"transfer-encoding">>, Headers), + + has_body => HasBody, + body_length => BodyLength + %% @todo multipart? keep state separate + + %% meta values (cowboy_websocket, cowboy_rest) + }, + State = case HasBody of + true -> + cancel_request_timeout(State0#state{in_state=#ps_body{ + %% @todo Don't need length anymore? + transfer_decode_fun = TDecodeFun, + transfer_decode_state = TDecodeState + }}); + false -> + set_request_timeout(State0#state{in_streamid=StreamID + 1, in_state=#ps_request_line{}}) + end, + {request, Req, State, Buffer}. + +%% Request body parsing. + +parse_body(Buffer, State=#state{in_streamid=StreamID, in_state= + PS=#ps_body{transfer_decode_fun=TDecode, transfer_decode_state=TState0}}) -> + %% @todo Proper trailers. + case TDecode(Buffer, TState0) of + more -> + %% @todo Asks for 0 or more bytes. + {more, State, Buffer}; + {more, Data, TState} -> + %% @todo Asks for 0 or more bytes. + {data, StreamID, nofin, Data, State#state{in_state= + PS#ps_body{transfer_decode_state=TState}}, <<>>}; + {more, Data, _Length, TState} when is_integer(_Length) -> + %% @todo Asks for Length more bytes. + {data, StreamID, nofin, Data, State#state{in_state= + PS#ps_body{transfer_decode_state=TState}}, <<>>}; + {more, Data, Rest, TState} -> + %% @todo Asks for 0 or more bytes. + {data, StreamID, nofin, Data, State#state{in_state= + PS#ps_body{transfer_decode_state=TState}}, Rest}; + {done, TotalLength, Rest} -> + {data, StreamID, {fin, TotalLength}, <<>>, set_request_timeout( + State#state{in_streamid=StreamID + 1, in_state=#ps_request_line{}}), Rest}; + {done, Data, TotalLength, Rest} -> + {data, StreamID, {fin, TotalLength}, Data, set_request_timeout( + State#state{in_streamid=StreamID + 1, in_state=#ps_request_line{}}), Rest} + end. + +%% Message handling. + +down(State=#state{children=Children0}, Pid, Msg) -> + case lists:keytake(Pid, 1, Children0) of + {value, {_, undefined, _}, Children} -> + State#state{children=Children}; + {value, {_, StreamID, _}, Children} -> + info(State#state{children=Children}, StreamID, Msg); + false -> + error_logger:error_msg("Received EXIT signal ~p for unknown process ~p.", [Msg, Pid]), + State + end. + +info(State=#state{handler=Handler, streams=Streams0}, StreamID, Msg) -> + case lists:keyfind(StreamID, #stream.id, Streams0) of + Stream = #stream{state=StreamState0} -> + try Handler:info(StreamID, Msg, StreamState0) of + {Commands, StreamState} -> + Streams = lists:keyreplace(StreamID, #stream.id, Streams0, + Stream#stream{state=StreamState}), + commands(State#state{streams=Streams}, StreamID, Commands) + catch Class:Reason -> + error_logger:error_msg("Exception occurred in ~s:info(~p, ~p, ~p) with reason ~p:~p.", + [Handler, StreamID, Msg, StreamState0, Class, Reason]), + ok +%% @todo +% stream_reset(State, StreamID, {internal_error, {Class, Reason}, +% 'Exception occurred in StreamHandler:info/3 call.'}) + end; + false -> + error_logger:error_msg("Received message ~p for unknown stream ~p.", [Msg, StreamID]), + State + end. + +%% @todo commands/3 +%% @todo stream_reset + + + + +%% Commands. + +commands(State, _, []) -> + State; +%% Supervise a child process. +commands(State=#state{children=Children}, StreamID, [{spawn, Pid, Shutdown}|Tail]) -> + commands(State#state{children=[{Pid, StreamID, Shutdown}|Children]}, StreamID, Tail); +%% Error handling. +commands(State, StreamID, [Error = {internal_error, _, _}|Tail]) -> + commands(stream_reset(State, StreamID, Error), StreamID, Tail); +%% Commands for a stream currently inactive. +commands(State=#state{out_streamid=Current, streams=Streams0}, StreamID, Commands) + when Current =/= StreamID -> + + %% @todo We still want to handle some commands... + + Stream = #stream{queue=Queue} = lists:keyfind(StreamID, #stream.id, Streams0), + Streams = lists:keyreplace(StreamID, #stream.id, Streams0, + Stream#stream{queue=Queue ++ Commands}), + State#state{streams=Streams}; +%% Read the request body. +commands(State, StreamID, [{flow, _Length}|Tail]) -> + %% @todo We only read from socket if buffer is empty, otherwise + %% we decode the buffer. + + %% @todo Set the body reading length to min(Length, BodyLength) + + commands(State, StreamID, Tail); +%% @todo Probably a good idea to have an atomic response send (single send call for resp+body). +%% Send a full response. +%% +%% @todo Kill the stream if it sent a response when one has already been sent. +%% @todo Keep IsFin in the state. +%% @todo Same two things above apply to DATA, possibly promise too. +commands(State0=#state{socket=Socket, transport=Transport, streams=Streams}, StreamID, + [{response, StatusCode, Headers0, Body}|Tail]) -> + %% @todo I'm pretty sure the last stream in the list is the one we want + %% considering all others are queued. + #stream{version=Version} = lists:keyfind(StreamID, #stream.id, Streams), + {State, Headers} = connection(State0, Headers0, StreamID, Version), + %% @todo Ensure content-length is set. + Response = cow_http:response(StatusCode, 'HTTP/1.1', maps:to_list(Headers)), + case Body of + {sendfile, O, B, P} -> + Transport:send(Socket, Response), + commands(State#state{out_state=done}, StreamID, [{sendfile, fin, O, B, P}|Tail]); + _ -> + Transport:send(Socket, [Response, Body]), + %% @todo If max number of requests, close connection. + %% @todo If IsFin, maybe skip body of current request. + maybe_terminate(State#state{out_state=done}, StreamID, Tail, fin) + end; +%% Send response headers and initiate chunked encoding. +commands(State0=#state{socket=Socket, transport=Transport, streams=Streams}, StreamID, + [{headers, StatusCode, Headers0}|Tail]) -> + %% @todo Same as above. + #stream{version=Version} = lists:keyfind(StreamID, #stream.id, Streams), + {State1, Headers1} = case Version of + 'HTTP/1.1' -> + {State0, Headers0#{<<"transfer-encoding">> => <<"chunked">>}}; + %% Close the connection after streaming the data to HTTP/1.0 client. + %% @todo I'm guessing we need to differentiate responses with a content-length and others. + 'HTTP/1.0' -> + {State0#state{last_streamid=StreamID}, Headers0} + end, + {State, Headers} = connection(State1, Headers1, StreamID, Version), + Transport:send(Socket, cow_http:response(StatusCode, 'HTTP/1.1', maps:to_list(Headers))), + commands(State#state{out_state=chunked}, StreamID, Tail); +%% Send a response body chunk. +%% +%% @todo WINDOW_UPDATE stuff require us to buffer some data. +commands(State=#state{socket=Socket, transport=Transport, streams=Streams}, StreamID, + [{data, IsFin, Data}|Tail]) -> + %% @todo Same as above. + Headers1 = case lists:keyfind(StreamID, #stream.id, Streams) of + #stream{version='HTTP/1.1'} -> + Size = iolist_size(Data), + Transport:send(Socket, [integer_to_list(Size, 16), <<"\r\n">>, Data, <<"\r\n">>]); + #stream{version='HTTP/1.0'} -> + Transport:send(Socket, Data) + end, + maybe_terminate(State, StreamID, Tail, IsFin); +%% Send a file. +commands(State=#state{socket=Socket, transport=Transport}, StreamID, + [{sendfile, IsFin, Offset, Bytes, Path}|Tail]) -> + Transport:sendfile(Socket, Path, Offset, Bytes), + maybe_terminate(State, StreamID, Tail, IsFin); +%% Protocol takeover. +commands(State0=#state{ref=Ref, parent=Parent, socket=Socket, transport=Transport, + opts=Opts, children=Children}, StreamID, + [{switch_protocol, Headers, Protocol, InitialState}|_Tail]) -> + %% @todo This should be the last stream running otherwise we need to wait before switching. + %% @todo If there's streams opened after this one, fail instead of 101. + State = cancel_request_timeout(State0), + %% @todo When we actually do the upgrade, we only have the one stream left, plus + %% possibly some processes terminating. We need a smart strategy for handling the + %% children shutdown. We can start with brutal_kill and discarding the EXIT messages + %% received before switching to Websocket. Something better would be to let the + %% stream processes finish but that implies the Websocket module to know about + %% them and filter the messages. For now, kill them all and discard all messages + %% in the mailbox. + _ = [exit(Pid, kill) || {Pid, _, _} <- Children], + flush(), + %% Everything good, upgrade! + _ = commands(State, StreamID, [{response, 101, Headers, <<>>}]), + %% @todo This is no good because commands return a state normally and here it doesn't + %% we need to let this module go entirely. Perhaps it should be handled directly in + %% cowboy_clear/cowboy_tls? Perhaps not. We do want that Buffer. + Protocol:takeover(Parent, Ref, Socket, Transport, Opts, <<>>, InitialState); +%% Stream shutdown. +commands(State, StreamID, [stop|Tail]) -> + %% @todo Do we want to run the commands after a stop? +% commands(stream_terminate(State, StreamID, stop), StreamID, Tail). + maybe_terminate(State, StreamID, Tail, fin). + +flush() -> + receive _ -> flush() after 0 -> ok end. + +maybe_terminate(State, StreamID, Tail, nofin) -> + commands(State, StreamID, Tail); +%% @todo In these cases I'm not sure if we should continue processing commands. +maybe_terminate(State=#state{last_streamid=StreamID}, StreamID, _Tail, fin) -> + terminate(stream_terminate(State, StreamID, normal), normal); %% @todo Reason ok? +maybe_terminate(State, StreamID, _Tail, fin) -> + stream_terminate(State, StreamID, normal). + +stream_reset(State=#state{socket=Socket, transport=Transport}, StreamID, + StreamError={internal_error, _, _}) -> + %% @todo headers + %% @todo Don't send this if there are no streams left. + Transport:send(Socket, cow_http:response(500, 'HTTP/1.1', [ + {<<"content-length">>, <<"0">>} + ])), + %% @todo update IsFin local + stream_terminate(State#state{out_state=done}, StreamID, StreamError). + +stream_terminate(State=#state{socket=Socket, transport=Transport, handler=Handler, + out_streamid=OutStreamID, out_state=OutState, + streams=Streams0, children=Children0}, StreamID, Reason) -> + {value, #stream{state=StreamState, version=Version}, Streams} + = lists:keytake(StreamID, #stream.id, Streams0), + _ = case OutState of + wait -> + Transport:send(Socket, cow_http:response(204, 'HTTP/1.1', [])); + chunked when Version =:= 'HTTP/1.1' -> + Transport:send(Socket, <<"0\r\n\r\n">>); + _ -> %% done or Version =:= 'HTTP/1.0' + ok + end, + + stream_call_terminate(StreamID, Reason, Handler, StreamState), +%% @todo initiate children shutdown +% Children = stream_terminate_children(Children0, StreamID, []), + Children = [case C of + {Pid, StreamID, Shutdown} -> {Pid, undefined, Shutdown}; + _ -> C + end || C <- Children0], + + %% @todo Skip the body, if any, or drop the connection if too large. + + %% @todo Only do this if Current =:= StreamID. + NextOutStreamID = OutStreamID + 1, + case lists:keyfind(NextOutStreamID, #stream.id, Streams) of + false -> + %% @todo This is clearly wrong, if the stream is gone we need to check if + %% there used to be such a stream, and if there was to send an error. + State#state{out_streamid=NextOutStreamID, out_state=wait, streams=Streams, children=Children}; + #stream{queue=Commands} -> + %% @todo Remove queue from the stream. + commands(State#state{out_streamid=NextOutStreamID, out_state=wait, + streams=Streams, children=Children}, NextOutStreamID, Commands) + end. + +%% @todo Taken directly from _http2 +stream_call_terminate(StreamID, Reason, Handler, StreamState) -> + try + Handler:terminate(StreamID, Reason, StreamState), + ok + catch Class:Reason -> + error_logger:error_msg("Exception occurred in ~s:terminate(~p, ~p, ~p) with reason ~p:~p.", + [Handler, StreamID, Reason, StreamState, Class, Reason]) + end. + +%stream_terminate_children([], _, Acc) -> +% Acc; +%stream_terminate_children([{Pid, StreamID}|Tail], StreamID, Acc) -> +% exit(Pid, kill), +% stream_terminate_children(Tail, StreamID, Acc); +%stream_terminate_children([Child|Tail], StreamID, Acc) -> +% stream_terminate_children(Tail, StreamID, [Child|Acc]). + + +%% @todo max_reqs also +maybe_req_close(_, #{<<"connection">> := Conn}, 'HTTP/1.0') -> + Conns = cow_http_hd:parse_connection(Conn), + case lists:member(<<"keep-alive">>, Conns) of + true -> keepalive; + false -> close + end; +maybe_req_close(_, _, 'HTTP/1.0') -> + close; +maybe_req_close(_, #{<<"connection">> := Conn}, 'HTTP/1.1') -> + case connection_hd_is_close(Conn) of + true -> close; + false -> keepalive + end; +maybe_req_close(_State, _, _) -> + keepalive. + +connection(State=#state{last_streamid=StreamID}, Headers=#{<<"connection">> := Conn}, StreamID, _) -> + case connection_hd_is_close(Conn) of + true -> {State, Headers}; + %% @todo Here we need to remove keep-alive and add close, not just add close. + false -> {State, Headers#{<<"connection">> => [<<"close, ">>, Conn]}} + end; +connection(State=#state{last_streamid=StreamID}, Headers, StreamID, _) -> + {State, Headers#{<<"connection">> => <<"close">>}}; +connection(State, Headers=#{<<"connection">> := Conn}, StreamID, _) -> + case connection_hd_is_close(Conn) of + true -> {State#state{last_streamid=StreamID}, Headers}; + %% @todo Here we need to set keep-alive only if it wasn't set before. + false -> {State, Headers} + end; +connection(State, Headers, _, 'HTTP/1.0') -> + {State, Headers#{<<"connection">> => <<"keep-alive">>}}; +connection(State, Headers, _, _) -> + {State, Headers}. + +connection_hd_is_close(Conn) -> + Conns = cow_http_hd:parse_connection(iolist_to_binary(Conn)), + lists:member(<<"close">>, Conns). + +error_terminate(StatusCode, State=#state{socket=Socket, transport=Transport}, Reason) -> + Transport:send(Socket, cow_http:response(StatusCode, 'HTTP/1.1', [ + {<<"content-length">>, <<"0">>} + ])), + terminate(State, Reason). + +terminate(_State, _Reason) -> + exit(normal). %% @todo + + + + + + + + + + + + + +%% System callbacks. + +-spec system_continue(_, _, #state{}) -> ok. +system_continue(_, _, {State, Buffer}) -> + loop(State, Buffer). + +-spec system_terminate(any(), _, _, _) -> no_return(). +system_terminate(Reason, _, _, _) -> + exit(Reason). + +-spec system_code_change(Misc, _, _, _) -> {ok, Misc} when Misc::{#state{}, binary()}. +system_code_change(Misc, _, _, _) -> + {ok, Misc}. diff --git a/src/cowboy_http2.erl b/src/cowboy_http2.erl index 6018461..42d8ba5 100644 --- a/src/cowboy_http2.erl +++ b/src/cowboy_http2.erl @@ -86,8 +86,7 @@ init(Parent, Ref, Socket, Transport, Opts, Handler) -> before_loop(State, Buffer) -> loop(State, Buffer). -loop(State=#state{parent=Parent, socket=Socket, transport=Transport, - handler=Handler, children=Children}, Buffer) -> +loop(State=#state{parent=Parent, socket=Socket, transport=Transport, children=Children}, Buffer) -> Transport:setopts(Socket, [{active, once}]), {OK, Closed, Error} = Transport:messages(), receive @@ -104,7 +103,7 @@ loop(State=#state{parent=Parent, socket=Socket, transport=Transport, {system, From, Request} -> sys:handle_system_msg(Request, From, Parent, ?MODULE, [], {State, Buffer}); %% Messages pertaining to a stream. - {{Handler, StreamID}, Msg} -> + {{Pid, StreamID}, Msg} when Pid =:= self() -> loop(info(State, StreamID, Msg), Buffer); %% Exit signal from children. Msg = {'EXIT', Pid, _} -> @@ -328,8 +327,8 @@ commands(State0=#state{socket=Socket, transport=Transport, server_streamid=Promi commands(State, StreamID, [{flow, _Size}|Tail]) -> commands(State, StreamID, Tail); %% Supervise a child process. -commands(State=#state{children=Children}, StreamID, [{spawn, Pid}|Tail]) -> - commands(State#state{children=[{Pid, StreamID}|Children]}, StreamID, Tail); +commands(State=#state{children=Children}, StreamID, [{spawn, Pid, _Shutdown}|Tail]) -> %% @todo Shutdown + commands(State#state{children=[{Pid, StreamID}|Children]}, StreamID, Tail); %% Upgrade to a new protocol. %% %% @todo Implementation. @@ -357,7 +356,7 @@ terminate_all_streams([#stream{id=StreamID, state=StreamState}|Tail], Reason, Ha %% Stream functions. -stream_init(State0=#state{socket=Socket, transport=Transport, handler=Handler, +stream_init(State0=#state{ref=Ref, socket=Socket, transport=Transport, handler=Handler, opts=Opts, streams=Streams0, decode_state=DecodeState0}, StreamID, IsFin, HeaderBlock) -> %% @todo Add clause for CONNECT requests (no scheme/path). try headers_decode(HeaderBlock, DecodeState0) of @@ -365,10 +364,46 @@ stream_init(State0=#state{socket=Socket, transport=Transport, handler=Handler, <<":method">> := Method, <<":scheme">> := Scheme, <<":authority">> := Authority, - <<":path">> := Path}, DecodeState} -> + <<":path">> := PathWithQs}, DecodeState} -> State = State0#state{decode_state=DecodeState}, Headers = maps:without([<<":method">>, <<":scheme">>, <<":authority">>, <<":path">>], Headers0), - try Handler:init(StreamID, IsFin, Method, Scheme, Authority, Path, Headers) of + %% @todo We need to parse the port out of :authority. + %% @todo We need to parse the query string out of :path. + %% @todo We need to give a way to get the socket infos. + + Host = Authority, %% @todo + Port = todo, %% @todo + Path = PathWithQs, %% @todo + Qs = todo, %% @todo + + Req = #{ + ref => Ref, + pid => self(), + streamid => StreamID, + + %% @todo peer + %% @todo sockname + %% @todo ssl client cert? + + method => Method, + scheme => Scheme, + host => Host, + %% host_info (cowboy_router) + port => Port, + path => Path, + %% path_info (cowboy_router) + %% bindings (cowboy_router) + qs => Qs, + version => 'HTTP/2', + headers => Headers, + + has_body => IsFin =:= nofin + %% @todo multipart? keep state separate + + %% meta values (cowboy_websocket, cowboy_rest) + }, + + try Handler:init(StreamID, Req, Opts) of {Commands, StreamState} -> Streams = [#stream{id=StreamID, state=StreamState}|Streams0], commands(State#state{streams=Streams}, StreamID, Commands) @@ -377,7 +412,7 @@ stream_init(State0=#state{socket=Socket, transport=Transport, handler=Handler, "with reason ~p:~p.", [Handler, StreamID, IsFin, Method, Scheme, Authority, Path, Headers, Class, Reason]), stream_reset(State, StreamID, {internal_error, {Class, Reason}, - 'Exception occurred in StreamHandler:init/7 call.'}) + 'Exception occurred in StreamHandler:init/7 call.'}) %% @todo Check final arity. end; {_, DecodeState} -> Transport:send(Socket, cow_http2:rst_stream(StreamID, protocol_error)), diff --git a/src/cowboy_loop.erl b/src/cowboy_loop.erl index 695439a..e162417 100644 --- a/src/cowboy_loop.erl +++ b/src/cowboy_loop.erl @@ -60,15 +60,21 @@ upgrade(Req, Env, Handler, HandlerState, Timeout, run) -> State2 = timeout(State), after_call(Req, State2, Handler, HandlerState); upgrade(Req, Env, Handler, HandlerState, Timeout, hibernate) -> + +% dbg:start(), +% dbg:tracer(), +% dbg:tpl(?MODULE, []), +% dbg:tpl(long_polling_h, []), +% dbg:tpl(loop_handler_body_h, []), +% dbg:tpl(cowboy_req, []), +% dbg:p(all, c), + State = #state{env=Env, max_buffer=get_max_buffer(Env), hibernate=true, timeout=Timeout}, State2 = timeout(State), after_call(Req, State2, Handler, HandlerState). -get_max_buffer(Env) -> - case lists:keyfind(loop_max_buffer, 1, Env) of - false -> 5000; - {_, MaxBuffer} -> MaxBuffer - end. +get_max_buffer(#{loop_max_buffer := MaxBuffer}) -> MaxBuffer; +get_max_buffer(_) -> 5000. %% Update the state if the response was sent in the callback. after_call(Req, State=#state{resp_sent=false}, Handler, @@ -83,12 +89,21 @@ after_call(Req, State, Handler, HandlerState) -> before_loop(Req, State, Handler, HandlerState). before_loop(Req, State=#state{hibernate=true}, Handler, HandlerState) -> - [Socket, Transport] = cowboy_req:get([socket, transport], Req), - Transport:setopts(Socket, [{active, once}]), + + %% @todo Yeah we can't get the socket anymore. + %% Everything changes since we are a separate process now. + %% Proper flow control at the connection level should be implemented + %% instead of what we have here. + +% [Socket, Transport] = cowboy_req:get([socket, transport], Req), +% Transport:setopts(Socket, [{active, once}]), {suspend, ?MODULE, loop, [Req, State#state{hibernate=false}, Handler, HandlerState]}; before_loop(Req, State, Handler, HandlerState) -> - [Socket, Transport] = cowboy_req:get([socket, transport], Req), - Transport:setopts(Socket, [{active, once}]), + + %% Same here. + +% [Socket, Transport] = cowboy_req:get([socket, transport], Req), +% Transport:setopts(Socket, [{active, once}]), loop(Req, State, Handler, HandlerState). %% Almost the same code can be found in cowboy_websocket. @@ -109,26 +124,26 @@ timeout(State=#state{timeout=Timeout, loop(Req, State=#state{buffer_size=NbBytes, max_buffer=Threshold, timeout_ref=TRef, resp_sent=RespSent}, Handler, HandlerState) -> - [Socket, Transport] = cowboy_req:get([socket, transport], Req), - {OK, Closed, Error} = Transport:messages(), +% [Socket, Transport] = cowboy_req:get([socket, transport], Req), +% {OK, Closed, Error} = Transport:messages(), receive - {OK, Socket, Data} -> - NbBytes2 = NbBytes + byte_size(Data), - if NbBytes2 > Threshold -> - _ = if RespSent -> ok; true -> - cowboy_req:reply(500, Req) - end, - cowboy_handler:terminate({error, overflow}, Req, HandlerState, Handler), - exit(normal); - true -> - Req2 = cowboy_req:append_buffer(Data, Req), - State2 = timeout(State#state{buffer_size=NbBytes2}), - before_loop(Req2, State2, Handler, HandlerState) - end; - {Closed, Socket} -> - terminate(Req, State, Handler, HandlerState, {error, closed}); - {Error, Socket, Reason} -> - terminate(Req, State, Handler, HandlerState, {error, Reason}); +% {OK, Socket, Data} -> +% NbBytes2 = NbBytes + byte_size(Data), +% if NbBytes2 > Threshold -> +% _ = if RespSent -> ok; true -> +% cowboy_req:reply(500, Req) +% end, +% cowboy_handler:terminate({error, overflow}, Req, HandlerState, Handler), +% exit(normal); +% true -> +% Req2 = cowboy_req:append_buffer(Data, Req), +% State2 = timeout(State#state{buffer_size=NbBytes2}), +% before_loop(Req2, State2, Handler, HandlerState) +% end; +% {Closed, Socket} -> +% terminate(Req, State, Handler, HandlerState, {error, closed}); +% {Error, Socket, Reason} -> +% terminate(Req, State, Handler, HandlerState, {error, Reason}); {timeout, TRef, ?MODULE} -> after_loop(Req, State, Handler, HandlerState, timeout); {timeout, OlderTRef, ?MODULE} when is_reference(OlderTRef) -> @@ -139,13 +154,13 @@ loop(Req, State=#state{buffer_size=NbBytes, %% data received after that and put it into the buffer. %% We do not check the size here, if data keeps coming %% we'll error out on the next packet received. - Transport:setopts(Socket, [{active, false}]), - Req2 = receive {OK, Socket, Data} -> - cowboy_req:append_buffer(Data, Req) - after 0 -> - Req - end, - call(Req2, State, Handler, HandlerState, Message) +% Transport:setopts(Socket, [{active, false}]), +% Req2 = receive {OK, Socket, Data} -> +% cowboy_req:append_buffer(Data, Req) +% after 0 -> +% Req +% end, + call(Req, State, Handler, HandlerState, Message) end. call(Req, State=#state{resp_sent=RespSent}, @@ -168,7 +183,7 @@ call(Req, State=#state{resp_sent=RespSent}, {reason, Reason}, {mfa, {Handler, info, 3}}, {stacktrace, Stacktrace}, - {req, cowboy_req:to_list(Req)}, + {req, Req}, {state, HandlerState} ]}) end. diff --git a/src/cowboy_protocol.erl b/src/cowboy_protocol.erl deleted file mode 100644 index 90128c3..0000000 --- a/src/cowboy_protocol.erl +++ /dev/null @@ -1,534 +0,0 @@ -%% Copyright (c) 2011-2014, Loïc Hoguin <[email protected]> -%% Copyright (c) 2011, Anthony Ramine <[email protected]> -%% -%% Permission to use, copy, modify, and/or distribute this software for any -%% purpose with or without fee is hereby granted, provided that the above -%% copyright notice and this permission notice appear in all copies. -%% -%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES -%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF -%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR -%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES -%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN -%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF -%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. - --module(cowboy_protocol). - -%% API. --export([start_link/4]). - -%% Internal. --export([init/4]). --export([parse_request/3]). --export([resume/6]). - --type opts() :: [{compress, boolean()} - | {env, cowboy_middleware:env()} - | {max_empty_lines, non_neg_integer()} - | {max_header_name_length, non_neg_integer()} - | {max_header_value_length, non_neg_integer()} - | {max_headers, non_neg_integer()} - | {max_keepalive, non_neg_integer()} - | {max_request_line_length, non_neg_integer()} - | {middlewares, [module()]} - | {onresponse, cowboy:onresponse_fun()} - | {timeout, timeout()}]. --export_type([opts/0]). - --record(state, { - socket :: inet:socket(), - transport :: module(), - middlewares :: [module()], - compress :: boolean(), - env :: cowboy_middleware:env(), - onresponse = undefined :: undefined | cowboy:onresponse_fun(), - max_empty_lines :: non_neg_integer(), - req_keepalive = 1 :: non_neg_integer(), - max_keepalive :: non_neg_integer(), - max_request_line_length :: non_neg_integer(), - max_header_name_length :: non_neg_integer(), - max_header_value_length :: non_neg_integer(), - max_headers :: non_neg_integer(), - timeout :: timeout(), - until :: non_neg_integer() | infinity -}). - --include_lib("cowlib/include/cow_inline.hrl"). --include_lib("cowlib/include/cow_parse.hrl"). - -%% API. - --spec start_link(ranch:ref(), inet:socket(), module(), opts()) -> {ok, pid()}. -start_link(Ref, Socket, Transport, Opts) -> - Pid = spawn_link(?MODULE, init, [Ref, Socket, Transport, Opts]), - {ok, Pid}. - -%% Internal. - -%% Faster alternative to proplists:get_value/3. -get_value(Key, Opts, Default) -> - case lists:keyfind(Key, 1, Opts) of - {_, Value} -> Value; - _ -> Default - end. - --spec init(ranch:ref(), inet:socket(), module(), opts()) -> ok. -init(Ref, Socket, Transport, Opts) -> - ok = ranch:accept_ack(Ref), - Timeout = get_value(timeout, Opts, 5000), - Until = until(Timeout), - case recv(Socket, Transport, Until) of - {ok, Data} -> - OnFirstRequest = get_value(onfirstrequest, Opts, undefined), - case OnFirstRequest of - undefined -> ok; - _ -> OnFirstRequest(Ref, Socket, Transport, Opts) - end, - Compress = get_value(compress, Opts, false), - MaxEmptyLines = get_value(max_empty_lines, Opts, 5), - MaxHeaderNameLength = get_value(max_header_name_length, Opts, 64), - MaxHeaderValueLength = get_value(max_header_value_length, Opts, 4096), - MaxHeaders = get_value(max_headers, Opts, 100), - MaxKeepalive = get_value(max_keepalive, Opts, 100), - MaxRequestLineLength = get_value(max_request_line_length, Opts, 4096), - Middlewares = get_value(middlewares, Opts, [cowboy_router, cowboy_handler]), - Env = [{listener, Ref}|get_value(env, Opts, [])], - OnResponse = get_value(onresponse, Opts, undefined), - parse_request(Data, #state{socket=Socket, transport=Transport, - middlewares=Middlewares, compress=Compress, env=Env, - max_empty_lines=MaxEmptyLines, max_keepalive=MaxKeepalive, - max_request_line_length=MaxRequestLineLength, - max_header_name_length=MaxHeaderNameLength, - max_header_value_length=MaxHeaderValueLength, max_headers=MaxHeaders, - onresponse=OnResponse, timeout=Timeout, until=Until}, 0); - {error, _} -> - terminate(#state{socket=Socket, transport=Transport}) %% @todo ridiculous - end. - --spec until(timeout()) -> non_neg_integer() | infinity. -until(infinity) -> - infinity; -until(Timeout) -> - erlang:monotonic_time(milli_seconds) + Timeout. - -%% Request parsing. -%% -%% The next set of functions is the request parsing code. All of it -%% runs using a single binary match context. This optimization ends -%% right after the header parsing is finished and the code becomes -%% more interesting past that point. - --spec recv(inet:socket(), module(), non_neg_integer() | infinity) - -> {ok, binary()} | {error, closed | timeout | atom()}. -recv(Socket, Transport, infinity) -> - Transport:recv(Socket, 0, infinity); -recv(Socket, Transport, Until) -> - Timeout = Until - erlang:monotonic_time(milli_seconds), - if Timeout < 0 -> - {error, timeout}; - true -> - Transport:recv(Socket, 0, Timeout) - end. - --spec wait_request(binary(), #state{}, non_neg_integer()) -> ok. -wait_request(Buffer, State=#state{socket=Socket, transport=Transport, - until=Until}, ReqEmpty) -> - case recv(Socket, Transport, Until) of - {ok, Data} -> - parse_request(<< Buffer/binary, Data/binary >>, State, ReqEmpty); - {error, _} -> - terminate(State) - end. - --spec parse_request(binary(), #state{}, non_neg_integer()) -> ok. -%% Empty lines must be using \r\n. -parse_request(<< $\n, _/bits >>, State, _) -> - error_terminate(400, State); -parse_request(<< $\s, _/bits >>, State, _) -> - error_terminate(400, State); -%% We limit the length of the Request-line to MaxLength to avoid endlessly -%% reading from the socket and eventually crashing. -parse_request(Buffer, State=#state{max_request_line_length=MaxLength, - max_empty_lines=MaxEmpty}, ReqEmpty) -> - case match_eol(Buffer, 0) of - nomatch when byte_size(Buffer) > MaxLength -> - error_terminate(414, State); - nomatch -> - wait_request(Buffer, State, ReqEmpty); - 1 when ReqEmpty =:= MaxEmpty -> - error_terminate(400, State); - 1 -> - << _:16, Rest/bits >> = Buffer, - parse_request(Rest, State, ReqEmpty + 1); - _ -> - parse_method(Buffer, State, <<>>) - end. - -match_eol(<< $\n, _/bits >>, N) -> - N; -match_eol(<< _, Rest/bits >>, N) -> - match_eol(Rest, N + 1); -match_eol(_, _) -> - nomatch. - -parse_method(<< C, Rest/bits >>, State, SoFar) -> - case C of - $\r -> error_terminate(400, State); - $\s -> parse_uri(Rest, State, SoFar); - _ -> parse_method(Rest, State, << SoFar/binary, C >>) - end. - -parse_uri(<< $\r, _/bits >>, State, _) -> - error_terminate(400, State); -parse_uri(<< $\s, _/bits >>, State, _) -> - error_terminate(400, State); -parse_uri(<< "* ", Rest/bits >>, State, Method) -> - parse_version(Rest, State, Method, <<"*">>, <<>>); -parse_uri(<< "http://", Rest/bits >>, State, Method) -> - parse_uri_skip_host(Rest, State, Method); -parse_uri(<< "https://", Rest/bits >>, State, Method) -> - parse_uri_skip_host(Rest, State, Method); -parse_uri(<< "HTTP://", Rest/bits >>, State, Method) -> - parse_uri_skip_host(Rest, State, Method); -parse_uri(<< "HTTPS://", Rest/bits >>, State, Method) -> - parse_uri_skip_host(Rest, State, Method); -parse_uri(Buffer, State, Method) -> - parse_uri_path(Buffer, State, Method, <<>>). - -parse_uri_skip_host(<< C, Rest/bits >>, State, Method) -> - case C of - $\r -> error_terminate(400, State); - $/ -> parse_uri_path(Rest, State, Method, <<"/">>); - $\s -> parse_version(Rest, State, Method, <<"/">>, <<>>); - $? -> parse_uri_query(Rest, State, Method, <<"/">>, <<>>); - $# -> skip_uri_fragment(Rest, State, Method, <<"/">>, <<>>); - _ -> parse_uri_skip_host(Rest, State, Method) - end. - -parse_uri_path(<< C, Rest/bits >>, State, Method, SoFar) -> - case C of - $\r -> error_terminate(400, State); - $\s -> parse_version(Rest, State, Method, SoFar, <<>>); - $? -> parse_uri_query(Rest, State, Method, SoFar, <<>>); - $# -> skip_uri_fragment(Rest, State, Method, SoFar, <<>>); - _ -> parse_uri_path(Rest, State, Method, << SoFar/binary, C >>) - end. - -parse_uri_query(<< C, Rest/bits >>, S, M, P, SoFar) -> - case C of - $\r -> error_terminate(400, S); - $\s -> parse_version(Rest, S, M, P, SoFar); - $# -> skip_uri_fragment(Rest, S, M, P, SoFar); - _ -> parse_uri_query(Rest, S, M, P, << SoFar/binary, C >>) - end. - -skip_uri_fragment(<< C, Rest/bits >>, S, M, P, Q) -> - case C of - $\r -> error_terminate(400, S); - $\s -> parse_version(Rest, S, M, P, Q); - _ -> skip_uri_fragment(Rest, S, M, P, Q) - end. - -parse_version(<< "HTTP/1.1\r\n", Rest/bits >>, S, M, P, Q) -> - parse_header(Rest, S, M, P, Q, 'HTTP/1.1', []); -parse_version(<< "HTTP/1.0\r\n", Rest/bits >>, S, M, P, Q) -> - parse_header(Rest, S, M, P, Q, 'HTTP/1.0', []); -parse_version(_, State, _, _, _) -> - error_terminate(505, State). - -%% Stop receiving data if we have more than allowed number of headers. -wait_header(_, State=#state{max_headers=MaxHeaders}, _, _, _, _, Headers) - when length(Headers) >= MaxHeaders -> - error_terminate(400, State); -wait_header(Buffer, State=#state{socket=Socket, transport=Transport, - until=Until}, M, P, Q, V, H) -> - case recv(Socket, Transport, Until) of - {ok, Data} -> - parse_header(<< Buffer/binary, Data/binary >>, - State, M, P, Q, V, H); - {error, timeout} -> - error_terminate(408, State); - {error, _} -> - terminate(State) - end. - -parse_header(<< $\r, $\n, Rest/bits >>, S, M, P, Q, V, Headers) -> - request(Rest, S, M, P, Q, V, lists:reverse(Headers)); -parse_header(Buffer, State=#state{max_header_name_length=MaxLength}, - M, P, Q, V, H) -> - case match_colon(Buffer, 0) of - nomatch when byte_size(Buffer) > MaxLength -> - error_terminate(400, State); - nomatch -> - wait_header(Buffer, State, M, P, Q, V, H); - _ -> - parse_hd_name(Buffer, State, M, P, Q, V, H, <<>>) - end. - -match_colon(<< $:, _/bits >>, N) -> - N; -match_colon(<< _, Rest/bits >>, N) -> - match_colon(Rest, N + 1); -match_colon(_, _) -> - nomatch. - -parse_hd_name(<< $:, Rest/bits >>, S, M, P, Q, V, H, SoFar) -> - parse_hd_before_value(Rest, S, M, P, Q, V, H, SoFar); -parse_hd_name(<< C, Rest/bits >>, S, M, P, Q, V, H, SoFar) when ?IS_WS(C) -> - parse_hd_name_ws(Rest, S, M, P, Q, V, H, SoFar); -parse_hd_name(<< C, Rest/bits >>, S, M, P, Q, V, H, SoFar) -> - ?LOWER(parse_hd_name, Rest, S, M, P, Q, V, H, SoFar). - -parse_hd_name_ws(<< C, Rest/bits >>, S, M, P, Q, V, H, Name) -> - case C of - $\s -> parse_hd_name_ws(Rest, S, M, P, Q, V, H, Name); - $\t -> parse_hd_name_ws(Rest, S, M, P, Q, V, H, Name); - $: -> parse_hd_before_value(Rest, S, M, P, Q, V, H, Name) - end. - -wait_hd_before_value(Buffer, State=#state{ - socket=Socket, transport=Transport, until=Until}, - M, P, Q, V, H, N) -> - case recv(Socket, Transport, Until) of - {ok, Data} -> - parse_hd_before_value(<< Buffer/binary, Data/binary >>, - State, M, P, Q, V, H, N); - {error, timeout} -> - error_terminate(408, State); - {error, _} -> - terminate(State) - end. - -parse_hd_before_value(<< $\s, Rest/bits >>, S, M, P, Q, V, H, N) -> - parse_hd_before_value(Rest, S, M, P, Q, V, H, N); -parse_hd_before_value(<< $\t, Rest/bits >>, S, M, P, Q, V, H, N) -> - parse_hd_before_value(Rest, S, M, P, Q, V, H, N); -parse_hd_before_value(Buffer, State=#state{ - max_header_value_length=MaxLength}, M, P, Q, V, H, N) -> - case match_eol(Buffer, 0) of - nomatch when byte_size(Buffer) > MaxLength -> - error_terminate(400, State); - nomatch -> - wait_hd_before_value(Buffer, State, M, P, Q, V, H, N); - _ -> - parse_hd_value(Buffer, State, M, P, Q, V, H, N, <<>>) - end. - -%% We completely ignore the first argument which is always -%% the empty binary. We keep it there because we don't want -%% to change the other arguments' position and trigger costy -%% operations for no reasons. -wait_hd_value(_, State=#state{ - socket=Socket, transport=Transport, until=Until}, - M, P, Q, V, H, N, SoFar) -> - case recv(Socket, Transport, Until) of - {ok, Data} -> - parse_hd_value(Data, State, M, P, Q, V, H, N, SoFar); - {error, timeout} -> - error_terminate(408, State); - {error, _} -> - terminate(State) - end. - -%% Pushing back as much as we could the retrieval of new data -%% to check for multilines allows us to avoid a few tests in -%% the critical path, but forces us to have a special function. -wait_hd_value_nl(_, State=#state{ - socket=Socket, transport=Transport, until=Until}, - M, P, Q, V, Headers, Name, SoFar) -> - case recv(Socket, Transport, Until) of - {ok, << C, Data/bits >>} when C =:= $\s; C =:= $\t -> - parse_hd_value(Data, State, M, P, Q, V, Headers, Name, SoFar); - {ok, Data} -> - parse_header(Data, State, M, P, Q, V, [{Name, SoFar}|Headers]); - {error, timeout} -> - error_terminate(408, State); - {error, _} -> - terminate(State) - end. - -parse_hd_value(<< $\r, Rest/bits >>, S, M, P, Q, V, Headers, Name, SoFar) -> - case Rest of - << $\n >> -> - wait_hd_value_nl(<<>>, S, M, P, Q, V, Headers, Name, SoFar); - << $\n, C, Rest2/bits >> when C =:= $\s; C =:= $\t -> - parse_hd_value(Rest2, S, M, P, Q, V, Headers, Name, - << SoFar/binary, C >>); - << $\n, Rest2/bits >> -> - parse_header(Rest2, S, M, P, Q, V, [{Name, clean_value_ws_end(SoFar, byte_size(SoFar) - 1)}|Headers]) - end; -parse_hd_value(<< C, Rest/bits >>, S, M, P, Q, V, H, N, SoFar) -> - parse_hd_value(Rest, S, M, P, Q, V, H, N, << SoFar/binary, C >>); -parse_hd_value(<<>>, State=#state{max_header_value_length=MaxLength}, - _, _, _, _, _, _, SoFar) when byte_size(SoFar) > MaxLength -> - error_terminate(400, State); -parse_hd_value(<<>>, S, M, P, Q, V, H, N, SoFar) -> - wait_hd_value(<<>>, S, M, P, Q, V, H, N, SoFar). - -clean_value_ws_end(_, -1) -> - <<>>; -clean_value_ws_end(Value, N) -> - case binary:at(Value, N) of - $\s -> clean_value_ws_end(Value, N - 1); - $\t -> clean_value_ws_end(Value, N - 1); - _ -> - S = N + 1, - << Value2:S/binary, _/bits >> = Value, - Value2 - end. - --ifdef(TEST). -clean_value_ws_end_test_() -> - Tests = [ - {<<>>, <<>>}, - {<<" ">>, <<>>}, - {<<"text/*;q=0.3, text/html;q=0.7, text/html;level=1, " - "text/html;level=2;q=0.4, */*;q=0.5 \t \t ">>, - <<"text/*;q=0.3, text/html;q=0.7, text/html;level=1, " - "text/html;level=2;q=0.4, */*;q=0.5">>} - ], - [{V, fun() -> R = clean_value_ws_end(V, byte_size(V) - 1) end} || {V, R} <- Tests]. --endif. - --ifdef(PERF). -horse_clean_value_ws_end() -> - horse:repeat(200000, - clean_value_ws_end( - <<"text/*;q=0.3, text/html;q=0.7, text/html;level=1, " - "text/html;level=2;q=0.4, */*;q=0.5 ">>, - byte_size(<<"text/*;q=0.3, text/html;q=0.7, text/html;level=1, " - "text/html;level=2;q=0.4, */*;q=0.5 ">>) - 1) - ). --endif. - -request(B, State=#state{transport=Transport}, M, P, Q, Version, Headers) -> - case lists:keyfind(<<"host">>, 1, Headers) of - false when Version =:= 'HTTP/1.1' -> - error_terminate(400, State); - false -> - request(B, State, M, P, Q, Version, Headers, - <<>>, default_port(Transport:name())); - {_, RawHost} -> - try parse_host(RawHost, false, <<>>) of - {Host, undefined} -> - request(B, State, M, P, Q, Version, Headers, - Host, default_port(Transport:name())); - {Host, Port} -> - request(B, State, M, P, Q, Version, Headers, - Host, Port) - catch _:_ -> - error_terminate(400, State) - end - end. - --spec default_port(atom()) -> 80 | 443. -default_port(ssl) -> 443; -default_port(_) -> 80. - -%% Same code as cow_http:parse_fullhost/1, but inline because we -%% really want this to go fast. -parse_host(<< $[, Rest/bits >>, false, <<>>) -> - parse_host(Rest, true, << $[ >>); -parse_host(<<>>, false, Acc) -> - {Acc, undefined}; -parse_host(<< $:, Rest/bits >>, false, Acc) -> - {Acc, list_to_integer(binary_to_list(Rest))}; -parse_host(<< $], Rest/bits >>, true, Acc) -> - parse_host(Rest, false, << Acc/binary, $] >>); -parse_host(<< C, Rest/bits >>, E, Acc) -> - ?LOWER(parse_host, Rest, E, Acc). - -%% End of request parsing. -%% -%% We create the Req object and start handling the request. - -request(Buffer, State=#state{socket=Socket, transport=Transport, - req_keepalive=ReqKeepalive, max_keepalive=MaxKeepalive, - compress=Compress, onresponse=OnResponse}, - Method, Path, Query, Version, Headers, Host, Port) -> - case Transport:peername(Socket) of - {ok, Peer} -> - Req = cowboy_req:new(Socket, Transport, Peer, Method, Path, - Query, Version, Headers, Host, Port, Buffer, - ReqKeepalive < MaxKeepalive, Compress, OnResponse), - execute(Req, State); - {error, _} -> - %% Couldn't read the peer address; connection is gone. - terminate(State) - end. - --spec execute(cowboy_req:req(), #state{}) -> ok. -execute(Req, State=#state{middlewares=Middlewares, env=Env}) -> - execute(Req, State, Env, Middlewares). - --spec execute(cowboy_req:req(), #state{}, cowboy_middleware:env(), [module()]) - -> ok. -execute(Req, State, Env, []) -> - next_request(Req, State, get_value(result, Env, ok)); -execute(Req, State, Env, [Middleware|Tail]) -> - case Middleware:execute(Req, Env) of - {ok, Req2, Env2} -> - execute(Req2, State, Env2, Tail); - {suspend, Module, Function, Args} -> - erlang:hibernate(?MODULE, resume, - [State, Env, Tail, Module, Function, Args]); - {stop, Req2} -> - next_request(Req2, State, ok) - end. - --spec resume(#state{}, cowboy_middleware:env(), [module()], - module(), module(), [any()]) -> ok. -resume(State, Env, Tail, Module, Function, Args) -> - case apply(Module, Function, Args) of - {ok, Req2, Env2} -> - execute(Req2, State, Env2, Tail); - {suspend, Module2, Function2, Args2} -> - erlang:hibernate(?MODULE, resume, - [State, Env, Tail, Module2, Function2, Args2]); - {stop, Req2} -> - next_request(Req2, State, ok) - end. - --spec next_request(cowboy_req:req(), #state{}, any()) -> ok. -next_request(Req, State=#state{req_keepalive=Keepalive, timeout=Timeout}, - HandlerRes) -> - cowboy_req:ensure_response(Req, 204), - %% If we are going to close the connection, - %% we do not want to attempt to skip the body. - case cowboy_req:get(connection, Req) of - close -> - terminate(State); - _ -> - %% Skip the body if it is reasonably sized. Close otherwise. - Buffer = case cowboy_req:body(Req) of - {ok, _, Req2} -> cowboy_req:get(buffer, Req2); - _ -> close - end, - %% Flush the resp_sent message before moving on. - if HandlerRes =:= ok, Buffer =/= close -> - receive {cowboy_req, resp_sent} -> ok after 0 -> ok end, - ?MODULE:parse_request(Buffer, - State#state{req_keepalive=Keepalive + 1, - until=until(Timeout)}, 0); - true -> - terminate(State) - end - end. - --spec error_terminate(cowboy:http_status(), #state{}) -> ok. -error_terminate(Status, State=#state{socket=Socket, transport=Transport, - compress=Compress, onresponse=OnResponse}) -> - error_terminate(Status, cowboy_req:new(Socket, Transport, - undefined, <<"GET">>, <<>>, <<>>, 'HTTP/1.1', [], <<>>, - undefined, <<>>, false, Compress, OnResponse), State). - --spec error_terminate(cowboy:http_status(), cowboy_req:req(), #state{}) -> ok. -error_terminate(Status, Req, State) -> - _ = cowboy_req:reply(Status, Req), - terminate(State). - --spec terminate(#state{}) -> ok. -terminate(#state{socket=Socket, transport=Transport}) -> - Transport:close(Socket), - ok. diff --git a/src/cowboy_req.erl b/src/cowboy_req.erl index 8f0a04b..16b1fd1 100644 --- a/src/cowboy_req.erl +++ b/src/cowboy_req.erl @@ -47,6 +47,9 @@ %% Request body API. -export([has_body/1]). -export([body_length/1]). +-export([read_body/1]). +-export([read_body/2]). + -export([body/1]). -export([body/2]). -export([body_qs/1]). @@ -70,10 +73,12 @@ -export([reply/2]). -export([reply/3]). -export([reply/4]). + +-export([send_body/3]). + -export([chunked_reply/2]). -export([chunked_reply/3]). -export([chunk/2]). --export([upgrade_reply/3]). -export([continue/1]). -export([maybe_reply/2]). -export([ensure_response/2]). @@ -93,12 +98,12 @@ -type transfer_decode_fun() :: fun((binary(), any()) -> cow_http_te:decode_ret()). --type body_opts() :: [{continue, boolean()} +-type body_opts() :: [{continue, boolean()} %% doesn't apply | {length, non_neg_integer()} - | {read_length, non_neg_integer()} - | {read_timeout, timeout()} - | {transfer_decode, transfer_decode_fun(), any()} - | {content_decode, content_decode_fun()}]. + | {read_length, non_neg_integer()} %% to be added back later as optimization + | {read_timeout, timeout()} %% same + | {transfer_decode, transfer_decode_fun(), any()} %% doesn't apply + | {content_decode, content_decode_fun()}]. %% does apply -export_type([body_opts/0]). -type resp_body_fun() :: fun((any(), module()) -> ok). @@ -182,43 +187,43 @@ new(Socket, Transport, Peer, Method, Path, Query, end. -spec method(req()) -> binary(). -method(Req) -> - Req#http_req.method. +method(#{method := Method}) -> + Method. -spec version(req()) -> cowboy:http_version(). -version(Req) -> - Req#http_req.version. +version(#{version := Version}) -> + Version. -spec peer(req()) -> {inet:ip_address(), inet:port_number()} | undefined. peer(Req) -> Req#http_req.peer. -spec host(req()) -> binary(). -host(Req) -> - Req#http_req.host. +host(#{host := Host}) -> + Host. -spec host_info(req()) -> cowboy_router:tokens() | undefined. -host_info(Req) -> - Req#http_req.host_info. +host_info(#{host_info := HostInfo}) -> + HostInfo. -spec port(req()) -> inet:port_number(). -port(Req) -> - Req#http_req.port. +port(#{port := Port}) -> + Port. -spec path(req()) -> binary(). -path(Req) -> - Req#http_req.path. +path(#{path := Path}) -> + Path. -spec path_info(req()) -> cowboy_router:tokens() | undefined. -path_info(Req) -> - Req#http_req.path_info. +path_info(#{path_info := PathInfo}) -> + PathInfo. -spec qs(req()) -> binary(). -qs(Req) -> - Req#http_req.qs. +qs(#{qs := Qs}) -> + Qs. -spec parse_qs(req()) -> [{binary(), binary() | true}]. -parse_qs(#http_req{qs=Qs}) -> +parse_qs(#{qs := Qs}) -> cow_qs:parse_qs(Qs). -spec match_qs(cowboy:fields(), req()) -> map(). @@ -227,30 +232,24 @@ match_qs(Fields, Req) -> %% The URL includes the scheme, host and port only. -spec host_url(req()) -> undefined | binary(). -host_url(#http_req{port=undefined}) -> +host_url(#{port := undefined}) -> undefined; -host_url(#http_req{transport=Transport, host=Host, port=Port}) -> - TransportName = Transport:name(), - Secure = case TransportName of - ssl -> <<"s">>; - _ -> <<>> - end, - PortBin = case {TransportName, Port} of - {ssl, 443} -> <<>>; - {tcp, 80} -> <<>>; +host_url(#{scheme := Scheme, host := Host, port := Port}) -> + PortBin = case {Scheme, Port} of + {<<"https">>, 443} -> <<>>; + {<<"http">>, 80} -> <<>>; _ -> << ":", (integer_to_binary(Port))/binary >> end, - << "http", Secure/binary, "://", Host/binary, PortBin/binary >>. + << Scheme/binary, "://", Host/binary, PortBin/binary >>. %% The URL includes the scheme, host, port, path and query string. -spec url(req()) -> undefined | binary(). -url(Req=#http_req{}) -> - HostURL = host_url(Req), - url(Req, HostURL). +url(Req) -> + url(Req, host_url(Req)). url(_, undefined) -> undefined; -url(#http_req{path=Path, qs=QS}, HostURL) -> +url(#{path := Path, qs := QS}, HostURL) -> QS2 = case QS of <<>> -> <<>>; _ -> << "?", QS/binary >> @@ -262,30 +261,31 @@ binding(Name, Req) -> binding(Name, Req, undefined). -spec binding(atom(), req(), Default) -> any() | Default when Default::any(). -binding(Name, Req, Default) when is_atom(Name) -> - case lists:keyfind(Name, 1, Req#http_req.bindings) of +binding(Name, #{bindings := Bindings}, Default) when is_atom(Name) -> + case lists:keyfind(Name, 1, Bindings) of {_, Value} -> Value; false -> Default - end. + end; +binding(Name, _, Default) when is_atom(Name) -> + Default. -spec bindings(req()) -> [{atom(), any()}]. -bindings(Req) -> - Req#http_req.bindings. +bindings(#{bindings := Bindings}) -> + Bindings; +bindings(_) -> + []. -spec header(binary(), req()) -> binary() | undefined. header(Name, Req) -> header(Name, Req, undefined). -spec header(binary(), req(), Default) -> binary() | Default when Default::any(). -header(Name, Req, Default) -> - case lists:keyfind(Name, 1, Req#http_req.headers) of - {Name, Value} -> Value; - false -> Default - end. +header(Name, #{headers := Headers}, Default) -> + maps:get(Name, Headers, Default). -spec headers(req()) -> cowboy:http_headers(). -headers(Req) -> - Req#http_req.headers. +headers(#{headers := Headers}) -> + Headers. -spec parse_header(binary(), Req) -> any() when Req::req(). parse_header(Name = <<"content-length">>, Req) -> @@ -354,31 +354,52 @@ set_meta(Name, Value, Req=#http_req{meta=Meta}) -> %% Request Body API. -spec has_body(req()) -> boolean(). -has_body(Req) -> - case lists:keyfind(<<"content-length">>, 1, Req#http_req.headers) of - {_, <<"0">>} -> - false; - {_, _} -> - true; - _ -> - lists:keymember(<<"transfer-encoding">>, 1, Req#http_req.headers) - end. +has_body(#{has_body := HasBody}) -> + HasBody. %% The length may not be known if Transfer-Encoding is not identity, %% and the body hasn't been read at the time of the call. -spec body_length(req()) -> undefined | non_neg_integer(). -body_length(Req) -> - case parse_header(<<"transfer-encoding">>, Req) of - [<<"identity">>] -> - parse_header(<<"content-length">>, Req); - _ -> - undefined - end. +body_length(#{body_length := Length}) -> + Length. -spec body(Req) -> {ok, binary(), Req} | {more, binary(), Req} when Req::req(). body(Req) -> body(Req, []). +-spec read_body(Req) -> {ok, binary(), Req} | {more, binary(), Req} when Req::req(). +read_body(Req) -> + read_body(Req, []). + +-spec read_body(Req, body_opts()) -> {ok, binary(), Req} | {more, binary(), Req} when Req::req(). +read_body(Req=#{pid := Pid, streamid := StreamID}, Opts) -> + %% @todo Opts should be a map + Length = case lists:keyfind(length, 1, Opts) of + false -> 8000000; + {_, ChunkLen0} -> ChunkLen0 + end, + ReadTimeout = case lists:keyfind(read_timeout, 1, Opts) of + false -> 15000; + {_, ReadTimeout0} -> ReadTimeout0 + end, + Ref = make_ref(), + Pid ! {{Pid, StreamID}, {read_body, Ref, Length}}, +% io:format("READ_BODY ~p ~p ~p ~p~n", [Pid, StreamID, Ref, Length]), + receive + {request_body, Ref, nofin, Body} -> + {more, Body, Req}; + {request_body, Ref, {fin, BodyLength}, Body} -> + {ok, Body, set_body_length(Req, BodyLength)} + after ReadTimeout -> + exit(read_body_timeout) + end. + +set_body_length(Req=#{headers := Headers}, BodyLength) -> + Req#{ + headers => Headers#{<<"content-length">> => integer_to_binary(BodyLength)}, + body_length => BodyLength + }. + -spec body(Req, body_opts()) -> {ok, binary(), Req} | {more, binary(), Req} when Req::req(). body(Req=#http_req{body_state=waiting}, Opts) -> %% Send a 100 continue if needed (enabled by default). @@ -514,7 +535,7 @@ body_qs(Req) -> -spec body_qs(Req, body_opts()) -> {ok, [{binary(), binary() | true}], Req} | {badlength, Req} when Req::req(). body_qs(Req, Opts) -> - case body(Req, Opts) of + case read_body(Req, Opts) of {ok, Body, Req2} -> {ok, cow_qs:parse_qs(Body), Req2}; {more, _, Req2} -> @@ -535,13 +556,16 @@ part(Req) -> -spec part(Req, body_opts()) -> {ok, cow_multipart:headers(), Req} | {done, Req} when Req::req(). -part(Req=#http_req{multipart=undefined}, Opts) -> - part(init_multipart(Req), Opts); part(Req, Opts) -> - {Data, Req2} = stream_multipart(Req, Opts), - part(Data, Opts, Req2). + case maps:is_key(multipart, Req) of + true -> + {Data, Req2} = stream_multipart(Req, Opts), + part(Data, Opts, Req2); + false -> + part(init_multipart(Req), Opts) + end. -part(Buffer, Opts, Req=#http_req{multipart={Boundary, _}}) -> +part(Buffer, Opts, Req=#{multipart := {Boundary, _}}) -> case cow_multipart:parse_headers(Buffer, Boundary) of more -> {Data, Req2} = stream_multipart(Req, Opts), @@ -550,10 +574,10 @@ part(Buffer, Opts, Req=#http_req{multipart={Boundary, _}}) -> {Data, Req2} = stream_multipart(Req, Opts), part(<< Buffer2/binary, Data/binary >>, Opts, Req2); {ok, Headers, Rest} -> - {ok, Headers, Req#http_req{multipart={Boundary, Rest}}}; + {ok, Headers, Req#{multipart => {Boundary, Rest}}}; %% Ignore epilogue. {done, _} -> - {done, Req#http_req{multipart=undefined}} + {done, Req#{multipart => done}} end. -spec part_body(Req) @@ -565,19 +589,22 @@ part_body(Req) -> -spec part_body(Req, body_opts()) -> {ok, binary(), Req} | {more, binary(), Req} when Req::req(). -part_body(Req=#http_req{multipart=undefined}, Opts) -> - part_body(init_multipart(Req), Opts); part_body(Req, Opts) -> - part_body(<<>>, Opts, Req, <<>>). + case maps:is_key(multipart, Req) of + true -> + part_body(<<>>, Opts, Req, <<>>); + false -> + part_body(init_multipart(Req), Opts) + end. -part_body(Buffer, Opts, Req=#http_req{multipart={Boundary, _}}, Acc) -> +part_body(Buffer, Opts, Req=#{multipart := {Boundary, _}}, Acc) -> ChunkLen = case lists:keyfind(length, 1, Opts) of false -> 8000000; {_, ChunkLen0} -> ChunkLen0 end, case byte_size(Acc) > ChunkLen of true -> - {more, Acc, Req#http_req{multipart={Boundary, Buffer}}}; + {more, Acc, Req#{multipart => {Boundary, Buffer}}}; false -> {Data, Req2} = stream_multipart(Req, Opts), case cow_multipart:parse_body(<< Buffer/binary, Data/binary >>, Boundary) of @@ -591,21 +618,22 @@ part_body(Buffer, Opts, Req=#http_req{multipart={Boundary, _}}, Acc) -> {ok, << Acc/binary, Body/binary >>, Req2}; {done, Body, Rest} -> {ok, << Acc/binary, Body/binary >>, - Req2#http_req{multipart={Boundary, Rest}}} + Req2#{multipart => {Boundary, Rest}}} end end. init_multipart(Req) -> {<<"multipart">>, _, Params} = parse_header(<<"content-type">>, Req), {_, Boundary} = lists:keyfind(<<"boundary">>, 1, Params), - Req#http_req{multipart={Boundary, <<>>}}. + Req#{multipart => {Boundary, <<>>}}. -stream_multipart(Req=#http_req{body_state=BodyState, multipart={_, <<>>}}, Opts) -> - true = BodyState =/= done, - {_, Data, Req2} = body(Req, Opts), +stream_multipart(Req=#{multipart := done}, _) -> + {<<>>, Req}; +stream_multipart(Req=#{multipart := {_, <<>>}}, Opts) -> + {_, Data, Req2} = read_body(Req, Opts), {Data, Req2}; -stream_multipart(Req=#http_req{multipart={Boundary, Buffer}}, _) -> - {Buffer, Req#http_req{multipart={Boundary, <<>>}}}. +stream_multipart(Req=#{multipart := {Boundary, Buffer}}, _) -> + {Buffer, Req#{multipart => {Boundary, <<>>}}}. %% Response API. @@ -618,16 +646,22 @@ stream_multipart(Req=#http_req{multipart={Boundary, Buffer}}, _) -> -> Req when Req::req(). set_resp_cookie(Name, Value, Opts, Req) -> Cookie = cow_cookie:setcookie(Name, Value, Opts), + %% @todo Nah, keep separate. set_resp_header(<<"set-cookie">>, Cookie, Req). -spec set_resp_header(binary(), iodata(), Req) -> Req when Req::req(). -set_resp_header(Name, Value, Req=#http_req{resp_headers=RespHeaders}) -> - Req#http_req{resp_headers=[{Name, Value}|RespHeaders]}. +set_resp_header(Name, Value, Req=#{resp_headers := RespHeaders}) -> + Req#{resp_headers => RespHeaders#{Name => Value}}; +set_resp_header(Name,Value, Req) -> + Req#{resp_headers => #{Name => Value}}. +%% @todo {sendfile, Offset, Bytes, Path} tuple -spec set_resp_body(iodata(), Req) -> Req when Req::req(). set_resp_body(Body, Req) -> - Req#http_req{resp_body=Body}. + Req#{resp_body => Body}. +%set_resp_body(Body, Req) -> +% Req#http_req{resp_body=Body}. -spec set_resp_body_fun(resp_body_fun(), Req) -> Req when Req::req(). set_resp_body_fun(StreamFun, Req) when is_function(StreamFun) -> @@ -647,189 +681,217 @@ set_resp_body_fun(chunked, StreamFun, Req) Req#http_req{resp_body={chunked, StreamFun}}. -spec has_resp_header(binary(), req()) -> boolean(). -has_resp_header(Name, #http_req{resp_headers=RespHeaders}) -> - lists:keymember(Name, 1, RespHeaders). +has_resp_header(Name, #{resp_headers := RespHeaders}) -> + maps:is_key(Name, RespHeaders); +has_resp_header(_, _) -> + false. -spec has_resp_body(req()) -> boolean(). -has_resp_body(#http_req{resp_body=RespBody}) when is_function(RespBody) -> - true; -has_resp_body(#http_req{resp_body={chunked, _}}) -> - true; -has_resp_body(#http_req{resp_body={Length, _}}) -> - Length > 0; -has_resp_body(#http_req{resp_body=RespBody}) -> - iolist_size(RespBody) > 0. +has_resp_body(#{resp_body := {sendfile, Len, _}}) -> + Len > 0; +has_resp_body(#{resp_body := RespBody}) -> + iolist_size(RespBody) > 0; +has_resp_body(_) -> + false. + +%has_resp_body(#http_req{resp_body=RespBody}) when is_function(RespBody) -> +% true; +%has_resp_body(#http_req{resp_body={chunked, _}}) -> +% true; +%has_resp_body(#http_req{resp_body={Length, _}}) -> +% Length > 0; +%has_resp_body(#http_req{resp_body=RespBody}) -> +% iolist_size(RespBody) > 0. -spec delete_resp_header(binary(), Req) -> Req when Req::req(). -delete_resp_header(Name, Req=#http_req{resp_headers=RespHeaders}) -> - RespHeaders2 = lists:keydelete(Name, 1, RespHeaders), - Req#http_req{resp_headers=RespHeaders2}. +delete_resp_header(Name, Req=#{resp_headers := RespHeaders}) -> + Req#{resp_headers => maps:remove(Name, RespHeaders)}. -spec reply(cowboy:http_status(), Req) -> Req when Req::req(). -reply(Status, Req=#http_req{resp_body=Body}) -> - reply(Status, [], Body, Req). +reply(Status, Req) -> + reply(Status, #{}, Req). -spec reply(cowboy:http_status(), cowboy:http_headers(), Req) -> Req when Req::req(). -reply(Status, Headers, Req=#http_req{resp_body=Body}) -> - reply(Status, Headers, Body, Req). +reply(Status, Headers, Req=#{resp_body := Body}) -> + reply(Status, Headers, Body, Req); +reply(Status, Headers, Req) -> + reply(Status, Headers, <<>>, Req). -spec reply(cowboy:http_status(), cowboy:http_headers(), iodata() | resp_body_fun() | {non_neg_integer(), resp_body_fun()} | {chunked, resp_chunked_fun()}, Req) -> Req when Req::req(). -reply(Status, Headers, Body, Req=#http_req{ - socket=Socket, transport=Transport, - version=Version, connection=Connection, - method=Method, resp_compress=Compress, - resp_state=RespState, resp_headers=RespHeaders}) - when RespState =:= waiting; RespState =:= waiting_stream -> - HTTP11Headers = if - Transport =/= cowboy_spdy, Version =:= 'HTTP/1.0', Connection =:= keepalive -> - [{<<"connection">>, atom_to_connection(Connection)}]; - Transport =/= cowboy_spdy, Version =:= 'HTTP/1.1', Connection =:= close -> - [{<<"connection">>, atom_to_connection(Connection)}]; - true -> - [] - end, - Req3 = case Body of - BodyFun when is_function(BodyFun) -> - %% We stream the response body until we close the connection. - RespConn = close, - {RespType, Req2} = if - Transport =:= cowboy_spdy -> - response(Status, Headers, RespHeaders, [ - {<<"date">>, cowboy_clock:rfc1123()}, - {<<"server">>, <<"Cowboy">>} - ], stream, Req); - true -> - response(Status, Headers, RespHeaders, [ - {<<"connection">>, <<"close">>}, - {<<"date">>, cowboy_clock:rfc1123()}, - {<<"server">>, <<"Cowboy">>}, - {<<"transfer-encoding">>, <<"identity">>} - ], <<>>, Req) - end, - if RespType =/= hook, Method =/= <<"HEAD">> -> - BodyFun(Socket, Transport); - true -> ok - end, - Req2#http_req{connection=RespConn}; - {chunked, BodyFun} -> - %% We stream the response body in chunks. - {RespType, Req2} = chunked_response(Status, Headers, Req), - if RespType =/= hook, Method =/= <<"HEAD">> -> - ChunkFun = fun(IoData) -> chunk(IoData, Req2) end, - BodyFun(ChunkFun), - %% Send the last chunk if chunked encoding was used. - if - Version =:= 'HTTP/1.0'; RespState =:= waiting_stream -> - Req2; - true -> - last_chunk(Req2) - end; - true -> Req2 - end; - {ContentLength, BodyFun} -> - %% We stream the response body for ContentLength bytes. - RespConn = response_connection(Headers, Connection), - {RespType, Req2} = response(Status, Headers, RespHeaders, [ - {<<"content-length">>, integer_to_list(ContentLength)}, - {<<"date">>, cowboy_clock:rfc1123()}, - {<<"server">>, <<"Cowboy">>} - |HTTP11Headers], stream, Req), - if RespType =/= hook, Method =/= <<"HEAD">> -> - BodyFun(Socket, Transport); - true -> ok - end, - Req2#http_req{connection=RespConn}; - _ when Compress -> - RespConn = response_connection(Headers, Connection), - Req2 = reply_may_compress(Status, Headers, Body, Req, - RespHeaders, HTTP11Headers, Method), - Req2#http_req{connection=RespConn}; - _ -> - RespConn = response_connection(Headers, Connection), - Req2 = reply_no_compress(Status, Headers, Body, Req, - RespHeaders, HTTP11Headers, Method, iolist_size(Body)), - Req2#http_req{connection=RespConn} - end, - Req3#http_req{resp_state=done, resp_headers=[], resp_body= <<>>}. - -reply_may_compress(Status, Headers, Body, Req, - RespHeaders, HTTP11Headers, Method) -> - BodySize = iolist_size(Body), - try parse_header(<<"accept-encoding">>, Req) of - Encodings -> - CanGzip = (BodySize > 300) - andalso (false =:= lists:keyfind(<<"content-encoding">>, - 1, Headers)) - andalso (false =:= lists:keyfind(<<"content-encoding">>, - 1, RespHeaders)) - andalso (false =:= lists:keyfind(<<"transfer-encoding">>, - 1, Headers)) - andalso (false =:= lists:keyfind(<<"transfer-encoding">>, - 1, RespHeaders)) - andalso (Encodings =/= undefined) - andalso (false =/= lists:keyfind(<<"gzip">>, 1, Encodings)), - case CanGzip of - true -> - GzBody = zlib:gzip(Body), - {_, Req2} = response(Status, Headers, RespHeaders, [ - {<<"content-length">>, integer_to_list(byte_size(GzBody))}, - {<<"content-encoding">>, <<"gzip">>}, - {<<"date">>, cowboy_clock:rfc1123()}, - {<<"server">>, <<"Cowboy">>} - |HTTP11Headers], - case Method of <<"HEAD">> -> <<>>; _ -> GzBody end, - Req), - Req2; - false -> - reply_no_compress(Status, Headers, Body, Req, - RespHeaders, HTTP11Headers, Method, BodySize) - end - catch _:_ -> - reply_no_compress(Status, Headers, Body, Req, - RespHeaders, HTTP11Headers, Method, BodySize) - end. +reply(Status, Headers, Stream = {stream, undefined, _}, Req) -> + do_stream_reply(Status, Headers, Stream, Req); +reply(Status, Headers, Stream = {stream, Len, _}, Req) -> + do_stream_reply(Status, Headers#{ + <<"content-length">> => integer_to_binary(Len) + }, Stream, Req); +reply(Status, Headers, SendFile = {sendfile, _, Len, _}, Req) -> + do_reply(Status, Headers#{ + <<"content-length">> => integer_to_binary(Len) + }, SendFile, Req); +reply(Status, Headers, Body, Req) -> + do_reply(Status, Headers#{ + <<"content-length">> => integer_to_binary(iolist_size(Body)) + }, Body, Req). + +do_stream_reply(Status, Headers, {stream, _, Fun}, Req=#{pid := Pid, streamid := StreamID}) -> + Pid ! {{Pid, StreamID}, {headers, Status, response_headers(Headers, Req)}}, + Fun(), + ok. + +do_reply(Status, Headers, Body, Req=#{pid := Pid, streamid := StreamID}) -> + Pid ! {{Pid, StreamID}, {response, Status, response_headers(Headers, Req), Body}}, + ok. + +send_body(Data, IsFin, #{pid := Pid, streamid := StreamID}) -> + Pid ! {{Pid, StreamID}, {data, IsFin, Data}}, + ok. -reply_no_compress(Status, Headers, Body, Req, - RespHeaders, HTTP11Headers, Method, BodySize) -> - {_, Req2} = response(Status, Headers, RespHeaders, [ - {<<"content-length">>, integer_to_list(BodySize)}, - {<<"date">>, cowboy_clock:rfc1123()}, - {<<"server">>, <<"Cowboy">>} - |HTTP11Headers], - case Method of <<"HEAD">> -> <<>>; _ -> Body end, - Req), - Req2. +response_headers(Headers, Req) -> + RespHeaders = maps:get(resp_headers, Req, #{}), + maps:merge(#{ + <<"date">> => cowboy_clock:rfc1123(), + <<"server">> => <<"Cowboy">> + }, maps:merge(RespHeaders, Headers)). + +%reply(Status, Headers, Body, Req=#http_req{ +% socket=Socket, transport=Transport, +% version=Version, connection=Connection, +% method=Method, resp_compress=Compress, +% resp_state=RespState, resp_headers=RespHeaders}) +% when RespState =:= waiting; RespState =:= waiting_stream -> +% Req3 = case Body of +% BodyFun when is_function(BodyFun) -> +% %% We stream the response body until we close the connection. +% RespConn = close, +% {RespType, Req2} = if +% Transport =:= cowboy_spdy -> +% response(Status, Headers, RespHeaders, [ +% {<<"date">>, cowboy_clock:rfc1123()}, +% {<<"server">>, <<"Cowboy">>} +% ], stream, Req); +% true -> +% response(Status, Headers, RespHeaders, [ +% {<<"connection">>, <<"close">>}, +% {<<"date">>, cowboy_clock:rfc1123()}, +% {<<"server">>, <<"Cowboy">>}, +% {<<"transfer-encoding">>, <<"identity">>} +% ], <<>>, Req) +% end, +% if RespType =/= hook, Method =/= <<"HEAD">> -> +% BodyFun(Socket, Transport); +% true -> ok +% end, +% Req2#http_req{connection=RespConn}; +% {chunked, BodyFun} -> +% %% We stream the response body in chunks. +% {RespType, Req2} = chunked_response(Status, Headers, Req), +% if RespType =/= hook, Method =/= <<"HEAD">> -> +% ChunkFun = fun(IoData) -> chunk(IoData, Req2) end, +% BodyFun(ChunkFun), +% %% Send the last chunk if chunked encoding was used. +% if +% Version =:= 'HTTP/1.0'; RespState =:= waiting_stream -> +% Req2; +% true -> +% last_chunk(Req2) +% end; +% true -> Req2 +% end; +% {ContentLength, BodyFun} -> +% %% We stream the response body for ContentLength bytes. +% RespConn = response_connection(Headers, Connection), +% {RespType, Req2} = response(Status, Headers, RespHeaders, [ +% {<<"content-length">>, integer_to_list(ContentLength)}, +% {<<"date">>, cowboy_clock:rfc1123()}, +% {<<"server">>, <<"Cowboy">>} +% |HTTP11Headers], stream, Req), +% if RespType =/= hook, Method =/= <<"HEAD">> -> +% BodyFun(Socket, Transport); +% true -> ok +% end, +% Req2#http_req{connection=RespConn}; +% _ when Compress -> +% RespConn = response_connection(Headers, Connection), +% Req2 = reply_may_compress(Status, Headers, Body, Req, +% RespHeaders, HTTP11Headers, Method), +% Req2#http_req{connection=RespConn}; +% _ -> +% RespConn = response_connection(Headers, Connection), +% Req2 = reply_no_compress(Status, Headers, Body, Req, +% RespHeaders, HTTP11Headers, Method, iolist_size(Body)), +% Req2#http_req{connection=RespConn} +% end, +% Req3#http_req{resp_state=done, resp_headers=[], resp_body= <<>>}. + +%reply_may_compress(Status, Headers, Body, Req, +% RespHeaders, HTTP11Headers, Method) -> +% BodySize = iolist_size(Body), +% try parse_header(<<"accept-encoding">>, Req) of +% Encodings -> +% CanGzip = (BodySize > 300) +% andalso (false =:= lists:keyfind(<<"content-encoding">>, +% 1, Headers)) +% andalso (false =:= lists:keyfind(<<"content-encoding">>, +% 1, RespHeaders)) +% andalso (false =:= lists:keyfind(<<"transfer-encoding">>, +% 1, Headers)) +% andalso (false =:= lists:keyfind(<<"transfer-encoding">>, +% 1, RespHeaders)) +% andalso (Encodings =/= undefined) +% andalso (false =/= lists:keyfind(<<"gzip">>, 1, Encodings)), +% case CanGzip of +% true -> +% GzBody = zlib:gzip(Body), +% {_, Req2} = response(Status, Headers, RespHeaders, [ +% {<<"content-length">>, integer_to_list(byte_size(GzBody))}, +% {<<"content-encoding">>, <<"gzip">>}, +% |HTTP11Headers], +% case Method of <<"HEAD">> -> <<>>; _ -> GzBody end, +% Req), +% Req2; +% false -> +% reply_no_compress(Status, Headers, Body, Req, +% RespHeaders, HTTP11Headers, Method, BodySize) +% end +% catch _:_ -> +% reply_no_compress(Status, Headers, Body, Req, +% RespHeaders, HTTP11Headers, Method, BodySize) +% end. +% +%reply_no_compress(Status, Headers, Body, Req, +% RespHeaders, HTTP11Headers, Method, BodySize) -> +% {_, Req2} = response(Status, Headers, RespHeaders, [ +% {<<"content-length">>, integer_to_list(BodySize)}, +% |HTTP11Headers], +% case Method of <<"HEAD">> -> <<>>; _ -> Body end, +% Req), +% Req2. -spec chunked_reply(cowboy:http_status(), Req) -> Req when Req::req(). chunked_reply(Status, Req) -> - chunked_reply(Status, [], Req). + chunked_reply(Status, #{}, Req). -spec chunked_reply(cowboy:http_status(), cowboy:http_headers(), Req) -> Req when Req::req(). -chunked_reply(Status, Headers, Req) -> - {_, Req2} = chunked_response(Status, Headers, Req), - Req2. +chunked_reply(Status, Headers, Req=#{pid := Pid, streamid := StreamID}) -> + Pid ! {{Pid, StreamID}, {headers, Status, response_headers(Headers, Req)}}, + Req. %% @todo return ok +% ok. -spec chunk(iodata(), req()) -> ok. -chunk(_Data, #http_req{method= <<"HEAD">>}) -> +chunk(_Data, #{method := <<"HEAD">>}) -> ok; -chunk(Data, #http_req{socket=Socket, transport=cowboy_spdy, - resp_state=chunks}) -> - cowboy_spdy:stream_data(Socket, Data); -chunk(Data, #http_req{socket=Socket, transport=Transport, - resp_state=stream}) -> - ok = Transport:send(Socket, Data); -chunk(Data, #http_req{socket=Socket, transport=Transport, - resp_state=chunks}) -> +chunk(Data, #{pid := Pid, streamid := StreamID}) -> case iolist_size(Data) of 0 -> ok; - Size -> Transport:send(Socket, [integer_to_list(Size, 16), - <<"\r\n">>, Data, <<"\r\n">>]) + _ -> + Pid ! {{Pid, StreamID}, {data, nofin, Data}}, + ok end. %% If ever made public, need to send nothing if HEAD. @@ -841,16 +903,6 @@ last_chunk(Req=#http_req{socket=Socket, transport=Transport}) -> _ = Transport:send(Socket, <<"0\r\n\r\n">>), Req#http_req{resp_state=done}. --spec upgrade_reply(cowboy:http_status(), cowboy:http_headers(), Req) - -> Req when Req::req(). -upgrade_reply(Status, Headers, Req=#http_req{transport=Transport, - resp_state=waiting, resp_headers=RespHeaders}) - when Transport =/= cowboy_spdy -> - {_, Req2} = response(Status, Headers, RespHeaders, [ - {<<"connection">>, <<"Upgrade">>} - ], <<>>, Req), - Req2#http_req{resp_state=done, resp_headers=[], resp_body= <<>>}. - -spec continue(req()) -> ok. continue(#http_req{socket=Socket, transport=Transport, version=Version}) -> @@ -973,49 +1025,49 @@ to_list(Req) -> %% Internal. --spec chunked_response(cowboy:http_status(), cowboy:http_headers(), Req) -> - {normal | hook, Req} when Req::req(). -chunked_response(Status, Headers, Req=#http_req{ - transport=cowboy_spdy, resp_state=waiting, - resp_headers=RespHeaders}) -> - {RespType, Req2} = response(Status, Headers, RespHeaders, [ - {<<"date">>, cowboy_clock:rfc1123()}, - {<<"server">>, <<"Cowboy">>} - ], stream, Req), - {RespType, Req2#http_req{resp_state=chunks, - resp_headers=[], resp_body= <<>>}}; -chunked_response(Status, Headers, Req=#http_req{ - version=Version, connection=Connection, - resp_state=RespState, resp_headers=RespHeaders}) - when RespState =:= waiting; RespState =:= waiting_stream -> - RespConn = response_connection(Headers, Connection), - HTTP11Headers = if - Version =:= 'HTTP/1.0', Connection =:= keepalive -> - [{<<"connection">>, atom_to_connection(Connection)}]; - Version =:= 'HTTP/1.0' -> []; - true -> - MaybeTE = if - RespState =:= waiting_stream -> []; - true -> [{<<"transfer-encoding">>, <<"chunked">>}] - end, - if - Connection =:= close -> - [{<<"connection">>, atom_to_connection(Connection)}|MaybeTE]; - true -> - MaybeTE - end - end, - RespState2 = if - Version =:= 'HTTP/1.1', RespState =:= 'waiting' -> chunks; - true -> stream - end, - {RespType, Req2} = response(Status, Headers, RespHeaders, [ - {<<"date">>, cowboy_clock:rfc1123()}, - {<<"server">>, <<"Cowboy">>} - |HTTP11Headers], <<>>, Req), - {RespType, Req2#http_req{connection=RespConn, resp_state=RespState2, - resp_headers=[], resp_body= <<>>}}. - +%-spec chunked_response(cowboy:http_status(), cowboy:http_headers(), Req) -> +% {normal | hook, Req} when Req::req(). +%chunked_response(Status, Headers, Req=#http_req{ +% transport=cowboy_spdy, resp_state=waiting, +% resp_headers=RespHeaders}) -> +% {RespType, Req2} = response(Status, Headers, RespHeaders, [ +% {<<"date">>, cowboy_clock:rfc1123()}, +% {<<"server">>, <<"Cowboy">>} +% ], stream, Req), +% {RespType, Req2#http_req{resp_state=chunks, +% resp_headers=[], resp_body= <<>>}}; +%chunked_response(Status, Headers, Req=#http_req{ +% version=Version, connection=Connection, +% resp_state=RespState, resp_headers=RespHeaders}) +% when RespState =:= waiting; RespState =:= waiting_stream -> +% RespConn = response_connection(Headers, Connection), +% HTTP11Headers = if +% Version =:= 'HTTP/1.0', Connection =:= keepalive -> +% [{<<"connection">>, atom_to_connection(Connection)}]; +% Version =:= 'HTTP/1.0' -> []; +% true -> +% MaybeTE = if +% RespState =:= waiting_stream -> []; +% true -> [{<<"transfer-encoding">>, <<"chunked">>}] +% end, +% if +% Connection =:= close -> +% [{<<"connection">>, atom_to_connection(Connection)}|MaybeTE]; +% true -> +% MaybeTE +% end +% end, +% RespState2 = if +% Version =:= 'HTTP/1.1', RespState =:= 'waiting' -> chunks; +% true -> stream +% end, +% {RespType, Req2} = response(Status, Headers, RespHeaders, [ +% {<<"date">>, cowboy_clock:rfc1123()}, +% {<<"server">>, <<"Cowboy">>} +% |HTTP11Headers], <<>>, Req), +% {RespType, Req2#http_req{connection=RespConn, resp_state=RespState2, +% resp_headers=[], resp_body= <<>>}}. +% -spec response(cowboy:http_status(), cowboy:http_headers(), cowboy:http_headers(), cowboy:http_headers(), stream | iodata(), Req) -> {normal | hook, Req} when Req::req(). @@ -1063,20 +1115,20 @@ response(Status, Headers, RespHeaders, DefaultHeaders, Body, Req=#http_req{ hook end, {ReplyType, Req2}. - --spec response_connection(cowboy:http_headers(), keepalive | close) - -> keepalive | close. -response_connection([], Connection) -> - Connection; -response_connection([{Name, Value}|Tail], Connection) -> - case Name of - <<"connection">> -> - Tokens = cow_http_hd:parse_connection(Value), - connection_to_atom(Tokens); - _ -> - response_connection(Tail, Connection) - end. - +% +%-spec response_connection(cowboy:http_headers(), keepalive | close) +% -> keepalive | close. +%response_connection([], Connection) -> +% Connection; +%response_connection([{Name, Value}|Tail], Connection) -> +% case Name of +% <<"connection">> -> +% Tokens = cow_http_hd:parse_connection(Value), +% connection_to_atom(Tokens); +% _ -> +% response_connection(Tail, Connection) +% end. +% -spec response_merge_headers(cowboy:http_headers(), cowboy:http_headers(), cowboy:http_headers()) -> cowboy:http_headers(). response_merge_headers(Headers, RespHeaders, DefaultHeaders) -> @@ -1105,13 +1157,13 @@ merge_headers(Headers, [{Name, Value}|Tail]) -> false -> [{Name, Value}|Headers] end, merge_headers(Headers2, Tail). - --spec atom_to_connection(keepalive) -> <<_:80>>; - (close) -> <<_:40>>. -atom_to_connection(keepalive) -> - <<"keep-alive">>; -atom_to_connection(close) -> - <<"close">>. +% +%-spec atom_to_connection(keepalive) -> <<_:80>>; +% (close) -> <<_:40>>. +%atom_to_connection(keepalive) -> +% <<"keep-alive">>; +%atom_to_connection(close) -> +% <<"close">>. %% We don't match on "keep-alive" since it is the default value. -spec connection_to_atom([binary()]) -> keepalive | close. diff --git a/src/cowboy_rest.erl b/src/cowboy_rest.erl index 75e8e63..914b273 100644 --- a/src/cowboy_rest.erl +++ b/src/cowboy_rest.erl @@ -373,7 +373,7 @@ content_types_provided(Req, State) -> try cowboy_req:parse_header(<<"accept">>, Req) of undefined -> languages_provided( - cowboy_req:set_meta(media_type, {<<"text">>, <<"html">>, []}, Req), + Req#{media_type => {<<"text">>, <<"html">>, []}}, State2#state{content_type_a={{<<"text">>, <<"html">>, []}, to_html}}); Accept -> choose_media_type(Req, State2, prioritize_accept(Accept)) @@ -392,7 +392,7 @@ content_types_provided(Req, State) -> undefined -> {PMT, _Fun} = HeadCTP = hd(CTP2), languages_provided( - cowboy_req:set_meta(media_type, PMT, Req2), + Req2#{media_type => PMT}, State2#state{content_type_a=HeadCTP}); Accept -> choose_media_type(Req2, State2, prioritize_accept(Accept)) @@ -460,14 +460,14 @@ match_media_type_params(Req, State, _Accept, [Provided = {{TP, STP, '*'}, _Fun}|_Tail], {{_TA, _STA, Params_A}, _QA, _APA}) -> PMT = {TP, STP, Params_A}, - languages_provided(cowboy_req:set_meta(media_type, PMT, Req), + languages_provided(Req#{media_type => PMT}, State#state{content_type_a=Provided}); match_media_type_params(Req, State, Accept, [Provided = {PMT = {_TP, _STP, Params_P}, _Fun}|Tail], MediaType = {{_TA, _STA, Params_A}, _QA, _APA}) -> case lists:sort(Params_P) =:= lists:sort(Params_A) of true -> - languages_provided(cowboy_req:set_meta(media_type, PMT, Req), + languages_provided(Req#{media_type => PMT}, State#state{content_type_a=Provided}); false -> match_media_type(Req, State, Accept, Tail, MediaType) @@ -534,7 +534,7 @@ match_language(Req, State, Accept, [Provided|Tail], set_language(Req, State=#state{language_a=Language}) -> Req2 = cowboy_req:set_resp_header(<<"content-language">>, Language, Req), - charsets_provided(cowboy_req:set_meta(language, Language, Req2), State). + charsets_provided(Req2#{language => Language}, State). %% charsets_provided should return a list of binary values indicating %% which charsets are accepted by the resource. @@ -599,7 +599,7 @@ set_content_type(Req, State=#state{ Charset -> [ContentType, <<"; charset=">>, Charset] end, Req2 = cowboy_req:set_resp_header(<<"content-type">>, ContentType2, Req), - encodings_provided(cowboy_req:set_meta(charset, Charset, Req2), State). + encodings_provided(Req2#{charset => Charset}, State). set_content_type_build_params('*', []) -> <<>>; @@ -1012,7 +1012,7 @@ set_resp_body(Req, State=#state{content_type_a={_, Callback}}) -> cowboy_req:set_resp_body_fun(Len, StreamFun, Req2); {chunked, StreamFun} -> cowboy_req:set_resp_body_fun(chunked, StreamFun, Req2); - _Contents -> + _ -> cowboy_req:set_resp_body(Body, Req2) end, multiple_choices(Req3, State2) @@ -1152,7 +1152,7 @@ error_terminate(Req, #state{handler=Handler, handler_state=HandlerState}, {reason, Reason}, {mfa, {Handler, Callback, 2}}, {stacktrace, Stacktrace}, - {req, cowboy_req:to_list(Req)}, + {req, Req}, {state, HandlerState} ]}). diff --git a/src/cowboy_router.erl b/src/cowboy_router.erl index b806da5..f07b307 100644 --- a/src/cowboy_router.erl +++ b/src/cowboy_router.erl @@ -159,14 +159,17 @@ compile_brackets_split(<< C, Rest/bits >>, Acc, N) -> -spec execute(Req, Env) -> {ok, Req, Env} | {stop, Req} when Req::cowboy_req:req(), Env::cowboy_middleware:env(). -execute(Req, Env) -> - {_, Dispatch} = lists:keyfind(dispatch, 1, Env), - Host = cowboy_req:host(Req), - Path = cowboy_req:path(Req), +execute(Req=#{host := Host, path := Path}, Env=#{dispatch := Dispatch}) -> case match(Dispatch, Host, Path) of {ok, Handler, HandlerOpts, Bindings, HostInfo, PathInfo} -> - Req2 = cowboy_req:set_bindings(HostInfo, PathInfo, Bindings, Req), - {ok, Req2, [{handler, Handler}, {handler_opts, HandlerOpts}|Env]}; + {ok, Req#{ + host_info => HostInfo, + path_info => PathInfo, + bindings => Bindings + }, Env#{ + handler => Handler, + handler_opts => HandlerOpts + }}; {error, notfound, host} -> {stop, cowboy_req:reply(400, Req)}; {error, badrequest, path} -> diff --git a/src/cowboy_static.erl b/src/cowboy_static.erl index b2a8302..c771771 100644 --- a/src/cowboy_static.erl +++ b/src/cowboy_static.erl @@ -274,11 +274,4 @@ last_modified(Req, State={_, {ok, #file_info{mtime=Modified}}, _}) -> -> {{stream, non_neg_integer(), fun()}, Req, State} when State::state(). get_file(Req, State={Path, {ok, #file_info{size=Size}}, _}) -> - Sendfile = fun (Socket, Transport) -> - case Transport:sendfile(Socket, Path) of - {ok, _} -> ok; - {error, closed} -> ok; - {error, etimedout} -> ok - end - end, - {{stream, Size, Sendfile}, Req, State}. + {{sendfile, 0, Size, Path}, Req, State}. diff --git a/src/cowboy_stream_h.erl b/src/cowboy_stream_h.erl new file mode 100644 index 0000000..f924e28 --- /dev/null +++ b/src/cowboy_stream_h.erl @@ -0,0 +1,128 @@ +%% Copyright (c) 2016, Loïc Hoguin <[email protected]> +%% +%% Permission to use, copy, modify, and/or distribute this software for any +%% purpose with or without fee is hereby granted, provided that the above +%% copyright notice and this permission notice appear in all copies. +%% +%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES +%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF +%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR +%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES +%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN +%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF +%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + +-module(cowboy_stream_h). +%% @todo -behaviour(cowboy_stream). + +%% @todo Maybe have a callback for the type of process this is, worker or supervisor. +-export([init/3]). +-export([data/4]). +-export([info/3]). +-export([terminate/3]). + +-export([execute/3]). +-export([resume/5]). + +-record(state, { + pid = undefined :: pid(), + read_body_ref = undefined :: reference(), + read_body_length = 0 :: non_neg_integer(), + read_body_is_fin = nofin :: nofin | fin, + read_body_buffer = <<>> :: binary() +}). + +%% @todo For shutting down children we need to have a timeout before we terminate +%% the stream like supervisors do. So here just send a message to yourself first, +%% and then decide what to do when receiving this message. + +%% @todo proper specs +-spec init(_,_,_) -> _. +init(_StreamID, Req, Opts) -> + Env = maps:get(env, Opts, #{}), + Middlewares = maps:get(middlewares, Opts, [cowboy_router, cowboy_handler]), + Shutdown = maps:get(shutdown, Opts, 5000), + Pid = proc_lib:spawn_link(?MODULE, execute, [Req, Env, Middlewares]), + {[{spawn, Pid, Shutdown}], #state{pid=Pid}}. + +%% If we receive data and stream is waiting for data: +%% If we accumulated enough data or IsFin=fin, send it. +%% If not, buffer it. +%% If not, buffer it. + +%% @todo proper specs +-spec data(_,_,_,_) -> _. +data(_StreamID, IsFin, Data, State=#state{read_body_ref=undefined, read_body_buffer=Buffer}) -> + {[], State#state{read_body_is_fin=IsFin, read_body_buffer= << Buffer/binary, Data/binary >>}}; +data(_StreamID, nofin, Data, State=#state{read_body_length=Length, read_body_buffer=Buffer}) when byte_size(Data) + byte_size(Buffer) < Length -> + {[], State#state{read_body_buffer= << Buffer/binary, Data/binary >>}}; +data(_StreamID, IsFin, Data, State=#state{pid=Pid, read_body_ref=Ref, read_body_buffer=Buffer}) -> + Pid ! {request_body, Ref, IsFin, << Buffer/binary, Data/binary >>}, + {[], State#state{read_body_ref=undefined, read_body_buffer= <<>>}}. + +%% @todo proper specs +-spec info(_,_,_) -> _. +info(_StreamID, {'EXIT', Pid, normal}, State=#state{pid=Pid}) -> + {[stop], State}; +info(_StreamID, Reason = {'EXIT', Pid, _}, State=#state{pid=Pid}) -> + {[{internal_error, Reason, 'Stream process crashed.'}], State}; +%% Request body, no body buffer but IsFin=fin. +info(_StreamID, {read_body, Ref, _}, State=#state{pid=Pid, read_body_is_fin=fin, read_body_buffer= <<>>}) -> + Pid ! {request_body, Ref, fin, <<>>}, + {[], State}; +%% Request body, body buffered large enough or complete. +info(_StreamID, {read_body, Ref, Length}, State=#state{pid=Pid, read_body_is_fin=IsFin, read_body_buffer=Data}) + when element(1, IsFin) =:= fin; byte_size(Data) >= Length -> + Pid ! {request_body, Ref, IsFin, Data}, + {[], State#state{read_body_buffer= <<>>}}; +%% Request body, not enough to send yet. +info(_StreamID, {read_body, Ref, Length}, State) -> + {[{flow, Length}], State#state{read_body_ref=Ref, read_body_length=Length}}; +%% Response. +info(_StreamID, Response = {response, _, _, _}, State) -> + {[Response], State}; +info(_StreamID, Headers = {headers, _, _}, State) -> + {[Headers], State}; +info(_StreamID, Data = {data, _, _}, State) -> + {[Data], State}; +info(_StreamID, SwitchProtocol = {switch_protocol, _, _, _}, State) -> + {[SwitchProtocol], State}; +%% Stray message. +info(_StreamID, _Msg, State) -> + %% @todo Cleanup if no reply was sent when stream ends. + {[], State}. + +%% @todo proper specs +-spec terminate(_,_,_) -> _. +terminate(_StreamID, _Reason, _State) -> + ok. + +%% Request process. + +%% @todo +%-spec execute(cowboy_req:req(), #state{}, cowboy_middleware:env(), [module()]) +% -> ok. +-spec execute(_, _, _) -> _. +execute(_, _, []) -> + ok; %% @todo Maybe error reason should differ here and there. +execute(Req, Env, [Middleware|Tail]) -> + case Middleware:execute(Req, Env) of + {ok, Req2, Env2} -> + execute(Req2, Env2, Tail); + {suspend, Module, Function, Args} -> + proc_lib:hibernate(?MODULE, resume, [Env, Tail, Module, Function, Args]); + {stop, _Req2} -> + ok %% @todo Maybe error reason should differ here and there. + end. + +-spec resume(cowboy_middleware:env(), [module()], + module(), module(), [any()]) -> ok. +resume(Env, Tail, Module, Function, Args) -> + case apply(Module, Function, Args) of + {ok, Req2, Env2} -> + execute(Req2, Env2, Tail); + {suspend, Module2, Function2, Args2} -> + proc_lib:hibernate(?MODULE, resume, [Env, Tail, Module2, Function2, Args2]); + {stop, _Req2} -> + ok %% @todo Maybe error reason should differ here and there. + end. diff --git a/src/cowboy_tls.erl b/src/cowboy_tls.erl index 745d502..0ccf733 100644 --- a/src/cowboy_tls.erl +++ b/src/cowboy_tls.erl @@ -13,6 +13,7 @@ %% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. -module(cowboy_tls). +-behavior(ranch_protocol). -export([start_link/4]). -export([init/5]). diff --git a/src/cowboy_websocket.erl b/src/cowboy_websocket.erl index 3c34908..9a2862e 100644 --- a/src/cowboy_websocket.erl +++ b/src/cowboy_websocket.erl @@ -18,6 +18,7 @@ -behaviour(cowboy_sub_protocol). -export([upgrade/6]). +-export([takeover/7]). -export([handler_loop/4]). -type terminate_reason() :: normal | stop | timeout @@ -50,7 +51,6 @@ -optional_callbacks([terminate/3]). -record(state, { - env :: cowboy_middleware:env(), socket = undefined :: inet:socket(), transport = undefined :: module(), handler :: module(), @@ -65,25 +65,22 @@ extensions = #{} :: map() }). +%% Stream process. + -spec upgrade(Req, Env, module(), any(), timeout(), run | hibernate) - -> {ok, Req, Env} | {suspend, module(), atom(), [any()]} + -> {ok, Req, Env} when Req::cowboy_req:req(), Env::cowboy_middleware:env(). upgrade(Req, Env, Handler, HandlerState, Timeout, Hibernate) -> - {_, Ref} = lists:keyfind(listener, 1, Env), - ranch:remove_connection(Ref), - [Socket, Transport] = cowboy_req:get([socket, transport], Req), - State = #state{env=Env, socket=Socket, transport=Transport, handler=Handler, - timeout=Timeout, hibernate=Hibernate =:= hibernate}, + State = #state{handler=Handler, timeout=Timeout, + hibernate=Hibernate =:= hibernate}, + %% @todo We need to fail if HTTP/2. try websocket_upgrade(State, Req) of {ok, State2, Req2} -> - websocket_handshake(State2, Req2, HandlerState) + websocket_handshake(State2, Req2, HandlerState, Env) catch _:_ -> - receive - {cowboy_req, resp_sent} -> ok - after 0 -> - _ = cowboy_req:reply(400, Req), - exit(normal) - end + %% @todo Test that we can have 2 /ws 400 status code in a row on the same connection. + cowboy_req:reply(400, Req), + {ok, Req, Env} end. -spec websocket_upgrade(#state{}, Req) @@ -99,14 +96,15 @@ websocket_upgrade(State, Req) -> orelse (IntVersion =:= 13), Key = cowboy_req:header(<<"sec-websocket-key">>, Req), false = Key =:= undefined, - websocket_extensions(State#state{key=Key}, - cowboy_req:set_meta(websocket_version, IntVersion, Req)). + websocket_extensions(State#state{key=Key}, Req#{websocket_version => IntVersion}). -spec websocket_extensions(#state{}, Req) -> {ok, #state{}, Req} when Req::cowboy_req:req(). websocket_extensions(State, Req) -> - [Compress] = cowboy_req:get([resp_compress], Req), - Req2 = cowboy_req:set_meta(websocket_compress, false, Req), + %% @todo Proper options for this. +% [Compress] = cowboy_req:get([resp_compress], Req), + Compress = false, + Req2 = Req#{websocket_compress => false}, case {Compress, cowboy_req:parse_header(<<"sec-websocket-extensions">>, Req2)} of {true, Extensions} when Extensions =/= undefined -> websocket_extensions(State, Req2, Extensions, []); @@ -123,7 +121,7 @@ websocket_extensions(State=#state{extensions=Extensions}, Req, [{<<"permessage-d Opts = #{level => best_compression, mem_level => 8, strategy => default}, case cow_ws:negotiate_permessage_deflate(Params, Extensions, Opts) of {ok, RespExt, Extensions2} -> - Req2 = cowboy_req:set_meta(websocket_compress, true, Req), + Req2 = Req#{websocket_compress => true}, websocket_extensions(State#state{extensions=Extensions2}, Req2, Tail, [<<", ">>, RespExt|RespHeader]); ignore -> @@ -143,33 +141,46 @@ websocket_extensions(State=#state{extensions=Extensions}, Req, [{<<"x-webkit-def websocket_extensions(State, Req, [_|Tail], RespHeader) -> websocket_extensions(State, Req, Tail, RespHeader). --spec websocket_handshake(#state{}, Req, any()) - -> {ok, Req, cowboy_middleware:env()} - | {suspend, module(), atom(), [any()]} - when Req::cowboy_req:req(). -websocket_handshake(State=#state{transport=Transport, key=Key}, Req, HandlerState) -> +-spec websocket_handshake(#state{}, Req, any(), Env) + -> {ok, Req, Env} + when Req::cowboy_req:req(), Env::cowboy_middleware:env(). +websocket_handshake(State=#state{transport=Transport, key=Key}, + Req=#{pid := Pid, streamid := StreamID}, HandlerState, Env) -> Challenge = base64:encode(crypto:hash(sha, << Key/binary, "258EAFA5-E914-47DA-95CA-C5AB0DC85B11" >>)), - Req2 = cowboy_req:upgrade_reply(101, [ - {<<"upgrade">>, <<"websocket">>}, - {<<"sec-websocket-accept">>, Challenge} - ], Req), - %% Flush the resp_sent message before moving on. - receive {cowboy_req, resp_sent} -> ok after 0 -> ok end, - State2 = handler_loop_timeout(State), + Headers = #{ + %% @todo Hmm should those be here or in cowboy_http? + <<"connection">> => <<"Upgrade">>, + <<"upgrade">> => <<"websocket">>, + <<"sec-websocket-accept">> => Challenge + }, + Pid ! {{Pid, StreamID}, {switch_protocol, Headers, ?MODULE, {Req, State, HandlerState}}}, + {ok, Req, Env}. + +%% Connection process. + +takeover(Parent, Ref, Socket, Transport, Opts, Buffer, {Req0, State=#state{handler=Handler}, HandlerState0}) -> + ranch:remove_connection(Ref), + %% @todo Remove Req from Websocket callbacks. + %% @todo Allow sending a reply from websocket_init. + %% @todo Try/catch. + {ok, Req, HandlerState} = case erlang:function_exported(Handler, websocket_init, 2) of + true -> Handler:websocket_init(Req0, HandlerState0); + false -> {ok, Req0, HandlerState0} + end, + State2 = handler_loop_timeout(State#state{socket=Socket, transport=Transport}), handler_before_loop(State2#state{key=undefined, - messages=Transport:messages()}, Req2, HandlerState, <<>>). + messages=Transport:messages()}, Req, HandlerState, Buffer). -spec handler_before_loop(#state{}, Req, any(), binary()) -> {ok, Req, cowboy_middleware:env()} - | {suspend, module(), atom(), [any()]} when Req::cowboy_req:req(). handler_before_loop(State=#state{ socket=Socket, transport=Transport, hibernate=true}, Req, HandlerState, SoFar) -> Transport:setopts(Socket, [{active, once}]), - {suspend, ?MODULE, handler_loop, - [State#state{hibernate=false}, Req, HandlerState, SoFar]}; + proc_lib:hibernate(?MODULE, handler_loop, + [State#state{hibernate=false}, Req, HandlerState, SoFar]); handler_before_loop(State=#state{socket=Socket, transport=Transport}, Req, HandlerState, SoFar) -> Transport:setopts(Socket, [{active, once}]), @@ -186,7 +197,6 @@ handler_loop_timeout(State=#state{timeout=Timeout, timeout_ref=PrevRef}) -> -spec handler_loop(#state{}, Req, any(), binary()) -> {ok, Req, cowboy_middleware:env()} - | {suspend, module(), atom(), [any()]} when Req::cowboy_req:req(). handler_loop(State=#state{socket=Socket, messages={OK, Closed, Error}, timeout_ref=TRef}, Req, HandlerState, SoFar) -> @@ -210,7 +220,6 @@ handler_loop(State=#state{socket=Socket, messages={OK, Closed, Error}, -spec websocket_data(#state{}, Req, any(), binary()) -> {ok, Req, cowboy_middleware:env()} - | {suspend, module(), atom(), [any()]} when Req::cowboy_req:req(). websocket_data(State=#state{frag_state=FragState, extensions=Extensions}, Req, HandlerState, Data) -> case cow_ws:parse_header(Data, Extensions, FragState) of @@ -298,7 +307,6 @@ websocket_dispatch(State=#state{socket=Socket, transport=Transport, frag_state=F -spec handler_call(#state{}, Req, any(), binary(), atom(), any(), fun()) -> {ok, Req, cowboy_middleware:env()} - | {suspend, module(), atom(), [any()]} when Req::cowboy_req:req(). handler_call(State=#state{handler=Handler}, Req, HandlerState, RemainingData, Callback, Message, NextState) -> @@ -358,7 +366,7 @@ handler_call(State=#state{handler=Handler}, Req, HandlerState, {mfa, {Handler, Callback, 3}}, {stacktrace, erlang:get_stacktrace()}, {msg, Message}, - {req, cowboy_req:to_list(Req)}, + {req, Req}, {state, HandlerState} ]}) end. @@ -407,7 +415,7 @@ websocket_close(State=#state{socket=Socket, transport=Transport, extensions=Exte -spec handler_terminate(#state{}, Req, any(), terminate_reason()) -> {ok, Req, cowboy_middleware:env()} when Req::cowboy_req:req(). -handler_terminate(#state{env=Env, handler=Handler}, +handler_terminate(#state{handler=Handler}, Req, HandlerState, Reason) -> cowboy_handler:terminate(Reason, Req, HandlerState, Handler), - {ok, Req, [{result, closed}|Env]}. + exit(normal). diff --git a/test/cowboy_test.erl b/test/cowboy_test.erl index cced1f9..e3aeb97 100644 --- a/test/cowboy_test.erl +++ b/test/cowboy_test.erl @@ -20,21 +20,21 @@ %% Listeners initialization. init_http(Ref, ProtoOpts, Config) -> - {ok, _} = cowboy:start_http(Ref, 100, [{port, 0}], ProtoOpts), + {ok, _} = cowboy:start_clear(Ref, 100, [{port, 0}], ProtoOpts), Port = ranch:get_port(Ref), - [{type, tcp}, {port, Port}, {opts, []}|Config]. + [{type, tcp}, {protocol, http}, {port, Port}, {opts, []}|Config]. init_https(Ref, ProtoOpts, Config) -> Opts = ct_helper:get_certs_from_ets(), - {ok, _} = cowboy:start_https(Ref, 100, Opts ++ [{port, 0}], ProtoOpts), + {ok, _} = cowboy:start_tls(Ref, 100, Opts ++ [{port, 0}], ProtoOpts), Port = ranch:get_port(Ref), - [{type, ssl}, {port, Port}, {opts, Opts}|Config]. + [{type, ssl}, {protocol, http}, {port, Port}, {opts, Opts}|Config]. init_spdy(Ref, ProtoOpts, Config) -> Opts = ct_helper:get_certs_from_ets(), - {ok, _} = cowboy:start_spdy(Ref, 100, Opts ++ [{port, 0}], ProtoOpts), + {ok, _} = cowboy:start_tls(Ref, 100, Opts ++ [{port, 0}], ProtoOpts), Port = ranch:get_port(Ref), - [{type, ssl}, {port, Port}, {opts, Opts}|Config]. + [{type, ssl}, {protocol, spdy}, {port, Port}, {opts, Opts}|Config]. %% Common group of listeners used by most suites. @@ -59,32 +59,26 @@ common_groups(Tests) -> ]. init_common_groups(Name = http, Config, Mod) -> - init_http(Name, [ - {env, [{dispatch, Mod:init_dispatch(Config)}]} - ], Config); + init_http(Name, #{env => #{dispatch => Mod:init_dispatch(Config)}}, Config); init_common_groups(Name = https, Config, Mod) -> - init_https(Name, [ - {env, [{dispatch, Mod:init_dispatch(Config)}]} - ], Config); + init_https(Name, #{env => #{dispatch => Mod:init_dispatch(Config)}}, Config); init_common_groups(Name = spdy, Config, Mod) -> - init_spdy(Name, [ - {env, [{dispatch, Mod:init_dispatch(Config)}]} - ], Config); + init_https(Name, #{env => #{dispatch => Mod:init_dispatch(Config)}}, Config); init_common_groups(Name = http_compress, Config, Mod) -> - init_http(Name, [ - {env, [{dispatch, Mod:init_dispatch(Config)}]}, - {compress, true} - ], Config); + init_http(Name, #{ + env => #{dispatch => Mod:init_dispatch(Config)}, + compress => true + }, Config); init_common_groups(Name = https_compress, Config, Mod) -> - init_https(Name, [ - {env, [{dispatch, Mod:init_dispatch(Config)}]}, - {compress, true} - ], Config); + init_https(Name, #{ + env => #{dispatch => Mod:init_dispatch(Config)}, + compress => true + }, Config); init_common_groups(Name = spdy_compress, Config, Mod) -> - init_spdy(Name, [ - {env, [{dispatch, Mod:init_dispatch(Config)}]}, - {compress, true} - ], Config). + init_spdy(Name, #{ + env => #{dispatch => Mod:init_dispatch(Config)}, + compress => true + }, Config). %% Support functions for testing using Gun. @@ -94,7 +88,8 @@ gun_open(Config) -> gun_open(Config, Opts) -> {ok, ConnPid} = gun:open("localhost", config(port, Config), Opts#{ retry => 0, - transport => config(type, Config) + transport => config(type, Config), + protocols => [config(protocol, Config)] }), ConnPid. diff --git a/test/handlers/asterisk_h.erl b/test/handlers/asterisk_h.erl index b5ff32d..56b8bcb 100644 --- a/test/handlers/asterisk_h.erl +++ b/test/handlers/asterisk_h.erl @@ -13,4 +13,4 @@ echo(What, Req, Opts) -> V when is_integer(V) -> integer_to_binary(V); V -> V end, - {ok, cowboy_req:reply(200, [], Value, Req), Opts}. + {ok, cowboy_req:reply(200, #{}, Value, Req), Opts}. diff --git a/test/handlers/echo_h.erl b/test/handlers/echo_h.erl index 71408c3..6db43fe 100644 --- a/test/handlers/echo_h.erl +++ b/test/handlers/echo_h.erl @@ -13,4 +13,4 @@ echo(What, Req, Opts) -> V when is_integer(V) -> integer_to_binary(V); V -> V end, - {ok, cowboy_req:reply(200, [], Value, Req), Opts}. + {ok, cowboy_req:reply(200, #{}, Value, Req), Opts}. diff --git a/test/handlers/hello_h.erl b/test/handlers/hello_h.erl index 3be7b6d..5e3dcc8 100644 --- a/test/handlers/hello_h.erl +++ b/test/handlers/hello_h.erl @@ -5,4 +5,4 @@ -export([init/2]). init(Req, Opts) -> - {ok, cowboy_req:reply(200, [], <<"Hello world!">>, Req), Opts}. + {ok, cowboy_req:reply(200, #{}, <<"Hello world!">>, Req), Opts}. diff --git a/test/handlers/loop_handler_body_h.erl b/test/handlers/loop_handler_body_h.erl index 0d4fd4d..38ba2c0 100644 --- a/test/handlers/loop_handler_body_h.erl +++ b/test/handlers/loop_handler_body_h.erl @@ -14,9 +14,10 @@ init(Req, _) -> {cowboy_loop, Req, undefined, 5000, hibernate}. info(timeout, Req, State) -> - {ok, Body, Req2} = cowboy_req:body(Req), + {ok, Body, Req2} = cowboy_req:read_body(Req), 100000 = byte_size(Body), - {stop, cowboy_req:reply(200, Req2), State}. + cowboy_req:reply(200, Req2), + {stop, Req, State}. terminate(stop, _, _) -> ok. diff --git a/test/http_SUITE.erl b/test/http_SUITE.erl index d5ec909..1b39feb 100644 --- a/test/http_SUITE.erl +++ b/test/http_SUITE.erl @@ -45,7 +45,7 @@ groups() -> parse_host, set_env_dispatch ], [ - {http, [parallel], Tests}, + {http, [], Tests}, %% @todo parallel {https, [parallel], Tests}, {http_compress, [parallel], Tests}, {https_compress, [parallel], Tests}, @@ -73,33 +73,29 @@ end_per_suite(Config) -> ct_helper:delete_static_dir(config(static_dir, Config)). init_per_group(Name = http, Config) -> - cowboy_test:init_http(Name, [ - {env, [{dispatch, init_dispatch(Config)}]} - ], Config); + cowboy_test:init_http(Name, #{env => #{dispatch => init_dispatch(Config)}}, Config); init_per_group(Name = https, Config) -> - cowboy_test:init_https(Name, [ - {env, [{dispatch, init_dispatch(Config)}]} - ], Config); + cowboy_test:init_https(Name, #{env => #{dispatch => init_dispatch(Config)}}, Config); init_per_group(Name = http_compress, Config) -> - cowboy_test:init_http(Name, [ - {env, [{dispatch, init_dispatch(Config)}]}, - {compress, true} - ], Config); + cowboy_test:init_http(Name, #{ + env => #{dispatch => init_dispatch(Config)}, + compress => true + }, Config); init_per_group(Name = https_compress, Config) -> - cowboy_test:init_https(Name, [ - {env, [{dispatch, init_dispatch(Config)}]}, - {compress, true} - ], Config); + cowboy_test:init_https(Name, #{ + env => #{dispatch => init_dispatch(Config)}, + compress => true + }, Config); %% Most, if not all of these, should be in separate test suites. init_per_group(onresponse, Config) -> - {ok, _} = cowboy:start_http(onresponse, 100, [{port, 0}], [ + {ok, _} = cowboy:start_clear(onresponse, 100, [{port, 0}], [ {env, [{dispatch, init_dispatch(Config)}]}, {onresponse, fun do_onresponse_hook/4} ]), Port = ranch:get_port(onresponse), [{type, tcp}, {port, Port}, {opts, []}|Config]; init_per_group(onresponse_capitalize, Config) -> - {ok, _} = cowboy:start_http(onresponse_capitalize, 100, [{port, 0}], [ + {ok, _} = cowboy:start_clear(onresponse_capitalize, 100, [{port, 0}], [ {env, [{dispatch, init_dispatch(Config)}]}, {onresponse, fun do_onresponse_capitalize_hook/4} ]), @@ -111,13 +107,13 @@ init_per_group(parse_host, Config) -> {"/req_attr", http_req_attr, []} ]} ]), - {ok, _} = cowboy:start_http(parse_host, 100, [{port, 0}], [ + {ok, _} = cowboy:start_clear(parse_host, 100, [{port, 0}], [ {env, [{dispatch, Dispatch}]} ]), Port = ranch:get_port(parse_host), [{type, tcp}, {port, Port}, {opts, []}|Config]; init_per_group(set_env, Config) -> - {ok, _} = cowboy:start_http(set_env, 100, [{port, 0}], [ + {ok, _} = cowboy:start_clear(set_env, 100, [{port, 0}], [ {env, [{dispatch, []}]} ]), Port = ranch:get_port(set_env), @@ -134,11 +130,11 @@ init_dispatch(Config) -> {"/chunked_response", http_chunked, []}, {"/streamed_response", http_streamed, []}, {"/headers/dupe", http_handler, - [{headers, [{<<"connection">>, <<"close">>}]}]}, + [{headers, #{<<"connection">> => <<"close">>}}]}, {"/set_resp/header", http_set_resp, - [{headers, [{<<"vary">>, <<"Accept">>}]}]}, + [{headers, #{<<"vary">> => <<"Accept">>}}]}, {"/set_resp/overwrite", http_set_resp, - [{headers, [{<<"server">>, <<"DesireDrive/1.0">>}]}]}, + [{headers, #{<<"server">> => <<"DesireDrive/1.0">>}}]}, {"/set_resp/body", http_set_resp, [{body, <<"A flameless dance does not equal a cycle">>}]}, {"/stream_body/set_resp", http_stream_body, @@ -314,7 +310,7 @@ echo_body(Config) -> %% Check if sending request whose size is bigger than 1000000 bytes causes 413 echo_body_max_length(Config) -> ConnPid = gun_open(Config), - Ref = gun:post(ConnPid, "/echo/body", [], << 0:2000000/unit:8 >>), + Ref = gun:post(ConnPid, "/echo/body", [], << 0:10000000/unit:8 >>), {response, nofin, 413, _} = gun:await(ConnPid, Ref), ok. @@ -336,7 +332,7 @@ error_init_after_reply(Config) -> ConnPid = gun_open(Config), Ref = gun:get(ConnPid, "/handler_errors?case=init_after_reply"), {response, nofin, 200, _} = gun:await(ConnPid, Ref), - gun_down(ConnPid). + ok. headers_dupe(Config) -> ConnPid = gun_open(Config), @@ -357,18 +353,18 @@ http10_chunkless(Config) -> http10_hostless(Config) -> Name = http10_hostless, Port10 = config(port, Config) + 10, - Transport = case config(type, Config) of - tcp -> ranch_tcp; - ssl -> ranch_ssl + {Transport, Protocol} = case config(type, Config) of + tcp -> {ranch_tcp, cowboy_clear}; + ssl -> {ranch_ssl, cowboy_tls} end, ranch:start_listener(Name, 5, Transport, config(opts, Config) ++ [{port, Port10}], - cowboy_protocol, [ - {env, [{dispatch, cowboy_router:compile([ - {'_', [{"/http1.0/hostless", http_handler, []}]}])}]}, - {max_keepalive, 50}, - {timeout, 500}] - ), + Protocol, #{ + env =>#{dispatch => cowboy_router:compile([ + {'_', [{"/http1.0/hostless", http_handler, []}]}])}, + max_keepalive => 50, + timeout => 500 + }), 200 = do_raw("GET /http1.0/hostless HTTP/1.0\r\n\r\n", [{port, Port10}|Config]), cowboy:stop_listener(http10_hostless). @@ -380,9 +376,10 @@ http10_keepalive_default(Config) -> case catch raw_recv_head(Client) of {'EXIT', _} -> error(closed); Data -> - {'HTTP/1.0', 200, _, Rest} = cow_http:parse_status_line(Data), + %% Cowboy always advertises itself as HTTP/1.1. + {'HTTP/1.1', 200, _, Rest} = cow_http:parse_status_line(Data), {Headers, _} = cow_http:parse_headers(Rest), - false = lists:keymember(<<"connection">>, 1, Headers) + {_, <<"close">>} = lists:keyfind(<<"connection">>, 1, Headers) end, ok = raw_send(Client, Normal), case catch raw_recv_head(Client) of @@ -397,7 +394,8 @@ http10_keepalive_forced(Config) -> case catch raw_recv_head(Client) of {'EXIT', _} -> error(closed); Data -> - {'HTTP/1.0', 200, _, Rest} = cow_http:parse_status_line(Data), + %% Cowboy always advertises itself as HTTP/1.1. + {'HTTP/1.1', 200, _, Rest} = cow_http:parse_status_line(Data), {Headers, _} = cow_http:parse_headers(Rest), {_, <<"keep-alive">>} = lists:keyfind(<<"connection">>, 1, Headers) end, diff --git a/test/http_SUITE_data/http_body_qs.erl b/test/http_SUITE_data/http_body_qs.erl index 945b7fd..e0673cf 100644 --- a/test/http_SUITE_data/http_body_qs.erl +++ b/test/http_SUITE_data/http_body_qs.erl @@ -17,16 +17,16 @@ maybe_echo(<<"POST">>, true, Req) -> echo(proplists:get_value(<<"echo">>, PostVals), Req2) end; maybe_echo(<<"POST">>, false, Req) -> - cowboy_req:reply(400, [], <<"Missing body.">>, Req); + cowboy_req:reply(400, #{}, <<"Missing body.">>, Req); maybe_echo(_, _, Req) -> %% Method not allowed. cowboy_req:reply(405, Req). echo(badlength, Req) -> - cowboy_req:reply(413, [], <<"POST body bigger than 16000 bytes">>, Req); + cowboy_req:reply(413, #{}, <<"POST body bigger than 16000 bytes">>, Req); echo(undefined, Req) -> - cowboy_req:reply(400, [], <<"Missing echo parameter.">>, Req); + cowboy_req:reply(400, #{}, <<"Missing echo parameter.">>, Req); echo(Echo, Req) -> - cowboy_req:reply(200, [ - {<<"content-type">>, <<"text/plain; charset=utf-8">>} - ], Echo, Req). + cowboy_req:reply(200, #{ + <<"content-type">> => <<"text/plain; charset=utf-8">> + }, Echo, Req). diff --git a/test/http_SUITE_data/http_echo_body.erl b/test/http_SUITE_data/http_echo_body.erl index c76b9b3..a541a67 100644 --- a/test/http_SUITE_data/http_echo_body.erl +++ b/test/http_SUITE_data/http_echo_body.erl @@ -6,16 +6,16 @@ init(Req, Opts) -> true = cowboy_req:has_body(Req), - Req3 = case cowboy_req:body(Req, [{length, 1000000}]) of + Req3 = case cowboy_req:read_body(Req, [{length, 1000000}]) of {ok, Body, Req2} -> handle_body(Req2, Body); {more, _, Req2} -> handle_badlength(Req2) end, {ok, Req3, Opts}. handle_badlength(Req) -> - cowboy_req:reply(413, [], <<"Request entity too large">>, Req). + cowboy_req:reply(413, #{}, <<"Request entity too large">>, Req). handle_body(Req, Body) -> Size = cowboy_req:body_length(Req), Size = byte_size(Body), - cowboy_req:reply(200, [], Body, Req). + cowboy_req:reply(200, #{}, Body, Req). diff --git a/test/http_SUITE_data/http_errors.erl b/test/http_SUITE_data/http_errors.erl index ee5c2b2..14e3d09 100644 --- a/test/http_SUITE_data/http_errors.erl +++ b/test/http_SUITE_data/http_errors.erl @@ -13,5 +13,5 @@ case_init(<<"init_before_reply">> = Case, _Req) -> error(Case); case_init(<<"init_after_reply">> = Case, Req) -> ct_helper_error_h:ignore(?MODULE, case_init, 2), - _ = cowboy_req:reply(200, [], "http_handler_crashes", Req), + _ = cowboy_req:reply(200, #{}, "http_handler_crashes", Req), error(Case). diff --git a/test/http_SUITE_data/http_handler.erl b/test/http_SUITE_data/http_handler.erl index 62e9193..eb31aa8 100644 --- a/test/http_SUITE_data/http_handler.erl +++ b/test/http_SUITE_data/http_handler.erl @@ -5,6 +5,6 @@ -export([init/2]). init(Req, Opts) -> - Headers = proplists:get_value(headers, Opts, []), + Headers = proplists:get_value(headers, Opts, #{}), Body = proplists:get_value(body, Opts, "http_handler"), {ok, cowboy_req:reply(200, Headers, Body, Req), Opts}. diff --git a/test/http_SUITE_data/http_multipart.erl b/test/http_SUITE_data/http_multipart.erl index 196cbce..212f569 100644 --- a/test/http_SUITE_data/http_multipart.erl +++ b/test/http_SUITE_data/http_multipart.erl @@ -6,7 +6,7 @@ init(Req, Opts) -> {Result, Req2} = acc_multipart(Req, []), - {ok, cowboy_req:reply(200, [], term_to_binary(Result), Req2), Opts}. + {ok, cowboy_req:reply(200, #{}, term_to_binary(Result), Req2), Opts}. acc_multipart(Req, Acc) -> case cowboy_req:part(Req) of diff --git a/test/http_SUITE_data/http_req_attr.erl b/test/http_SUITE_data/http_req_attr.erl index d8483a5..c6a940e 100644 --- a/test/http_SUITE_data/http_req_attr.erl +++ b/test/http_SUITE_data/http_req_attr.erl @@ -12,4 +12,4 @@ init(Req, Opts) -> Host = cowboy_req:host(Req), Port = cowboy_req:port(Req), Value = [Host, "\n", integer_to_list(Port)], - {ok, cowboy_req:reply(200, [], Value, Req), Opts}. + {ok, cowboy_req:reply(200, #{}, Value, Req), Opts}. diff --git a/test/http_SUITE_data/http_set_resp.erl b/test/http_SUITE_data/http_set_resp.erl index 2e7f835..6ac4c8e 100644 --- a/test/http_SUITE_data/http_set_resp.erl +++ b/test/http_SUITE_data/http_set_resp.erl @@ -5,11 +5,11 @@ -export([init/2]). init(Req, Opts) -> - Headers = proplists:get_value(headers, Opts, []), + Headers = proplists:get_value(headers, Opts, #{}), Body = proplists:get_value(body, Opts, <<"http_handler_set_resp">>), Req2 = lists:foldl(fun({Name, Value}, R) -> cowboy_req:set_resp_header(Name, Value, R) - end, Req, Headers), + end, Req, maps:to_list(Headers)), Req3 = cowboy_req:set_resp_body(Body, Req2), Req4 = cowboy_req:set_resp_header(<<"x-cowboy-test">>, <<"ok">>, Req3), Req5 = cowboy_req:set_resp_cookie(<<"cake">>, <<"lie">>, [], Req4), diff --git a/test/http_SUITE_data/http_stream_body.erl b/test/http_SUITE_data/http_stream_body.erl index aea5300..ef10266 100644 --- a/test/http_SUITE_data/http_stream_body.erl +++ b/test/http_SUITE_data/http_stream_body.erl @@ -7,16 +7,22 @@ init(Req, Opts) -> Body = proplists:get_value(body, Opts, "http_handler_stream_body"), Reply = proplists:get_value(reply, Opts), - SFun = fun(Socket, Transport) -> Transport:send(Socket, Body) end, + SFun = fun () -> + cowboy_req:send_body(Body, nofin, Req) + end, Req2 = case Reply of set_resp -> SLen = iolist_size(Body), - cowboy_req:set_resp_body_fun(SLen, SFun, Req); + cowboy_req:set_resp_body({stream, SLen, SFun}, Req); + %% @todo Hmm that one will be sent as chunked now. + %% We need an option to disable chunked. set_resp_close -> - cowboy_req:set_resp_body_fun(SFun, Req); + cowboy_req:set_resp_body({stream, undefined, SFun}, Req); set_resp_chunked -> %% Here Body should be a list of chunks, not a binary. - SFun2 = fun(SendFun) -> lists:foreach(SendFun, Body) end, - cowboy_req:set_resp_body_fun(chunked, SFun2, Req) + SFun2 = fun () -> + lists:foreach(fun (Data) -> cowboy_req:send_body(Data, nofin, Req) end, Body) + end, + cowboy_req:set_resp_body({stream, undefined, SFun2}, Req) end, {ok, cowboy_req:reply(200, Req2), Opts}. diff --git a/test/http_SUITE_data/rest_param_all.erl b/test/http_SUITE_data/rest_param_all.erl index 2b2ea23..d74df74 100644 --- a/test/http_SUITE_data/rest_param_all.erl +++ b/test/http_SUITE_data/rest_param_all.erl @@ -17,7 +17,7 @@ content_types_provided(Req, State) -> {[{{<<"text">>, <<"plain">>, '*'}, get_text_plain}], Req, State}. get_text_plain(Req, State) -> - {_, _, Param} = cowboy_req:meta(media_type, Req, {{<<"text">>, <<"plain">>}, []}), + {_, _, Param} = maps:get(media_type, Req, {{<<"text">>, <<"plain">>}, []}), Body = if Param == '*' -> <<"'*'">>; diff --git a/test/http_SUITE_data/rest_patch_resource.erl b/test/http_SUITE_data/rest_patch_resource.erl index c1244e7..341920d 100644 --- a/test/http_SUITE_data/rest_patch_resource.erl +++ b/test/http_SUITE_data/rest_patch_resource.erl @@ -28,7 +28,7 @@ content_types_accepted(Req, State) -> end. patch_text_plain(Req, State) -> - case cowboy_req:body(Req) of + case cowboy_req:read_body(Req) of {ok, <<"stop">>, Req0} -> {stop, cowboy_req:reply(400, Req0), State}; {ok, <<"false">>, Req0} -> diff --git a/test/rfc7230_SUITE.erl b/test/rfc7230_SUITE.erl index 15d1fc5..480cc4e 100644 --- a/test/rfc7230_SUITE.erl +++ b/test/rfc7230_SUITE.erl @@ -23,12 +23,12 @@ all() -> [{group, http}]. -groups() -> [{http, [parallel], ct_helper:all(?MODULE)}]. +groups() -> [{http, [parallel], ct_helper:all(?MODULE)}]. %% @todo parallel init_per_group(Name = http, Config) -> - cowboy_test:init_http(Name = http, [ - {env, [{dispatch, cowboy_router:compile(init_routes(Config))}]} - ], Config). + cowboy_test:init_http(Name = http, #{ + env => #{dispatch => cowboy_router:compile(init_routes(Config))} + }, Config). end_per_group(Name, _) -> ok = cowboy:stop_listener(Name). @@ -52,9 +52,7 @@ do_raw(Config, Data) -> {Version, Code, Reason, Rest} = cow_http:parse_status_line(raw_recv_head(Client)), {Headers, Rest2} = cow_http:parse_headers(Rest), case lists:keyfind(<<"content-length">>, 1, Headers) of - false -> - #{client => Client, version => Version, code => Code, reason => Reason, headers => Headers, body => <<>>}; - {_, LengthBin} -> + {_, LengthBin} when LengthBin =/= <<"0">> -> Length = binary_to_integer(LengthBin), Body = if byte_size(Rest2) =:= Length -> Rest2; @@ -62,7 +60,9 @@ do_raw(Config, Data) -> {ok, Body0} = raw_recv(Client, binary_to_integer(LengthBin) - byte_size(Rest2), 5000), << Rest2/bits, Body0/bits >> end, - #{client => Client, version => Version, code => Code, reason => Reason, headers => Headers, body => Body} + #{client => Client, version => Version, code => Code, reason => Reason, headers => Headers, body => Body}; + _ -> + #{client => Client, version => Version, code => Code, reason => Reason, headers => Headers, body => <<>>} end. %% Listener. @@ -90,8 +90,8 @@ accept_at_least_1_empty_line(Config) -> reject_response(Config) -> doc("When receiving a response instead of a request, identified by the " "status-line which starts with the HTTP version, the server must " - "reject the message with a 501 status code and close the connection. (RFC7230 3.1)"), - #{code := 501, client := Client} = do_raw(Config, + "reject the message with a 400 status code and close the connection. (RFC7230 3.1)"), + #{code := 400, client := Client} = do_raw(Config, "HTTP/1.1 200 OK\r\n" "\r\n"), {error, closed} = raw_recv(Client, 0, 1000). @@ -138,14 +138,14 @@ timeout_before_request_line(Config) -> "by the reception of a complete request-line."), Client = raw_open(Config), ok = raw_send(Client, "GET / HTTP/1.1\r"), - {error, closed} = raw_recv(Client, 0, 1000). + {error, closed} = raw_recv(Client, 0, 6000). timeout_after_request_line(Config) -> doc("The time the request (request line and headers) takes to be " "received by the server must be limited and subject to configuration. " "A 408 status code must be sent if the request line was received."), #{code := 408, client := Client} = do_raw(Config, "GET / HTTP/1.1\r\n"), - {error, closed} = raw_recv(Client, 0, 1000). + {error, closed} = raw_recv(Client, 0, 6000). %% @todo Add an HTTP/1.0 test suite. %An HTTP/1.1 server must understand any valid HTTP/1.0 request, diff --git a/test/ws_SUITE.erl b/test/ws_SUITE.erl index 4c5bb1d..4eaf456 100644 --- a/test/ws_SUITE.erl +++ b/test/ws_SUITE.erl @@ -37,17 +37,17 @@ init_per_group(Name = autobahn, Config) -> "http://autobahn.ws/testsuite/installation.html"), {skip, "Autobahn Test Suite not installed."}; _ -> - {ok, _} = cowboy:start_http(Name, 100, [{port, 33080}], [ - {env, [{dispatch, init_dispatch()}]}, - {compress, true} - ]), + {ok, _} = cowboy:start_clear(Name, 100, [{port, 33080}], #{ + env => #{dispatch => init_dispatch()}, + compress => true %% @todo Use a separate option for HTTP and Websocket compression. + }), Config end; init_per_group(Name = ws, Config) -> - cowboy_test:init_http(Name, [ - {env, [{dispatch, init_dispatch()}]}, - {compress, true} - ], Config). + cowboy_test:init_http(Name, #{ + env => #{dispatch => init_dispatch()}, + compress => true %% @todo Use a separate option for HTTP and Websocket compression. + }, Config). end_per_group(Listener, _Config) -> cowboy:stop_listener(Listener). diff --git a/test/ws_SUITE_data/ws_echo_timer.erl b/test/ws_SUITE_data/ws_echo_timer.erl index a26c332..9761e1f 100644 --- a/test/ws_SUITE_data/ws_echo_timer.erl +++ b/test/ws_SUITE_data/ws_echo_timer.erl @@ -3,13 +3,17 @@ -module(ws_echo_timer). -export([init/2]). +-export([websocket_init/2]). -export([websocket_handle/3]). -export([websocket_info/3]). init(Req, _) -> - erlang:start_timer(1000, self(), <<"websocket_init">>), {cowboy_websocket, Req, undefined}. +websocket_init(Req, State) -> + erlang:start_timer(1000, self(), <<"websocket_init">>), + {ok, Req, State}. + websocket_handle({text, Data}, Req, State) -> {reply, {text, Data}, Req, State}; websocket_handle({binary, Data}, Req, State) -> diff --git a/test/ws_SUITE_data/ws_send_many.erl b/test/ws_SUITE_data/ws_send_many.erl index 6585ffa..9cee75e 100644 --- a/test/ws_SUITE_data/ws_send_many.erl +++ b/test/ws_SUITE_data/ws_send_many.erl @@ -3,13 +3,17 @@ -module(ws_send_many). -export([init/2]). +-export([websocket_init/2]). -export([websocket_handle/3]). -export([websocket_info/3]). init(Req, Opts) -> - erlang:send_after(10, self(), send_many), {cowboy_websocket, Req, Opts}. +websocket_init(Req, State) -> + erlang:send_after(10, self(), send_many), + {ok, Req, State}. + websocket_handle(_Frame, Req, State) -> {ok, Req, State}. |