aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--ebin/cowboy.app2
-rw-r--r--erlang.mk55
-rw-r--r--examples/ssl_hello_world/src/ssl_hello_world_app.erl4
-rw-r--r--src/cowboy.erl42
-rw-r--r--src/cowboy_clear.erl37
-rw-r--r--src/cowboy_handler.erl32
-rw-r--r--src/cowboy_http.erl973
-rw-r--r--src/cowboy_http2.erl53
-rw-r--r--src/cowboy_loop.erl87
-rw-r--r--src/cowboy_protocol.erl534
-rw-r--r--src/cowboy_req.erl690
-rw-r--r--src/cowboy_rest.erl16
-rw-r--r--src/cowboy_router.erl15
-rw-r--r--src/cowboy_static.erl9
-rw-r--r--src/cowboy_stream_h.erl128
-rw-r--r--src/cowboy_tls.erl1
-rw-r--r--src/cowboy_websocket.erl90
-rw-r--r--test/cowboy_test.erl51
-rw-r--r--test/handlers/asterisk_h.erl2
-rw-r--r--test/handlers/echo_h.erl2
-rw-r--r--test/handlers/hello_h.erl2
-rw-r--r--test/handlers/loop_handler_body_h.erl5
-rw-r--r--test/http_SUITE.erl70
-rw-r--r--test/http_SUITE_data/http_body_qs.erl12
-rw-r--r--test/http_SUITE_data/http_echo_body.erl6
-rw-r--r--test/http_SUITE_data/http_errors.erl2
-rw-r--r--test/http_SUITE_data/http_handler.erl2
-rw-r--r--test/http_SUITE_data/http_multipart.erl2
-rw-r--r--test/http_SUITE_data/http_req_attr.erl2
-rw-r--r--test/http_SUITE_data/http_set_resp.erl4
-rw-r--r--test/http_SUITE_data/http_stream_body.erl16
-rw-r--r--test/http_SUITE_data/rest_param_all.erl2
-rw-r--r--test/http_SUITE_data/rest_patch_resource.erl2
-rw-r--r--test/rfc7230_SUITE.erl24
-rw-r--r--test/ws_SUITE.erl16
-rw-r--r--test/ws_SUITE_data/ws_echo_timer.erl6
-rw-r--r--test/ws_SUITE_data/ws_send_many.erl6
37 files changed, 1841 insertions, 1161 deletions
diff --git a/ebin/cowboy.app b/ebin/cowboy.app
index 0f037ce..a450961 100644
--- a/ebin/cowboy.app
+++ b/ebin/cowboy.app
@@ -1,7 +1,7 @@
{application, cowboy, [
{description, "Small, fast, modular HTTP server."},
{vsn, "2.0.0-pre.2"},
- {modules, ['cowboy','cowboy_app','cowboy_bstr','cowboy_clock','cowboy_constraints','cowboy_handler','cowboy_http2','cowboy_loop','cowboy_middleware','cowboy_protocol','cowboy_req','cowboy_rest','cowboy_router','cowboy_spdy','cowboy_static','cowboy_stream','cowboy_sub_protocol','cowboy_sup','cowboy_tls','cowboy_websocket']},
+ {modules, ['cowboy','cowboy_app','cowboy_bstr','cowboy_clear','cowboy_clock','cowboy_constraints','cowboy_handler','cowboy_http','cowboy_http2','cowboy_loop','cowboy_middleware','cowboy_protocol','cowboy_req','cowboy_rest','cowboy_router','cowboy_spdy','cowboy_static','cowboy_stream','cowboy_stream_h','cowboy_sub_protocol','cowboy_sup','cowboy_tls','cowboy_websocket']},
{registered, [cowboy_sup,cowboy_clock]},
{applications, [kernel,stdlib,crypto,cowlib,ranch]},
{mod, {cowboy_app, []}}
diff --git a/erlang.mk b/erlang.mk
index e2c40b8..3ad1a48 100644
--- a/erlang.mk
+++ b/erlang.mk
@@ -16,7 +16,7 @@
ERLANG_MK_FILENAME := $(realpath $(lastword $(MAKEFILE_LIST)))
-ERLANG_MK_VERSION = 2.0.0-pre.2-73-g87285ad-dirty
+ERLANG_MK_VERSION = 2.0.0-pre.2-75-g18a7074-dirty
# Core configuration.
@@ -147,11 +147,6 @@ else
core_native_path = $1
endif
-ifeq ($(shell which wget 2>/dev/null | wc -l), 1)
-define core_http_get
- wget --no-check-certificate -O $(1) $(2)|| rm $(1)
-endef
-else
define core_http_get.erl
ssl:start(),
inets:start(),
@@ -170,7 +165,6 @@ endef
define core_http_get
$(call erlang,$(call core_http_get.erl,$(call core_native_path,$1),$2))
endef
-endif
core_eq = $(and $(findstring $(1),$(2)),$(findstring $(2),$(1)))
@@ -4646,7 +4640,7 @@ dtl_verbose = $(dtl_verbose_$(V))
# Core targets.
-DTL_FILES = $(sort $(call core_find,$(DTL_PATH),*.dtl))
+DTL_FILES := $(sort $(call core_find,$(DTL_PATH),*.dtl))
ifneq ($(DTL_FILES),)
@@ -4711,10 +4705,11 @@ define compile_proto.erl
halt().
endef
-ifneq ($(wildcard src/),)
+# @todo Convert like erlydtl was.
+# @todo Give access to ALL_SRC_FILES.
+
ebin/$(PROJECT).app:: $(sort $(call core_find,src/,*.proto))
$(if $(strip $?),$(call compile_proto,$?))
-endif
# Copyright (c) 2013-2015, Loïc Hoguin <[email protected]eu>
# This file is part of erlang.mk and subject to the terms of the ISC License.
@@ -4807,8 +4802,11 @@ app-build: ebin/$(PROJECT).app
# Source files.
-ERL_FILES = $(sort $(call core_find,src/,*.erl))
-CORE_FILES = $(sort $(call core_find,src/,*.core))
+# @todo have "all test/ files" also.
+ALL_SRC_FILES := $(sort $(call core_find,src/,*))
+
+ERL_FILES := $(filter %.erl,$(ALL_SRC_FILES))
+CORE_FILES := $(filter %.core,$(ALL_SRC_FILES))
# ASN.1 files.
@@ -4841,11 +4839,11 @@ endif
# Leex and Yecc files.
-XRL_FILES = $(sort $(call core_find,src/,*.xrl))
+XRL_FILES := $(filter %.xrl,$(ALL_SRC_FILES))
XRL_ERL_FILES = $(addprefix src/,$(patsubst %.xrl,%.erl,$(notdir $(XRL_FILES))))
ERL_FILES += $(XRL_ERL_FILES)
-YRL_FILES = $(sort $(call core_find,src/,*.yrl))
+YRL_FILES := $(filter %.yrl,$(ALL_SRC_FILES))
YRL_ERL_FILES = $(addprefix src/,$(patsubst %.yrl,%.erl,$(notdir $(YRL_FILES))))
ERL_FILES += $(YRL_ERL_FILES)
@@ -4932,6 +4930,7 @@ define makedep.erl
endef
ifeq ($(if $(NO_MAKEDEP),$(wildcard $(PROJECT).d),),)
+# @todo Not src/*.hrl?
$(PROJECT).d:: $(ERL_FILES) $(call core_find,include/,*.hrl)
$(makedep_verbose) $(call erlang,$(call makedep.erl,[email protected]))
endif
@@ -5852,6 +5851,12 @@ endif
.PHONY: ci ci-prepare ci-setup distclean-kerl
+CI_OTP ?=
+
+ifeq ($(strip $(CI_OTP)),)
+ci::
+else
+
ifndef KERL
KERL := $(shell which kerl 2>/dev/null)
@@ -5870,11 +5875,7 @@ KERL_MAKEFLAGS ?=
OTP_GIT ?= https://github.com/erlang/otp
CI_INSTALL_DIR ?= $(HOME)/erlang
-CI_OTP ?=
-ifeq ($(strip $(CI_OTP)),)
-ci::
-else
ci:: $(addprefix ci-,$(CI_OTP))
ci-prepare: $(addprefix $(CI_INSTALL_DIR)/,$(CI_OTP))
@@ -5933,11 +5934,13 @@ endif
# Configuration.
CT_OPTS ?=
+
ifneq ($(wildcard $(TEST_DIR)),)
- CT_SUITES ?= $(sort $(subst _SUITE.erl,,$(notdir $(call core_find,$(TEST_DIR)/,*_SUITE.erl))))
-else
- CT_SUITES ?=
+ifndef CT_SUITES
+CT_SUITES := $(sort $(subst _SUITE.erl,,$(notdir $(call core_find,$(TEST_DIR)/,*_SUITE.erl))))
endif
+endif
+CT_SUITES ?=
# Core targets.
@@ -5967,7 +5970,7 @@ ct: $(if $(IS_APP),,apps-ct)
else
ct: test-build $(if $(IS_APP),,apps-ct)
$(verbose) mkdir -p $(CURDIR)/logs/
- $(gen_verbose) $(CT_RUN) -suite $(addsuffix _SUITE,$(CT_SUITES)) $(CT_OPTS)
+ $(gen_verbose) $(CT_RUN) -sname ct_$(PROJECT) -suite $(addsuffix _SUITE,$(CT_SUITES)) $(CT_OPTS)
endif
ifneq ($(ALL_APPS_DIRS),)
@@ -5989,7 +5992,7 @@ endif
define ct_suite_target
ct-$(1): test-build
$(verbose) mkdir -p $(CURDIR)/logs/
- $(gen_verbose) $(CT_RUN) -suite $(addsuffix _SUITE,$(1)) $(CT_EXTRA) $(CT_OPTS)
+ $(gen_verbose) $(CT_RUN) -sname ct_$(PROJECT) -suite $(addsuffix _SUITE,$(1)) $(CT_EXTRA) $(CT_OPTS)
endef
$(foreach test,$(CT_SUITES),$(eval $(call ct_suite_target,$(test))))
@@ -6196,8 +6199,9 @@ eunit: test-build
$(gen_verbose) $(call erlang,$(call eunit.erl,fun $(t)/0),$(EUNIT_ERL_OPTS))
endif
else
-EUNIT_EBIN_MODS = $(notdir $(basename $(call core_find,ebin/,*.beam)))
-EUNIT_TEST_MODS = $(notdir $(basename $(call core_find,$(TEST_DIR)/,*.beam)))
+EUNIT_EBIN_MODS = $(notdir $(basename $(ERL_FILES) $(BEAM_FILES)))
+EUNIT_TEST_MODS = $(notdir $(basename $(call core_find,$(TEST_DIR)/,*.erl)))
+
EUNIT_MODS = $(foreach mod,$(EUNIT_EBIN_MODS) $(filter-out \
$(patsubst %,%_tests,$(EUNIT_EBIN_MODS)),$(EUNIT_TEST_MODS)),'$(mod)')
@@ -6314,6 +6318,7 @@ shell: build-shell-deps
# Copyright (c) 2015, Loïc Hoguin <[email protected]>
# This file is part of erlang.mk and subject to the terms of the ISC License.
+# @todo BUILD_DEPS too?
ifeq ($(filter triq,$(DEPS) $(TEST_DEPS)),triq)
.PHONY: triq
diff --git a/examples/ssl_hello_world/src/ssl_hello_world_app.erl b/examples/ssl_hello_world/src/ssl_hello_world_app.erl
index 3e53818..0338baf 100644
--- a/examples/ssl_hello_world/src/ssl_hello_world_app.erl
+++ b/examples/ssl_hello_world/src/ssl_hello_world_app.erl
@@ -17,12 +17,12 @@ start(_Type, _Args) ->
]}
]),
PrivDir = code:priv_dir(ssl_hello_world),
- {ok, _} = cowboy:start_https(https, 100, [
+ {ok, _} = cowboy:start_tls(https, 100, [
{port, 8443},
{cacertfile, PrivDir ++ "/ssl/cowboy-ca.crt"},
{certfile, PrivDir ++ "/ssl/server.crt"},
{keyfile, PrivDir ++ "/ssl/server.key"}
- ], [{env, [{dispatch, Dispatch}]}]),
+ ], #{env, [{dispatch, Dispatch}]}),
ssl_hello_world_sup:start_link().
stop(_State) ->
diff --git a/src/cowboy.erl b/src/cowboy.erl
index 1f0e2a9..67afd7b 100644
--- a/src/cowboy.erl
+++ b/src/cowboy.erl
@@ -14,9 +14,7 @@
-module(cowboy).
--export([start_http/4]).
--export([start_https/4]).
--export([start_spdy/4]).
+-export([start_clear/4]).
-export([start_tls/4]).
-export([stop_listener/1]).
-export([set_env/3]).
@@ -42,43 +40,27 @@
fun((http_status(), http_headers(), iodata(), Req) -> Req).
-export_type([onresponse_fun/0]).
--spec start_http(ranch:ref(), non_neg_integer(), ranch_tcp:opts(),
+-spec start_clear(ranch:ref(), non_neg_integer(), ranch_tcp:opts(),
cowboy_protocol:opts()) -> {ok, pid()} | {error, any()}.
-start_http(Ref, NbAcceptors, TransOpts, ProtoOpts)
+start_clear(Ref, NbAcceptors, TransOpts0, ProtoOpts)
when is_integer(NbAcceptors), NbAcceptors > 0 ->
- ranch:start_listener(Ref, NbAcceptors,
- ranch_tcp, TransOpts, cowboy_protocol, ProtoOpts).
-
--spec start_https(ranch:ref(), non_neg_integer(), ranch_ssl:opts(),
- cowboy_protocol:opts()) -> {ok, pid()} | {error, any()}.
-start_https(Ref, NbAcceptors, TransOpts, ProtoOpts)
- when is_integer(NbAcceptors), NbAcceptors > 0 ->
- ranch:start_listener(Ref, NbAcceptors,
- ranch_ssl, TransOpts, cowboy_protocol, ProtoOpts).
-
--spec start_spdy(ranch:ref(), non_neg_integer(), ranch_ssl:opts(),
- cowboy_spdy:opts()) -> {ok, pid()} | {error, any()}.
-start_spdy(Ref, NbAcceptors, TransOpts, ProtoOpts)
- when is_integer(NbAcceptors), NbAcceptors > 0 ->
- TransOpts2 = [
- {connection_type, supervisor},
- {next_protocols_advertised,
- [<<"spdy/3">>, <<"http/1.1">>, <<"http/1.0">>]}
- |TransOpts],
- ranch:start_listener(Ref, NbAcceptors,
- ranch_ssl, TransOpts2, cowboy_spdy, ProtoOpts).
+ TransOpts = TransOpts0,%[connection_type(ProtoOpts)|TransOpts0],
+ ranch:start_listener(Ref, NbAcceptors, ranch_tcp, TransOpts, cowboy_clear, ProtoOpts).
-spec start_tls(ranch:ref(), non_neg_integer(), ranch_ssl:opts(), opts()) -> {ok, pid()} | {error, any()}.
start_tls(Ref, NbAcceptors, TransOpts0, ProtoOpts)
when is_integer(NbAcceptors), NbAcceptors > 0 ->
- {_, Type} = maps:get(stream_handler, ProtoOpts, {cowboy_stream_h, supervisor}),
TransOpts = [
- {connection_type, Type},
+ connection_type(ProtoOpts),
{next_protocols_advertised, [<<"h2">>, <<"spdy/3">>, <<"http/1.1">>]},
{alpn_preferred_protocols, [<<"h2">>, <<"spdy/3">>, <<"http/1.1">>]}
|TransOpts0],
- ranch:start_listener(Ref, NbAcceptors,
- ranch_ssl, TransOpts, cowboy_tls, ProtoOpts).
+ ranch:start_listener(Ref, NbAcceptors, ranch_ssl, TransOpts, cowboy_tls, ProtoOpts).
+
+-spec connection_type(opts()) -> {connection_type, worker | supervisor}.
+connection_type(ProtoOpts) ->
+ {_, Type} = maps:get(stream_handler, ProtoOpts, {cowboy_stream_h, supervisor}),
+ {connection_type, Type}.
-spec stop_listener(ranch:ref()) -> ok | {error, not_found}.
stop_listener(Ref) ->
diff --git a/src/cowboy_clear.erl b/src/cowboy_clear.erl
new file mode 100644
index 0000000..cc6078e
--- /dev/null
+++ b/src/cowboy_clear.erl
@@ -0,0 +1,37 @@
+%% Copyright (c) 2016, Loïc Hoguin <[email protected]>
+%%
+%% Permission to use, copy, modify, and/or distribute this software for any
+%% purpose with or without fee is hereby granted, provided that the above
+%% copyright notice and this permission notice appear in all copies.
+%%
+%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+
+-module(cowboy_clear).
+-behavior(ranch_protocol).
+
+-export([start_link/4]).
+-export([init/5]).
+
+-spec start_link(ranch:ref(), inet:socket(), module(), cowboy:opts()) -> {ok, pid()}.
+start_link(Ref, Socket, Transport, Opts) ->
+ Pid = proc_lib:spawn_link(?MODULE, init, [self(), Ref, Socket, Transport, Opts]),
+ {ok, Pid}.
+
+-spec init(pid(), ranch:ref(), inet:socket(), module(), cowboy:opts()) -> ok.
+init(Parent, Ref, Socket, Transport, Opts) ->
+ ok = ranch:accept_ack(Ref),
+ init(Parent, Ref, Socket, Transport, Opts, cowboy_http).
+
+init(Parent, Ref, Socket, Transport, Opts, Protocol) ->
+ {Handler, Type} = maps:get(stream_handler, Opts, {cowboy_stream_h, supervisor}),
+ _ = case Type of
+ worker -> ok;
+ supervisor -> process_flag(trap_exit, true)
+ end,
+ Protocol:init(Parent, Ref, Socket, Transport, Opts, Handler).
diff --git a/src/cowboy_handler.erl b/src/cowboy_handler.erl
index 165888e..7791fe2 100644
--- a/src/cowboy_handler.erl
+++ b/src/cowboy_handler.erl
@@ -35,10 +35,8 @@
-spec execute(Req, Env) -> {ok, Req, Env}
when Req::cowboy_req:req(), Env::cowboy_middleware:env().
-execute(Req, Env) ->
- {_, Handler} = lists:keyfind(handler, 1, Env),
- {_, HandlerOpts} = lists:keyfind(handler_opts, 1, Env),
- try Handler:init(Req, HandlerOpts) of
+execute(Req, Env=#{handler := Handler, handler_opts := HandlerOpts}) ->
+ case Handler:init(Req, HandlerOpts) of
{ok, Req2, State} ->
Result = terminate(normal, Req2, State, Handler),
{ok, Req2, [{result, Result}|Env]};
@@ -50,37 +48,13 @@ execute(Req, Env) ->
Mod:upgrade(Req2, Env, Handler, State, Timeout, run);
{Mod, Req2, State, Timeout, hibernate} ->
Mod:upgrade(Req2, Env, Handler, State, Timeout, hibernate)
- catch Class:Reason ->
- Stacktrace = erlang:get_stacktrace(),
- cowboy_req:maybe_reply(Stacktrace, Req),
- terminate({crash, Class, Reason}, Req, HandlerOpts, Handler),
- exit({cowboy_handler, [
- {class, Class},
- {reason, Reason},
- {mfa, {Handler, init, 2}},
- {stacktrace, Stacktrace},
- {req, cowboy_req:to_list(Req)},
- {opts, HandlerOpts}
- ]})
end.
-spec terminate(any(), Req, any(), module()) -> ok when Req::cowboy_req:req().
terminate(Reason, Req, State, Handler) ->
case erlang:function_exported(Handler, terminate, 3) of
true ->
- try
- Handler:terminate(Reason, cowboy_req:lock(Req), State)
- catch Class:Reason2 ->
- exit({cowboy_handler, [
- {class, Class},
- {reason, Reason2},
- {mfa, {Handler, terminate, 3}},
- {stacktrace, erlang:get_stacktrace()},
- {req, cowboy_req:to_list(Req)},
- {state, State},
- {terminate_reason, Reason}
- ]})
- end;
+ Handler:terminate(Reason, cowboy_req:lock(Req), State);
false ->
ok
end.
diff --git a/src/cowboy_http.erl b/src/cowboy_http.erl
new file mode 100644
index 0000000..3ec3c17
--- /dev/null
+++ b/src/cowboy_http.erl
@@ -0,0 +1,973 @@
+%% Copyright (c) 2016, Loïc Hoguin <[email protected]>
+%%
+%% Permission to use, copy, modify, and/or distribute this software for any
+%% purpose with or without fee is hereby granted, provided that the above
+%% copyright notice and this permission notice appear in all copies.
+%%
+%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+
+-module(cowboy_http).
+
+-export([init/6]).
+
+-export([system_continue/3]).
+-export([system_terminate/4]).
+-export([system_code_change/4]).
+
+%% @todo map
+-type opts() :: [{compress, boolean()}
+ | {env, cowboy_middleware:env()}
+ | {max_empty_lines, non_neg_integer()}
+ | {max_header_name_length, non_neg_integer()}
+ | {max_header_value_length, non_neg_integer()}
+ | {max_headers, non_neg_integer()}
+ | {max_keepalive, non_neg_integer()}
+ | {max_request_line_length, non_neg_integer()}
+ | {middlewares, [module()]}
+ | {onresponse, cowboy:onresponse_fun()}
+ | {timeout, timeout()}].
+-export_type([opts/0]).
+
+-record(ps_request_line, {
+ empty_lines = 0 :: non_neg_integer()
+}).
+
+-record(ps_header, {
+ method = undefined :: binary(),
+ path = undefined :: binary(),
+ qs = undefined :: binary(),
+ version = undefined :: cowboy:http_version(),
+ headers = undefined :: map() | undefined, %% @todo better type than map()
+ name = undefined :: binary()
+}).
+
+%% @todo We need a state where we wait for the stream process to ask for the body.
+%% OR DO WE
+
+%% In HTTP/2 we start receiving data before the body asks for it, even if optionally
+%% (and by default), so we need to be able to do the same for HTTP/1.1 too. This means
+%% that when we receive data (up to a certain limit, we read from the socket and decode.
+%% When we reach a limit, we stop reading from the socket momentarily until the stream
+%% process asks for more or the stream ends.
+
+%% This means that we need to keep a buffer in the stream handler (until the stream
+%% process asks for it). And that we need the body state to indicate how much we have
+%% left to read (and stop/start reading from the socket depending on value).
+
+-record(ps_body, {
+ %% @todo flow
+ transfer_decode_fun :: fun(), %% @todo better type
+ transfer_decode_state :: any() %% @todo better type
+}).
+
+-record(stream, {
+ %% Stream identifier.
+ id = undefined :: cowboy_stream:streamid(),
+
+ %% Stream handler state.
+ state = undefined :: any(),
+
+ %% Client HTTP version for this stream.
+ version = undefined :: cowboy:http_version(),
+
+ %% Commands queued.
+ queue = [] :: [] %% @todo better type
+}).
+
+-type stream() :: #stream{}.
+
+-record(state, {
+ parent :: pid(),
+ ref :: ranch:ref(),
+ socket :: inet:socket(),
+ transport :: module(),
+ opts = #{} :: map(),
+ handler :: module(),
+
+ timer = undefined :: undefined | reference(),
+
+ %% Identifier for the stream currently being read (or waiting to be received).
+ in_streamid = 1 :: pos_integer(),
+
+ %% Parsing state for the current stream or stream-to-be.
+ in_state = #ps_request_line{} :: #ps_request_line{} | #ps_header{} | #ps_body{},
+
+ %% Identifier for the stream currently being written.
+ %% Note that out_streamid =< in_streamid.
+ out_streamid = 1 :: pos_integer(),
+
+ %% Whether we finished writing data for the current stream.
+ out_state = wait :: wait | headers | chunked,
+
+ %% The connection will be closed after this stream.
+ last_streamid = undefined :: pos_integer(),
+
+ %% Currently active HTTP/1.1 streams. Streams may be initiated either
+ %% by the client or by the server through PUSH_PROMISE frames.
+ streams = [] :: [stream()],
+
+ %% Children which are in the process of shutting down.
+ children = [] :: [{pid(), cowboy_stream:streamid(), timeout()}]
+
+ %% @todo Automatic compression. (compress option?)
+ %% @todo onresponse? Equivalent using streams.
+}).
+
+-include_lib("cowlib/include/cow_inline.hrl").
+-include_lib("cowlib/include/cow_parse.hrl").
+
+-spec init(pid(), ranch:ref(), inet:socket(), module(), cowboy:opts(), module()) -> ok.
+init(Parent, Ref, Socket, Transport, Opts, Handler) ->
+ LastStreamID = maps:get(max_keepalive, Opts, 100),
+ before_loop(set_request_timeout(#state{
+ parent=Parent, ref=Ref, socket=Socket,
+ transport=Transport, opts=Opts, handler=Handler,
+ last_streamid=LastStreamID}), <<>>).
+
+%% @todo Send a response depending on in_state and whether one was already sent.
+
+%% @todo
+%% Timeouts:
+%% - waiting for new request (if no stream is currently running)
+%% -> request_timeout: for whole request/headers, set at init/when we set ps_request_line{} state
+%% - waiting for body (if a stream requested the body to be read)
+%% -> read_body_timeout: amount of time we wait without receiving any data when reading the body
+%% - if we skip the body, skip only for a specific duration
+%% -> skip_body_timeout: also have a skip_body_length
+%% - none if we have a stream running and it didn't request the body to be read
+%% - global
+%% -> inactivity_timeout: max time to wait without anything happening before giving up
+
+before_loop(State=#state{socket=Socket, transport=Transport}, Buffer) ->
+ %% @todo disable this when we get to the body, until the stream asks for it?
+ %% Perhaps have a threshold for how much we're willing to read before waiting.
+ Transport:setopts(Socket, [{active, once}]),
+ loop(State, Buffer).
+
+loop(State=#state{parent=Parent, socket=Socket, transport=Transport,
+ handler=_Handler, timer=TimerRef, children=Children}, Buffer) ->
+ {OK, Closed, Error} = Transport:messages(),
+ receive
+ %% Socket messages.
+ {OK, Socket, Data} ->
+ parse(<< Buffer/binary, Data/binary >>, State);
+ {Closed, Socket} ->
+ terminate(State, {socket_error, closed, 'The socket has been closed.'});
+ {Error, Socket, Reason} ->
+ terminate(State, {socket_error, Reason, 'An error has occurred on the socket.'});
+ %% Timeouts.
+ {timeout, TimerRef, Reason} ->
+ timeout(State, Reason);
+ {timeout, _, _} ->
+ loop(State, Buffer);
+ %% System messages.
+ {'EXIT', Parent, Reason} ->
+ exit(Reason);
+ {system, From, Request} ->
+ sys:handle_system_msg(Request, From, Parent, ?MODULE, [], {State, Buffer});
+ %% Messages pertaining to a stream.
+ {{Pid, StreamID}, Msg} when Pid =:= self() ->
+ loop(info(State, StreamID, Msg), Buffer);
+ %% Exit signal from children.
+ Msg = {'EXIT', Pid, _} ->
+ loop(down(State, Pid, Msg), Buffer);
+ %% Calls from supervisor module.
+ {'$gen_call', {From, Tag}, which_children} ->
+ Workers = [{?MODULE, Pid, worker, [?MODULE]} || {Pid, _, _} <- Children],
+ From ! {Tag, Workers},
+ loop(State, Buffer);
+ {'$gen_call', {From, Tag}, count_children} ->
+ NbChildren = length(Children),
+ Counts = [{specs, 1}, {active, NbChildren},
+ {supervisors, 0}, {workers, NbChildren}],
+ From ! {Tag, Counts},
+ loop(State, Buffer);
+ {'$gen_call', {From, Tag}, _} ->
+ From ! {Tag, {error, ?MODULE}},
+ loop(State, Buffer);
+ %% Unknown messages.
+ Msg ->
+ error_logger:error_msg("Received stray message ~p.", [Msg]),
+ loop(State, Buffer)
+ %% @todo Configurable timeout. This should be a global inactivity timeout
+ %% that triggers when really nothing happens (ie something went really wrong).
+ after 300000 ->
+ terminate(State, {internal_error, timeout, 'No message or data received before timeout.'})
+ end.
+
+set_request_timeout(State0=#state{timer=TimerRef0, opts=Opts}) ->
+ State = cancel_request_timeout(State0),
+ Timeout = maps:get(request_timeout, Opts, 5000),
+ TimerRef = erlang:start_timer(Timeout, self(), request_timeout),
+ State#state{timer=TimerRef}.
+
+cancel_request_timeout(State=#state{timer=TimerRef, opts=Opts}) ->
+ ok = case TimerRef of
+ undefined -> ok;
+ _ -> erlang:cancel_timer(TimerRef, [{async, true}, {info, false}])
+ end,
+ State#state{timer=undefined}.
+
+%% @todo Honestly it would be much better if we didn't enable pipelining yet.
+timeout(State=#state{in_state=#ps_request_line{}}, request_timeout) ->
+ %% @todo If other streams are running, just set the connection to be closed
+ %% and stop trying to read from the socket?
+ terminate(State, {connection_error, timeout, 'No request-line received before timeout.'});
+timeout(State=#state{socket=Socket, transport=Transport, in_state=#ps_header{}}, request_timeout) ->
+ %% @todo If other streams are running, maybe wait for their reply before sending 408?
+ %% -> Definitely. Either way, stop reading from the socket and make that stream the last.
+ Transport:send(Socket, cow_http:response(408, 'HTTP/1.1', [])),
+ terminate(State, {connection_error, timeout, 'Request headers not received before timeout.'}).
+
+%% Request-line.
+parse(<<>>, State) ->
+ before_loop(State, <<>>);
+parse(Buffer, State=#state{in_state=#ps_request_line{empty_lines=EmptyLines}}) ->
+ after_parse(parse_request(Buffer, State, EmptyLines));
+parse(Buffer, State=#state{in_state=PS=#ps_header{headers=Headers, name=undefined}}) ->
+ after_parse(parse_header(Buffer,
+ State#state{in_state=PS#ps_header{headers=undefined}},
+ Headers));
+parse(Buffer, State=#state{in_state=PS=#ps_header{headers=Headers, name=Name}}) ->
+ after_parse(parse_hd_before_value(Buffer,
+ State#state{in_state=PS#ps_header{headers=undefined, name=undefined}},
+ Headers, Name));
+parse(Buffer, State=#state{in_state=#ps_body{}}) ->
+ %% @todo We do not want to get the body automatically if the request doesn't ask for it.
+ %% We may want to get bodies that are below a threshold without waiting, and buffer them
+ %% until the request asks, though.
+
+ %% @todo Transfer-decoding must be done here.
+ after_parse(parse_body(Buffer, State)).
+%% @todo Don't parse if body is finished but request isn't. Let's not parallelize for now.
+
+after_parse({request, Req=#{streamid := StreamID, headers := Headers, version := Version},
+ State0=#state{handler=Handler, opts=Opts, streams=Streams0}, Buffer}) ->
+ %% @todo Opts at the end. Maybe pass the same Opts we got?
+ try Handler:init(StreamID, Req, Opts) of
+ {Commands, StreamState} ->
+ Streams = [#stream{id=StreamID, state=StreamState, version=Version}|Streams0],
+ State = case maybe_req_close(State0, Headers, Version) of
+ close -> State0#state{streams=Streams, last_streamid=StreamID};
+ keepalive -> State0#state{streams=Streams}
+ end,
+ parse(Buffer, commands(State, StreamID, Commands))
+ catch Class:Reason ->
+ error_logger:error_msg("Exception occurred in ~s:init(~p, ~p, ~p) "
+ "with reason ~p:~p.",
+ [Handler, StreamID, Req, Opts, Class, Reason]),
+ ok
+ %% @todo Status code.
+% stream_reset(State, StreamID, {internal_error, {Class, Reason},
+% 'Exception occurred in StreamHandler:init/10 call.'}) %% @todo Check final arity.
+ end;
+%% Streams are sequential so the body is always about the last stream created
+%% unless that stream has terminated.
+after_parse({data, StreamID, IsFin, Data, State=#state{handler=Handler,
+ streams=Streams0=[Stream=#stream{id=StreamID, state=StreamState0}|_]}, Buffer}) ->
+ try Handler:data(StreamID, IsFin, Data, StreamState0) of
+ {Commands, StreamState} ->
+ Streams = lists:keyreplace(StreamID, #stream.id, Streams0,
+ Stream#stream{state=StreamState}),
+ parse(Buffer, commands(State#state{streams=Streams}, StreamID, Commands))
+ catch Class:Reason ->
+ error_logger:error_msg("Exception occurred in ~s:data(~p, ~p, ~p, ~p) with reason ~p:~p.",
+ [Handler, StreamID, IsFin, Data, StreamState0, Class, Reason]),
+ ok
+ %% @todo
+% stream_reset(State, StreamID, {internal_error, {Class, Reason},
+% 'Exception occurred in StreamHandler:data/4 call.'})
+ end;
+%% No corresponding stream, skip.
+after_parse({data, _, _, _, State, Buffer}) ->
+ before_loop(State, Buffer);
+after_parse({more, State, Buffer}) ->
+ before_loop(State, Buffer).
+
+%% Request-line.
+
+-spec parse_request(binary(), #state{}, non_neg_integer()) -> ok.
+%% Empty lines must be using \r\n.
+parse_request(<< $\n, _/bits >>, State, _) ->
+ error_terminate(400, State, {connection_error, protocol_error,
+ ''}); %% @todo
+parse_request(<< $\s, _/bits >>, State, _) ->
+ error_terminate(400, State, {connection_error, protocol_error,
+ ''}); %% @todo
+%% We limit the length of the Request-line to MaxLength to avoid endlessly
+%% reading from the socket and eventually crashing.
+parse_request(Buffer, State=#state{opts=Opts}, EmptyLines) ->
+ MaxLength = maps:get(max_request_line_length, Opts, 8000),
+ MaxEmptyLines = maps:get(max_empty_lines, Opts, 5),
+ case match_eol(Buffer, 0) of
+ nomatch when byte_size(Buffer) > MaxLength ->
+ error_terminate(414, State, {connection_error, limit_reached,
+ ''}); %% @todo
+ nomatch ->
+ {more, State#state{in_state=#ps_request_line{empty_lines=EmptyLines}}, Buffer};
+ 1 when EmptyLines =:= MaxEmptyLines ->
+ error_terminate(400, State, {connection_error, limit_reached,
+ ''}); %% @todo
+ 1 ->
+ << _:16, Rest/bits >> = Buffer,
+ parse_request(Rest, State, EmptyLines + 1);
+ _ ->
+ case Buffer of
+ %% @todo * is only for server-wide OPTIONS request (RFC7230 5.3.4); tests
+ << "OPTIONS * ", Rest/bits >> ->
+ parse_version(Rest, State, <<"OPTIONS">>, <<"*">>, <<>>);
+% << "CONNECT ", Rest/bits >> ->
+% parse_authority( %% @todo
+ _ ->
+ parse_method(Buffer, State, <<>>,
+ maps:get(max_method_length, Opts, 32))
+ end
+ end.
+
+match_eol(<< $\n, _/bits >>, N) ->
+ N;
+match_eol(<< _, Rest/bits >>, N) ->
+ match_eol(Rest, N + 1);
+match_eol(_, _) ->
+ nomatch.
+
+parse_method(_, State, _, 0) ->
+ error_terminate(501, State, {connection_error, limit_reached,
+ 'The method name is longer than configuration allows. (RFC7230 3.1.1)'});
+parse_method(<< C, Rest/bits >>, State, SoFar, Remaining) ->
+ case C of
+ $\r -> error_terminate(400, State, {connection_error, protocol_error,
+ ''}); %% @todo
+ $\s -> parse_uri(Rest, State, SoFar);
+ _ when ?IS_TOKEN(C) -> parse_method(Rest, State, << SoFar/binary, C >>, Remaining - 1);
+ _ -> error_terminate(400, State, {connection_error, protocol_error,
+ 'The method name must contain only valid token characters. (RFC7230 3.1.1)'})
+ end.
+
+parse_uri(<< H, T, T, P, "://", Rest/bits >>, State, Method)
+ when H =:= $h orelse H =:= $H, T =:= $t orelse T =:= $T;
+ P =:= $p orelse P =:= $P ->
+ parse_uri_skip_host(Rest, State, Method);
+parse_uri(<< H, T, T, P, S, "://", Rest/bits >>, State, Method)
+ when H =:= $h orelse H =:= $H, T =:= $t orelse T =:= $T;
+ P =:= $p orelse P =:= $P; S =:= $s orelse S =:= $S ->
+ parse_uri_skip_host(Rest, State, Method);
+parse_uri(<< $/, Rest/bits >>, State, Method) ->
+ parse_uri_path(Rest, State, Method, << $/ >>);
+parse_uri(_, State, _) ->
+ error_terminate(400, State, {connection_error, protocol_error,
+ 'Invalid request-line or request-target. (RFC7230 3.1.1, RFC7230 5.3)'}).
+
+parse_uri_skip_host(<< C, Rest/bits >>, State, Method) ->
+ case C of
+ $\r -> error_terminate(400, State, {connection_error, protocol_error,
+ ''}); %% @todo
+ $/ -> parse_uri_path(Rest, State, Method, <<"/">>);
+ $\s -> parse_version(Rest, State, Method, <<"/">>, <<>>);
+ $? -> parse_uri_query(Rest, State, Method, <<"/">>, <<>>);
+ $# -> skip_uri_fragment(Rest, State, Method, <<"/">>, <<>>);
+ _ -> parse_uri_skip_host(Rest, State, Method)
+ end.
+
+parse_uri_path(<< C, Rest/bits >>, State, Method, SoFar) ->
+ case C of
+ $\r -> error_terminate(400, State, {connection_error, protocol_error,
+ ''}); %% @todo
+ $\s -> parse_version(Rest, State, Method, SoFar, <<>>);
+ $? -> parse_uri_query(Rest, State, Method, SoFar, <<>>);
+ $# -> skip_uri_fragment(Rest, State, Method, SoFar, <<>>);
+ _ -> parse_uri_path(Rest, State, Method, << SoFar/binary, C >>)
+ end.
+
+parse_uri_query(<< C, Rest/bits >>, State, M, P, SoFar) ->
+ case C of
+ $\r -> error_terminate(400, State, {connection_error, protocol_error,
+ ''}); %% @todo
+ $\s -> parse_version(Rest, State, M, P, SoFar);
+ $# -> skip_uri_fragment(Rest, State, M, P, SoFar);
+ _ -> parse_uri_query(Rest, State, M, P, << SoFar/binary, C >>)
+ end.
+
+skip_uri_fragment(<< C, Rest/bits >>, State, M, P, Q) ->
+ case C of
+ $\r -> error_terminate(400, State, {connection_error, protocol_error,
+ ''}); %% @todo
+ $\s -> parse_version(Rest, State, M, P, Q);
+ _ -> skip_uri_fragment(Rest, State, M, P, Q)
+ end.
+
+%% @todo Calls to parse_header should update the state.
+parse_version(<< "HTTP/1.1\r\n", Rest/bits >>, State, M, P, Q) ->
+ parse_headers(Rest, State, M, P, Q, 'HTTP/1.1');
+parse_version(<< "HTTP/1.0\r\n", Rest/bits >>, State, M, P, Q) ->
+ parse_headers(Rest, State, M, P, Q, 'HTTP/1.0');
+parse_version(<< "HTTP/1.", _, C, _/bits >>, State, _, _, _) when C =:= $\s; C =:= $\t ->
+ error_terminate(400, State, {connection_error, protocol_error,
+ 'Whitespace is not allowed after the HTTP version. (RFC7230 3.1.1)'});
+parse_version(<< C, _/bits >>, State, _, _, _) when C =:= $\s; C =:= $\t ->
+ error_terminate(400, State, {connection_error, protocol_error,
+ 'The separator between request target and version must be a single SP.'});
+parse_version(_, State, _, _, _) ->
+ error_terminate(505, State, {connection_error, protocol_error,
+ ''}). %% @todo
+
+parse_headers(Rest, State, M, P, Q, V) ->
+ %% @todo Figure out the parse states.
+ parse_header(Rest, State#state{in_state=#ps_header{
+ method=M, path=P, qs=Q, version=V}}, #{}).
+
+%% Headers.
+
+%% We need two or more bytes in the buffer to continue.
+parse_header(Rest, State=#state{in_state=PS}, Headers) when byte_size(Rest) < 2 ->
+ {more, State#state{in_state=PS#ps_header{headers=Headers}}, Rest};
+parse_header(<< $\r, $\n, Rest/bits >>, S, Headers) ->
+ request(Rest, S, Headers);
+parse_header(Buffer, State=#state{opts=Opts, in_state=PS}, Headers) ->
+ MaxLength = maps:get(max_header_name_length, Opts, 64),
+ MaxHeaders = maps:get(max_headers, Opts, 100),
+ case match_colon(Buffer, 0) of
+ nomatch when byte_size(Buffer) > MaxLength ->
+ error_terminate(400, State, {connection_error, limit_reached,
+ ''}); %% @todo
+ nomatch when length(Headers) >= MaxHeaders ->
+ error_terminate(400, State, {connection_error, limit_reached,
+ ''}); %% @todo
+ nomatch ->
+ {more, State#state{in_state=PS#ps_header{headers=Headers}}, Buffer};
+ _ ->
+ parse_hd_name(Buffer, State, Headers, <<>>)
+ end.
+
+match_colon(<< $:, _/bits >>, N) ->
+ N;
+match_colon(<< _, Rest/bits >>, N) ->
+ match_colon(Rest, N + 1);
+match_colon(_, _) ->
+ nomatch.
+
+parse_hd_name(<< $:, Rest/bits >>, State, H, SoFar) ->
+ parse_hd_before_value(Rest, State, H, SoFar);
+parse_hd_name(<< C, _/bits >>, State, _, <<>>) when ?IS_WS(C) ->
+ error_terminate(400, State, {connection_error, protocol_error,
+ ''}); %% @todo
+parse_hd_name(<< C, Rest/bits >>, State, H, SoFar) when ?IS_WS(C) ->
+ parse_hd_name_ws(Rest, State, H, SoFar);
+parse_hd_name(<< C, Rest/bits >>, State, H, SoFar) ->
+ ?LOWER(parse_hd_name, Rest, State, H, SoFar).
+
+parse_hd_name_ws(<< C, Rest/bits >>, S, H, Name) ->
+ case C of
+ $\s -> parse_hd_name_ws(Rest, S, H, Name);
+ $\t -> parse_hd_name_ws(Rest, S, H, Name);
+ $: -> parse_hd_before_value(Rest, S, H, Name)
+ end.
+
+parse_hd_before_value(<< $\s, Rest/bits >>, S, H, N) ->
+ parse_hd_before_value(Rest, S, H, N);
+parse_hd_before_value(<< $\t, Rest/bits >>, S, H, N) ->
+ parse_hd_before_value(Rest, S, H, N);
+parse_hd_before_value(Buffer, State=#state{opts=Opts, in_state=PS}, H, N) ->
+ MaxLength = maps:get(max_header_value_length, Opts, 4096),
+ case match_eol(Buffer, 0) of
+ nomatch when byte_size(Buffer) > MaxLength ->
+ error_terminate(400, State, {connection_error, limit_reached,
+ ''}); %% @todo
+ nomatch ->
+ {more, State#state{in_state=PS#ps_header{headers=H, name=N}}, Buffer};
+ _ ->
+ parse_hd_value(Buffer, State, H, N, <<>>)
+ end.
+
+parse_hd_value(<< $\r, $\n, Rest/bits >>, S, Headers, Name, SoFar) ->
+ %% @todo What to do about duplicate header names.
+ parse_header(Rest, S, Headers#{Name => clean_value_ws_end(SoFar, byte_size(SoFar) - 1)});
+parse_hd_value(<< C, Rest/bits >>, S, H, N, SoFar) ->
+ parse_hd_value(Rest, S, H, N, << SoFar/binary, C >>).
+
+clean_value_ws_end(_, -1) ->
+ <<>>;
+clean_value_ws_end(Value, N) ->
+ case binary:at(Value, N) of
+ $\s -> clean_value_ws_end(Value, N - 1);
+ $\t -> clean_value_ws_end(Value, N - 1);
+ _ ->
+ S = N + 1,
+ << Value2:S/binary, _/bits >> = Value,
+ Value2
+ end.
+
+-ifdef(TEST).
+clean_value_ws_end_test_() ->
+ Tests = [
+ {<<>>, <<>>},
+ {<<" ">>, <<>>},
+ {<<"text/*;q=0.3, text/html;q=0.7, text/html;level=1, "
+ "text/html;level=2;q=0.4, */*;q=0.5 \t \t ">>,
+ <<"text/*;q=0.3, text/html;q=0.7, text/html;level=1, "
+ "text/html;level=2;q=0.4, */*;q=0.5">>}
+ ],
+ [{V, fun() -> R = clean_value_ws_end(V, byte_size(V) - 1) end} || {V, R} <- Tests].
+
+horse_clean_value_ws_end() ->
+ horse:repeat(200000,
+ clean_value_ws_end(
+ <<"text/*;q=0.3, text/html;q=0.7, text/html;level=1, "
+ "text/html;level=2;q=0.4, */*;q=0.5 ">>,
+ byte_size(<<"text/*;q=0.3, text/html;q=0.7, text/html;level=1, "
+ "text/html;level=2;q=0.4, */*;q=0.5 ">>) - 1)
+ ).
+-endif.
+
+request(Buffer, State=#state{transport=Transport, in_streamid=StreamID,
+ in_state=#ps_header{version=Version}}, Headers) ->
+ case maps:get(<<"host">>, Headers, undefined) of
+ undefined when Version =:= 'HTTP/1.1' ->
+ %% @todo Might want to not close the connection on this and next one.
+ error_terminate(400, State, {stream_error, StreamID, protocol_error,
+ ''}); %% @todo
+ undefined ->
+ request(Buffer, State, Headers, <<>>, default_port(Transport:secure()));
+ RawHost ->
+ try parse_host(RawHost, false, <<>>) of
+ {Host, undefined} ->
+ request(Buffer, State, Headers, Host, default_port(Transport:secure()));
+ {Host, Port} ->
+ request(Buffer, State, Headers, Host, Port)
+ catch _:_ ->
+ error_terminate(400, State, {stream_error, StreamID, protocol_error,
+ ''}) %% @todo
+ end
+ end.
+
+-spec default_port(boolean()) -> 80 | 443.
+default_port(true) -> 443;
+default_port(_) -> 80.
+
+%% @todo Yeah probably just call the cowlib function.
+%% Same code as cow_http:parse_fullhost/1, but inline because we
+%% really want this to go fast.
+parse_host(<< $[, Rest/bits >>, false, <<>>) ->
+ parse_host(Rest, true, << $[ >>);
+parse_host(<<>>, false, Acc) ->
+ {Acc, undefined};
+parse_host(<< $:, Rest/bits >>, false, Acc) ->
+ {Acc, list_to_integer(binary_to_list(Rest))};
+parse_host(<< $], Rest/bits >>, true, Acc) ->
+ parse_host(Rest, false, << Acc/binary, $] >>);
+parse_host(<< C, Rest/bits >>, E, Acc) ->
+ ?LOWER(parse_host, Rest, E, Acc).
+
+%% End of request parsing.
+
+%% @todo We used to get the peername here, bad idea, should
+%% get it at the very start of the connection, or the first
+%% time requested if we go the route of handler sending a
+%% message to get it (we probably shouldn't).
+request(Buffer, State0=#state{ref=Ref, transport=Transport, in_streamid=StreamID,
+ in_state=#ps_header{method=Method, path=Path, qs=Qs, version=Version}},
+ Headers, Host, Port) ->
+ Scheme = case Transport:secure() of
+ true -> <<"https">>;
+ false -> <<"http">>
+ end,
+ {HasBody, BodyLength, TDecodeFun, TDecodeState} = case Headers of
+ #{<<"content-length">> := <<"0">>} ->
+ {false, 0, undefined, undefined};
+ #{<<"content-length">> := BinLength} ->
+ Length = try
+ cow_http_hd:parse_content_length(BinLength)
+ catch _:_ ->
+ error_terminate(400, State0, {stream_error, StreamID, protocol_error,
+ ''}) %% @todo
+ %% @todo Err should terminate here...
+ end,
+ {true, Length, fun cow_http_te:stream_identity/2, {0, Length}};
+ %% @todo Better handling of transfer decoding.
+ #{<<"transfer-encoding">> := <<"chunked">>} ->
+ {true, undefined, fun cow_http_te:stream_chunked/2, {0, 0}};
+ _ ->
+ {false, 0, undefined, undefined}
+ end,
+ Req = #{
+ ref => Ref,
+ pid => self(),
+ streamid => StreamID,
+
+ %% @todo peer
+ %% @todo sockname
+ %% @todo ssl client cert?
+
+ method => Method,
+ scheme => Scheme,
+ host => Host,
+ %% host_info (cowboy_router)
+ port => Port,
+ path => Path,
+ %% path_info (cowboy_router)
+ %% bindings (cowboy_router)
+ qs => Qs,
+ version => Version,
+ %% We are transparently taking care of transfer-encodings so
+ %% the user code has no need to know about it.
+ headers => maps:remove(<<"transfer-encoding">>, Headers),
+
+ has_body => HasBody,
+ body_length => BodyLength
+ %% @todo multipart? keep state separate
+
+ %% meta values (cowboy_websocket, cowboy_rest)
+ },
+ State = case HasBody of
+ true ->
+ cancel_request_timeout(State0#state{in_state=#ps_body{
+ %% @todo Don't need length anymore?
+ transfer_decode_fun = TDecodeFun,
+ transfer_decode_state = TDecodeState
+ }});
+ false ->
+ set_request_timeout(State0#state{in_streamid=StreamID + 1, in_state=#ps_request_line{}})
+ end,
+ {request, Req, State, Buffer}.
+
+%% Request body parsing.
+
+parse_body(Buffer, State=#state{in_streamid=StreamID, in_state=
+ PS=#ps_body{transfer_decode_fun=TDecode, transfer_decode_state=TState0}}) ->
+ %% @todo Proper trailers.
+ case TDecode(Buffer, TState0) of
+ more ->
+ %% @todo Asks for 0 or more bytes.
+ {more, State, Buffer};
+ {more, Data, TState} ->
+ %% @todo Asks for 0 or more bytes.
+ {data, StreamID, nofin, Data, State#state{in_state=
+ PS#ps_body{transfer_decode_state=TState}}, <<>>};
+ {more, Data, _Length, TState} when is_integer(_Length) ->
+ %% @todo Asks for Length more bytes.
+ {data, StreamID, nofin, Data, State#state{in_state=
+ PS#ps_body{transfer_decode_state=TState}}, <<>>};
+ {more, Data, Rest, TState} ->
+ %% @todo Asks for 0 or more bytes.
+ {data, StreamID, nofin, Data, State#state{in_state=
+ PS#ps_body{transfer_decode_state=TState}}, Rest};
+ {done, TotalLength, Rest} ->
+ {data, StreamID, {fin, TotalLength}, <<>>, set_request_timeout(
+ State#state{in_streamid=StreamID + 1, in_state=#ps_request_line{}}), Rest};
+ {done, Data, TotalLength, Rest} ->
+ {data, StreamID, {fin, TotalLength}, Data, set_request_timeout(
+ State#state{in_streamid=StreamID + 1, in_state=#ps_request_line{}}), Rest}
+ end.
+
+%% Message handling.
+
+down(State=#state{children=Children0}, Pid, Msg) ->
+ case lists:keytake(Pid, 1, Children0) of
+ {value, {_, undefined, _}, Children} ->
+ State#state{children=Children};
+ {value, {_, StreamID, _}, Children} ->
+ info(State#state{children=Children}, StreamID, Msg);
+ false ->
+ error_logger:error_msg("Received EXIT signal ~p for unknown process ~p.", [Msg, Pid]),
+ State
+ end.
+
+info(State=#state{handler=Handler, streams=Streams0}, StreamID, Msg) ->
+ case lists:keyfind(StreamID, #stream.id, Streams0) of
+ Stream = #stream{state=StreamState0} ->
+ try Handler:info(StreamID, Msg, StreamState0) of
+ {Commands, StreamState} ->
+ Streams = lists:keyreplace(StreamID, #stream.id, Streams0,
+ Stream#stream{state=StreamState}),
+ commands(State#state{streams=Streams}, StreamID, Commands)
+ catch Class:Reason ->
+ error_logger:error_msg("Exception occurred in ~s:info(~p, ~p, ~p) with reason ~p:~p.",
+ [Handler, StreamID, Msg, StreamState0, Class, Reason]),
+ ok
+%% @todo
+% stream_reset(State, StreamID, {internal_error, {Class, Reason},
+% 'Exception occurred in StreamHandler:info/3 call.'})
+ end;
+ false ->
+ error_logger:error_msg("Received message ~p for unknown stream ~p.", [Msg, StreamID]),
+ State
+ end.
+
+%% @todo commands/3
+%% @todo stream_reset
+
+
+
+
+%% Commands.
+
+commands(State, _, []) ->
+ State;
+%% Supervise a child process.
+commands(State=#state{children=Children}, StreamID, [{spawn, Pid, Shutdown}|Tail]) ->
+ commands(State#state{children=[{Pid, StreamID, Shutdown}|Children]}, StreamID, Tail);
+%% Error handling.
+commands(State, StreamID, [Error = {internal_error, _, _}|Tail]) ->
+ commands(stream_reset(State, StreamID, Error), StreamID, Tail);
+%% Commands for a stream currently inactive.
+commands(State=#state{out_streamid=Current, streams=Streams0}, StreamID, Commands)
+ when Current =/= StreamID ->
+
+ %% @todo We still want to handle some commands...
+
+ Stream = #stream{queue=Queue} = lists:keyfind(StreamID, #stream.id, Streams0),
+ Streams = lists:keyreplace(StreamID, #stream.id, Streams0,
+ Stream#stream{queue=Queue ++ Commands}),
+ State#state{streams=Streams};
+%% Read the request body.
+commands(State, StreamID, [{flow, _Length}|Tail]) ->
+ %% @todo We only read from socket if buffer is empty, otherwise
+ %% we decode the buffer.
+
+ %% @todo Set the body reading length to min(Length, BodyLength)
+
+ commands(State, StreamID, Tail);
+%% @todo Probably a good idea to have an atomic response send (single send call for resp+body).
+%% Send a full response.
+%%
+%% @todo Kill the stream if it sent a response when one has already been sent.
+%% @todo Keep IsFin in the state.
+%% @todo Same two things above apply to DATA, possibly promise too.
+commands(State0=#state{socket=Socket, transport=Transport, streams=Streams}, StreamID,
+ [{response, StatusCode, Headers0, Body}|Tail]) ->
+ %% @todo I'm pretty sure the last stream in the list is the one we want
+ %% considering all others are queued.
+ #stream{version=Version} = lists:keyfind(StreamID, #stream.id, Streams),
+ {State, Headers} = connection(State0, Headers0, StreamID, Version),
+ %% @todo Ensure content-length is set.
+ Response = cow_http:response(StatusCode, 'HTTP/1.1', maps:to_list(Headers)),
+ case Body of
+ {sendfile, O, B, P} ->
+ Transport:send(Socket, Response),
+ commands(State#state{out_state=done}, StreamID, [{sendfile, fin, O, B, P}|Tail]);
+ _ ->
+ Transport:send(Socket, [Response, Body]),
+ %% @todo If max number of requests, close connection.
+ %% @todo If IsFin, maybe skip body of current request.
+ maybe_terminate(State#state{out_state=done}, StreamID, Tail, fin)
+ end;
+%% Send response headers and initiate chunked encoding.
+commands(State0=#state{socket=Socket, transport=Transport, streams=Streams}, StreamID,
+ [{headers, StatusCode, Headers0}|Tail]) ->
+ %% @todo Same as above.
+ #stream{version=Version} = lists:keyfind(StreamID, #stream.id, Streams),
+ {State1, Headers1} = case Version of
+ 'HTTP/1.1' ->
+ {State0, Headers0#{<<"transfer-encoding">> => <<"chunked">>}};
+ %% Close the connection after streaming the data to HTTP/1.0 client.
+ %% @todo I'm guessing we need to differentiate responses with a content-length and others.
+ 'HTTP/1.0' ->
+ {State0#state{last_streamid=StreamID}, Headers0}
+ end,
+ {State, Headers} = connection(State1, Headers1, StreamID, Version),
+ Transport:send(Socket, cow_http:response(StatusCode, 'HTTP/1.1', maps:to_list(Headers))),
+ commands(State#state{out_state=chunked}, StreamID, Tail);
+%% Send a response body chunk.
+%%
+%% @todo WINDOW_UPDATE stuff require us to buffer some data.
+commands(State=#state{socket=Socket, transport=Transport, streams=Streams}, StreamID,
+ [{data, IsFin, Data}|Tail]) ->
+ %% @todo Same as above.
+ Headers1 = case lists:keyfind(StreamID, #stream.id, Streams) of
+ #stream{version='HTTP/1.1'} ->
+ Size = iolist_size(Data),
+ Transport:send(Socket, [integer_to_list(Size, 16), <<"\r\n">>, Data, <<"\r\n">>]);
+ #stream{version='HTTP/1.0'} ->
+ Transport:send(Socket, Data)
+ end,
+ maybe_terminate(State, StreamID, Tail, IsFin);
+%% Send a file.
+commands(State=#state{socket=Socket, transport=Transport}, StreamID,
+ [{sendfile, IsFin, Offset, Bytes, Path}|Tail]) ->
+ Transport:sendfile(Socket, Path, Offset, Bytes),
+ maybe_terminate(State, StreamID, Tail, IsFin);
+%% Protocol takeover.
+commands(State0=#state{ref=Ref, parent=Parent, socket=Socket, transport=Transport,
+ opts=Opts, children=Children}, StreamID,
+ [{switch_protocol, Headers, Protocol, InitialState}|_Tail]) ->
+ %% @todo This should be the last stream running otherwise we need to wait before switching.
+ %% @todo If there's streams opened after this one, fail instead of 101.
+ State = cancel_request_timeout(State0),
+ %% @todo When we actually do the upgrade, we only have the one stream left, plus
+ %% possibly some processes terminating. We need a smart strategy for handling the
+ %% children shutdown. We can start with brutal_kill and discarding the EXIT messages
+ %% received before switching to Websocket. Something better would be to let the
+ %% stream processes finish but that implies the Websocket module to know about
+ %% them and filter the messages. For now, kill them all and discard all messages
+ %% in the mailbox.
+ _ = [exit(Pid, kill) || {Pid, _, _} <- Children],
+ flush(),
+ %% Everything good, upgrade!
+ _ = commands(State, StreamID, [{response, 101, Headers, <<>>}]),
+ %% @todo This is no good because commands return a state normally and here it doesn't
+ %% we need to let this module go entirely. Perhaps it should be handled directly in
+ %% cowboy_clear/cowboy_tls? Perhaps not. We do want that Buffer.
+ Protocol:takeover(Parent, Ref, Socket, Transport, Opts, <<>>, InitialState);
+%% Stream shutdown.
+commands(State, StreamID, [stop|Tail]) ->
+ %% @todo Do we want to run the commands after a stop?
+% commands(stream_terminate(State, StreamID, stop), StreamID, Tail).
+ maybe_terminate(State, StreamID, Tail, fin).
+
+flush() ->
+ receive _ -> flush() after 0 -> ok end.
+
+maybe_terminate(State, StreamID, Tail, nofin) ->
+ commands(State, StreamID, Tail);
+%% @todo In these cases I'm not sure if we should continue processing commands.
+maybe_terminate(State=#state{last_streamid=StreamID}, StreamID, _Tail, fin) ->
+ terminate(stream_terminate(State, StreamID, normal), normal); %% @todo Reason ok?
+maybe_terminate(State, StreamID, _Tail, fin) ->
+ stream_terminate(State, StreamID, normal).
+
+stream_reset(State=#state{socket=Socket, transport=Transport}, StreamID,
+ StreamError={internal_error, _, _}) ->
+ %% @todo headers
+ %% @todo Don't send this if there are no streams left.
+ Transport:send(Socket, cow_http:response(500, 'HTTP/1.1', [
+ {<<"content-length">>, <<"0">>}
+ ])),
+ %% @todo update IsFin local
+ stream_terminate(State#state{out_state=done}, StreamID, StreamError).
+
+stream_terminate(State=#state{socket=Socket, transport=Transport, handler=Handler,
+ out_streamid=OutStreamID, out_state=OutState,
+ streams=Streams0, children=Children0}, StreamID, Reason) ->
+ {value, #stream{state=StreamState, version=Version}, Streams}
+ = lists:keytake(StreamID, #stream.id, Streams0),
+ _ = case OutState of
+ wait ->
+ Transport:send(Socket, cow_http:response(204, 'HTTP/1.1', []));
+ chunked when Version =:= 'HTTP/1.1' ->
+ Transport:send(Socket, <<"0\r\n\r\n">>);
+ _ -> %% done or Version =:= 'HTTP/1.0'
+ ok
+ end,
+
+ stream_call_terminate(StreamID, Reason, Handler, StreamState),
+%% @todo initiate children shutdown
+% Children = stream_terminate_children(Children0, StreamID, []),
+ Children = [case C of
+ {Pid, StreamID, Shutdown} -> {Pid, undefined, Shutdown};
+ _ -> C
+ end || C <- Children0],
+
+ %% @todo Skip the body, if any, or drop the connection if too large.
+
+ %% @todo Only do this if Current =:= StreamID.
+ NextOutStreamID = OutStreamID + 1,
+ case lists:keyfind(NextOutStreamID, #stream.id, Streams) of
+ false ->
+ %% @todo This is clearly wrong, if the stream is gone we need to check if
+ %% there used to be such a stream, and if there was to send an error.
+ State#state{out_streamid=NextOutStreamID, out_state=wait, streams=Streams, children=Children};
+ #stream{queue=Commands} ->
+ %% @todo Remove queue from the stream.
+ commands(State#state{out_streamid=NextOutStreamID, out_state=wait,
+ streams=Streams, children=Children}, NextOutStreamID, Commands)
+ end.
+
+%% @todo Taken directly from _http2
+stream_call_terminate(StreamID, Reason, Handler, StreamState) ->
+ try
+ Handler:terminate(StreamID, Reason, StreamState),
+ ok
+ catch Class:Reason ->
+ error_logger:error_msg("Exception occurred in ~s:terminate(~p, ~p, ~p) with reason ~p:~p.",
+ [Handler, StreamID, Reason, StreamState, Class, Reason])
+ end.
+
+%stream_terminate_children([], _, Acc) ->
+% Acc;
+%stream_terminate_children([{Pid, StreamID}|Tail], StreamID, Acc) ->
+% exit(Pid, kill),
+% stream_terminate_children(Tail, StreamID, Acc);
+%stream_terminate_children([Child|Tail], StreamID, Acc) ->
+% stream_terminate_children(Tail, StreamID, [Child|Acc]).
+
+
+%% @todo max_reqs also
+maybe_req_close(_, #{<<"connection">> := Conn}, 'HTTP/1.0') ->
+ Conns = cow_http_hd:parse_connection(Conn),
+ case lists:member(<<"keep-alive">>, Conns) of
+ true -> keepalive;
+ false -> close
+ end;
+maybe_req_close(_, _, 'HTTP/1.0') ->
+ close;
+maybe_req_close(_, #{<<"connection">> := Conn}, 'HTTP/1.1') ->
+ case connection_hd_is_close(Conn) of
+ true -> close;
+ false -> keepalive
+ end;
+maybe_req_close(_State, _, _) ->
+ keepalive.
+
+connection(State=#state{last_streamid=StreamID}, Headers=#{<<"connection">> := Conn}, StreamID, _) ->
+ case connection_hd_is_close(Conn) of
+ true -> {State, Headers};
+ %% @todo Here we need to remove keep-alive and add close, not just add close.
+ false -> {State, Headers#{<<"connection">> => [<<"close, ">>, Conn]}}
+ end;
+connection(State=#state{last_streamid=StreamID}, Headers, StreamID, _) ->
+ {State, Headers#{<<"connection">> => <<"close">>}};
+connection(State, Headers=#{<<"connection">> := Conn}, StreamID, _) ->
+ case connection_hd_is_close(Conn) of
+ true -> {State#state{last_streamid=StreamID}, Headers};
+ %% @todo Here we need to set keep-alive only if it wasn't set before.
+ false -> {State, Headers}
+ end;
+connection(State, Headers, _, 'HTTP/1.0') ->
+ {State, Headers#{<<"connection">> => <<"keep-alive">>}};
+connection(State, Headers, _, _) ->
+ {State, Headers}.
+
+connection_hd_is_close(Conn) ->
+ Conns = cow_http_hd:parse_connection(iolist_to_binary(Conn)),
+ lists:member(<<"close">>, Conns).
+
+error_terminate(StatusCode, State=#state{socket=Socket, transport=Transport}, Reason) ->
+ Transport:send(Socket, cow_http:response(StatusCode, 'HTTP/1.1', [
+ {<<"content-length">>, <<"0">>}
+ ])),
+ terminate(State, Reason).
+
+terminate(_State, _Reason) ->
+ exit(normal). %% @todo
+
+
+
+
+
+
+
+
+
+
+
+
+
+%% System callbacks.
+
+-spec system_continue(_, _, #state{}) -> ok.
+system_continue(_, _, {State, Buffer}) ->
+ loop(State, Buffer).
+
+-spec system_terminate(any(), _, _, _) -> no_return().
+system_terminate(Reason, _, _, _) ->
+ exit(Reason).
+
+-spec system_code_change(Misc, _, _, _) -> {ok, Misc} when Misc::{#state{}, binary()}.
+system_code_change(Misc, _, _, _) ->
+ {ok, Misc}.
diff --git a/src/cowboy_http2.erl b/src/cowboy_http2.erl
index 6018461..42d8ba5 100644
--- a/src/cowboy_http2.erl
+++ b/src/cowboy_http2.erl
@@ -86,8 +86,7 @@ init(Parent, Ref, Socket, Transport, Opts, Handler) ->
before_loop(State, Buffer) ->
loop(State, Buffer).
-loop(State=#state{parent=Parent, socket=Socket, transport=Transport,
- handler=Handler, children=Children}, Buffer) ->
+loop(State=#state{parent=Parent, socket=Socket, transport=Transport, children=Children}, Buffer) ->
Transport:setopts(Socket, [{active, once}]),
{OK, Closed, Error} = Transport:messages(),
receive
@@ -104,7 +103,7 @@ loop(State=#state{parent=Parent, socket=Socket, transport=Transport,
{system, From, Request} ->
sys:handle_system_msg(Request, From, Parent, ?MODULE, [], {State, Buffer});
%% Messages pertaining to a stream.
- {{Handler, StreamID}, Msg} ->
+ {{Pid, StreamID}, Msg} when Pid =:= self() ->
loop(info(State, StreamID, Msg), Buffer);
%% Exit signal from children.
Msg = {'EXIT', Pid, _} ->
@@ -328,8 +327,8 @@ commands(State0=#state{socket=Socket, transport=Transport, server_streamid=Promi
commands(State, StreamID, [{flow, _Size}|Tail]) ->
commands(State, StreamID, Tail);
%% Supervise a child process.
-commands(State=#state{children=Children}, StreamID, [{spawn, Pid}|Tail]) ->
- commands(State#state{children=[{Pid, StreamID}|Children]}, StreamID, Tail);
+commands(State=#state{children=Children}, StreamID, [{spawn, Pid, _Shutdown}|Tail]) -> %% @todo Shutdown
+ commands(State#state{children=[{Pid, StreamID}|Children]}, StreamID, Tail);
%% Upgrade to a new protocol.
%%
%% @todo Implementation.
@@ -357,7 +356,7 @@ terminate_all_streams([#stream{id=StreamID, state=StreamState}|Tail], Reason, Ha
%% Stream functions.
-stream_init(State0=#state{socket=Socket, transport=Transport, handler=Handler,
+stream_init(State0=#state{ref=Ref, socket=Socket, transport=Transport, handler=Handler, opts=Opts,
streams=Streams0, decode_state=DecodeState0}, StreamID, IsFin, HeaderBlock) ->
%% @todo Add clause for CONNECT requests (no scheme/path).
try headers_decode(HeaderBlock, DecodeState0) of
@@ -365,10 +364,46 @@ stream_init(State0=#state{socket=Socket, transport=Transport, handler=Handler,
<<":method">> := Method,
<<":scheme">> := Scheme,
<<":authority">> := Authority,
- <<":path">> := Path}, DecodeState} ->
+ <<":path">> := PathWithQs}, DecodeState} ->
State = State0#state{decode_state=DecodeState},
Headers = maps:without([<<":method">>, <<":scheme">>, <<":authority">>, <<":path">>], Headers0),
- try Handler:init(StreamID, IsFin, Method, Scheme, Authority, Path, Headers) of
+ %% @todo We need to parse the port out of :authority.
+ %% @todo We need to parse the query string out of :path.
+ %% @todo We need to give a way to get the socket infos.
+
+ Host = Authority, %% @todo
+ Port = todo, %% @todo
+ Path = PathWithQs, %% @todo
+ Qs = todo, %% @todo
+
+ Req = #{
+ ref => Ref,
+ pid => self(),
+ streamid => StreamID,
+
+ %% @todo peer
+ %% @todo sockname
+ %% @todo ssl client cert?
+
+ method => Method,
+ scheme => Scheme,
+ host => Host,
+ %% host_info (cowboy_router)
+ port => Port,
+ path => Path,
+ %% path_info (cowboy_router)
+ %% bindings (cowboy_router)
+ qs => Qs,
+ version => 'HTTP/2',
+ headers => Headers,
+
+ has_body => IsFin =:= nofin
+ %% @todo multipart? keep state separate
+
+ %% meta values (cowboy_websocket, cowboy_rest)
+ },
+
+ try Handler:init(StreamID, Req, Opts) of
{Commands, StreamState} ->
Streams = [#stream{id=StreamID, state=StreamState}|Streams0],
commands(State#state{streams=Streams}, StreamID, Commands)
@@ -377,7 +412,7 @@ stream_init(State0=#state{socket=Socket, transport=Transport, handler=Handler,
"with reason ~p:~p.",
[Handler, StreamID, IsFin, Method, Scheme, Authority, Path, Headers, Class, Reason]),
stream_reset(State, StreamID, {internal_error, {Class, Reason},
- 'Exception occurred in StreamHandler:init/7 call.'})
+ 'Exception occurred in StreamHandler:init/7 call.'}) %% @todo Check final arity.
end;
{_, DecodeState} ->
Transport:send(Socket, cow_http2:rst_stream(StreamID, protocol_error)),
diff --git a/src/cowboy_loop.erl b/src/cowboy_loop.erl
index 695439a..e162417 100644
--- a/src/cowboy_loop.erl
+++ b/src/cowboy_loop.erl
@@ -60,15 +60,21 @@ upgrade(Req, Env, Handler, HandlerState, Timeout, run) ->
State2 = timeout(State),
after_call(Req, State2, Handler, HandlerState);
upgrade(Req, Env, Handler, HandlerState, Timeout, hibernate) ->
+
+% dbg:start(),
+% dbg:tracer(),
+% dbg:tpl(?MODULE, []),
+% dbg:tpl(long_polling_h, []),
+% dbg:tpl(loop_handler_body_h, []),
+% dbg:tpl(cowboy_req, []),
+% dbg:p(all, c),
+
State = #state{env=Env, max_buffer=get_max_buffer(Env), hibernate=true, timeout=Timeout},
State2 = timeout(State),
after_call(Req, State2, Handler, HandlerState).
-get_max_buffer(Env) ->
- case lists:keyfind(loop_max_buffer, 1, Env) of
- false -> 5000;
- {_, MaxBuffer} -> MaxBuffer
- end.
+get_max_buffer(#{loop_max_buffer := MaxBuffer}) -> MaxBuffer;
+get_max_buffer(_) -> 5000.
%% Update the state if the response was sent in the callback.
after_call(Req, State=#state{resp_sent=false}, Handler,
@@ -83,12 +89,21 @@ after_call(Req, State, Handler, HandlerState) ->
before_loop(Req, State, Handler, HandlerState).
before_loop(Req, State=#state{hibernate=true}, Handler, HandlerState) ->
- [Socket, Transport] = cowboy_req:get([socket, transport], Req),
- Transport:setopts(Socket, [{active, once}]),
+
+ %% @todo Yeah we can't get the socket anymore.
+ %% Everything changes since we are a separate process now.
+ %% Proper flow control at the connection level should be implemented
+ %% instead of what we have here.
+
+% [Socket, Transport] = cowboy_req:get([socket, transport], Req),
+% Transport:setopts(Socket, [{active, once}]),
{suspend, ?MODULE, loop, [Req, State#state{hibernate=false}, Handler, HandlerState]};
before_loop(Req, State, Handler, HandlerState) ->
- [Socket, Transport] = cowboy_req:get([socket, transport], Req),
- Transport:setopts(Socket, [{active, once}]),
+
+ %% Same here.
+
+% [Socket, Transport] = cowboy_req:get([socket, transport], Req),
+% Transport:setopts(Socket, [{active, once}]),
loop(Req, State, Handler, HandlerState).
%% Almost the same code can be found in cowboy_websocket.
@@ -109,26 +124,26 @@ timeout(State=#state{timeout=Timeout,
loop(Req, State=#state{buffer_size=NbBytes,
max_buffer=Threshold, timeout_ref=TRef,
resp_sent=RespSent}, Handler, HandlerState) ->
- [Socket, Transport] = cowboy_req:get([socket, transport], Req),
- {OK, Closed, Error} = Transport:messages(),
+% [Socket, Transport] = cowboy_req:get([socket, transport], Req),
+% {OK, Closed, Error} = Transport:messages(),
receive
- {OK, Socket, Data} ->
- NbBytes2 = NbBytes + byte_size(Data),
- if NbBytes2 > Threshold ->
- _ = if RespSent -> ok; true ->
- cowboy_req:reply(500, Req)
- end,
- cowboy_handler:terminate({error, overflow}, Req, HandlerState, Handler),
- exit(normal);
- true ->
- Req2 = cowboy_req:append_buffer(Data, Req),
- State2 = timeout(State#state{buffer_size=NbBytes2}),
- before_loop(Req2, State2, Handler, HandlerState)
- end;
- {Closed, Socket} ->
- terminate(Req, State, Handler, HandlerState, {error, closed});
- {Error, Socket, Reason} ->
- terminate(Req, State, Handler, HandlerState, {error, Reason});
+% {OK, Socket, Data} ->
+% NbBytes2 = NbBytes + byte_size(Data),
+% if NbBytes2 > Threshold ->
+% _ = if RespSent -> ok; true ->
+% cowboy_req:reply(500, Req)
+% end,
+% cowboy_handler:terminate({error, overflow}, Req, HandlerState, Handler),
+% exit(normal);
+% true ->
+% Req2 = cowboy_req:append_buffer(Data, Req),
+% State2 = timeout(State#state{buffer_size=NbBytes2}),
+% before_loop(Req2, State2, Handler, HandlerState)
+% end;
+% {Closed, Socket} ->
+% terminate(Req, State, Handler, HandlerState, {error, closed});
+% {Error, Socket, Reason} ->
+% terminate(Req, State, Handler, HandlerState, {error, Reason});
{timeout, TRef, ?MODULE} ->
after_loop(Req, State, Handler, HandlerState, timeout);
{timeout, OlderTRef, ?MODULE} when is_reference(OlderTRef) ->
@@ -139,13 +154,13 @@ loop(Req, State=#state{buffer_size=NbBytes,
%% data received after that and put it into the buffer.
%% We do not check the size here, if data keeps coming
%% we'll error out on the next packet received.
- Transport:setopts(Socket, [{active, false}]),
- Req2 = receive {OK, Socket, Data} ->
- cowboy_req:append_buffer(Data, Req)
- after 0 ->
- Req
- end,
- call(Req2, State, Handler, HandlerState, Message)
+% Transport:setopts(Socket, [{active, false}]),
+% Req2 = receive {OK, Socket, Data} ->
+% cowboy_req:append_buffer(Data, Req)
+% after 0 ->
+% Req
+% end,
+ call(Req, State, Handler, HandlerState, Message)
end.
call(Req, State=#state{resp_sent=RespSent},
@@ -168,7 +183,7 @@ call(Req, State=#state{resp_sent=RespSent},
{reason, Reason},
{mfa, {Handler, info, 3}},
{stacktrace, Stacktrace},
- {req, cowboy_req:to_list(Req)},
+ {req, Req},
{state, HandlerState}
]})
end.
diff --git a/src/cowboy_protocol.erl b/src/cowboy_protocol.erl
deleted file mode 100644
index 90128c3..0000000
--- a/src/cowboy_protocol.erl
+++ /dev/null
@@ -1,534 +0,0 @@
-%% Copyright (c) 2011-2014, Loïc Hoguin <[email protected]>
-%% Copyright (c) 2011, Anthony Ramine <[email protected]>
-%%
-%% Permission to use, copy, modify, and/or distribute this software for any
-%% purpose with or without fee is hereby granted, provided that the above
-%% copyright notice and this permission notice appear in all copies.
-%%
-%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
-%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
-%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
-%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
-%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
-%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
-%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
-
--module(cowboy_protocol).
-
-%% API.
--export([start_link/4]).
-
-%% Internal.
--export([init/4]).
--export([parse_request/3]).
--export([resume/6]).
-
--type opts() :: [{compress, boolean()}
- | {env, cowboy_middleware:env()}
- | {max_empty_lines, non_neg_integer()}
- | {max_header_name_length, non_neg_integer()}
- | {max_header_value_length, non_neg_integer()}
- | {max_headers, non_neg_integer()}
- | {max_keepalive, non_neg_integer()}
- | {max_request_line_length, non_neg_integer()}
- | {middlewares, [module()]}
- | {onresponse, cowboy:onresponse_fun()}
- | {timeout, timeout()}].
--export_type([opts/0]).
-
--record(state, {
- socket :: inet:socket(),
- transport :: module(),
- middlewares :: [module()],
- compress :: boolean(),
- env :: cowboy_middleware:env(),
- onresponse = undefined :: undefined | cowboy:onresponse_fun(),
- max_empty_lines :: non_neg_integer(),
- req_keepalive = 1 :: non_neg_integer(),
- max_keepalive :: non_neg_integer(),
- max_request_line_length :: non_neg_integer(),
- max_header_name_length :: non_neg_integer(),
- max_header_value_length :: non_neg_integer(),
- max_headers :: non_neg_integer(),
- timeout :: timeout(),
- until :: non_neg_integer() | infinity
-}).
-
--include_lib("cowlib/include/cow_inline.hrl").
--include_lib("cowlib/include/cow_parse.hrl").
-
-%% API.
-
--spec start_link(ranch:ref(), inet:socket(), module(), opts()) -> {ok, pid()}.
-start_link(Ref, Socket, Transport, Opts) ->
- Pid = spawn_link(?MODULE, init, [Ref, Socket, Transport, Opts]),
- {ok, Pid}.
-
-%% Internal.
-
-%% Faster alternative to proplists:get_value/3.
-get_value(Key, Opts, Default) ->
- case lists:keyfind(Key, 1, Opts) of
- {_, Value} -> Value;
- _ -> Default
- end.
-
--spec init(ranch:ref(), inet:socket(), module(), opts()) -> ok.
-init(Ref, Socket, Transport, Opts) ->
- ok = ranch:accept_ack(Ref),
- Timeout = get_value(timeout, Opts, 5000),
- Until = until(Timeout),
- case recv(Socket, Transport, Until) of
- {ok, Data} ->
- OnFirstRequest = get_value(onfirstrequest, Opts, undefined),
- case OnFirstRequest of
- undefined -> ok;
- _ -> OnFirstRequest(Ref, Socket, Transport, Opts)
- end,
- Compress = get_value(compress, Opts, false),
- MaxEmptyLines = get_value(max_empty_lines, Opts, 5),
- MaxHeaderNameLength = get_value(max_header_name_length, Opts, 64),
- MaxHeaderValueLength = get_value(max_header_value_length, Opts, 4096),
- MaxHeaders = get_value(max_headers, Opts, 100),
- MaxKeepalive = get_value(max_keepalive, Opts, 100),
- MaxRequestLineLength = get_value(max_request_line_length, Opts, 4096),
- Middlewares = get_value(middlewares, Opts, [cowboy_router, cowboy_handler]),
- Env = [{listener, Ref}|get_value(env, Opts, [])],
- OnResponse = get_value(onresponse, Opts, undefined),
- parse_request(Data, #state{socket=Socket, transport=Transport,
- middlewares=Middlewares, compress=Compress, env=Env,
- max_empty_lines=MaxEmptyLines, max_keepalive=MaxKeepalive,
- max_request_line_length=MaxRequestLineLength,
- max_header_name_length=MaxHeaderNameLength,
- max_header_value_length=MaxHeaderValueLength, max_headers=MaxHeaders,
- onresponse=OnResponse, timeout=Timeout, until=Until}, 0);
- {error, _} ->
- terminate(#state{socket=Socket, transport=Transport}) %% @todo ridiculous
- end.
-
--spec until(timeout()) -> non_neg_integer() | infinity.
-until(infinity) ->
- infinity;
-until(Timeout) ->
- erlang:monotonic_time(milli_seconds) + Timeout.
-
-%% Request parsing.
-%%
-%% The next set of functions is the request parsing code. All of it
-%% runs using a single binary match context. This optimization ends
-%% right after the header parsing is finished and the code becomes
-%% more interesting past that point.
-
--spec recv(inet:socket(), module(), non_neg_integer() | infinity)
- -> {ok, binary()} | {error, closed | timeout | atom()}.
-recv(Socket, Transport, infinity) ->
- Transport:recv(Socket, 0, infinity);
-recv(Socket, Transport, Until) ->
- Timeout = Until - erlang:monotonic_time(milli_seconds),
- if Timeout < 0 ->
- {error, timeout};
- true ->
- Transport:recv(Socket, 0, Timeout)
- end.
-
--spec wait_request(binary(), #state{}, non_neg_integer()) -> ok.
-wait_request(Buffer, State=#state{socket=Socket, transport=Transport,
- until=Until}, ReqEmpty) ->
- case recv(Socket, Transport, Until) of
- {ok, Data} ->
- parse_request(<< Buffer/binary, Data/binary >>, State, ReqEmpty);
- {error, _} ->
- terminate(State)
- end.
-
--spec parse_request(binary(), #state{}, non_neg_integer()) -> ok.
-%% Empty lines must be using \r\n.
-parse_request(<< $\n, _/bits >>, State, _) ->
- error_terminate(400, State);
-parse_request(<< $\s, _/bits >>, State, _) ->
- error_terminate(400, State);
-%% We limit the length of the Request-line to MaxLength to avoid endlessly
-%% reading from the socket and eventually crashing.
-parse_request(Buffer, State=#state{max_request_line_length=MaxLength,
- max_empty_lines=MaxEmpty}, ReqEmpty) ->
- case match_eol(Buffer, 0) of
- nomatch when byte_size(Buffer) > MaxLength ->
- error_terminate(414, State);
- nomatch ->
- wait_request(Buffer, State, ReqEmpty);
- 1 when ReqEmpty =:= MaxEmpty ->
- error_terminate(400, State);
- 1 ->
- << _:16, Rest/bits >> = Buffer,
- parse_request(Rest, State, ReqEmpty + 1);
- _ ->
- parse_method(Buffer, State, <<>>)
- end.
-
-match_eol(<< $\n, _/bits >>, N) ->
- N;
-match_eol(<< _, Rest/bits >>, N) ->
- match_eol(Rest, N + 1);
-match_eol(_, _) ->
- nomatch.
-
-parse_method(<< C, Rest/bits >>, State, SoFar) ->
- case C of
- $\r -> error_terminate(400, State);
- $\s -> parse_uri(Rest, State, SoFar);
- _ -> parse_method(Rest, State, << SoFar/binary, C >>)
- end.
-
-parse_uri(<< $\r, _/bits >>, State, _) ->
- error_terminate(400, State);
-parse_uri(<< $\s, _/bits >>, State, _) ->
- error_terminate(400, State);
-parse_uri(<< "* ", Rest/bits >>, State, Method) ->
- parse_version(Rest, State, Method, <<"*">>, <<>>);
-parse_uri(<< "http://", Rest/bits >>, State, Method) ->
- parse_uri_skip_host(Rest, State, Method);
-parse_uri(<< "https://", Rest/bits >>, State, Method) ->
- parse_uri_skip_host(Rest, State, Method);
-parse_uri(<< "HTTP://", Rest/bits >>, State, Method) ->
- parse_uri_skip_host(Rest, State, Method);
-parse_uri(<< "HTTPS://", Rest/bits >>, State, Method) ->
- parse_uri_skip_host(Rest, State, Method);
-parse_uri(Buffer, State, Method) ->
- parse_uri_path(Buffer, State, Method, <<>>).
-
-parse_uri_skip_host(<< C, Rest/bits >>, State, Method) ->
- case C of
- $\r -> error_terminate(400, State);
- $/ -> parse_uri_path(Rest, State, Method, <<"/">>);
- $\s -> parse_version(Rest, State, Method, <<"/">>, <<>>);
- $? -> parse_uri_query(Rest, State, Method, <<"/">>, <<>>);
- $# -> skip_uri_fragment(Rest, State, Method, <<"/">>, <<>>);
- _ -> parse_uri_skip_host(Rest, State, Method)
- end.
-
-parse_uri_path(<< C, Rest/bits >>, State, Method, SoFar) ->
- case C of
- $\r -> error_terminate(400, State);
- $\s -> parse_version(Rest, State, Method, SoFar, <<>>);
- $? -> parse_uri_query(Rest, State, Method, SoFar, <<>>);
- $# -> skip_uri_fragment(Rest, State, Method, SoFar, <<>>);
- _ -> parse_uri_path(Rest, State, Method, << SoFar/binary, C >>)
- end.
-
-parse_uri_query(<< C, Rest/bits >>, S, M, P, SoFar) ->
- case C of
- $\r -> error_terminate(400, S);
- $\s -> parse_version(Rest, S, M, P, SoFar);
- $# -> skip_uri_fragment(Rest, S, M, P, SoFar);
- _ -> parse_uri_query(Rest, S, M, P, << SoFar/binary, C >>)
- end.
-
-skip_uri_fragment(<< C, Rest/bits >>, S, M, P, Q) ->
- case C of
- $\r -> error_terminate(400, S);
- $\s -> parse_version(Rest, S, M, P, Q);
- _ -> skip_uri_fragment(Rest, S, M, P, Q)
- end.
-
-parse_version(<< "HTTP/1.1\r\n", Rest/bits >>, S, M, P, Q) ->
- parse_header(Rest, S, M, P, Q, 'HTTP/1.1', []);
-parse_version(<< "HTTP/1.0\r\n", Rest/bits >>, S, M, P, Q) ->
- parse_header(Rest, S, M, P, Q, 'HTTP/1.0', []);
-parse_version(_, State, _, _, _) ->
- error_terminate(505, State).
-
-%% Stop receiving data if we have more than allowed number of headers.
-wait_header(_, State=#state{max_headers=MaxHeaders}, _, _, _, _, Headers)
- when length(Headers) >= MaxHeaders ->
- error_terminate(400, State);
-wait_header(Buffer, State=#state{socket=Socket, transport=Transport,
- until=Until}, M, P, Q, V, H) ->
- case recv(Socket, Transport, Until) of
- {ok, Data} ->
- parse_header(<< Buffer/binary, Data/binary >>,
- State, M, P, Q, V, H);
- {error, timeout} ->
- error_terminate(408, State);
- {error, _} ->
- terminate(State)
- end.
-
-parse_header(<< $\r, $\n, Rest/bits >>, S, M, P, Q, V, Headers) ->
- request(Rest, S, M, P, Q, V, lists:reverse(Headers));
-parse_header(Buffer, State=#state{max_header_name_length=MaxLength},
- M, P, Q, V, H) ->
- case match_colon(Buffer, 0) of
- nomatch when byte_size(Buffer) > MaxLength ->
- error_terminate(400, State);
- nomatch ->
- wait_header(Buffer, State, M, P, Q, V, H);
- _ ->
- parse_hd_name(Buffer, State, M, P, Q, V, H, <<>>)
- end.
-
-match_colon(<< $:, _/bits >>, N) ->
- N;
-match_colon(<< _, Rest/bits >>, N) ->
- match_colon(Rest, N + 1);
-match_colon(_, _) ->
- nomatch.
-
-parse_hd_name(<< $:, Rest/bits >>, S, M, P, Q, V, H, SoFar) ->
- parse_hd_before_value(Rest, S, M, P, Q, V, H, SoFar);
-parse_hd_name(<< C, Rest/bits >>, S, M, P, Q, V, H, SoFar) when ?IS_WS(C) ->
- parse_hd_name_ws(Rest, S, M, P, Q, V, H, SoFar);
-parse_hd_name(<< C, Rest/bits >>, S, M, P, Q, V, H, SoFar) ->
- ?LOWER(parse_hd_name, Rest, S, M, P, Q, V, H, SoFar).
-
-parse_hd_name_ws(<< C, Rest/bits >>, S, M, P, Q, V, H, Name) ->
- case C of
- $\s -> parse_hd_name_ws(Rest, S, M, P, Q, V, H, Name);
- $\t -> parse_hd_name_ws(Rest, S, M, P, Q, V, H, Name);
- $: -> parse_hd_before_value(Rest, S, M, P, Q, V, H, Name)
- end.
-
-wait_hd_before_value(Buffer, State=#state{
- socket=Socket, transport=Transport, until=Until},
- M, P, Q, V, H, N) ->
- case recv(Socket, Transport, Until) of
- {ok, Data} ->
- parse_hd_before_value(<< Buffer/binary, Data/binary >>,
- State, M, P, Q, V, H, N);
- {error, timeout} ->
- error_terminate(408, State);
- {error, _} ->
- terminate(State)
- end.
-
-parse_hd_before_value(<< $\s, Rest/bits >>, S, M, P, Q, V, H, N) ->
- parse_hd_before_value(Rest, S, M, P, Q, V, H, N);
-parse_hd_before_value(<< $\t, Rest/bits >>, S, M, P, Q, V, H, N) ->
- parse_hd_before_value(Rest, S, M, P, Q, V, H, N);
-parse_hd_before_value(Buffer, State=#state{
- max_header_value_length=MaxLength}, M, P, Q, V, H, N) ->
- case match_eol(Buffer, 0) of
- nomatch when byte_size(Buffer) > MaxLength ->
- error_terminate(400, State);
- nomatch ->
- wait_hd_before_value(Buffer, State, M, P, Q, V, H, N);
- _ ->
- parse_hd_value(Buffer, State, M, P, Q, V, H, N, <<>>)
- end.
-
-%% We completely ignore the first argument which is always
-%% the empty binary. We keep it there because we don't want
-%% to change the other arguments' position and trigger costy
-%% operations for no reasons.
-wait_hd_value(_, State=#state{
- socket=Socket, transport=Transport, until=Until},
- M, P, Q, V, H, N, SoFar) ->
- case recv(Socket, Transport, Until) of
- {ok, Data} ->
- parse_hd_value(Data, State, M, P, Q, V, H, N, SoFar);
- {error, timeout} ->
- error_terminate(408, State);
- {error, _} ->
- terminate(State)
- end.
-
-%% Pushing back as much as we could the retrieval of new data
-%% to check for multilines allows us to avoid a few tests in
-%% the critical path, but forces us to have a special function.
-wait_hd_value_nl(_, State=#state{
- socket=Socket, transport=Transport, until=Until},
- M, P, Q, V, Headers, Name, SoFar) ->
- case recv(Socket, Transport, Until) of
- {ok, << C, Data/bits >>} when C =:= $\s; C =:= $\t ->
- parse_hd_value(Data, State, M, P, Q, V, Headers, Name, SoFar);
- {ok, Data} ->
- parse_header(Data, State, M, P, Q, V, [{Name, SoFar}|Headers]);
- {error, timeout} ->
- error_terminate(408, State);
- {error, _} ->
- terminate(State)
- end.
-
-parse_hd_value(<< $\r, Rest/bits >>, S, M, P, Q, V, Headers, Name, SoFar) ->
- case Rest of
- << $\n >> ->
- wait_hd_value_nl(<<>>, S, M, P, Q, V, Headers, Name, SoFar);
- << $\n, C, Rest2/bits >> when C =:= $\s; C =:= $\t ->
- parse_hd_value(Rest2, S, M, P, Q, V, Headers, Name,
- << SoFar/binary, C >>);
- << $\n, Rest2/bits >> ->
- parse_header(Rest2, S, M, P, Q, V, [{Name, clean_value_ws_end(SoFar, byte_size(SoFar) - 1)}|Headers])
- end;
-parse_hd_value(<< C, Rest/bits >>, S, M, P, Q, V, H, N, SoFar) ->
- parse_hd_value(Rest, S, M, P, Q, V, H, N, << SoFar/binary, C >>);
-parse_hd_value(<<>>, State=#state{max_header_value_length=MaxLength},
- _, _, _, _, _, _, SoFar) when byte_size(SoFar) > MaxLength ->
- error_terminate(400, State);
-parse_hd_value(<<>>, S, M, P, Q, V, H, N, SoFar) ->
- wait_hd_value(<<>>, S, M, P, Q, V, H, N, SoFar).
-
-clean_value_ws_end(_, -1) ->
- <<>>;
-clean_value_ws_end(Value, N) ->
- case binary:at(Value, N) of
- $\s -> clean_value_ws_end(Value, N - 1);
- $\t -> clean_value_ws_end(Value, N - 1);
- _ ->
- S = N + 1,
- << Value2:S/binary, _/bits >> = Value,
- Value2
- end.
-
--ifdef(TEST).
-clean_value_ws_end_test_() ->
- Tests = [
- {<<>>, <<>>},
- {<<" ">>, <<>>},
- {<<"text/*;q=0.3, text/html;q=0.7, text/html;level=1, "
- "text/html;level=2;q=0.4, */*;q=0.5 \t \t ">>,
- <<"text/*;q=0.3, text/html;q=0.7, text/html;level=1, "
- "text/html;level=2;q=0.4, */*;q=0.5">>}
- ],
- [{V, fun() -> R = clean_value_ws_end(V, byte_size(V) - 1) end} || {V, R} <- Tests].
--endif.
-
--ifdef(PERF).
-horse_clean_value_ws_end() ->
- horse:repeat(200000,
- clean_value_ws_end(
- <<"text/*;q=0.3, text/html;q=0.7, text/html;level=1, "
- "text/html;level=2;q=0.4, */*;q=0.5 ">>,
- byte_size(<<"text/*;q=0.3, text/html;q=0.7, text/html;level=1, "
- "text/html;level=2;q=0.4, */*;q=0.5 ">>) - 1)
- ).
--endif.
-
-request(B, State=#state{transport=Transport}, M, P, Q, Version, Headers) ->
- case lists:keyfind(<<"host">>, 1, Headers) of
- false when Version =:= 'HTTP/1.1' ->
- error_terminate(400, State);
- false ->
- request(B, State, M, P, Q, Version, Headers,
- <<>>, default_port(Transport:name()));
- {_, RawHost} ->
- try parse_host(RawHost, false, <<>>) of
- {Host, undefined} ->
- request(B, State, M, P, Q, Version, Headers,
- Host, default_port(Transport:name()));
- {Host, Port} ->
- request(B, State, M, P, Q, Version, Headers,
- Host, Port)
- catch _:_ ->
- error_terminate(400, State)
- end
- end.
-
--spec default_port(atom()) -> 80 | 443.
-default_port(ssl) -> 443;
-default_port(_) -> 80.
-
-%% Same code as cow_http:parse_fullhost/1, but inline because we
-%% really want this to go fast.
-parse_host(<< $[, Rest/bits >>, false, <<>>) ->
- parse_host(Rest, true, << $[ >>);
-parse_host(<<>>, false, Acc) ->
- {Acc, undefined};
-parse_host(<< $:, Rest/bits >>, false, Acc) ->
- {Acc, list_to_integer(binary_to_list(Rest))};
-parse_host(<< $], Rest/bits >>, true, Acc) ->
- parse_host(Rest, false, << Acc/binary, $] >>);
-parse_host(<< C, Rest/bits >>, E, Acc) ->
- ?LOWER(parse_host, Rest, E, Acc).
-
-%% End of request parsing.
-%%
-%% We create the Req object and start handling the request.
-
-request(Buffer, State=#state{socket=Socket, transport=Transport,
- req_keepalive=ReqKeepalive, max_keepalive=MaxKeepalive,
- compress=Compress, onresponse=OnResponse},
- Method, Path, Query, Version, Headers, Host, Port) ->
- case Transport:peername(Socket) of
- {ok, Peer} ->
- Req = cowboy_req:new(Socket, Transport, Peer, Method, Path,
- Query, Version, Headers, Host, Port, Buffer,
- ReqKeepalive < MaxKeepalive, Compress, OnResponse),
- execute(Req, State);
- {error, _} ->
- %% Couldn't read the peer address; connection is gone.
- terminate(State)
- end.
-
--spec execute(cowboy_req:req(), #state{}) -> ok.
-execute(Req, State=#state{middlewares=Middlewares, env=Env}) ->
- execute(Req, State, Env, Middlewares).
-
--spec execute(cowboy_req:req(), #state{}, cowboy_middleware:env(), [module()])
- -> ok.
-execute(Req, State, Env, []) ->
- next_request(Req, State, get_value(result, Env, ok));
-execute(Req, State, Env, [Middleware|Tail]) ->
- case Middleware:execute(Req, Env) of
- {ok, Req2, Env2} ->
- execute(Req2, State, Env2, Tail);
- {suspend, Module, Function, Args} ->
- erlang:hibernate(?MODULE, resume,
- [State, Env, Tail, Module, Function, Args]);
- {stop, Req2} ->
- next_request(Req2, State, ok)
- end.
-
--spec resume(#state{}, cowboy_middleware:env(), [module()],
- module(), module(), [any()]) -> ok.
-resume(State, Env, Tail, Module, Function, Args) ->
- case apply(Module, Function, Args) of
- {ok, Req2, Env2} ->
- execute(Req2, State, Env2, Tail);
- {suspend, Module2, Function2, Args2} ->
- erlang:hibernate(?MODULE, resume,
- [State, Env, Tail, Module2, Function2, Args2]);
- {stop, Req2} ->
- next_request(Req2, State, ok)
- end.
-
--spec next_request(cowboy_req:req(), #state{}, any()) -> ok.
-next_request(Req, State=#state{req_keepalive=Keepalive, timeout=Timeout},
- HandlerRes) ->
- cowboy_req:ensure_response(Req, 204),
- %% If we are going to close the connection,
- %% we do not want to attempt to skip the body.
- case cowboy_req:get(connection, Req) of
- close ->
- terminate(State);
- _ ->
- %% Skip the body if it is reasonably sized. Close otherwise.
- Buffer = case cowboy_req:body(Req) of
- {ok, _, Req2} -> cowboy_req:get(buffer, Req2);
- _ -> close
- end,
- %% Flush the resp_sent message before moving on.
- if HandlerRes =:= ok, Buffer =/= close ->
- receive {cowboy_req, resp_sent} -> ok after 0 -> ok end,
- ?MODULE:parse_request(Buffer,
- State#state{req_keepalive=Keepalive + 1,
- until=until(Timeout)}, 0);
- true ->
- terminate(State)
- end
- end.
-
--spec error_terminate(cowboy:http_status(), #state{}) -> ok.
-error_terminate(Status, State=#state{socket=Socket, transport=Transport,
- compress=Compress, onresponse=OnResponse}) ->
- error_terminate(Status, cowboy_req:new(Socket, Transport,
- undefined, <<"GET">>, <<>>, <<>>, 'HTTP/1.1', [], <<>>,
- undefined, <<>>, false, Compress, OnResponse), State).
-
--spec error_terminate(cowboy:http_status(), cowboy_req:req(), #state{}) -> ok.
-error_terminate(Status, Req, State) ->
- _ = cowboy_req:reply(Status, Req),
- terminate(State).
-
--spec terminate(#state{}) -> ok.
-terminate(#state{socket=Socket, transport=Transport}) ->
- Transport:close(Socket),
- ok.
diff --git a/src/cowboy_req.erl b/src/cowboy_req.erl
index 8f0a04b..16b1fd1 100644
--- a/src/cowboy_req.erl
+++ b/src/cowboy_req.erl
@@ -47,6 +47,9 @@
%% Request body API.
-export([has_body/1]).
-export([body_length/1]).
+-export([read_body/1]).
+-export([read_body/2]).
+
-export([body/1]).
-export([body/2]).
-export([body_qs/1]).
@@ -70,10 +73,12 @@
-export([reply/2]).
-export([reply/3]).
-export([reply/4]).
+
+-export([send_body/3]).
+
-export([chunked_reply/2]).
-export([chunked_reply/3]).
-export([chunk/2]).
--export([upgrade_reply/3]).
-export([continue/1]).
-export([maybe_reply/2]).
-export([ensure_response/2]).
@@ -93,12 +98,12 @@
-type transfer_decode_fun() :: fun((binary(), any())
-> cow_http_te:decode_ret()).
--type body_opts() :: [{continue, boolean()}
+-type body_opts() :: [{continue, boolean()} %% doesn't apply
| {length, non_neg_integer()}
- | {read_length, non_neg_integer()}
- | {read_timeout, timeout()}
- | {transfer_decode, transfer_decode_fun(), any()}
- | {content_decode, content_decode_fun()}].
+ | {read_length, non_neg_integer()} %% to be added back later as optimization
+ | {read_timeout, timeout()} %% same
+ | {transfer_decode, transfer_decode_fun(), any()} %% doesn't apply
+ | {content_decode, content_decode_fun()}]. %% does apply
-export_type([body_opts/0]).
-type resp_body_fun() :: fun((any(), module()) -> ok).
@@ -182,43 +187,43 @@ new(Socket, Transport, Peer, Method, Path, Query,
end.
-spec method(req()) -> binary().
-method(Req) ->
- Req#http_req.method.
+method(#{method := Method}) ->
+ Method.
-spec version(req()) -> cowboy:http_version().
-version(Req) ->
- Req#http_req.version.
+version(#{version := Version}) ->
+ Version.
-spec peer(req()) -> {inet:ip_address(), inet:port_number()} | undefined.
peer(Req) ->
Req#http_req.peer.
-spec host(req()) -> binary().
-host(Req) ->
- Req#http_req.host.
+host(#{host := Host}) ->
+ Host.
-spec host_info(req()) -> cowboy_router:tokens() | undefined.
-host_info(Req) ->
- Req#http_req.host_info.
+host_info(#{host_info := HostInfo}) ->
+ HostInfo.
-spec port(req()) -> inet:port_number().
-port(Req) ->
- Req#http_req.port.
+port(#{port := Port}) ->
+ Port.
-spec path(req()) -> binary().
-path(Req) ->
- Req#http_req.path.
+path(#{path := Path}) ->
+ Path.
-spec path_info(req()) -> cowboy_router:tokens() | undefined.
-path_info(Req) ->
- Req#http_req.path_info.
+path_info(#{path_info := PathInfo}) ->
+ PathInfo.
-spec qs(req()) -> binary().
-qs(Req) ->
- Req#http_req.qs.
+qs(#{qs := Qs}) ->
+ Qs.
-spec parse_qs(req()) -> [{binary(), binary() | true}].
-parse_qs(#http_req{qs=Qs}) ->
+parse_qs(#{qs := Qs}) ->
cow_qs:parse_qs(Qs).
-spec match_qs(cowboy:fields(), req()) -> map().
@@ -227,30 +232,24 @@ match_qs(Fields, Req) ->
%% The URL includes the scheme, host and port only.
-spec host_url(req()) -> undefined | binary().
-host_url(#http_req{port=undefined}) ->
+host_url(#{port := undefined}) ->
undefined;
-host_url(#http_req{transport=Transport, host=Host, port=Port}) ->
- TransportName = Transport:name(),
- Secure = case TransportName of
- ssl -> <<"s">>;
- _ -> <<>>
- end,
- PortBin = case {TransportName, Port} of
- {ssl, 443} -> <<>>;
- {tcp, 80} -> <<>>;
+host_url(#{scheme := Scheme, host := Host, port := Port}) ->
+ PortBin = case {Scheme, Port} of
+ {<<"https">>, 443} -> <<>>;
+ {<<"http">>, 80} -> <<>>;
_ -> << ":", (integer_to_binary(Port))/binary >>
end,
- << "http", Secure/binary, "://", Host/binary, PortBin/binary >>.
+ << Scheme/binary, "://", Host/binary, PortBin/binary >>.
%% The URL includes the scheme, host, port, path and query string.
-spec url(req()) -> undefined | binary().
-url(Req=#http_req{}) ->
- HostURL = host_url(Req),
- url(Req, HostURL).
+url(Req) ->
+ url(Req, host_url(Req)).
url(_, undefined) ->
undefined;
-url(#http_req{path=Path, qs=QS}, HostURL) ->
+url(#{path := Path, qs := QS}, HostURL) ->
QS2 = case QS of
<<>> -> <<>>;
_ -> << "?", QS/binary >>
@@ -262,30 +261,31 @@ binding(Name, Req) ->
binding(Name, Req, undefined).
-spec binding(atom(), req(), Default) -> any() | Default when Default::any().
-binding(Name, Req, Default) when is_atom(Name) ->
- case lists:keyfind(Name, 1, Req#http_req.bindings) of
+binding(Name, #{bindings := Bindings}, Default) when is_atom(Name) ->
+ case lists:keyfind(Name, 1, Bindings) of
{_, Value} -> Value;
false -> Default
- end.
+ end;
+binding(Name, _, Default) when is_atom(Name) ->
+ Default.
-spec bindings(req()) -> [{atom(), any()}].
-bindings(Req) ->
- Req#http_req.bindings.
+bindings(#{bindings := Bindings}) ->
+ Bindings;
+bindings(_) ->
+ [].
-spec header(binary(), req()) -> binary() | undefined.
header(Name, Req) ->
header(Name, Req, undefined).
-spec header(binary(), req(), Default) -> binary() | Default when Default::any().
-header(Name, Req, Default) ->
- case lists:keyfind(Name, 1, Req#http_req.headers) of
- {Name, Value} -> Value;
- false -> Default
- end.
+header(Name, #{headers := Headers}, Default) ->
+ maps:get(Name, Headers, Default).
-spec headers(req()) -> cowboy:http_headers().
-headers(Req) ->
- Req#http_req.headers.
+headers(#{headers := Headers}) ->
+ Headers.
-spec parse_header(binary(), Req) -> any() when Req::req().
parse_header(Name = <<"content-length">>, Req) ->
@@ -354,31 +354,52 @@ set_meta(Name, Value, Req=#http_req{meta=Meta}) ->
%% Request Body API.
-spec has_body(req()) -> boolean().
-has_body(Req) ->
- case lists:keyfind(<<"content-length">>, 1, Req#http_req.headers) of
- {_, <<"0">>} ->
- false;
- {_, _} ->
- true;
- _ ->
- lists:keymember(<<"transfer-encoding">>, 1, Req#http_req.headers)
- end.
+has_body(#{has_body := HasBody}) ->
+ HasBody.
%% The length may not be known if Transfer-Encoding is not identity,
%% and the body hasn't been read at the time of the call.
-spec body_length(req()) -> undefined | non_neg_integer().
-body_length(Req) ->
- case parse_header(<<"transfer-encoding">>, Req) of
- [<<"identity">>] ->
- parse_header(<<"content-length">>, Req);
- _ ->
- undefined
- end.
+body_length(#{body_length := Length}) ->
+ Length.
-spec body(Req) -> {ok, binary(), Req} | {more, binary(), Req} when Req::req().
body(Req) ->
body(Req, []).
+-spec read_body(Req) -> {ok, binary(), Req} | {more, binary(), Req} when Req::req().
+read_body(Req) ->
+ read_body(Req, []).
+
+-spec read_body(Req, body_opts()) -> {ok, binary(), Req} | {more, binary(), Req} when Req::req().
+read_body(Req=#{pid := Pid, streamid := StreamID}, Opts) ->
+ %% @todo Opts should be a map
+ Length = case lists:keyfind(length, 1, Opts) of
+ false -> 8000000;
+ {_, ChunkLen0} -> ChunkLen0
+ end,
+ ReadTimeout = case lists:keyfind(read_timeout, 1, Opts) of
+ false -> 15000;
+ {_, ReadTimeout0} -> ReadTimeout0
+ end,
+ Ref = make_ref(),
+ Pid ! {{Pid, StreamID}, {read_body, Ref, Length}},
+% io:format("READ_BODY ~p ~p ~p ~p~n", [Pid, StreamID, Ref, Length]),
+ receive
+ {request_body, Ref, nofin, Body} ->
+ {more, Body, Req};
+ {request_body, Ref, {fin, BodyLength}, Body} ->
+ {ok, Body, set_body_length(Req, BodyLength)}
+ after ReadTimeout ->
+ exit(read_body_timeout)
+ end.
+
+set_body_length(Req=#{headers := Headers}, BodyLength) ->
+ Req#{
+ headers => Headers#{<<"content-length">> => integer_to_binary(BodyLength)},
+ body_length => BodyLength
+ }.
+
-spec body(Req, body_opts()) -> {ok, binary(), Req} | {more, binary(), Req} when Req::req().
body(Req=#http_req{body_state=waiting}, Opts) ->
%% Send a 100 continue if needed (enabled by default).
@@ -514,7 +535,7 @@ body_qs(Req) ->
-spec body_qs(Req, body_opts()) -> {ok, [{binary(), binary() | true}], Req}
| {badlength, Req} when Req::req().
body_qs(Req, Opts) ->
- case body(Req, Opts) of
+ case read_body(Req, Opts) of
{ok, Body, Req2} ->
{ok, cow_qs:parse_qs(Body), Req2};
{more, _, Req2} ->
@@ -535,13 +556,16 @@ part(Req) ->
-spec part(Req, body_opts())
-> {ok, cow_multipart:headers(), Req} | {done, Req}
when Req::req().
-part(Req=#http_req{multipart=undefined}, Opts) ->
- part(init_multipart(Req), Opts);
part(Req, Opts) ->
- {Data, Req2} = stream_multipart(Req, Opts),
- part(Data, Opts, Req2).
+ case maps:is_key(multipart, Req) of
+ true ->
+ {Data, Req2} = stream_multipart(Req, Opts),
+ part(Data, Opts, Req2);
+ false ->
+ part(init_multipart(Req), Opts)
+ end.
-part(Buffer, Opts, Req=#http_req{multipart={Boundary, _}}) ->
+part(Buffer, Opts, Req=#{multipart := {Boundary, _}}) ->
case cow_multipart:parse_headers(Buffer, Boundary) of
more ->
{Data, Req2} = stream_multipart(Req, Opts),
@@ -550,10 +574,10 @@ part(Buffer, Opts, Req=#http_req{multipart={Boundary, _}}) ->
{Data, Req2} = stream_multipart(Req, Opts),
part(<< Buffer2/binary, Data/binary >>, Opts, Req2);
{ok, Headers, Rest} ->
- {ok, Headers, Req#http_req{multipart={Boundary, Rest}}};
+ {ok, Headers, Req#{multipart => {Boundary, Rest}}};
%% Ignore epilogue.
{done, _} ->
- {done, Req#http_req{multipart=undefined}}
+ {done, Req#{multipart => done}}
end.
-spec part_body(Req)
@@ -565,19 +589,22 @@ part_body(Req) ->
-spec part_body(Req, body_opts())
-> {ok, binary(), Req} | {more, binary(), Req}
when Req::req().
-part_body(Req=#http_req{multipart=undefined}, Opts) ->
- part_body(init_multipart(Req), Opts);
part_body(Req, Opts) ->
- part_body(<<>>, Opts, Req, <<>>).
+ case maps:is_key(multipart, Req) of
+ true ->
+ part_body(<<>>, Opts, Req, <<>>);
+ false ->
+ part_body(init_multipart(Req), Opts)
+ end.
-part_body(Buffer, Opts, Req=#http_req{multipart={Boundary, _}}, Acc) ->
+part_body(Buffer, Opts, Req=#{multipart := {Boundary, _}}, Acc) ->
ChunkLen = case lists:keyfind(length, 1, Opts) of
false -> 8000000;
{_, ChunkLen0} -> ChunkLen0
end,
case byte_size(Acc) > ChunkLen of
true ->
- {more, Acc, Req#http_req{multipart={Boundary, Buffer}}};
+ {more, Acc, Req#{multipart => {Boundary, Buffer}}};
false ->
{Data, Req2} = stream_multipart(Req, Opts),
case cow_multipart:parse_body(<< Buffer/binary, Data/binary >>, Boundary) of
@@ -591,21 +618,22 @@ part_body(Buffer, Opts, Req=#http_req{multipart={Boundary, _}}, Acc) ->
{ok, << Acc/binary, Body/binary >>, Req2};
{done, Body, Rest} ->
{ok, << Acc/binary, Body/binary >>,
- Req2#http_req{multipart={Boundary, Rest}}}
+ Req2#{multipart => {Boundary, Rest}}}
end
end.
init_multipart(Req) ->
{<<"multipart">>, _, Params} = parse_header(<<"content-type">>, Req),
{_, Boundary} = lists:keyfind(<<"boundary">>, 1, Params),
- Req#http_req{multipart={Boundary, <<>>}}.
+ Req#{multipart => {Boundary, <<>>}}.
-stream_multipart(Req=#http_req{body_state=BodyState, multipart={_, <<>>}}, Opts) ->
- true = BodyState =/= done,
- {_, Data, Req2} = body(Req, Opts),
+stream_multipart(Req=#{multipart := done}, _) ->
+ {<<>>, Req};
+stream_multipart(Req=#{multipart := {_, <<>>}}, Opts) ->
+ {_, Data, Req2} = read_body(Req, Opts),
{Data, Req2};
-stream_multipart(Req=#http_req{multipart={Boundary, Buffer}}, _) ->
- {Buffer, Req#http_req{multipart={Boundary, <<>>}}}.
+stream_multipart(Req=#{multipart := {Boundary, Buffer}}, _) ->
+ {Buffer, Req#{multipart => {Boundary, <<>>}}}.
%% Response API.
@@ -618,16 +646,22 @@ stream_multipart(Req=#http_req{multipart={Boundary, Buffer}}, _) ->
-> Req when Req::req().
set_resp_cookie(Name, Value, Opts, Req) ->
Cookie = cow_cookie:setcookie(Name, Value, Opts),
+ %% @todo Nah, keep separate.
set_resp_header(<<"set-cookie">>, Cookie, Req).
-spec set_resp_header(binary(), iodata(), Req)
-> Req when Req::req().
-set_resp_header(Name, Value, Req=#http_req{resp_headers=RespHeaders}) ->
- Req#http_req{resp_headers=[{Name, Value}|RespHeaders]}.
+set_resp_header(Name, Value, Req=#{resp_headers := RespHeaders}) ->
+ Req#{resp_headers => RespHeaders#{Name => Value}};
+set_resp_header(Name,Value, Req) ->
+ Req#{resp_headers => #{Name => Value}}.
+%% @todo {sendfile, Offset, Bytes, Path} tuple
-spec set_resp_body(iodata(), Req) -> Req when Req::req().
set_resp_body(Body, Req) ->
- Req#http_req{resp_body=Body}.
+ Req#{resp_body => Body}.
+%set_resp_body(Body, Req) ->
+% Req#http_req{resp_body=Body}.
-spec set_resp_body_fun(resp_body_fun(), Req) -> Req when Req::req().
set_resp_body_fun(StreamFun, Req) when is_function(StreamFun) ->
@@ -647,189 +681,217 @@ set_resp_body_fun(chunked, StreamFun, Req)
Req#http_req{resp_body={chunked, StreamFun}}.
-spec has_resp_header(binary(), req()) -> boolean().
-has_resp_header(Name, #http_req{resp_headers=RespHeaders}) ->
- lists:keymember(Name, 1, RespHeaders).
+has_resp_header(Name, #{resp_headers := RespHeaders}) ->
+ maps:is_key(Name, RespHeaders);
+has_resp_header(_, _) ->
+ false.
-spec has_resp_body(req()) -> boolean().
-has_resp_body(#http_req{resp_body=RespBody}) when is_function(RespBody) ->
- true;
-has_resp_body(#http_req{resp_body={chunked, _}}) ->
- true;
-has_resp_body(#http_req{resp_body={Length, _}}) ->
- Length > 0;
-has_resp_body(#http_req{resp_body=RespBody}) ->
- iolist_size(RespBody) > 0.
+has_resp_body(#{resp_body := {sendfile, Len, _}}) ->
+ Len > 0;
+has_resp_body(#{resp_body := RespBody}) ->
+ iolist_size(RespBody) > 0;
+has_resp_body(_) ->
+ false.
+
+%has_resp_body(#http_req{resp_body=RespBody}) when is_function(RespBody) ->
+% true;
+%has_resp_body(#http_req{resp_body={chunked, _}}) ->
+% true;
+%has_resp_body(#http_req{resp_body={Length, _}}) ->
+% Length > 0;
+%has_resp_body(#http_req{resp_body=RespBody}) ->
+% iolist_size(RespBody) > 0.
-spec delete_resp_header(binary(), Req)
-> Req when Req::req().
-delete_resp_header(Name, Req=#http_req{resp_headers=RespHeaders}) ->
- RespHeaders2 = lists:keydelete(Name, 1, RespHeaders),
- Req#http_req{resp_headers=RespHeaders2}.
+delete_resp_header(Name, Req=#{resp_headers := RespHeaders}) ->
+ Req#{resp_headers => maps:remove(Name, RespHeaders)}.
-spec reply(cowboy:http_status(), Req) -> Req when Req::req().
-reply(Status, Req=#http_req{resp_body=Body}) ->
- reply(Status, [], Body, Req).
+reply(Status, Req) ->
+ reply(Status, #{}, Req).
-spec reply(cowboy:http_status(), cowboy:http_headers(), Req)
-> Req when Req::req().
-reply(Status, Headers, Req=#http_req{resp_body=Body}) ->
- reply(Status, Headers, Body, Req).
+reply(Status, Headers, Req=#{resp_body := Body}) ->
+ reply(Status, Headers, Body, Req);
+reply(Status, Headers, Req) ->
+ reply(Status, Headers, <<>>, Req).
-spec reply(cowboy:http_status(), cowboy:http_headers(),
iodata() | resp_body_fun() | {non_neg_integer(), resp_body_fun()}
| {chunked, resp_chunked_fun()}, Req)
-> Req when Req::req().
-reply(Status, Headers, Body, Req=#http_req{
- socket=Socket, transport=Transport,
- version=Version, connection=Connection,
- method=Method, resp_compress=Compress,
- resp_state=RespState, resp_headers=RespHeaders})
- when RespState =:= waiting; RespState =:= waiting_stream ->
- HTTP11Headers = if
- Transport =/= cowboy_spdy, Version =:= 'HTTP/1.0', Connection =:= keepalive ->
- [{<<"connection">>, atom_to_connection(Connection)}];
- Transport =/= cowboy_spdy, Version =:= 'HTTP/1.1', Connection =:= close ->
- [{<<"connection">>, atom_to_connection(Connection)}];
- true ->
- []
- end,
- Req3 = case Body of
- BodyFun when is_function(BodyFun) ->
- %% We stream the response body until we close the connection.
- RespConn = close,
- {RespType, Req2} = if
- Transport =:= cowboy_spdy ->
- response(Status, Headers, RespHeaders, [
- {<<"date">>, cowboy_clock:rfc1123()},
- {<<"server">>, <<"Cowboy">>}
- ], stream, Req);
- true ->
- response(Status, Headers, RespHeaders, [
- {<<"connection">>, <<"close">>},
- {<<"date">>, cowboy_clock:rfc1123()},
- {<<"server">>, <<"Cowboy">>},
- {<<"transfer-encoding">>, <<"identity">>}
- ], <<>>, Req)
- end,
- if RespType =/= hook, Method =/= <<"HEAD">> ->
- BodyFun(Socket, Transport);
- true -> ok
- end,
- Req2#http_req{connection=RespConn};
- {chunked, BodyFun} ->
- %% We stream the response body in chunks.
- {RespType, Req2} = chunked_response(Status, Headers, Req),
- if RespType =/= hook, Method =/= <<"HEAD">> ->
- ChunkFun = fun(IoData) -> chunk(IoData, Req2) end,
- BodyFun(ChunkFun),
- %% Send the last chunk if chunked encoding was used.
- if
- Version =:= 'HTTP/1.0'; RespState =:= waiting_stream ->
- Req2;
- true ->
- last_chunk(Req2)
- end;
- true -> Req2
- end;
- {ContentLength, BodyFun} ->
- %% We stream the response body for ContentLength bytes.
- RespConn = response_connection(Headers, Connection),
- {RespType, Req2} = response(Status, Headers, RespHeaders, [
- {<<"content-length">>, integer_to_list(ContentLength)},
- {<<"date">>, cowboy_clock:rfc1123()},
- {<<"server">>, <<"Cowboy">>}
- |HTTP11Headers], stream, Req),
- if RespType =/= hook, Method =/= <<"HEAD">> ->
- BodyFun(Socket, Transport);
- true -> ok
- end,
- Req2#http_req{connection=RespConn};
- _ when Compress ->
- RespConn = response_connection(Headers, Connection),
- Req2 = reply_may_compress(Status, Headers, Body, Req,
- RespHeaders, HTTP11Headers, Method),
- Req2#http_req{connection=RespConn};
- _ ->
- RespConn = response_connection(Headers, Connection),
- Req2 = reply_no_compress(Status, Headers, Body, Req,
- RespHeaders, HTTP11Headers, Method, iolist_size(Body)),
- Req2#http_req{connection=RespConn}
- end,
- Req3#http_req{resp_state=done, resp_headers=[], resp_body= <<>>}.
-
-reply_may_compress(Status, Headers, Body, Req,
- RespHeaders, HTTP11Headers, Method) ->
- BodySize = iolist_size(Body),
- try parse_header(<<"accept-encoding">>, Req) of
- Encodings ->
- CanGzip = (BodySize > 300)
- andalso (false =:= lists:keyfind(<<"content-encoding">>,
- 1, Headers))
- andalso (false =:= lists:keyfind(<<"content-encoding">>,
- 1, RespHeaders))
- andalso (false =:= lists:keyfind(<<"transfer-encoding">>,
- 1, Headers))
- andalso (false =:= lists:keyfind(<<"transfer-encoding">>,
- 1, RespHeaders))
- andalso (Encodings =/= undefined)
- andalso (false =/= lists:keyfind(<<"gzip">>, 1, Encodings)),
- case CanGzip of
- true ->
- GzBody = zlib:gzip(Body),
- {_, Req2} = response(Status, Headers, RespHeaders, [
- {<<"content-length">>, integer_to_list(byte_size(GzBody))},
- {<<"content-encoding">>, <<"gzip">>},
- {<<"date">>, cowboy_clock:rfc1123()},
- {<<"server">>, <<"Cowboy">>}
- |HTTP11Headers],
- case Method of <<"HEAD">> -> <<>>; _ -> GzBody end,
- Req),
- Req2;
- false ->
- reply_no_compress(Status, Headers, Body, Req,
- RespHeaders, HTTP11Headers, Method, BodySize)
- end
- catch _:_ ->
- reply_no_compress(Status, Headers, Body, Req,
- RespHeaders, HTTP11Headers, Method, BodySize)
- end.
+reply(Status, Headers, Stream = {stream, undefined, _}, Req) ->
+ do_stream_reply(Status, Headers, Stream, Req);
+reply(Status, Headers, Stream = {stream, Len, _}, Req) ->
+ do_stream_reply(Status, Headers#{
+ <<"content-length">> => integer_to_binary(Len)
+ }, Stream, Req);
+reply(Status, Headers, SendFile = {sendfile, _, Len, _}, Req) ->
+ do_reply(Status, Headers#{
+ <<"content-length">> => integer_to_binary(Len)
+ }, SendFile, Req);
+reply(Status, Headers, Body, Req) ->
+ do_reply(Status, Headers#{
+ <<"content-length">> => integer_to_binary(iolist_size(Body))
+ }, Body, Req).
+
+do_stream_reply(Status, Headers, {stream, _, Fun}, Req=#{pid := Pid, streamid := StreamID}) ->
+ Pid ! {{Pid, StreamID}, {headers, Status, response_headers(Headers, Req)}},
+ Fun(),
+ ok.
+
+do_reply(Status, Headers, Body, Req=#{pid := Pid, streamid := StreamID}) ->
+ Pid ! {{Pid, StreamID}, {response, Status, response_headers(Headers, Req), Body}},
+ ok.
+
+send_body(Data, IsFin, #{pid := Pid, streamid := StreamID}) ->
+ Pid ! {{Pid, StreamID}, {data, IsFin, Data}},
+ ok.
-reply_no_compress(Status, Headers, Body, Req,
- RespHeaders, HTTP11Headers, Method, BodySize) ->
- {_, Req2} = response(Status, Headers, RespHeaders, [
- {<<"content-length">>, integer_to_list(BodySize)},
- {<<"date">>, cowboy_clock:rfc1123()},
- {<<"server">>, <<"Cowboy">>}
- |HTTP11Headers],
- case Method of <<"HEAD">> -> <<>>; _ -> Body end,
- Req),
- Req2.
+response_headers(Headers, Req) ->
+ RespHeaders = maps:get(resp_headers, Req, #{}),
+ maps:merge(#{
+ <<"date">> => cowboy_clock:rfc1123(),
+ <<"server">> => <<"Cowboy">>
+ }, maps:merge(RespHeaders, Headers)).
+
+%reply(Status, Headers, Body, Req=#http_req{
+% socket=Socket, transport=Transport,
+% version=Version, connection=Connection,
+% method=Method, resp_compress=Compress,
+% resp_state=RespState, resp_headers=RespHeaders})
+% when RespState =:= waiting; RespState =:= waiting_stream ->
+% Req3 = case Body of
+% BodyFun when is_function(BodyFun) ->
+% %% We stream the response body until we close the connection.
+% RespConn = close,
+% {RespType, Req2} = if
+% Transport =:= cowboy_spdy ->
+% response(Status, Headers, RespHeaders, [
+% {<<"date">>, cowboy_clock:rfc1123()},
+% {<<"server">>, <<"Cowboy">>}
+% ], stream, Req);
+% true ->
+% response(Status, Headers, RespHeaders, [
+% {<<"connection">>, <<"close">>},
+% {<<"date">>, cowboy_clock:rfc1123()},
+% {<<"server">>, <<"Cowboy">>},
+% {<<"transfer-encoding">>, <<"identity">>}
+% ], <<>>, Req)
+% end,
+% if RespType =/= hook, Method =/= <<"HEAD">> ->
+% BodyFun(Socket, Transport);
+% true -> ok
+% end,
+% Req2#http_req{connection=RespConn};
+% {chunked, BodyFun} ->
+% %% We stream the response body in chunks.
+% {RespType, Req2} = chunked_response(Status, Headers, Req),
+% if RespType =/= hook, Method =/= <<"HEAD">> ->
+% ChunkFun = fun(IoData) -> chunk(IoData, Req2) end,
+% BodyFun(ChunkFun),
+% %% Send the last chunk if chunked encoding was used.
+% if
+% Version =:= 'HTTP/1.0'; RespState =:= waiting_stream ->
+% Req2;
+% true ->
+% last_chunk(Req2)
+% end;
+% true -> Req2
+% end;
+% {ContentLength, BodyFun} ->
+% %% We stream the response body for ContentLength bytes.
+% RespConn = response_connection(Headers, Connection),
+% {RespType, Req2} = response(Status, Headers, RespHeaders, [
+% {<<"content-length">>, integer_to_list(ContentLength)},
+% {<<"date">>, cowboy_clock:rfc1123()},
+% {<<"server">>, <<"Cowboy">>}
+% |HTTP11Headers], stream, Req),
+% if RespType =/= hook, Method =/= <<"HEAD">> ->
+% BodyFun(Socket, Transport);
+% true -> ok
+% end,
+% Req2#http_req{connection=RespConn};
+% _ when Compress ->
+% RespConn = response_connection(Headers, Connection),
+% Req2 = reply_may_compress(Status, Headers, Body, Req,
+% RespHeaders, HTTP11Headers, Method),
+% Req2#http_req{connection=RespConn};
+% _ ->
+% RespConn = response_connection(Headers, Connection),
+% Req2 = reply_no_compress(Status, Headers, Body, Req,
+% RespHeaders, HTTP11Headers, Method, iolist_size(Body)),
+% Req2#http_req{connection=RespConn}
+% end,
+% Req3#http_req{resp_state=done, resp_headers=[], resp_body= <<>>}.
+
+%reply_may_compress(Status, Headers, Body, Req,
+% RespHeaders, HTTP11Headers, Method) ->
+% BodySize = iolist_size(Body),
+% try parse_header(<<"accept-encoding">>, Req) of
+% Encodings ->
+% CanGzip = (BodySize > 300)
+% andalso (false =:= lists:keyfind(<<"content-encoding">>,
+% 1, Headers))
+% andalso (false =:= lists:keyfind(<<"content-encoding">>,
+% 1, RespHeaders))
+% andalso (false =:= lists:keyfind(<<"transfer-encoding">>,
+% 1, Headers))
+% andalso (false =:= lists:keyfind(<<"transfer-encoding">>,
+% 1, RespHeaders))
+% andalso (Encodings =/= undefined)
+% andalso (false =/= lists:keyfind(<<"gzip">>, 1, Encodings)),
+% case CanGzip of
+% true ->
+% GzBody = zlib:gzip(Body),
+% {_, Req2} = response(Status, Headers, RespHeaders, [
+% {<<"content-length">>, integer_to_list(byte_size(GzBody))},
+% {<<"content-encoding">>, <<"gzip">>},
+% |HTTP11Headers],
+% case Method of <<"HEAD">> -> <<>>; _ -> GzBody end,
+% Req),
+% Req2;
+% false ->
+% reply_no_compress(Status, Headers, Body, Req,
+% RespHeaders, HTTP11Headers, Method, BodySize)
+% end
+% catch _:_ ->
+% reply_no_compress(Status, Headers, Body, Req,
+% RespHeaders, HTTP11Headers, Method, BodySize)
+% end.
+%
+%reply_no_compress(Status, Headers, Body, Req,
+% RespHeaders, HTTP11Headers, Method, BodySize) ->
+% {_, Req2} = response(Status, Headers, RespHeaders, [
+% {<<"content-length">>, integer_to_list(BodySize)},
+% |HTTP11Headers],
+% case Method of <<"HEAD">> -> <<>>; _ -> Body end,
+% Req),
+% Req2.
-spec chunked_reply(cowboy:http_status(), Req) -> Req when Req::req().
chunked_reply(Status, Req) ->
- chunked_reply(Status, [], Req).
+ chunked_reply(Status, #{}, Req).
-spec chunked_reply(cowboy:http_status(), cowboy:http_headers(), Req)
-> Req when Req::req().
-chunked_reply(Status, Headers, Req) ->
- {_, Req2} = chunked_response(Status, Headers, Req),
- Req2.
+chunked_reply(Status, Headers, Req=#{pid := Pid, streamid := StreamID}) ->
+ Pid ! {{Pid, StreamID}, {headers, Status, response_headers(Headers, Req)}},
+ Req. %% @todo return ok
+% ok.
-spec chunk(iodata(), req()) -> ok.
-chunk(_Data, #http_req{method= <<"HEAD">>}) ->
+chunk(_Data, #{method := <<"HEAD">>}) ->
ok;
-chunk(Data, #http_req{socket=Socket, transport=cowboy_spdy,
- resp_state=chunks}) ->
- cowboy_spdy:stream_data(Socket, Data);
-chunk(Data, #http_req{socket=Socket, transport=Transport,
- resp_state=stream}) ->
- ok = Transport:send(Socket, Data);
-chunk(Data, #http_req{socket=Socket, transport=Transport,
- resp_state=chunks}) ->
+chunk(Data, #{pid := Pid, streamid := StreamID}) ->
case iolist_size(Data) of
0 -> ok;
- Size -> Transport:send(Socket, [integer_to_list(Size, 16),
- <<"\r\n">>, Data, <<"\r\n">>])
+ _ ->
+ Pid ! {{Pid, StreamID}, {data, nofin, Data}},
+ ok
end.
%% If ever made public, need to send nothing if HEAD.
@@ -841,16 +903,6 @@ last_chunk(Req=#http_req{socket=Socket, transport=Transport}) ->
_ = Transport:send(Socket, <<"0\r\n\r\n">>),
Req#http_req{resp_state=done}.
--spec upgrade_reply(cowboy:http_status(), cowboy:http_headers(), Req)
- -> Req when Req::req().
-upgrade_reply(Status, Headers, Req=#http_req{transport=Transport,
- resp_state=waiting, resp_headers=RespHeaders})
- when Transport =/= cowboy_spdy ->
- {_, Req2} = response(Status, Headers, RespHeaders, [
- {<<"connection">>, <<"Upgrade">>}
- ], <<>>, Req),
- Req2#http_req{resp_state=done, resp_headers=[], resp_body= <<>>}.
-
-spec continue(req()) -> ok.
continue(#http_req{socket=Socket, transport=Transport,
version=Version}) ->
@@ -973,49 +1025,49 @@ to_list(Req) ->
%% Internal.
--spec chunked_response(cowboy:http_status(), cowboy:http_headers(), Req) ->
- {normal | hook, Req} when Req::req().
-chunked_response(Status, Headers, Req=#http_req{
- transport=cowboy_spdy, resp_state=waiting,
- resp_headers=RespHeaders}) ->
- {RespType, Req2} = response(Status, Headers, RespHeaders, [
- {<<"date">>, cowboy_clock:rfc1123()},
- {<<"server">>, <<"Cowboy">>}
- ], stream, Req),
- {RespType, Req2#http_req{resp_state=chunks,
- resp_headers=[], resp_body= <<>>}};
-chunked_response(Status, Headers, Req=#http_req{
- version=Version, connection=Connection,
- resp_state=RespState, resp_headers=RespHeaders})
- when RespState =:= waiting; RespState =:= waiting_stream ->
- RespConn = response_connection(Headers, Connection),
- HTTP11Headers = if
- Version =:= 'HTTP/1.0', Connection =:= keepalive ->
- [{<<"connection">>, atom_to_connection(Connection)}];
- Version =:= 'HTTP/1.0' -> [];
- true ->
- MaybeTE = if
- RespState =:= waiting_stream -> [];
- true -> [{<<"transfer-encoding">>, <<"chunked">>}]
- end,
- if
- Connection =:= close ->
- [{<<"connection">>, atom_to_connection(Connection)}|MaybeTE];
- true ->
- MaybeTE
- end
- end,
- RespState2 = if
- Version =:= 'HTTP/1.1', RespState =:= 'waiting' -> chunks;
- true -> stream
- end,
- {RespType, Req2} = response(Status, Headers, RespHeaders, [
- {<<"date">>, cowboy_clock:rfc1123()},
- {<<"server">>, <<"Cowboy">>}
- |HTTP11Headers], <<>>, Req),
- {RespType, Req2#http_req{connection=RespConn, resp_state=RespState2,
- resp_headers=[], resp_body= <<>>}}.
-
+%-spec chunked_response(cowboy:http_status(), cowboy:http_headers(), Req) ->
+% {normal | hook, Req} when Req::req().
+%chunked_response(Status, Headers, Req=#http_req{
+% transport=cowboy_spdy, resp_state=waiting,
+% resp_headers=RespHeaders}) ->
+% {RespType, Req2} = response(Status, Headers, RespHeaders, [
+% {<<"date">>, cowboy_clock:rfc1123()},
+% {<<"server">>, <<"Cowboy">>}
+% ], stream, Req),
+% {RespType, Req2#http_req{resp_state=chunks,
+% resp_headers=[], resp_body= <<>>}};
+%chunked_response(Status, Headers, Req=#http_req{
+% version=Version, connection=Connection,
+% resp_state=RespState, resp_headers=RespHeaders})
+% when RespState =:= waiting; RespState =:= waiting_stream ->
+% RespConn = response_connection(Headers, Connection),
+% HTTP11Headers = if
+% Version =:= 'HTTP/1.0', Connection =:= keepalive ->
+% [{<<"connection">>, atom_to_connection(Connection)}];
+% Version =:= 'HTTP/1.0' -> [];
+% true ->
+% MaybeTE = if
+% RespState =:= waiting_stream -> [];
+% true -> [{<<"transfer-encoding">>, <<"chunked">>}]
+% end,
+% if
+% Connection =:= close ->
+% [{<<"connection">>, atom_to_connection(Connection)}|MaybeTE];
+% true ->
+% MaybeTE
+% end
+% end,
+% RespState2 = if
+% Version =:= 'HTTP/1.1', RespState =:= 'waiting' -> chunks;
+% true -> stream
+% end,
+% {RespType, Req2} = response(Status, Headers, RespHeaders, [
+% {<<"date">>, cowboy_clock:rfc1123()},
+% {<<"server">>, <<"Cowboy">>}
+% |HTTP11Headers], <<>>, Req),
+% {RespType, Req2#http_req{connection=RespConn, resp_state=RespState2,
+% resp_headers=[], resp_body= <<>>}}.
+%
-spec response(cowboy:http_status(), cowboy:http_headers(),
cowboy:http_headers(), cowboy:http_headers(), stream | iodata(), Req)
-> {normal | hook, Req} when Req::req().
@@ -1063,20 +1115,20 @@ response(Status, Headers, RespHeaders, DefaultHeaders, Body, Req=#http_req{
hook
end,
{ReplyType, Req2}.
-
--spec response_connection(cowboy:http_headers(), keepalive | close)
- -> keepalive | close.
-response_connection([], Connection) ->
- Connection;
-response_connection([{Name, Value}|Tail], Connection) ->
- case Name of
- <<"connection">> ->
- Tokens = cow_http_hd:parse_connection(Value),
- connection_to_atom(Tokens);
- _ ->
- response_connection(Tail, Connection)
- end.
-
+%
+%-spec response_connection(cowboy:http_headers(), keepalive | close)
+% -> keepalive | close.
+%response_connection([], Connection) ->
+% Connection;
+%response_connection([{Name, Value}|Tail], Connection) ->
+% case Name of
+% <<"connection">> ->
+% Tokens = cow_http_hd:parse_connection(Value),
+% connection_to_atom(Tokens);
+% _ ->
+% response_connection(Tail, Connection)
+% end.
+%
-spec response_merge_headers(cowboy:http_headers(), cowboy:http_headers(),
cowboy:http_headers()) -> cowboy:http_headers().
response_merge_headers(Headers, RespHeaders, DefaultHeaders) ->
@@ -1105,13 +1157,13 @@ merge_headers(Headers, [{Name, Value}|Tail]) ->
false -> [{Name, Value}|Headers]
end,
merge_headers(Headers2, Tail).
-
--spec atom_to_connection(keepalive) -> <<_:80>>;
- (close) -> <<_:40>>.
-atom_to_connection(keepalive) ->
- <<"keep-alive">>;
-atom_to_connection(close) ->
- <<"close">>.
+%
+%-spec atom_to_connection(keepalive) -> <<_:80>>;
+% (close) -> <<_:40>>.
+%atom_to_connection(keepalive) ->
+% <<"keep-alive">>;
+%atom_to_connection(close) ->
+% <<"close">>.
%% We don't match on "keep-alive" since it is the default value.
-spec connection_to_atom([binary()]) -> keepalive | close.
diff --git a/src/cowboy_rest.erl b/src/cowboy_rest.erl
index 75e8e63..914b273 100644
--- a/src/cowboy_rest.erl
+++ b/src/cowboy_rest.erl
@@ -373,7 +373,7 @@ content_types_provided(Req, State) ->
try cowboy_req:parse_header(<<"accept">>, Req) of
undefined ->
languages_provided(
- cowboy_req:set_meta(media_type, {<<"text">>, <<"html">>, []}, Req),
+ Req#{media_type => {<<"text">>, <<"html">>, []}},
State2#state{content_type_a={{<<"text">>, <<"html">>, []}, to_html}});
Accept ->
choose_media_type(Req, State2, prioritize_accept(Accept))
@@ -392,7 +392,7 @@ content_types_provided(Req, State) ->
undefined ->
{PMT, _Fun} = HeadCTP = hd(CTP2),
languages_provided(
- cowboy_req:set_meta(media_type, PMT, Req2),
+ Req2#{media_type => PMT},
State2#state{content_type_a=HeadCTP});
Accept ->
choose_media_type(Req2, State2, prioritize_accept(Accept))
@@ -460,14 +460,14 @@ match_media_type_params(Req, State, _Accept,
[Provided = {{TP, STP, '*'}, _Fun}|_Tail],
{{_TA, _STA, Params_A}, _QA, _APA}) ->
PMT = {TP, STP, Params_A},
- languages_provided(cowboy_req:set_meta(media_type, PMT, Req),
+ languages_provided(Req#{media_type => PMT},
State#state{content_type_a=Provided});
match_media_type_params(Req, State, Accept,
[Provided = {PMT = {_TP, _STP, Params_P}, _Fun}|Tail],
MediaType = {{_TA, _STA, Params_A}, _QA, _APA}) ->
case lists:sort(Params_P) =:= lists:sort(Params_A) of
true ->
- languages_provided(cowboy_req:set_meta(media_type, PMT, Req),
+ languages_provided(Req#{media_type => PMT},
State#state{content_type_a=Provided});
false ->
match_media_type(Req, State, Accept, Tail, MediaType)
@@ -534,7 +534,7 @@ match_language(Req, State, Accept, [Provided|Tail],
set_language(Req, State=#state{language_a=Language}) ->
Req2 = cowboy_req:set_resp_header(<<"content-language">>, Language, Req),
- charsets_provided(cowboy_req:set_meta(language, Language, Req2), State).
+ charsets_provided(Req2#{language => Language}, State).
%% charsets_provided should return a list of binary values indicating
%% which charsets are accepted by the resource.
@@ -599,7 +599,7 @@ set_content_type(Req, State=#state{
Charset -> [ContentType, <<"; charset=">>, Charset]
end,
Req2 = cowboy_req:set_resp_header(<<"content-type">>, ContentType2, Req),
- encodings_provided(cowboy_req:set_meta(charset, Charset, Req2), State).
+ encodings_provided(Req2#{charset => Charset}, State).
set_content_type_build_params('*', []) ->
<<>>;
@@ -1012,7 +1012,7 @@ set_resp_body(Req, State=#state{content_type_a={_, Callback}}) ->
cowboy_req:set_resp_body_fun(Len, StreamFun, Req2);
{chunked, StreamFun} ->
cowboy_req:set_resp_body_fun(chunked, StreamFun, Req2);
- _Contents ->
+ _ ->
cowboy_req:set_resp_body(Body, Req2)
end,
multiple_choices(Req3, State2)
@@ -1152,7 +1152,7 @@ error_terminate(Req, #state{handler=Handler, handler_state=HandlerState},
{reason, Reason},
{mfa, {Handler, Callback, 2}},
{stacktrace, Stacktrace},
- {req, cowboy_req:to_list(Req)},
+ {req, Req},
{state, HandlerState}
]}).
diff --git a/src/cowboy_router.erl b/src/cowboy_router.erl
index b806da5..f07b307 100644
--- a/src/cowboy_router.erl
+++ b/src/cowboy_router.erl
@@ -159,14 +159,17 @@ compile_brackets_split(<< C, Rest/bits >>, Acc, N) ->
-spec execute(Req, Env)
-> {ok, Req, Env} | {stop, Req}
when Req::cowboy_req:req(), Env::cowboy_middleware:env().
-execute(Req, Env) ->
- {_, Dispatch} = lists:keyfind(dispatch, 1, Env),
- Host = cowboy_req:host(Req),
- Path = cowboy_req:path(Req),
+execute(Req=#{host := Host, path := Path}, Env=#{dispatch := Dispatch}) ->
case match(Dispatch, Host, Path) of
{ok, Handler, HandlerOpts, Bindings, HostInfo, PathInfo} ->
- Req2 = cowboy_req:set_bindings(HostInfo, PathInfo, Bindings, Req),
- {ok, Req2, [{handler, Handler}, {handler_opts, HandlerOpts}|Env]};
+ {ok, Req#{
+ host_info => HostInfo,
+ path_info => PathInfo,
+ bindings => Bindings
+ }, Env#{
+ handler => Handler,
+ handler_opts => HandlerOpts
+ }};
{error, notfound, host} ->
{stop, cowboy_req:reply(400, Req)};
{error, badrequest, path} ->
diff --git a/src/cowboy_static.erl b/src/cowboy_static.erl
index b2a8302..c771771 100644
--- a/src/cowboy_static.erl
+++ b/src/cowboy_static.erl
@@ -274,11 +274,4 @@ last_modified(Req, State={_, {ok, #file_info{mtime=Modified}}, _}) ->
-> {{stream, non_neg_integer(), fun()}, Req, State}
when State::state().
get_file(Req, State={Path, {ok, #file_info{size=Size}}, _}) ->
- Sendfile = fun (Socket, Transport) ->
- case Transport:sendfile(Socket, Path) of
- {ok, _} -> ok;
- {error, closed} -> ok;
- {error, etimedout} -> ok
- end
- end,
- {{stream, Size, Sendfile}, Req, State}.
+ {{sendfile, 0, Size, Path}, Req, State}.
diff --git a/src/cowboy_stream_h.erl b/src/cowboy_stream_h.erl
new file mode 100644
index 0000000..f924e28
--- /dev/null
+++ b/src/cowboy_stream_h.erl
@@ -0,0 +1,128 @@
+%% Copyright (c) 2016, Loïc Hoguin <[email protected]>
+%%
+%% Permission to use, copy, modify, and/or distribute this software for any
+%% purpose with or without fee is hereby granted, provided that the above
+%% copyright notice and this permission notice appear in all copies.
+%%
+%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
+%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
+%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
+%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
+%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
+%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
+%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+
+-module(cowboy_stream_h).
+%% @todo -behaviour(cowboy_stream).
+
+%% @todo Maybe have a callback for the type of process this is, worker or supervisor.
+-export([init/3]).
+-export([data/4]).
+-export([info/3]).
+-export([terminate/3]).
+
+-export([execute/3]).
+-export([resume/5]).
+
+-record(state, {
+ pid = undefined :: pid(),
+ read_body_ref = undefined :: reference(),
+ read_body_length = 0 :: non_neg_integer(),
+ read_body_is_fin = nofin :: nofin | fin,
+ read_body_buffer = <<>> :: binary()
+}).
+
+%% @todo For shutting down children we need to have a timeout before we terminate
+%% the stream like supervisors do. So here just send a message to yourself first,
+%% and then decide what to do when receiving this message.
+
+%% @todo proper specs
+-spec init(_,_,_) -> _.
+init(_StreamID, Req, Opts) ->
+ Env = maps:get(env, Opts, #{}),
+ Middlewares = maps:get(middlewares, Opts, [cowboy_router, cowboy_handler]),
+ Shutdown = maps:get(shutdown, Opts, 5000),
+ Pid = proc_lib:spawn_link(?MODULE, execute, [Req, Env, Middlewares]),
+ {[{spawn, Pid, Shutdown}], #state{pid=Pid}}.
+
+%% If we receive data and stream is waiting for data:
+%% If we accumulated enough data or IsFin=fin, send it.
+%% If not, buffer it.
+%% If not, buffer it.
+
+%% @todo proper specs
+-spec data(_,_,_,_) -> _.
+data(_StreamID, IsFin, Data, State=#state{read_body_ref=undefined, read_body_buffer=Buffer}) ->
+ {[], State#state{read_body_is_fin=IsFin, read_body_buffer= << Buffer/binary, Data/binary >>}};
+data(_StreamID, nofin, Data, State=#state{read_body_length=Length, read_body_buffer=Buffer}) when byte_size(Data) + byte_size(Buffer) < Length ->
+ {[], State#state{read_body_buffer= << Buffer/binary, Data/binary >>}};
+data(_StreamID, IsFin, Data, State=#state{pid=Pid, read_body_ref=Ref, read_body_buffer=Buffer}) ->
+ Pid ! {request_body, Ref, IsFin, << Buffer/binary, Data/binary >>},
+ {[], State#state{read_body_ref=undefined, read_body_buffer= <<>>}}.
+
+%% @todo proper specs
+-spec info(_,_,_) -> _.
+info(_StreamID, {'EXIT', Pid, normal}, State=#state{pid=Pid}) ->
+ {[stop], State};
+info(_StreamID, Reason = {'EXIT', Pid, _}, State=#state{pid=Pid}) ->
+ {[{internal_error, Reason, 'Stream process crashed.'}], State};
+%% Request body, no body buffer but IsFin=fin.
+info(_StreamID, {read_body, Ref, _}, State=#state{pid=Pid, read_body_is_fin=fin, read_body_buffer= <<>>}) ->
+ Pid ! {request_body, Ref, fin, <<>>},
+ {[], State};
+%% Request body, body buffered large enough or complete.
+info(_StreamID, {read_body, Ref, Length}, State=#state{pid=Pid, read_body_is_fin=IsFin, read_body_buffer=Data})
+ when element(1, IsFin) =:= fin; byte_size(Data) >= Length ->
+ Pid ! {request_body, Ref, IsFin, Data},
+ {[], State#state{read_body_buffer= <<>>}};
+%% Request body, not enough to send yet.
+info(_StreamID, {read_body, Ref, Length}, State) ->
+ {[{flow, Length}], State#state{read_body_ref=Ref, read_body_length=Length}};
+%% Response.
+info(_StreamID, Response = {response, _, _, _}, State) ->
+ {[Response], State};
+info(_StreamID, Headers = {headers, _, _}, State) ->
+ {[Headers], State};
+info(_StreamID, Data = {data, _, _}, State) ->
+ {[Data], State};
+info(_StreamID, SwitchProtocol = {switch_protocol, _, _, _}, State) ->
+ {[SwitchProtocol], State};
+%% Stray message.
+info(_StreamID, _Msg, State) ->
+ %% @todo Cleanup if no reply was sent when stream ends.
+ {[], State}.
+
+%% @todo proper specs
+-spec terminate(_,_,_) -> _.
+terminate(_StreamID, _Reason, _State) ->
+ ok.
+
+%% Request process.
+
+%% @todo
+%-spec execute(cowboy_req:req(), #state{}, cowboy_middleware:env(), [module()])
+% -> ok.
+-spec execute(_, _, _) -> _.
+execute(_, _, []) ->
+ ok; %% @todo Maybe error reason should differ here and there.
+execute(Req, Env, [Middleware|Tail]) ->
+ case Middleware:execute(Req, Env) of
+ {ok, Req2, Env2} ->
+ execute(Req2, Env2, Tail);
+ {suspend, Module, Function, Args} ->
+ proc_lib:hibernate(?MODULE, resume, [Env, Tail, Module, Function, Args]);
+ {stop, _Req2} ->
+ ok %% @todo Maybe error reason should differ here and there.
+ end.
+
+-spec resume(cowboy_middleware:env(), [module()],
+ module(), module(), [any()]) -> ok.
+resume(Env, Tail, Module, Function, Args) ->
+ case apply(Module, Function, Args) of
+ {ok, Req2, Env2} ->
+ execute(Req2, Env2, Tail);
+ {suspend, Module2, Function2, Args2} ->
+ proc_lib:hibernate(?MODULE, resume, [Env, Tail, Module2, Function2, Args2]);
+ {stop, _Req2} ->
+ ok %% @todo Maybe error reason should differ here and there.
+ end.
diff --git a/src/cowboy_tls.erl b/src/cowboy_tls.erl
index 745d502..0ccf733 100644
--- a/src/cowboy_tls.erl
+++ b/src/cowboy_tls.erl
@@ -13,6 +13,7 @@
%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
-module(cowboy_tls).
+-behavior(ranch_protocol).
-export([start_link/4]).
-export([init/5]).
diff --git a/src/cowboy_websocket.erl b/src/cowboy_websocket.erl
index 3c34908..9a2862e 100644
--- a/src/cowboy_websocket.erl
+++ b/src/cowboy_websocket.erl
@@ -18,6 +18,7 @@
-behaviour(cowboy_sub_protocol).
-export([upgrade/6]).
+-export([takeover/7]).
-export([handler_loop/4]).
-type terminate_reason() :: normal | stop | timeout
@@ -50,7 +51,6 @@
-optional_callbacks([terminate/3]).
-record(state, {
- env :: cowboy_middleware:env(),
socket = undefined :: inet:socket(),
transport = undefined :: module(),
handler :: module(),
@@ -65,25 +65,22 @@
extensions = #{} :: map()
}).
+%% Stream process.
+
-spec upgrade(Req, Env, module(), any(), timeout(), run | hibernate)
- -> {ok, Req, Env} | {suspend, module(), atom(), [any()]}
+ -> {ok, Req, Env}
when Req::cowboy_req:req(), Env::cowboy_middleware:env().
upgrade(Req, Env, Handler, HandlerState, Timeout, Hibernate) ->
- {_, Ref} = lists:keyfind(listener, 1, Env),
- ranch:remove_connection(Ref),
- [Socket, Transport] = cowboy_req:get([socket, transport], Req),
- State = #state{env=Env, socket=Socket, transport=Transport, handler=Handler,
- timeout=Timeout, hibernate=Hibernate =:= hibernate},
+ State = #state{handler=Handler, timeout=Timeout,
+ hibernate=Hibernate =:= hibernate},
+ %% @todo We need to fail if HTTP/2.
try websocket_upgrade(State, Req) of
{ok, State2, Req2} ->
- websocket_handshake(State2, Req2, HandlerState)
+ websocket_handshake(State2, Req2, HandlerState, Env)
catch _:_ ->
- receive
- {cowboy_req, resp_sent} -> ok
- after 0 ->
- _ = cowboy_req:reply(400, Req),
- exit(normal)
- end
+ %% @todo Test that we can have 2 /ws 400 status code in a row on the same connection.
+ cowboy_req:reply(400, Req),
+ {ok, Req, Env}
end.
-spec websocket_upgrade(#state{}, Req)
@@ -99,14 +96,15 @@ websocket_upgrade(State, Req) ->
orelse (IntVersion =:= 13),
Key = cowboy_req:header(<<"sec-websocket-key">>, Req),
false = Key =:= undefined,
- websocket_extensions(State#state{key=Key},
- cowboy_req:set_meta(websocket_version, IntVersion, Req)).
+ websocket_extensions(State#state{key=Key}, Req#{websocket_version => IntVersion}).
-spec websocket_extensions(#state{}, Req)
-> {ok, #state{}, Req} when Req::cowboy_req:req().
websocket_extensions(State, Req) ->
- [Compress] = cowboy_req:get([resp_compress], Req),
- Req2 = cowboy_req:set_meta(websocket_compress, false, Req),
+ %% @todo Proper options for this.
+% [Compress] = cowboy_req:get([resp_compress], Req),
+ Compress = false,
+ Req2 = Req#{websocket_compress => false},
case {Compress, cowboy_req:parse_header(<<"sec-websocket-extensions">>, Req2)} of
{true, Extensions} when Extensions =/= undefined ->
websocket_extensions(State, Req2, Extensions, []);
@@ -123,7 +121,7 @@ websocket_extensions(State=#state{extensions=Extensions}, Req, [{<<"permessage-d
Opts = #{level => best_compression, mem_level => 8, strategy => default},
case cow_ws:negotiate_permessage_deflate(Params, Extensions, Opts) of
{ok, RespExt, Extensions2} ->
- Req2 = cowboy_req:set_meta(websocket_compress, true, Req),
+ Req2 = Req#{websocket_compress => true},
websocket_extensions(State#state{extensions=Extensions2},
Req2, Tail, [<<", ">>, RespExt|RespHeader]);
ignore ->
@@ -143,33 +141,46 @@ websocket_extensions(State=#state{extensions=Extensions}, Req, [{<<"x-webkit-def
websocket_extensions(State, Req, [_|Tail], RespHeader) ->
websocket_extensions(State, Req, Tail, RespHeader).
--spec websocket_handshake(#state{}, Req, any())
- -> {ok, Req, cowboy_middleware:env()}
- | {suspend, module(), atom(), [any()]}
- when Req::cowboy_req:req().
-websocket_handshake(State=#state{transport=Transport, key=Key}, Req, HandlerState) ->
+-spec websocket_handshake(#state{}, Req, any(), Env)
+ -> {ok, Req, Env}
+ when Req::cowboy_req:req(), Env::cowboy_middleware:env().
+websocket_handshake(State=#state{transport=Transport, key=Key},
+ Req=#{pid := Pid, streamid := StreamID}, HandlerState, Env) ->
Challenge = base64:encode(crypto:hash(sha,
<< Key/binary, "258EAFA5-E914-47DA-95CA-C5AB0DC85B11" >>)),
- Req2 = cowboy_req:upgrade_reply(101, [
- {<<"upgrade">>, <<"websocket">>},
- {<<"sec-websocket-accept">>, Challenge}
- ], Req),
- %% Flush the resp_sent message before moving on.
- receive {cowboy_req, resp_sent} -> ok after 0 -> ok end,
- State2 = handler_loop_timeout(State),
+ Headers = #{
+ %% @todo Hmm should those be here or in cowboy_http?
+ <<"connection">> => <<"Upgrade">>,
+ <<"upgrade">> => <<"websocket">>,
+ <<"sec-websocket-accept">> => Challenge
+ },
+ Pid ! {{Pid, StreamID}, {switch_protocol, Headers, ?MODULE, {Req, State, HandlerState}}},
+ {ok, Req, Env}.
+
+%% Connection process.
+
+takeover(Parent, Ref, Socket, Transport, Opts, Buffer, {Req0, State=#state{handler=Handler}, HandlerState0}) ->
+ ranch:remove_connection(Ref),
+ %% @todo Remove Req from Websocket callbacks.
+ %% @todo Allow sending a reply from websocket_init.
+ %% @todo Try/catch.
+ {ok, Req, HandlerState} = case erlang:function_exported(Handler, websocket_init, 2) of
+ true -> Handler:websocket_init(Req0, HandlerState0);
+ false -> {ok, Req0, HandlerState0}
+ end,
+ State2 = handler_loop_timeout(State#state{socket=Socket, transport=Transport}),
handler_before_loop(State2#state{key=undefined,
- messages=Transport:messages()}, Req2, HandlerState, <<>>).
+ messages=Transport:messages()}, Req, HandlerState, Buffer).
-spec handler_before_loop(#state{}, Req, any(), binary())
-> {ok, Req, cowboy_middleware:env()}
- | {suspend, module(), atom(), [any()]}
when Req::cowboy_req:req().
handler_before_loop(State=#state{
socket=Socket, transport=Transport, hibernate=true},
Req, HandlerState, SoFar) ->
Transport:setopts(Socket, [{active, once}]),
- {suspend, ?MODULE, handler_loop,
- [State#state{hibernate=false}, Req, HandlerState, SoFar]};
+ proc_lib:hibernate(?MODULE, handler_loop,
+ [State#state{hibernate=false}, Req, HandlerState, SoFar]);
handler_before_loop(State=#state{socket=Socket, transport=Transport},
Req, HandlerState, SoFar) ->
Transport:setopts(Socket, [{active, once}]),
@@ -186,7 +197,6 @@ handler_loop_timeout(State=#state{timeout=Timeout, timeout_ref=PrevRef}) ->
-spec handler_loop(#state{}, Req, any(), binary())
-> {ok, Req, cowboy_middleware:env()}
- | {suspend, module(), atom(), [any()]}
when Req::cowboy_req:req().
handler_loop(State=#state{socket=Socket, messages={OK, Closed, Error},
timeout_ref=TRef}, Req, HandlerState, SoFar) ->
@@ -210,7 +220,6 @@ handler_loop(State=#state{socket=Socket, messages={OK, Closed, Error},
-spec websocket_data(#state{}, Req, any(), binary())
-> {ok, Req, cowboy_middleware:env()}
- | {suspend, module(), atom(), [any()]}
when Req::cowboy_req:req().
websocket_data(State=#state{frag_state=FragState, extensions=Extensions}, Req, HandlerState, Data) ->
case cow_ws:parse_header(Data, Extensions, FragState) of
@@ -298,7 +307,6 @@ websocket_dispatch(State=#state{socket=Socket, transport=Transport, frag_state=F
-spec handler_call(#state{}, Req, any(), binary(), atom(), any(), fun())
-> {ok, Req, cowboy_middleware:env()}
- | {suspend, module(), atom(), [any()]}
when Req::cowboy_req:req().
handler_call(State=#state{handler=Handler}, Req, HandlerState,
RemainingData, Callback, Message, NextState) ->
@@ -358,7 +366,7 @@ handler_call(State=#state{handler=Handler}, Req, HandlerState,
{mfa, {Handler, Callback, 3}},
{stacktrace, erlang:get_stacktrace()},
{msg, Message},
- {req, cowboy_req:to_list(Req)},
+ {req, Req},
{state, HandlerState}
]})
end.
@@ -407,7 +415,7 @@ websocket_close(State=#state{socket=Socket, transport=Transport, extensions=Exte
-spec handler_terminate(#state{}, Req, any(), terminate_reason())
-> {ok, Req, cowboy_middleware:env()}
when Req::cowboy_req:req().
-handler_terminate(#state{env=Env, handler=Handler},
+handler_terminate(#state{handler=Handler},
Req, HandlerState, Reason) ->
cowboy_handler:terminate(Reason, Req, HandlerState, Handler),
- {ok, Req, [{result, closed}|Env]}.
+ exit(normal).
diff --git a/test/cowboy_test.erl b/test/cowboy_test.erl
index cced1f9..e3aeb97 100644
--- a/test/cowboy_test.erl
+++ b/test/cowboy_test.erl
@@ -20,21 +20,21 @@
%% Listeners initialization.
init_http(Ref, ProtoOpts, Config) ->
- {ok, _} = cowboy:start_http(Ref, 100, [{port, 0}], ProtoOpts),
+ {ok, _} = cowboy:start_clear(Ref, 100, [{port, 0}], ProtoOpts),
Port = ranch:get_port(Ref),
- [{type, tcp}, {port, Port}, {opts, []}|Config].
+ [{type, tcp}, {protocol, http}, {port, Port}, {opts, []}|Config].
init_https(Ref, ProtoOpts, Config) ->
Opts = ct_helper:get_certs_from_ets(),
- {ok, _} = cowboy:start_https(Ref, 100, Opts ++ [{port, 0}], ProtoOpts),
+ {ok, _} = cowboy:start_tls(Ref, 100, Opts ++ [{port, 0}], ProtoOpts),
Port = ranch:get_port(Ref),
- [{type, ssl}, {port, Port}, {opts, Opts}|Config].
+ [{type, ssl}, {protocol, http}, {port, Port}, {opts, Opts}|Config].
init_spdy(Ref, ProtoOpts, Config) ->
Opts = ct_helper:get_certs_from_ets(),
- {ok, _} = cowboy:start_spdy(Ref, 100, Opts ++ [{port, 0}], ProtoOpts),
+ {ok, _} = cowboy:start_tls(Ref, 100, Opts ++ [{port, 0}], ProtoOpts),
Port = ranch:get_port(Ref),
- [{type, ssl}, {port, Port}, {opts, Opts}|Config].
+ [{type, ssl}, {protocol, spdy}, {port, Port}, {opts, Opts}|Config].
%% Common group of listeners used by most suites.
@@ -59,32 +59,26 @@ common_groups(Tests) ->
].
init_common_groups(Name = http, Config, Mod) ->
- init_http(Name, [
- {env, [{dispatch, Mod:init_dispatch(Config)}]}
- ], Config);
+ init_http(Name, #{env => #{dispatch => Mod:init_dispatch(Config)}}, Config);
init_common_groups(Name = https, Config, Mod) ->
- init_https(Name, [
- {env, [{dispatch, Mod:init_dispatch(Config)}]}
- ], Config);
+ init_https(Name, #{env => #{dispatch => Mod:init_dispatch(Config)}}, Config);
init_common_groups(Name = spdy, Config, Mod) ->
- init_spdy(Name, [
- {env, [{dispatch, Mod:init_dispatch(Config)}]}
- ], Config);
+ init_https(Name, #{env => #{dispatch => Mod:init_dispatch(Config)}}, Config);
init_common_groups(Name = http_compress, Config, Mod) ->
- init_http(Name, [
- {env, [{dispatch, Mod:init_dispatch(Config)}]},
- {compress, true}
- ], Config);
+ init_http(Name, #{
+ env => #{dispatch => Mod:init_dispatch(Config)},
+ compress => true
+ }, Config);
init_common_groups(Name = https_compress, Config, Mod) ->
- init_https(Name, [
- {env, [{dispatch, Mod:init_dispatch(Config)}]},
- {compress, true}
- ], Config);
+ init_https(Name, #{
+ env => #{dispatch => Mod:init_dispatch(Config)},
+ compress => true
+ }, Config);
init_common_groups(Name = spdy_compress, Config, Mod) ->
- init_spdy(Name, [
- {env, [{dispatch, Mod:init_dispatch(Config)}]},
- {compress, true}
- ], Config).
+ init_spdy(Name, #{
+ env => #{dispatch => Mod:init_dispatch(Config)},
+ compress => true
+ }, Config).
%% Support functions for testing using Gun.
@@ -94,7 +88,8 @@ gun_open(Config) ->
gun_open(Config, Opts) ->
{ok, ConnPid} = gun:open("localhost", config(port, Config), Opts#{
retry => 0,
- transport => config(type, Config)
+ transport => config(type, Config),
+ protocols => [config(protocol, Config)]
}),
ConnPid.
diff --git a/test/handlers/asterisk_h.erl b/test/handlers/asterisk_h.erl
index b5ff32d..56b8bcb 100644
--- a/test/handlers/asterisk_h.erl
+++ b/test/handlers/asterisk_h.erl
@@ -13,4 +13,4 @@ echo(What, Req, Opts) ->
V when is_integer(V) -> integer_to_binary(V);
V -> V
end,
- {ok, cowboy_req:reply(200, [], Value, Req), Opts}.
+ {ok, cowboy_req:reply(200, #{}, Value, Req), Opts}.
diff --git a/test/handlers/echo_h.erl b/test/handlers/echo_h.erl
index 71408c3..6db43fe 100644
--- a/test/handlers/echo_h.erl
+++ b/test/handlers/echo_h.erl
@@ -13,4 +13,4 @@ echo(What, Req, Opts) ->
V when is_integer(V) -> integer_to_binary(V);
V -> V
end,
- {ok, cowboy_req:reply(200, [], Value, Req), Opts}.
+ {ok, cowboy_req:reply(200, #{}, Value, Req), Opts}.
diff --git a/test/handlers/hello_h.erl b/test/handlers/hello_h.erl
index 3be7b6d..5e3dcc8 100644
--- a/test/handlers/hello_h.erl
+++ b/test/handlers/hello_h.erl
@@ -5,4 +5,4 @@
-export([init/2]).
init(Req, Opts) ->
- {ok, cowboy_req:reply(200, [], <<"Hello world!">>, Req), Opts}.
+ {ok, cowboy_req:reply(200, #{}, <<"Hello world!">>, Req), Opts}.
diff --git a/test/handlers/loop_handler_body_h.erl b/test/handlers/loop_handler_body_h.erl
index 0d4fd4d..38ba2c0 100644
--- a/test/handlers/loop_handler_body_h.erl
+++ b/test/handlers/loop_handler_body_h.erl
@@ -14,9 +14,10 @@ init(Req, _) ->
{cowboy_loop, Req, undefined, 5000, hibernate}.
info(timeout, Req, State) ->
- {ok, Body, Req2} = cowboy_req:body(Req),
+ {ok, Body, Req2} = cowboy_req:read_body(Req),
100000 = byte_size(Body),
- {stop, cowboy_req:reply(200, Req2), State}.
+ cowboy_req:reply(200, Req2),
+ {stop, Req, State}.
terminate(stop, _, _) ->
ok.
diff --git a/test/http_SUITE.erl b/test/http_SUITE.erl
index d5ec909..1b39feb 100644
--- a/test/http_SUITE.erl
+++ b/test/http_SUITE.erl
@@ -45,7 +45,7 @@ groups() ->
parse_host, set_env_dispatch
],
[
- {http, [parallel], Tests},
+ {http, [], Tests}, %% @todo parallel
{https, [parallel], Tests},
{http_compress, [parallel], Tests},
{https_compress, [parallel], Tests},
@@ -73,33 +73,29 @@ end_per_suite(Config) ->
ct_helper:delete_static_dir(config(static_dir, Config)).
init_per_group(Name = http, Config) ->
- cowboy_test:init_http(Name, [
- {env, [{dispatch, init_dispatch(Config)}]}
- ], Config);
+ cowboy_test:init_http(Name, #{env => #{dispatch => init_dispatch(Config)}}, Config);
init_per_group(Name = https, Config) ->
- cowboy_test:init_https(Name, [
- {env, [{dispatch, init_dispatch(Config)}]}
- ], Config);
+ cowboy_test:init_https(Name, #{env => #{dispatch => init_dispatch(Config)}}, Config);
init_per_group(Name = http_compress, Config) ->
- cowboy_test:init_http(Name, [
- {env, [{dispatch, init_dispatch(Config)}]},
- {compress, true}
- ], Config);
+ cowboy_test:init_http(Name, #{
+ env => #{dispatch => init_dispatch(Config)},
+ compress => true
+ }, Config);
init_per_group(Name = https_compress, Config) ->
- cowboy_test:init_https(Name, [
- {env, [{dispatch, init_dispatch(Config)}]},
- {compress, true}
- ], Config);
+ cowboy_test:init_https(Name, #{
+ env => #{dispatch => init_dispatch(Config)},
+ compress => true
+ }, Config);
%% Most, if not all of these, should be in separate test suites.
init_per_group(onresponse, Config) ->
- {ok, _} = cowboy:start_http(onresponse, 100, [{port, 0}], [
+ {ok, _} = cowboy:start_clear(onresponse, 100, [{port, 0}], [
{env, [{dispatch, init_dispatch(Config)}]},
{onresponse, fun do_onresponse_hook/4}
]),
Port = ranch:get_port(onresponse),
[{type, tcp}, {port, Port}, {opts, []}|Config];
init_per_group(onresponse_capitalize, Config) ->
- {ok, _} = cowboy:start_http(onresponse_capitalize, 100, [{port, 0}], [
+ {ok, _} = cowboy:start_clear(onresponse_capitalize, 100, [{port, 0}], [
{env, [{dispatch, init_dispatch(Config)}]},
{onresponse, fun do_onresponse_capitalize_hook/4}
]),
@@ -111,13 +107,13 @@ init_per_group(parse_host, Config) ->
{"/req_attr", http_req_attr, []}
]}
]),
- {ok, _} = cowboy:start_http(parse_host, 100, [{port, 0}], [
+ {ok, _} = cowboy:start_clear(parse_host, 100, [{port, 0}], [
{env, [{dispatch, Dispatch}]}
]),
Port = ranch:get_port(parse_host),
[{type, tcp}, {port, Port}, {opts, []}|Config];
init_per_group(set_env, Config) ->
- {ok, _} = cowboy:start_http(set_env, 100, [{port, 0}], [
+ {ok, _} = cowboy:start_clear(set_env, 100, [{port, 0}], [
{env, [{dispatch, []}]}
]),
Port = ranch:get_port(set_env),
@@ -134,11 +130,11 @@ init_dispatch(Config) ->
{"/chunked_response", http_chunked, []},
{"/streamed_response", http_streamed, []},
{"/headers/dupe", http_handler,
- [{headers, [{<<"connection">>, <<"close">>}]}]},
+ [{headers, #{<<"connection">> => <<"close">>}}]},
{"/set_resp/header", http_set_resp,
- [{headers, [{<<"vary">>, <<"Accept">>}]}]},
+ [{headers, #{<<"vary">> => <<"Accept">>}}]},
{"/set_resp/overwrite", http_set_resp,
- [{headers, [{<<"server">>, <<"DesireDrive/1.0">>}]}]},
+ [{headers, #{<<"server">> => <<"DesireDrive/1.0">>}}]},
{"/set_resp/body", http_set_resp,
[{body, <<"A flameless dance does not equal a cycle">>}]},
{"/stream_body/set_resp", http_stream_body,
@@ -314,7 +310,7 @@ echo_body(Config) ->
%% Check if sending request whose size is bigger than 1000000 bytes causes 413
echo_body_max_length(Config) ->
ConnPid = gun_open(Config),
- Ref = gun:post(ConnPid, "/echo/body", [], << 0:2000000/unit:8 >>),
+ Ref = gun:post(ConnPid, "/echo/body", [], << 0:10000000/unit:8 >>),
{response, nofin, 413, _} = gun:await(ConnPid, Ref),
ok.
@@ -336,7 +332,7 @@ error_init_after_reply(Config) ->
ConnPid = gun_open(Config),
Ref = gun:get(ConnPid, "/handler_errors?case=init_after_reply"),
{response, nofin, 200, _} = gun:await(ConnPid, Ref),
- gun_down(ConnPid).
+ ok.
headers_dupe(Config) ->
ConnPid = gun_open(Config),
@@ -357,18 +353,18 @@ http10_chunkless(Config) ->
http10_hostless(Config) ->
Name = http10_hostless,
Port10 = config(port, Config) + 10,
- Transport = case config(type, Config) of
- tcp -> ranch_tcp;
- ssl -> ranch_ssl
+ {Transport, Protocol} = case config(type, Config) of
+ tcp -> {ranch_tcp, cowboy_clear};
+ ssl -> {ranch_ssl, cowboy_tls}
end,
ranch:start_listener(Name, 5, Transport,
config(opts, Config) ++ [{port, Port10}],
- cowboy_protocol, [
- {env, [{dispatch, cowboy_router:compile([
- {'_', [{"/http1.0/hostless", http_handler, []}]}])}]},
- {max_keepalive, 50},
- {timeout, 500}]
- ),
+ Protocol, #{
+ env =>#{dispatch => cowboy_router:compile([
+ {'_', [{"/http1.0/hostless", http_handler, []}]}])},
+ max_keepalive => 50,
+ timeout => 500
+ }),
200 = do_raw("GET /http1.0/hostless HTTP/1.0\r\n\r\n",
[{port, Port10}|Config]),
cowboy:stop_listener(http10_hostless).
@@ -380,9 +376,10 @@ http10_keepalive_default(Config) ->
case catch raw_recv_head(Client) of
{'EXIT', _} -> error(closed);
Data ->
- {'HTTP/1.0', 200, _, Rest} = cow_http:parse_status_line(Data),
+ %% Cowboy always advertises itself as HTTP/1.1.
+ {'HTTP/1.1', 200, _, Rest} = cow_http:parse_status_line(Data),
{Headers, _} = cow_http:parse_headers(Rest),
- false = lists:keymember(<<"connection">>, 1, Headers)
+ {_, <<"close">>} = lists:keyfind(<<"connection">>, 1, Headers)
end,
ok = raw_send(Client, Normal),
case catch raw_recv_head(Client) of
@@ -397,7 +394,8 @@ http10_keepalive_forced(Config) ->
case catch raw_recv_head(Client) of
{'EXIT', _} -> error(closed);
Data ->
- {'HTTP/1.0', 200, _, Rest} = cow_http:parse_status_line(Data),
+ %% Cowboy always advertises itself as HTTP/1.1.
+ {'HTTP/1.1', 200, _, Rest} = cow_http:parse_status_line(Data),
{Headers, _} = cow_http:parse_headers(Rest),
{_, <<"keep-alive">>} = lists:keyfind(<<"connection">>, 1, Headers)
end,
diff --git a/test/http_SUITE_data/http_body_qs.erl b/test/http_SUITE_data/http_body_qs.erl
index 945b7fd..e0673cf 100644
--- a/test/http_SUITE_data/http_body_qs.erl
+++ b/test/http_SUITE_data/http_body_qs.erl
@@ -17,16 +17,16 @@ maybe_echo(<<"POST">>, true, Req) ->
echo(proplists:get_value(<<"echo">>, PostVals), Req2)
end;
maybe_echo(<<"POST">>, false, Req) ->
- cowboy_req:reply(400, [], <<"Missing body.">>, Req);
+ cowboy_req:reply(400, #{}, <<"Missing body.">>, Req);
maybe_echo(_, _, Req) ->
%% Method not allowed.
cowboy_req:reply(405, Req).
echo(badlength, Req) ->
- cowboy_req:reply(413, [], <<"POST body bigger than 16000 bytes">>, Req);
+ cowboy_req:reply(413, #{}, <<"POST body bigger than 16000 bytes">>, Req);
echo(undefined, Req) ->
- cowboy_req:reply(400, [], <<"Missing echo parameter.">>, Req);
+ cowboy_req:reply(400, #{}, <<"Missing echo parameter.">>, Req);
echo(Echo, Req) ->
- cowboy_req:reply(200, [
- {<<"content-type">>, <<"text/plain; charset=utf-8">>}
- ], Echo, Req).
+ cowboy_req:reply(200, #{
+ <<"content-type">> => <<"text/plain; charset=utf-8">>
+ }, Echo, Req).
diff --git a/test/http_SUITE_data/http_echo_body.erl b/test/http_SUITE_data/http_echo_body.erl
index c76b9b3..a541a67 100644
--- a/test/http_SUITE_data/http_echo_body.erl
+++ b/test/http_SUITE_data/http_echo_body.erl
@@ -6,16 +6,16 @@
init(Req, Opts) ->
true = cowboy_req:has_body(Req),
- Req3 = case cowboy_req:body(Req, [{length, 1000000}]) of
+ Req3 = case cowboy_req:read_body(Req, [{length, 1000000}]) of
{ok, Body, Req2} -> handle_body(Req2, Body);
{more, _, Req2} -> handle_badlength(Req2)
end,
{ok, Req3, Opts}.
handle_badlength(Req) ->
- cowboy_req:reply(413, [], <<"Request entity too large">>, Req).
+ cowboy_req:reply(413, #{}, <<"Request entity too large">>, Req).
handle_body(Req, Body) ->
Size = cowboy_req:body_length(Req),
Size = byte_size(Body),
- cowboy_req:reply(200, [], Body, Req).
+ cowboy_req:reply(200, #{}, Body, Req).
diff --git a/test/http_SUITE_data/http_errors.erl b/test/http_SUITE_data/http_errors.erl
index ee5c2b2..14e3d09 100644
--- a/test/http_SUITE_data/http_errors.erl
+++ b/test/http_SUITE_data/http_errors.erl
@@ -13,5 +13,5 @@ case_init(<<"init_before_reply">> = Case, _Req) ->
error(Case);
case_init(<<"init_after_reply">> = Case, Req) ->
ct_helper_error_h:ignore(?MODULE, case_init, 2),
- _ = cowboy_req:reply(200, [], "http_handler_crashes", Req),
+ _ = cowboy_req:reply(200, #{}, "http_handler_crashes", Req),
error(Case).
diff --git a/test/http_SUITE_data/http_handler.erl b/test/http_SUITE_data/http_handler.erl
index 62e9193..eb31aa8 100644
--- a/test/http_SUITE_data/http_handler.erl
+++ b/test/http_SUITE_data/http_handler.erl
@@ -5,6 +5,6 @@
-export([init/2]).
init(Req, Opts) ->
- Headers = proplists:get_value(headers, Opts, []),
+ Headers = proplists:get_value(headers, Opts, #{}),
Body = proplists:get_value(body, Opts, "http_handler"),
{ok, cowboy_req:reply(200, Headers, Body, Req), Opts}.
diff --git a/test/http_SUITE_data/http_multipart.erl b/test/http_SUITE_data/http_multipart.erl
index 196cbce..212f569 100644
--- a/test/http_SUITE_data/http_multipart.erl
+++ b/test/http_SUITE_data/http_multipart.erl
@@ -6,7 +6,7 @@
init(Req, Opts) ->
{Result, Req2} = acc_multipart(Req, []),
- {ok, cowboy_req:reply(200, [], term_to_binary(Result), Req2), Opts}.
+ {ok, cowboy_req:reply(200, #{}, term_to_binary(Result), Req2), Opts}.
acc_multipart(Req, Acc) ->
case cowboy_req:part(Req) of
diff --git a/test/http_SUITE_data/http_req_attr.erl b/test/http_SUITE_data/http_req_attr.erl
index d8483a5..c6a940e 100644
--- a/test/http_SUITE_data/http_req_attr.erl
+++ b/test/http_SUITE_data/http_req_attr.erl
@@ -12,4 +12,4 @@ init(Req, Opts) ->
Host = cowboy_req:host(Req),
Port = cowboy_req:port(Req),
Value = [Host, "\n", integer_to_list(Port)],
- {ok, cowboy_req:reply(200, [], Value, Req), Opts}.
+ {ok, cowboy_req:reply(200, #{}, Value, Req), Opts}.
diff --git a/test/http_SUITE_data/http_set_resp.erl b/test/http_SUITE_data/http_set_resp.erl
index 2e7f835..6ac4c8e 100644
--- a/test/http_SUITE_data/http_set_resp.erl
+++ b/test/http_SUITE_data/http_set_resp.erl
@@ -5,11 +5,11 @@
-export([init/2]).
init(Req, Opts) ->
- Headers = proplists:get_value(headers, Opts, []),
+ Headers = proplists:get_value(headers, Opts, #{}),
Body = proplists:get_value(body, Opts, <<"http_handler_set_resp">>),
Req2 = lists:foldl(fun({Name, Value}, R) ->
cowboy_req:set_resp_header(Name, Value, R)
- end, Req, Headers),
+ end, Req, maps:to_list(Headers)),
Req3 = cowboy_req:set_resp_body(Body, Req2),
Req4 = cowboy_req:set_resp_header(<<"x-cowboy-test">>, <<"ok">>, Req3),
Req5 = cowboy_req:set_resp_cookie(<<"cake">>, <<"lie">>, [], Req4),
diff --git a/test/http_SUITE_data/http_stream_body.erl b/test/http_SUITE_data/http_stream_body.erl
index aea5300..ef10266 100644
--- a/test/http_SUITE_data/http_stream_body.erl
+++ b/test/http_SUITE_data/http_stream_body.erl
@@ -7,16 +7,22 @@
init(Req, Opts) ->
Body = proplists:get_value(body, Opts, "http_handler_stream_body"),
Reply = proplists:get_value(reply, Opts),
- SFun = fun(Socket, Transport) -> Transport:send(Socket, Body) end,
+ SFun = fun () ->
+ cowboy_req:send_body(Body, nofin, Req)
+ end,
Req2 = case Reply of
set_resp ->
SLen = iolist_size(Body),
- cowboy_req:set_resp_body_fun(SLen, SFun, Req);
+ cowboy_req:set_resp_body({stream, SLen, SFun}, Req);
+ %% @todo Hmm that one will be sent as chunked now.
+ %% We need an option to disable chunked.
set_resp_close ->
- cowboy_req:set_resp_body_fun(SFun, Req);
+ cowboy_req:set_resp_body({stream, undefined, SFun}, Req);
set_resp_chunked ->
%% Here Body should be a list of chunks, not a binary.
- SFun2 = fun(SendFun) -> lists:foreach(SendFun, Body) end,
- cowboy_req:set_resp_body_fun(chunked, SFun2, Req)
+ SFun2 = fun () ->
+ lists:foreach(fun (Data) -> cowboy_req:send_body(Data, nofin, Req) end, Body)
+ end,
+ cowboy_req:set_resp_body({stream, undefined, SFun2}, Req)
end,
{ok, cowboy_req:reply(200, Req2), Opts}.
diff --git a/test/http_SUITE_data/rest_param_all.erl b/test/http_SUITE_data/rest_param_all.erl
index 2b2ea23..d74df74 100644
--- a/test/http_SUITE_data/rest_param_all.erl
+++ b/test/http_SUITE_data/rest_param_all.erl
@@ -17,7 +17,7 @@ content_types_provided(Req, State) ->
{[{{<<"text">>, <<"plain">>, '*'}, get_text_plain}], Req, State}.
get_text_plain(Req, State) ->
- {_, _, Param} = cowboy_req:meta(media_type, Req, {{<<"text">>, <<"plain">>}, []}),
+ {_, _, Param} = maps:get(media_type, Req, {{<<"text">>, <<"plain">>}, []}),
Body = if
Param == '*' ->
<<"'*'">>;
diff --git a/test/http_SUITE_data/rest_patch_resource.erl b/test/http_SUITE_data/rest_patch_resource.erl
index c1244e7..341920d 100644
--- a/test/http_SUITE_data/rest_patch_resource.erl
+++ b/test/http_SUITE_data/rest_patch_resource.erl
@@ -28,7 +28,7 @@ content_types_accepted(Req, State) ->
end.
patch_text_plain(Req, State) ->
- case cowboy_req:body(Req) of
+ case cowboy_req:read_body(Req) of
{ok, <<"stop">>, Req0} ->
{stop, cowboy_req:reply(400, Req0), State};
{ok, <<"false">>, Req0} ->
diff --git a/test/rfc7230_SUITE.erl b/test/rfc7230_SUITE.erl
index 15d1fc5..480cc4e 100644
--- a/test/rfc7230_SUITE.erl
+++ b/test/rfc7230_SUITE.erl
@@ -23,12 +23,12 @@
all() -> [{group, http}].
-groups() -> [{http, [parallel], ct_helper:all(?MODULE)}].
+groups() -> [{http, [parallel], ct_helper:all(?MODULE)}]. %% @todo parallel
init_per_group(Name = http, Config) ->
- cowboy_test:init_http(Name = http, [
- {env, [{dispatch, cowboy_router:compile(init_routes(Config))}]}
- ], Config).
+ cowboy_test:init_http(Name = http, #{
+ env => #{dispatch => cowboy_router:compile(init_routes(Config))}
+ }, Config).
end_per_group(Name, _) ->
ok = cowboy:stop_listener(Name).
@@ -52,9 +52,7 @@ do_raw(Config, Data) ->
{Version, Code, Reason, Rest} = cow_http:parse_status_line(raw_recv_head(Client)),
{Headers, Rest2} = cow_http:parse_headers(Rest),
case lists:keyfind(<<"content-length">>, 1, Headers) of
- false ->
- #{client => Client, version => Version, code => Code, reason => Reason, headers => Headers, body => <<>>};
- {_, LengthBin} ->
+ {_, LengthBin} when LengthBin =/= <<"0">> ->
Length = binary_to_integer(LengthBin),
Body = if
byte_size(Rest2) =:= Length -> Rest2;
@@ -62,7 +60,9 @@ do_raw(Config, Data) ->
{ok, Body0} = raw_recv(Client, binary_to_integer(LengthBin) - byte_size(Rest2), 5000),
<< Rest2/bits, Body0/bits >>
end,
- #{client => Client, version => Version, code => Code, reason => Reason, headers => Headers, body => Body}
+ #{client => Client, version => Version, code => Code, reason => Reason, headers => Headers, body => Body};
+ _ ->
+ #{client => Client, version => Version, code => Code, reason => Reason, headers => Headers, body => <<>>}
end.
%% Listener.
@@ -90,8 +90,8 @@ accept_at_least_1_empty_line(Config) ->
reject_response(Config) ->
doc("When receiving a response instead of a request, identified by the "
"status-line which starts with the HTTP version, the server must "
- "reject the message with a 501 status code and close the connection. (RFC7230 3.1)"),
- #{code := 501, client := Client} = do_raw(Config,
+ "reject the message with a 400 status code and close the connection. (RFC7230 3.1)"),
+ #{code := 400, client := Client} = do_raw(Config,
"HTTP/1.1 200 OK\r\n"
"\r\n"),
{error, closed} = raw_recv(Client, 0, 1000).
@@ -138,14 +138,14 @@ timeout_before_request_line(Config) ->
"by the reception of a complete request-line."),
Client = raw_open(Config),
ok = raw_send(Client, "GET / HTTP/1.1\r"),
- {error, closed} = raw_recv(Client, 0, 1000).
+ {error, closed} = raw_recv(Client, 0, 6000).
timeout_after_request_line(Config) ->
doc("The time the request (request line and headers) takes to be "
"received by the server must be limited and subject to configuration. "
"A 408 status code must be sent if the request line was received."),
#{code := 408, client := Client} = do_raw(Config, "GET / HTTP/1.1\r\n"),
- {error, closed} = raw_recv(Client, 0, 1000).
+ {error, closed} = raw_recv(Client, 0, 6000).
%% @todo Add an HTTP/1.0 test suite.
%An HTTP/1.1 server must understand any valid HTTP/1.0 request,
diff --git a/test/ws_SUITE.erl b/test/ws_SUITE.erl
index 4c5bb1d..4eaf456 100644
--- a/test/ws_SUITE.erl
+++ b/test/ws_SUITE.erl
@@ -37,17 +37,17 @@ init_per_group(Name = autobahn, Config) ->
"http://autobahn.ws/testsuite/installation.html"),
{skip, "Autobahn Test Suite not installed."};
_ ->
- {ok, _} = cowboy:start_http(Name, 100, [{port, 33080}], [
- {env, [{dispatch, init_dispatch()}]},
- {compress, true}
- ]),
+ {ok, _} = cowboy:start_clear(Name, 100, [{port, 33080}], #{
+ env => #{dispatch => init_dispatch()},
+ compress => true %% @todo Use a separate option for HTTP and Websocket compression.
+ }),
Config
end;
init_per_group(Name = ws, Config) ->
- cowboy_test:init_http(Name, [
- {env, [{dispatch, init_dispatch()}]},
- {compress, true}
- ], Config).
+ cowboy_test:init_http(Name, #{
+ env => #{dispatch => init_dispatch()},
+ compress => true %% @todo Use a separate option for HTTP and Websocket compression.
+ }, Config).
end_per_group(Listener, _Config) ->
cowboy:stop_listener(Listener).
diff --git a/test/ws_SUITE_data/ws_echo_timer.erl b/test/ws_SUITE_data/ws_echo_timer.erl
index a26c332..9761e1f 100644
--- a/test/ws_SUITE_data/ws_echo_timer.erl
+++ b/test/ws_SUITE_data/ws_echo_timer.erl
@@ -3,13 +3,17 @@
-module(ws_echo_timer).
-export([init/2]).
+-export([websocket_init/2]).
-export([websocket_handle/3]).
-export([websocket_info/3]).
init(Req, _) ->
- erlang:start_timer(1000, self(), <<"websocket_init">>),
{cowboy_websocket, Req, undefined}.
+websocket_init(Req, State) ->
+ erlang:start_timer(1000, self(), <<"websocket_init">>),
+ {ok, Req, State}.
+
websocket_handle({text, Data}, Req, State) ->
{reply, {text, Data}, Req, State};
websocket_handle({binary, Data}, Req, State) ->
diff --git a/test/ws_SUITE_data/ws_send_many.erl b/test/ws_SUITE_data/ws_send_many.erl
index 6585ffa..9cee75e 100644
--- a/test/ws_SUITE_data/ws_send_many.erl
+++ b/test/ws_SUITE_data/ws_send_many.erl
@@ -3,13 +3,17 @@
-module(ws_send_many).
-export([init/2]).
+-export([websocket_init/2]).
-export([websocket_handle/3]).
-export([websocket_info/3]).
init(Req, Opts) ->
- erlang:send_after(10, self(), send_many),
{cowboy_websocket, Req, Opts}.
+websocket_init(Req, State) ->
+ erlang:send_after(10, self(), send_many),
+ {ok, Req, State}.
+
websocket_handle(_Frame, Req, State) ->
{ok, Req, State}.