diff options
Diffstat (limited to 'src/ranch_acceptor.erl')
-rw-r--r-- | src/ranch_acceptor.erl | 71 |
1 files changed, 45 insertions, 26 deletions
diff --git a/src/ranch_acceptor.erl b/src/ranch_acceptor.erl index 692277b..e03cde9 100644 --- a/src/ranch_acceptor.erl +++ b/src/ranch_acceptor.erl @@ -19,43 +19,62 @@ -export([start_link/6]). %% Internal. --export([acceptor/7]). +-export([init/7]). +-export([loop/7]). %% API. -spec start_link(any(), inet:socket(), module(), module(), pid(), pid()) -> {ok, pid()}. start_link(Ref, LSocket, Transport, Protocol, ListenerPid, ConnsSup) -> + {ok, MaxConns} = ranch_listener:get_max_connections(ListenerPid), {ok, Opts} = ranch_listener:get_protocol_options(ListenerPid), - Pid = spawn_link(?MODULE, acceptor, - [LSocket, Transport, Protocol, Opts, 1, ListenerPid, ConnsSup]), + Pid = spawn_link(?MODULE, init, + [LSocket, Transport, Protocol, MaxConns, Opts, ListenerPid, ConnsSup]), ok = ranch_server:add_acceptor(Ref, Pid), {ok, Pid}. %% Internal. --spec acceptor(inet:socket(), module(), module(), any(), - non_neg_integer(), pid(), pid()) -> no_return(). -acceptor(LSocket, Transport, Protocol, Opts, OptsVsn, ListenerPid, ConnsSup) -> - Res = case Transport:accept(LSocket, 2000) of - {ok, CSocket} -> - {ok, Pid} = supervisor:start_child(ConnsSup, +-spec init(inet:socket(), module(), module(), + non_neg_integer(), any(), pid(), pid()) -> no_return(). +init(LSocket, Transport, Protocol, MaxConns, Opts, ListenerPid, ConnsSup) -> + async_accept(LSocket, Transport), + loop(LSocket, Transport, Protocol, MaxConns, Opts, ListenerPid, ConnsSup). + +-spec loop(inet:socket(), module(), module(), + non_neg_integer(), any(), pid(), pid()) -> no_return(). +loop(LSocket, Transport, Protocol, MaxConns, Opts, ListenerPid, ConnsSup) -> + receive + {accept, CSocket} -> + {ok, ConnPid} = supervisor:start_child(ConnsSup, [ListenerPid, CSocket, Transport, Protocol, Opts]), - Transport:controlling_process(CSocket, Pid), - ranch_listener:add_connection(ListenerPid, - default, Pid, OptsVsn); - {error, timeout} -> - ranch_listener:check_upgrades(ListenerPid, OptsVsn); - {error, _Reason} -> - %% @todo Probably do something here. If the socket was closed, - %% we may want to try and listen again on the port? - ok - end, - case Res of - ok -> - ?MODULE:acceptor(LSocket, Transport, Protocol, - Opts, OptsVsn, ListenerPid, ConnsSup); - {upgrade, Opts2, OptsVsn2} -> - ?MODULE:acceptor(LSocket, Transport, Protocol, - Opts2, OptsVsn2, ListenerPid, ConnsSup) + Transport:controlling_process(CSocket, ConnPid), + ConnPid ! {shoot, ListenerPid}, + NbConns = ranch_listener:add_connection(ListenerPid, ConnPid), + maybe_wait(ListenerPid, MaxConns, NbConns), + ?MODULE:init(LSocket, Transport, Protocol, + MaxConns, Opts, ListenerPid, ConnsSup); + {set_opts, Opts2} -> + ?MODULE:loop(LSocket, Transport, Protocol, + MaxConns, Opts2, ListenerPid, ConnsSup) end. + +-spec maybe_wait(pid(), non_neg_integer(), non_neg_integer()) -> ok. +maybe_wait(_, MaxConns, NbConns) when MaxConns > NbConns -> + ok; +maybe_wait(ListenerPid, MaxConns, _) -> + erlang:yield(), + NbConns2 = ranch_server:count_connections(ListenerPid), + maybe_wait(ListenerPid, MaxConns, NbConns2). + +-spec async_accept(inet:socket(), module()) -> ok. +async_accept(LSocket, Transport) -> + AcceptorPid = self(), + _ = spawn_link(fun() -> + %% @todo {error, closed} must be handled and other errors ignored. + {ok, CSocket} = Transport:accept(LSocket, infinity), + Transport:controlling_process(CSocket, AcceptorPid), + AcceptorPid ! {accept, CSocket} + end), + ok. |