From 43d14b52cd07dfd1121bbe6727a96dfd32304e47 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Hoguin?= Date: Wed, 10 Aug 2011 20:28:30 +0200 Subject: Give the ListenerPid to the protocol on startup Also sends a message 'shoot' that can be received by the protocol to make sure Cowboy has had enough time to fully initialize the socket. This message should be received before any socket-related operations are performed. WebSocket request connections are now moved from the pool 'default' to the pool 'websocket', meaning we can have a lot of running WebSockets despite having a low 'max_connections' setting. --- src/cowboy_acceptor.erl | 3 ++- src/cowboy_http_protocol.erl | 22 ++++++++++++---------- src/cowboy_http_websocket.erl | 7 ++++--- src/cowboy_listener.erl | 19 ++++++++++++++++++- src/cowboy_requests_sup.erl | 9 +++++---- 5 files changed, 41 insertions(+), 19 deletions(-) (limited to 'src') diff --git a/src/cowboy_acceptor.erl b/src/cowboy_acceptor.erl index be63cef..f2b603e 100644 --- a/src/cowboy_acceptor.erl +++ b/src/cowboy_acceptor.erl @@ -36,10 +36,11 @@ acceptor(LSocket, Transport, Protocol, Opts, MaxConns, ListenerPid, ReqsSup) -> case Transport:accept(LSocket, 2000) of {ok, CSocket} -> {ok, Pid} = supervisor:start_child(ReqsSup, - [CSocket, Transport, Protocol, Opts]), + [ListenerPid, CSocket, Transport, Protocol, Opts]), Transport:controlling_process(CSocket, Pid), {ok, NbConns} = cowboy_listener:add_connection(ListenerPid, default, Pid), + Pid ! shoot, limit_reqs(ListenerPid, NbConns, MaxConns); {error, timeout} -> ignore; diff --git a/src/cowboy_http_protocol.erl b/src/cowboy_http_protocol.erl index 421978b..a7da5bb 100644 --- a/src/cowboy_http_protocol.erl +++ b/src/cowboy_http_protocol.erl @@ -31,12 +31,13 @@ %% @see cowboy_http_handler -module(cowboy_http_protocol). --export([start_link/3]). %% API. --export([init/3, parse_request/1]). %% FSM. +-export([start_link/4]). %% API. +-export([init/4, parse_request/1]). %% FSM. -include("include/http.hrl"). -record(state, { + listener :: pid(), socket :: inet:socket(), transport :: module(), dispatch :: cowboy_dispatcher:dispatch_rules(), @@ -51,20 +52,21 @@ %% API. %% @doc Start an HTTP protocol process. --spec start_link(inet:socket(), module(), any()) -> {ok, pid()}. -start_link(Socket, Transport, Opts) -> - Pid = spawn_link(?MODULE, init, [Socket, Transport, Opts]), +-spec start_link(pid(), inet:socket(), module(), any()) -> {ok, pid()}. +start_link(ListenerPid, Socket, Transport, Opts) -> + Pid = spawn_link(?MODULE, init, [ListenerPid, Socket, Transport, Opts]), {ok, Pid}. %% FSM. %% @private --spec init(inet:socket(), module(), any()) -> ok. -init(Socket, Transport, Opts) -> +-spec init(pid(), inet:socket(), module(), any()) -> ok. +init(ListenerPid, Socket, Transport, Opts) -> Dispatch = proplists:get_value(dispatch, Opts, []), MaxEmptyLines = proplists:get_value(max_empty_lines, Opts, 5), Timeout = proplists:get_value(timeout, Opts, 5000), - wait_request(#state{socket=Socket, transport=Transport, + receive shoot -> ok end, + wait_request(#state{listener=ListenerPid, socket=Socket, transport=Transport, dispatch=Dispatch, max_empty_lines=MaxEmptyLines, timeout=Timeout}). %% @private @@ -189,14 +191,14 @@ dispatch(Req=#http_req{host=Host, path=Path}, end. -spec handler_init(#http_req{}, #state{}) -> ok. -handler_init(Req, State=#state{ +handler_init(Req, State=#state{listener=ListenerPid, transport=Transport, handler={Handler, Opts}}) -> try Handler:init({Transport:name(), http}, Req, Opts) of {ok, Req2, HandlerState} -> handler_loop(HandlerState, Req2, State); %% @todo {upgrade, transport, Module} {upgrade, protocol, Module} -> - Module:upgrade(Handler, Opts, Req) + Module:upgrade(ListenerPid, Handler, Opts, Req) catch Class:Reason -> error_terminate(500, State), error_logger:error_msg( diff --git a/src/cowboy_http_websocket.erl b/src/cowboy_http_websocket.erl index 66187f0..63113d6 100644 --- a/src/cowboy_http_websocket.erl +++ b/src/cowboy_http_websocket.erl @@ -23,7 +23,7 @@ %% -module(cowboy_http_websocket). --export([upgrade/3]). %% API. +-export([upgrade/4]). %% API. -export([handler_loop/4]). %% Internal. -include("include/http.hrl"). @@ -45,8 +45,9 @@ %% You do not need to call this function manually. To upgrade to the WebSocket %% protocol, you simply need to return {upgrade, protocol, {@module}} %% in your cowboy_http_handler:init/3 handler function. --spec upgrade(module(), any(), #http_req{}) -> ok. -upgrade(Handler, Opts, Req) -> +-spec upgrade(pid(), module(), any(), #http_req{}) -> ok. +upgrade(ListenerPid, Handler, Opts, Req) -> + cowboy_listener:move_connection(ListenerPid, websocket, self()), EOP = binary:compile_pattern(<< 255 >>), case catch websocket_upgrade(#state{handler=Handler, opts=Opts, eop=EOP}, Req) of {ok, State, Req2} -> handler_init(State, Req2); diff --git a/src/cowboy_listener.erl b/src/cowboy_listener.erl index 2f5ccc2..8b656ba 100644 --- a/src/cowboy_listener.erl +++ b/src/cowboy_listener.erl @@ -17,7 +17,7 @@ -behaviour(gen_server). -export([start_link/0, stop/1, - add_connection/3, remove_connection/2, wait/3]). %% API. + add_connection/3, move_connection/3, remove_connection/2, wait/3]). %% API. -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). %% gen_server. @@ -54,6 +54,11 @@ stop(ServerPid) -> add_connection(ServerPid, Pool, ConnPid) -> gen_server:call(ServerPid, {add_connection, Pool, ConnPid}). +%% @doc Move a connection from one pool to another. +-spec move_connection(pid(), atom(), pid()) -> ok. +move_connection(ServerPid, DestPool, ConnPid) -> + gen_server:cast(ServerPid, {move_connection, DestPool, ConnPid}). + %% @doc Remove the given connection from its pool. -spec remove_connection(pid(), pid()) -> ok. remove_connection(ServerPid, ConnPid) -> @@ -107,6 +112,18 @@ handle_call(_Request, _From, State) -> %% @private -spec handle_cast(_, State) -> {noreply, State}. +handle_cast({move_connection, DestPool, ConnPid}, State=#state{ + req_pools=Pools, reqs_table=ReqsTable}) -> + {MonitorRef, SrcPool} = ets:lookup_element(ReqsTable, ConnPid, 2), + ets:insert(ReqsTable, {ConnPid, {MonitorRef, DestPool}}), + {SrcPool, SrcNbConns} = lists:keyfind(SrcPool, 1, Pools), + DestNbConns = case lists:keyfind(DestPool, 1, Pools) of + false -> 1; + {DestPool, NbConns} -> NbConns + 1 + end, + Pools2 = lists:keydelete(SrcPool, 1, lists:keydelete(DestPool, 1, Pools)), + Pools3 = [{SrcPool, SrcNbConns - 1}, {DestPool, DestNbConns}|Pools2], + {noreply, State#state{req_pools=Pools3}}; handle_cast({remove_connection, ConnPid}, State) -> State2 = remove_pid(ConnPid, State), {noreply, State2}; diff --git a/src/cowboy_requests_sup.erl b/src/cowboy_requests_sup.erl index a50ee8a..87d5352 100644 --- a/src/cowboy_requests_sup.erl +++ b/src/cowboy_requests_sup.erl @@ -16,7 +16,7 @@ -module(cowboy_requests_sup). -behaviour(supervisor). --export([start_link/0, start_request/4]). %% API. +-export([start_link/0, start_request/5]). %% API. -export([init/1]). %% supervisor. %% API. @@ -25,9 +25,10 @@ start_link() -> supervisor:start_link(?MODULE, []). --spec start_request(inet:socket(), module(), module(), any()) -> {ok, pid()}. -start_request(Socket, Transport, Protocol, Opts) -> - Protocol:start_link(Socket, Transport, Opts). +-spec start_request(pid(), inet:socket(), module(), module(), any()) + -> {ok, pid()}. +start_request(ListenerPid, Socket, Transport, Protocol, Opts) -> + Protocol:start_link(ListenerPid, Socket, Transport, Opts). %% supervisor. -- cgit v1.2.3