diff options
-rw-r--r-- | Makefile | 8 | ||||
-rw-r--r-- | src/cowboy.erl | 32 | ||||
-rw-r--r-- | src/cowboy_acceptor.erl | 38 | ||||
-rw-r--r-- | src/cowboy_acceptors_sup.erl | 3 | ||||
-rw-r--r-- | src/cowboy_http_rest.erl | 2 | ||||
-rw-r--r-- | src/cowboy_listener.erl | 172 | ||||
-rw-r--r-- | src/cowboy_listener_sup.erl | 3 | ||||
-rw-r--r-- | test/http_SUITE.erl | 17 | ||||
-rw-r--r-- | test/rest_nodelete_resource.erl | 17 |
9 files changed, 193 insertions, 99 deletions
@@ -24,18 +24,18 @@ eunit: @$(REBAR) eunit skip_deps=true ct: - @$(REBAR) ct skip_deps=true suites=http,ws + @$(REBAR) ct skip_deps=true suites=http,proper,ws intct: - @$(REBAR) ct skip_deps=true suites=http,ws,autobahn + @$(REBAR) ct skip_deps=true suites=http,proper,ws,autobahn build-plt: @$(DIALYZER) --build_plt --output_plt .cowboy_dialyzer.plt \ --apps kernel stdlib sasl inets crypto public_key ssl dialyze: - @$(DIALYZER) --src src --plt .cowboy_dialyzer.plt -Werror_handling \ - -Wrace_conditions -Wunmatched_returns # -Wunderspecs + @$(DIALYZER) --src src --plt .cowboy_dialyzer.plt --no_native \ + -Werror_handling -Wrace_conditions -Wunmatched_returns # -Wunderspecs docs: @$(REBAR) doc skip_deps=true diff --git a/src/cowboy.erl b/src/cowboy.erl index 6defeea..7963df2 100644 --- a/src/cowboy.erl +++ b/src/cowboy.erl @@ -15,7 +15,8 @@ %% @doc Cowboy API to start and stop listeners. -module(cowboy). --export([start_listener/6, stop_listener/1, child_spec/6, accept_ack/1]). +-export([start_listener/6, stop_listener/1, child_spec/6, accept_ack/1, + get_protocol_options/1, set_protocol_options/2]). %% @doc Start a listener for the given transport and protocol. %% @@ -83,3 +84,32 @@ child_spec(Ref, NbAcceptors, Transport, TransOpts, Protocol, ProtoOpts) -spec accept_ack(pid()) -> ok. accept_ack(ListenerPid) -> receive {shoot, ListenerPid} -> ok end. + +%% @doc Return the current protocol options for the given listener. +-spec get_protocol_options(any()) -> any(). +get_protocol_options(Ref) -> + ListenerPid = ref_to_listener_pid(Ref), + {ok, ProtoOpts} = cowboy_listener:get_protocol_options(ListenerPid), + ProtoOpts. + +%% @doc Upgrade the protocol options for the given listener. +%% +%% The upgrade takes place at the acceptor level, meaning that only the +%% newly accepted connections receive the new protocol options. This has +%% no effect on the currently opened connections. +-spec set_protocol_options(any(), any()) -> ok. +set_protocol_options(Ref, ProtoOpts) -> + ListenerPid = ref_to_listener_pid(Ref), + ok = cowboy_listener:set_protocol_options(ListenerPid, ProtoOpts). + +%% Internal. + +-spec ref_to_listener_pid(any()) -> pid(). +ref_to_listener_pid(Ref) -> + Children = supervisor:which_children(cowboy_sup), + {_, ListenerSupPid, _, _} = lists:keyfind( + {cowboy_listener_sup, Ref}, 1, Children), + ListenerSupChildren = supervisor:which_children(ListenerSupPid), + {_, ListenerPid, _, _} = lists:keyfind( + cowboy_listener, 1, ListenerSupChildren), + ListenerPid. diff --git a/src/cowboy_acceptor.erl b/src/cowboy_acceptor.erl index 4cb9fa7..29f7c09 100644 --- a/src/cowboy_acceptor.erl +++ b/src/cowboy_acceptor.erl @@ -15,45 +15,43 @@ %% @private -module(cowboy_acceptor). --export([start_link/7]). %% API. +-export([start_link/6]). %% API. -export([acceptor/7]). %% Internal. %% API. -spec start_link(inet:socket(), module(), module(), any(), - non_neg_integer(), pid(), pid()) -> {ok, pid()}. + pid(), pid()) -> {ok, pid()}. start_link(LSocket, Transport, Protocol, Opts, - MaxConns, ListenerPid, ReqsSup) -> + ListenerPid, ReqsSup) -> Pid = spawn_link(?MODULE, acceptor, - [LSocket, Transport, Protocol, Opts, MaxConns, ListenerPid, ReqsSup]), + [LSocket, Transport, Protocol, Opts, 1, ListenerPid, ReqsSup]), {ok, Pid}. %% Internal. -spec acceptor(inet:socket(), module(), module(), any(), non_neg_integer(), pid(), pid()) -> no_return(). -acceptor(LSocket, Transport, Protocol, Opts, MaxConns, ListenerPid, ReqsSup) -> - case Transport:accept(LSocket, 2000) of +acceptor(LSocket, Transport, Protocol, Opts, OptsVsn, ListenerPid, ReqsSup) -> + Res = case Transport:accept(LSocket, 2000) of {ok, CSocket} -> {ok, Pid} = supervisor:start_child(ReqsSup, [ListenerPid, CSocket, Transport, Protocol, Opts]), Transport:controlling_process(CSocket, Pid), - {ok, NbConns} = cowboy_listener:add_connection(ListenerPid, - default, Pid), - Pid ! {shoot, ListenerPid}, - limit_reqs(ListenerPid, NbConns, MaxConns); + cowboy_listener:add_connection(ListenerPid, + default, Pid, OptsVsn); {error, timeout} -> - ignore; + ok; {error, _Reason} -> %% @todo Probably do something here. If the socket was closed, %% we may want to try and listen again on the port? - ignore + ok end, - ?MODULE:acceptor(LSocket, Transport, Protocol, Opts, - MaxConns, ListenerPid, ReqsSup). - --spec limit_reqs(pid(), non_neg_integer(), non_neg_integer()) -> ok. -limit_reqs(_ListenerPid, NbConns, MaxConns) when NbConns =< MaxConns -> - ok; -limit_reqs(ListenerPid, _NbConns, MaxConns) -> - cowboy_listener:wait(ListenerPid, default, MaxConns). + case Res of + ok -> + ?MODULE:acceptor(LSocket, Transport, Protocol, + Opts, OptsVsn, ListenerPid, ReqsSup); + {upgrade, Opts2, OptsVsn2} -> + ?MODULE:acceptor(LSocket, Transport, Protocol, + Opts2, OptsVsn2, ListenerPid, ReqsSup) + end. diff --git a/src/cowboy_acceptors_sup.erl b/src/cowboy_acceptors_sup.erl index 17849a6..625028c 100644 --- a/src/cowboy_acceptors_sup.erl +++ b/src/cowboy_acceptors_sup.erl @@ -34,10 +34,9 @@ start_link(NbAcceptors, Transport, TransOpts, init([NbAcceptors, Transport, TransOpts, Protocol, ProtoOpts, ListenerPid, ReqsPid]) -> {ok, LSocket} = Transport:listen(TransOpts), - MaxConns = proplists:get_value(max_connections, TransOpts, 1024), Procs = [{{acceptor, self(), N}, {cowboy_acceptor, start_link, [ LSocket, Transport, Protocol, ProtoOpts, - MaxConns, ListenerPid, ReqsPid + ListenerPid, ReqsPid ]}, permanent, brutal_kill, worker, []} || N <- lists:seq(1, NbAcceptors)], {ok, {{one_for_one, 10, 10}, Procs}}. diff --git a/src/cowboy_http_rest.erl b/src/cowboy_http_rest.erl index e6cc6ff..392b172 100644 --- a/src/cowboy_http_rest.erl +++ b/src/cowboy_http_rest.erl @@ -648,7 +648,7 @@ method(Req, State) -> %% delete_resource/2 should start deleting the resource and return. delete_resource(Req, State) -> - expect(Req, State, delete_resource, true, fun delete_completed/2, 500). + expect(Req, State, delete_resource, false, 500, fun delete_completed/2). %% delete_completed/2 indicates whether the resource has been deleted yet. delete_completed(Req, State) -> diff --git a/src/cowboy_listener.erl b/src/cowboy_listener.erl index c19d079..b12e059 100644 --- a/src/cowboy_listener.erl +++ b/src/cowboy_listener.erl @@ -16,15 +16,21 @@ -module(cowboy_listener). -behaviour(gen_server). --export([start_link/0, stop/1, - add_connection/3, move_connection/3, remove_connection/2, wait/3]). %% API. +-export([start_link/2, stop/1, + add_connection/4, move_connection/3, remove_connection/2, + get_protocol_options/1, set_protocol_options/2]). %% API. -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). %% gen_server. +-type pools() :: [{atom(), non_neg_integer()}]. + -record(state, { - req_pools = [] :: [{atom(), non_neg_integer()}], + req_pools = [] :: pools(), reqs_table :: ets:tid(), - queue = [] :: [{pid(), reference()}] + queue = undefined :: queue(), + max_conns = undefined :: non_neg_integer(), + proto_opts :: any(), + proto_opts_vsn = 1 :: non_neg_integer() }). %% API. @@ -36,9 +42,10 @@ %% Setting the process priority to high ensures the connection-related code %% will always be executed when a connection needs it, allowing Cowboy to %% scale far beyond what it would with a normal priority. --spec start_link() -> {ok, pid()}. -start_link() -> - gen_server:start_link(?MODULE, [], [{spawn_opt, [{priority, high}]}]). +-spec start_link(non_neg_integer(), any()) -> {ok, pid()}. +start_link(MaxConns, ProtoOpts) -> + gen_server:start_link(?MODULE, [MaxConns, ProtoOpts], + [{spawn_opt, [{priority, high}]}]). %% @private -spec stop(pid()) -> stopped. @@ -50,15 +57,22 @@ stop(ServerPid) -> %% Pools of connections are used to restrict the maximum number of connections %% depending on their type. By default, Cowboy add all connections to the %% pool <em>default</em>. It also checks for the maximum number of connections -%% in that pool before accepting again. +%% in that pool before accepting again. This function only returns when there +%% is free space in the pool. %% %% When a process managing a connection dies, the process is removed from the %% pool. If the socket has been sent to another process, it is up to the %% protocol code to inform the listener of the new <em>ConnPid</em> by removing %% the previous and adding the new one. --spec add_connection(pid(), atom(), pid()) -> {ok, non_neg_integer()}. -add_connection(ServerPid, Pool, ConnPid) -> - gen_server:call(ServerPid, {add_connection, Pool, ConnPid}). +%% +%% This function also returns whether the protocol options have been modified. +%% If so, then an {upgrade, ProtoOpts, OptsVsn} will be returned instead of +%% the atom 'ok'. The acceptor can then continue with the new protocol options. +-spec add_connection(pid(), atom(), pid(), non_neg_integer()) + -> ok | {upgrade, any(), non_neg_integer()}. +add_connection(ServerPid, Pool, ConnPid, OptsVsn) -> + gen_server:call(ServerPid, {add_connection, Pool, ConnPid, OptsVsn}, + infinity). %% @doc Move a connection from one pool to another. -spec move_connection(pid(), atom(), pid()) -> ok. @@ -70,47 +84,48 @@ move_connection(ServerPid, DestPool, ConnPid) -> remove_connection(ServerPid, ConnPid) -> gen_server:cast(ServerPid, {remove_connection, ConnPid}). -%% @doc Wait until the number of connections in the given pool gets below -%% the given threshold. -%% -%% This function will not return until the number of connections in the pool -%% gets below <em>MaxConns</em>. It makes use of <em>gen_server:reply/2</em> -%% to make the process wait for a reply indefinitely. --spec wait(pid(), atom(), non_neg_integer()) -> ok. -wait(ServerPid, Pool, MaxConns) -> - gen_server:call(ServerPid, {wait, Pool, MaxConns}, infinity). +%% @doc Return the current protocol options. +-spec get_protocol_options(pid()) -> {ok, any()}. +get_protocol_options(ServerPid) -> + gen_server:call(ServerPid, get_protocol_options). + +%% @doc Upgrade the protocol options. +-spec set_protocol_options(pid(), any()) -> ok. +set_protocol_options(ServerPid, ProtoOpts) -> + gen_server:call(ServerPid, {set_protocol_options, ProtoOpts}). %% gen_server. %% @private --spec init([]) -> {ok, #state{}}. -init([]) -> - ReqsTablePid = ets:new(requests_table, [set, private]), - {ok, #state{reqs_table=ReqsTablePid}}. +-spec init(list()) -> {ok, #state{}}. +init([MaxConns, ProtoOpts]) -> + ReqsTable = ets:new(requests_table, [set, private]), + Queue = queue:new(), + {ok, #state{reqs_table=ReqsTable, max_conns=MaxConns, + proto_opts=ProtoOpts, queue=Queue}}. %% @private -spec handle_call(_, _, State) -> {reply, ignored, State} | {stop, normal, stopped, State}. -handle_call({add_connection, Pool, ConnPid}, _From, State=#state{ - req_pools=Pools, reqs_table=ReqsTable}) -> - MonitorRef = erlang:monitor(process, ConnPid), - {NbConnsRet, Pools2} = case lists:keyfind(Pool, 1, Pools) of - false -> - {1, [{Pool, 1}|Pools]}; - {Pool, NbConns} -> - NbConns2 = NbConns + 1, - {NbConns2, [{Pool, NbConns2}|lists:keydelete(Pool, 1, Pools)]} - end, - ets:insert(ReqsTable, {ConnPid, {MonitorRef, Pool}}), - {reply, {ok, NbConnsRet}, State#state{req_pools=Pools2}}; -handle_call({wait, Pool, MaxConns}, From, State=#state{ - req_pools=Pools, queue=Queue}) -> - case lists:keyfind(Pool, 1, Pools) of - {Pool, NbConns} when NbConns > MaxConns -> - {noreply, State#state{queue=[From|Queue]}}; - _Any -> - {reply, ok, State} +handle_call({add_connection, Pool, ConnPid, AccOptsVsn}, From, State=#state{ + req_pools=Pools, reqs_table=ReqsTable, + queue=Queue, max_conns=MaxConns, + proto_opts=ProtoOpts, proto_opts_vsn=LisOptsVsn}) -> + {NbConns, Pools2} = add_pid(ConnPid, Pool, Pools, ReqsTable), + State2 = State#state{req_pools=Pools2}, + if AccOptsVsn =/= LisOptsVsn -> + {reply, {ugprade, ProtoOpts, LisOptsVsn}, State2}; + NbConns > MaxConns -> + Queue2 = queue:in(From, Queue), + {noreply, State2#state{queue=Queue2}}; + true -> + {reply, ok, State2} end; +handle_call(get_protocol_options, _From, State=#state{proto_opts=ProtoOpts}) -> + {reply, {ok, ProtoOpts}, State}; +handle_call({set_protocol_options, ProtoOpts}, _From, + State=#state{proto_opts_vsn=OptsVsn}) -> + {reply, ok, State#state{proto_opts=ProtoOpts, proto_opts_vsn=OptsVsn + 1}}; handle_call(stop, _From, State) -> {stop, normal, stopped, State}; handle_call(_Request, _From, State) -> @@ -120,27 +135,21 @@ handle_call(_Request, _From, State) -> -spec handle_cast(_, State) -> {noreply, State}. handle_cast({move_connection, DestPool, ConnPid}, State=#state{ req_pools=Pools, reqs_table=ReqsTable}) -> - {MonitorRef, SrcPool} = ets:lookup_element(ReqsTable, ConnPid, 2), - ets:insert(ReqsTable, {ConnPid, {MonitorRef, DestPool}}), - {SrcPool, SrcNbConns} = lists:keyfind(SrcPool, 1, Pools), - DestNbConns = case lists:keyfind(DestPool, 1, Pools) of - false -> 1; - {DestPool, NbConns} -> NbConns + 1 - end, - Pools2 = lists:keydelete(SrcPool, 1, lists:keydelete(DestPool, 1, Pools)), - Pools3 = [{SrcPool, SrcNbConns - 1}, {DestPool, DestNbConns}|Pools2], - {noreply, State#state{req_pools=Pools3}}; -handle_cast({remove_connection, ConnPid}, State) -> - State2 = remove_pid(ConnPid, State), - {noreply, State2}; + Pools2 = move_pid(ConnPid, DestPool, Pools, ReqsTable), + {noreply, State#state{req_pools=Pools2}}; +handle_cast({remove_connection, ConnPid}, State=#state{ + req_pools=Pools, reqs_table=ReqsTable, queue=Queue}) -> + {Pools2, Queue2} = remove_pid(ConnPid, Pools, ReqsTable, Queue), + {noreply, State#state{req_pools=Pools2, queue=Queue2}}; handle_cast(_Msg, State) -> {noreply, State}. %% @private -spec handle_info(_, State) -> {noreply, State}. -handle_info({'DOWN', _Ref, process, Pid, _Info}, State) -> - State2 = remove_pid(Pid, State), - {noreply, State2}; +handle_info({'DOWN', _Ref, process, Pid, _Info}, State=#state{ + req_pools=Pools, reqs_table=ReqsTable, queue=Queue}) -> + {Pools2, Queue2} = remove_pid(Pid, Pools, ReqsTable, Queue), + {noreply, State#state{req_pools=Pools2, queue=Queue2}}; handle_info(_Info, State) -> {noreply, State}. @@ -157,18 +166,47 @@ code_change(_OldVsn, State, _Extra) -> %% Internal. %% @private --spec remove_pid(pid(), State) -> State. -remove_pid(Pid, State=#state{ - req_pools=Pools, reqs_table=ReqsTable, queue=Queue}) -> +-spec add_pid(pid(), atom(), pools(), ets:tid()) + -> {non_neg_integer(), pools()}. +add_pid(ConnPid, Pool, Pools, ReqsTable) -> + MonitorRef = erlang:monitor(process, ConnPid), + ConnPid ! {shoot, self()}, + {NbConnsRet, Pools2} = case lists:keyfind(Pool, 1, Pools) of + false -> + {1, [{Pool, 1}|Pools]}; + {Pool, NbConns} -> + NbConns2 = NbConns + 1, + {NbConns2, [{Pool, NbConns2}|lists:keydelete(Pool, 1, Pools)]} + end, + ets:insert(ReqsTable, {ConnPid, {MonitorRef, Pool}}), + {NbConnsRet, Pools2}. + +%% @private +-spec move_pid(pid(), atom(), pools(), ets:tid()) -> pools(). +move_pid(ConnPid, DestPool, Pools, ReqsTable) -> + {MonitorRef, SrcPool} = ets:lookup_element(ReqsTable, ConnPid, 2), + ets:insert(ReqsTable, {ConnPid, {MonitorRef, DestPool}}), + {SrcPool, SrcNbConns} = lists:keyfind(SrcPool, 1, Pools), + DestNbConns = case lists:keyfind(DestPool, 1, Pools) of + false -> 1; + {DestPool, NbConns} -> NbConns + 1 + end, + Pools2 = lists:keydelete(SrcPool, 1, lists:keydelete(DestPool, 1, Pools)), + [{SrcPool, SrcNbConns - 1}, {DestPool, DestNbConns}|Pools2]. + +%% @private +-spec remove_pid(pid(), pools(), ets:tid(), queue()) -> {pools(), queue()}. +remove_pid(Pid, Pools, ReqsTable, Queue) -> {MonitorRef, Pool} = ets:lookup_element(ReqsTable, Pid, 2), erlang:demonitor(MonitorRef, [flush]), {Pool, NbConns} = lists:keyfind(Pool, 1, Pools), Pools2 = [{Pool, NbConns - 1}|lists:keydelete(Pool, 1, Pools)], ets:delete(ReqsTable, Pid), - case Queue of - [] -> - State#state{req_pools=Pools2}; - [Client|Queue2] -> + case queue:len(Queue) of + 0 -> + {Pools2, Queue}; + _ -> + {{value, Client}, Queue2} = queue:out(Queue), gen_server:reply(Client, ok), - State#state{req_pools=Pools2, queue=Queue2} + {Pools2, Queue2} end. diff --git a/src/cowboy_listener_sup.erl b/src/cowboy_listener_sup.erl index aca2b0b..da6eca3 100644 --- a/src/cowboy_listener_sup.erl +++ b/src/cowboy_listener_sup.erl @@ -24,9 +24,10 @@ -spec start_link(non_neg_integer(), module(), any(), module(), any()) -> {ok, pid()}. start_link(NbAcceptors, Transport, TransOpts, Protocol, ProtoOpts) -> + MaxConns = proplists:get_value(max_connections, TransOpts, 1024), {ok, SupPid} = supervisor:start_link(?MODULE, []), {ok, ListenerPid} = supervisor:start_child(SupPid, - {cowboy_listener, {cowboy_listener, start_link, []}, + {cowboy_listener, {cowboy_listener, start_link, [MaxConns, ProtoOpts]}, permanent, 5000, worker, [cowboy_listener]}), {ok, ReqsPid} = supervisor:start_child(SupPid, {cowboy_requests_sup, {cowboy_requests_sup, start_link, []}, diff --git a/test/http_SUITE.erl b/test/http_SUITE.erl index bad91a8..862a110 100644 --- a/test/http_SUITE.erl +++ b/test/http_SUITE.erl @@ -29,7 +29,7 @@ file_200/1, file_403/1, dir_403/1, file_404/1, file_400/1]). %% http and https. -export([http_10_hostless/1]). %% misc. --export([rest_simple/1, rest_keepalive/1, rest_keepalive_post/1]). %% rest. +-export([rest_simple/1, rest_keepalive/1, rest_keepalive_post/1, rest_nodelete/1]). %% rest. %% ct. @@ -47,7 +47,7 @@ groups() -> static_function_etag, multipart] ++ BaseTests}, {https, [], BaseTests}, {misc, [], [http_10_hostless]}, - {rest, [], [rest_simple, rest_keepalive, rest_keepalive_post]}]. + {rest, [], [rest_simple, rest_keepalive, rest_keepalive_post, rest_nodelete]}]. init_per_suite(Config) -> application:start(inets), @@ -97,7 +97,8 @@ init_per_group(rest, Config) -> cowboy_http_protocol, [{dispatch, [{'_', [ {[<<"simple">>], rest_simple_resource, []}, {[<<"forbidden_post">>], rest_forbidden_resource, [true]}, - {[<<"simple_post">>], rest_forbidden_resource, [false]} + {[<<"simple_post">>], rest_forbidden_resource, [false]}, + {[<<"nodelete">>], rest_nodelete_resource, []} ]}]}]), [{port, Port}|Config]. @@ -611,3 +612,13 @@ rest_keepalive_post_loop(Socket, N, forbidden_post) -> {0, 12} = binary:match(Data, <<"HTTP/1.1 403">>), nomatch = binary:match(Data, <<"Connection: close">>), rest_keepalive_post_loop(Socket, N - 1, simple_post). + +rest_nodelete(Config) -> + {port, Port} = lists:keyfind(port, 1, Config), + {ok, Socket} = gen_tcp:connect("localhost", Port, + [binary, {active, false}, {packet, raw}]), + Request = "DELETE /nodelete HTTP/1.1\r\nHost: localhost\r\n\r\n", + ok = gen_tcp:send(Socket, Request), + {ok, Data} = gen_tcp:recv(Socket, 0, 6000), + {0, 12} = binary:match(Data, <<"HTTP/1.1 500">>), + ok = gen_tcp:close(Socket). diff --git a/test/rest_nodelete_resource.erl b/test/rest_nodelete_resource.erl new file mode 100644 index 0000000..e0ece5a --- /dev/null +++ b/test/rest_nodelete_resource.erl @@ -0,0 +1,17 @@ +-module(rest_nodelete_resource). +-export([init/3, allowed_methods/2, content_types_provided/2, + get_text_plain/2]). + +init(_Transport, _Req, _Opts) -> + {upgrade, protocol, cowboy_http_rest}. + +allowed_methods(Req, State) -> + {['GET', 'HEAD', 'DELETE'], Req, State}. + + +content_types_provided(Req, State) -> + {[{{<<"text">>, <<"plain">>, []}, get_text_plain}], Req, State}. + +get_text_plain(Req, State) -> + {<<"This is REST!">>, Req, State}. + |