aboutsummaryrefslogtreecommitdiffstats
path: root/src/ranch_acceptor.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/ranch_acceptor.erl')
-rw-r--r--src/ranch_acceptor.erl71
1 files changed, 45 insertions, 26 deletions
diff --git a/src/ranch_acceptor.erl b/src/ranch_acceptor.erl
index 692277b..e03cde9 100644
--- a/src/ranch_acceptor.erl
+++ b/src/ranch_acceptor.erl
@@ -19,43 +19,62 @@
-export([start_link/6]).
%% Internal.
--export([acceptor/7]).
+-export([init/7]).
+-export([loop/7]).
%% API.
-spec start_link(any(), inet:socket(), module(), module(), pid(), pid())
-> {ok, pid()}.
start_link(Ref, LSocket, Transport, Protocol, ListenerPid, ConnsSup) ->
+ {ok, MaxConns} = ranch_listener:get_max_connections(ListenerPid),
{ok, Opts} = ranch_listener:get_protocol_options(ListenerPid),
- Pid = spawn_link(?MODULE, acceptor,
- [LSocket, Transport, Protocol, Opts, 1, ListenerPid, ConnsSup]),
+ Pid = spawn_link(?MODULE, init,
+ [LSocket, Transport, Protocol, MaxConns, Opts, ListenerPid, ConnsSup]),
ok = ranch_server:add_acceptor(Ref, Pid),
{ok, Pid}.
%% Internal.
--spec acceptor(inet:socket(), module(), module(), any(),
- non_neg_integer(), pid(), pid()) -> no_return().
-acceptor(LSocket, Transport, Protocol, Opts, OptsVsn, ListenerPid, ConnsSup) ->
- Res = case Transport:accept(LSocket, 2000) of
- {ok, CSocket} ->
- {ok, Pid} = supervisor:start_child(ConnsSup,
+-spec init(inet:socket(), module(), module(),
+ non_neg_integer(), any(), pid(), pid()) -> no_return().
+init(LSocket, Transport, Protocol, MaxConns, Opts, ListenerPid, ConnsSup) ->
+ async_accept(LSocket, Transport),
+ loop(LSocket, Transport, Protocol, MaxConns, Opts, ListenerPid, ConnsSup).
+
+-spec loop(inet:socket(), module(), module(),
+ non_neg_integer(), any(), pid(), pid()) -> no_return().
+loop(LSocket, Transport, Protocol, MaxConns, Opts, ListenerPid, ConnsSup) ->
+ receive
+ {accept, CSocket} ->
+ {ok, ConnPid} = supervisor:start_child(ConnsSup,
[ListenerPid, CSocket, Transport, Protocol, Opts]),
- Transport:controlling_process(CSocket, Pid),
- ranch_listener:add_connection(ListenerPid,
- default, Pid, OptsVsn);
- {error, timeout} ->
- ranch_listener:check_upgrades(ListenerPid, OptsVsn);
- {error, _Reason} ->
- %% @todo Probably do something here. If the socket was closed,
- %% we may want to try and listen again on the port?
- ok
- end,
- case Res of
- ok ->
- ?MODULE:acceptor(LSocket, Transport, Protocol,
- Opts, OptsVsn, ListenerPid, ConnsSup);
- {upgrade, Opts2, OptsVsn2} ->
- ?MODULE:acceptor(LSocket, Transport, Protocol,
- Opts2, OptsVsn2, ListenerPid, ConnsSup)
+ Transport:controlling_process(CSocket, ConnPid),
+ ConnPid ! {shoot, ListenerPid},
+ NbConns = ranch_listener:add_connection(ListenerPid, ConnPid),
+ maybe_wait(ListenerPid, MaxConns, NbConns),
+ ?MODULE:init(LSocket, Transport, Protocol,
+ MaxConns, Opts, ListenerPid, ConnsSup);
+ {set_opts, Opts2} ->
+ ?MODULE:loop(LSocket, Transport, Protocol,
+ MaxConns, Opts2, ListenerPid, ConnsSup)
end.
+
+-spec maybe_wait(pid(), non_neg_integer(), non_neg_integer()) -> ok.
+maybe_wait(_, MaxConns, NbConns) when MaxConns > NbConns ->
+ ok;
+maybe_wait(ListenerPid, MaxConns, _) ->
+ erlang:yield(),
+ NbConns2 = ranch_server:count_connections(ListenerPid),
+ maybe_wait(ListenerPid, MaxConns, NbConns2).
+
+-spec async_accept(inet:socket(), module()) -> ok.
+async_accept(LSocket, Transport) ->
+ AcceptorPid = self(),
+ _ = spawn_link(fun() ->
+ %% @todo {error, closed} must be handled and other errors ignored.
+ {ok, CSocket} = Transport:accept(LSocket, infinity),
+ Transport:controlling_process(CSocket, AcceptorPid),
+ AcceptorPid ! {accept, CSocket}
+ end),
+ ok.