aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMicael Karlberg <[email protected]>2018-07-16 18:21:48 +0200
committerMicael Karlberg <[email protected]>2018-09-18 14:50:18 +0200
commit017565203f40860d24b80a54136a160aee460dbe (patch)
tree7dd8d8a9426cd918a1d41db7dcd99bcb7cc35a51
parent8de18e84deaed4c9e6e7242ae2550fc6618dc44d (diff)
downloadotp-017565203f40860d24b80a54136a160aee460dbe.tar.gz
otp-017565203f40860d24b80a54136a160aee460dbe.tar.bz2
otp-017565203f40860d24b80a54136a160aee460dbe.zip
[socket-nif] Add support for multiple acceptor processes
Its now possible to have multiple (simultaneous) acceptor processes for the same listening socket. OTP-14831
-rw-r--r--erts/emulator/nifs/common/socket_int.h2
-rw-r--r--erts/emulator/nifs/common/socket_nif.c310
-rw-r--r--erts/preloaded/ebin/socket.beambin46328 -> 46540 bytes
-rw-r--r--erts/preloaded/src/socket.erl7
-rw-r--r--lib/kernel/test/socket_server.erl313
5 files changed, 492 insertions, 140 deletions
diff --git a/erts/emulator/nifs/common/socket_int.h b/erts/emulator/nifs/common/socket_int.h
index aa10260134..67e4baba27 100644
--- a/erts/emulator/nifs/common/socket_int.h
+++ b/erts/emulator/nifs/common/socket_int.h
@@ -92,6 +92,7 @@ typedef unsigned int BOOLEAN_T;
#define BOOL2ATOM(__B__) ((__B__) ? esock_atom_true : esock_atom_false)
+#define B2S(__B__) ((__B__) ? "true" : "false")
/* Misc error strings */
#define ESOCK_STR_EAFNOSUPPORT "eafnosupport"
@@ -141,6 +142,7 @@ extern ERL_NIF_TERM esock_atom_eagain;
extern ERL_NIF_TERM esock_atom_einval;
+
/* ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
* Various wrapper macros for enif functions
*/
diff --git a/erts/emulator/nifs/common/socket_nif.c b/erts/emulator/nifs/common/socket_nif.c
index 713153d7c5..224dcc9ff6 100644
--- a/erts/emulator/nifs/common/socket_nif.c
+++ b/erts/emulator/nifs/common/socket_nif.c
@@ -572,7 +572,7 @@ typedef struct {
/* +++ Accept stuff +++ */
ErlNifMutex* accMtx;
SocketRequestor currentAcceptor;
- SocketRequestor* currentAcceptorP; // NULL or points to currentReader
+ SocketRequestor* currentAcceptorP; // NULL or points to currentAcceptor
SocketRequestQueue acceptorsQ;
/* +++ Config & Misc stuff +++ */
@@ -1488,6 +1488,29 @@ static void inc_socket(int domain, int type, int protocol);
static void dec_socket(int domain, int type, int protocol);
+static BOOLEAN_T acceptor_search4pid(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ ErlNifPid* pid);
+static ERL_NIF_TERM acceptor_push(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ ErlNifPid pid,
+ ERL_NIF_TERM ref);
+static BOOLEAN_T acceptor_pop(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ ErlNifPid* pid,
+ ErlNifMonitor* mon,
+ ERL_NIF_TERM* ref);
+
+static BOOLEAN_T qsearch4pid(ErlNifEnv* env,
+ SocketRequestQueue* q,
+ ErlNifPid* pid);
+static void qpush(SocketRequestQueue* q,
+ SocketRequestQueueElement* e);
+static SocketRequestQueueElement* qpop(SocketRequestQueue* q);
+static BOOLEAN_T qunqueue(ErlNifEnv* env,
+ SocketRequestQueue* q,
+ const ErlNifPid* pid);
+
/*
#if defined(HAVE_SYS_UN_H) || defined(SO_BINDTODEVICE)
static size_t my_strnlen(const char *s, size_t maxlen);
@@ -2663,6 +2686,7 @@ ERL_NIF_TERM naccept_accepting(ErlNifEnv* env,
HANDLE accEvent;
ErlNifPid caller;
int save_errno;
+ ERL_NIF_TERM result;
SSDBG( descP, ("SOCKET", "naccept_accepting -> get caller\r\n") );
@@ -2682,11 +2706,18 @@ ERL_NIF_TERM naccept_accepting(ErlNifEnv* env,
* for the select message).
*/
+ SSDBG( descP, ("SOCKET", "naccept_accepting -> not (active) acceptor\r\n") );
+
+ if (!acceptor_search4pid(env, descP, &caller))
+ result = acceptor_push(env, descP, caller, ref);
+ else
+ result = esock_make_error(env, esock_atom_eagain);
+
SSDBG( descP,
("SOCKET",
- "naccept_accepting -> not current acceptor: busy\r\n") );
+ "naccept_accepting -> queue (push) result: %T\r\n", result) );
- return esock_make_error(env, atom_exbusy);
+ return result;
}
n = sizeof(descP->remote);
@@ -2777,8 +2808,30 @@ ERL_NIF_TERM naccept_accepting(ErlNifEnv* env,
* And if so, pop it and copy the (waiting) acceptor, and then
* make a new select with that info).
*/
- descP->state = SOCKET_STATE_LISTENING;
+ if (acceptor_pop(env, descP,
+ &descP->currentAcceptor.pid,
+ &descP->currentAcceptor.mon,
+ &descP->currentAcceptor.ref)) {
+
+ /* There was another one */
+
+ SSDBG( descP, ("SOCKET", "naccept_accepting -> new (active) acceptor: "
+ "\r\n pid: %T"
+ "\r\n ref: %T"
+ "\r\n",
+ descP->currentAcceptor.pid,
+ descP->currentAcceptor.ref) );
+
+ SELECT(env,
+ descP->sock,
+ (ERL_NIF_SELECT_READ),
+ descP, &descP->currentAcceptor.pid, descP->currentAcceptor.ref);
+ } else {
+ descP->currentAcceptorP = NULL;
+ descP->state = SOCKET_STATE_LISTENING;
+ }
+
return esock_make_ok2(env, accRef);
}
}
@@ -8238,6 +8291,186 @@ char* send_msg(ErlNifEnv* env,
/* ----------------------------------------------------------------------
+ * R e q u e s t Q u e u e F u n c t i o n s
+ * ----------------------------------------------------------------------
+ */
+
+/* *** acceptor search for pid ***
+ *
+ * Search for a pid in the acceptor queue.
+ */
+static
+BOOLEAN_T acceptor_search4pid(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ ErlNifPid* pid)
+{
+ return qsearch4pid(env, &descP->acceptorsQ, pid);
+}
+
+
+/* *** acceptor push ***
+ *
+ * Push an acceptor onto the acceptor queue.
+ * This happens when we already have atleast one current acceptor.
+ */
+static
+ERL_NIF_TERM acceptor_push(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ ErlNifPid pid,
+ ERL_NIF_TERM ref)
+{
+ SocketRequestQueueElement* e = MALLOC(sizeof(SocketRequestQueueElement));
+ SocketRequestor* reqP = &e->data;
+
+ reqP->pid = pid;
+ reqP->ref = ref;
+
+ if (MONP(env, descP, &pid, &reqP->mon) > 0) {
+ FREE(reqP);
+ return esock_make_error(env, atom_exmon);
+ }
+
+ qpush(&descP->acceptorsQ, e);
+
+ // THIS IS OK => MAKES THE CALLER WAIT FOR ITS TURN
+ return esock_make_error(env, esock_atom_eagain);
+}
+
+
+/* *** acceptor pop ***
+ *
+ * Pop an acceptor from the acceptor queue.
+ */
+static
+BOOLEAN_T acceptor_pop(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ ErlNifPid* pid,
+ ErlNifMonitor* mon,
+ ERL_NIF_TERM* ref)
+{
+ SocketRequestQueueElement* e = qpop(&descP->acceptorsQ);
+
+ if (e != NULL) {
+ *pid = e->data.pid;
+ *mon = e->data.mon;
+ *ref = e->data.ref;
+ FREE(e);
+ return TRUE;
+ } else {
+ /* (acceptors) Queue was empty */
+ // *pid = NULL; we have no null value for pids
+ // *mon = NULL; we have no null value for monitors
+ *ref = esock_atom_undefined; // Just in case
+ return FALSE;
+ }
+
+}
+
+
+static
+BOOLEAN_T qsearch4pid(ErlNifEnv* env,
+ SocketRequestQueue* q,
+ ErlNifPid* pid)
+{
+ SocketRequestQueueElement* tmp = q->first;
+
+ while (tmp != NULL) {
+ if (compare_pids(env, &tmp->data.pid, pid))
+ return TRUE;
+ else
+ tmp = tmp->nextP;
+ }
+
+ return FALSE;
+}
+
+
+static
+void qpush(SocketRequestQueue* q,
+ SocketRequestQueueElement* e)
+{
+ if (q->first != NULL) {
+ q->last->nextP = e;
+ q->last = e;
+ e->nextP = NULL;
+ } else {
+ q->first = e;
+ q->last = e;
+ e->nextP = NULL;
+ }
+}
+
+
+static
+SocketRequestQueueElement* qpop(SocketRequestQueue* q)
+{
+ SocketRequestQueueElement* e = q->first;
+
+ if (e != NULL) {
+ /* Atleast one element in the queue */
+ if (e == q->last) {
+ /* Only one element in the queue */
+ q->first = q->last = NULL;
+ } else {
+ /* More than one element in the queue */
+ q->first = e->nextP;
+ }
+ }
+
+ return e;
+}
+
+
+
+static
+BOOLEAN_T qunqueue(ErlNifEnv* env,
+ SocketRequestQueue* q,
+ const ErlNifPid* pid)
+{
+ SocketRequestQueueElement* e = q->first;
+ SocketRequestQueueElement* p = NULL;
+
+ /* Check if it was one of the waiting acceptor processes */
+ while (e != NULL) {
+ if (compare_pids(env, &e->data.pid, pid)) {
+
+ /* We have a match */
+
+ if (p != NULL) {
+ /* Not the first, but could be the last */
+ if (q->last == e) {
+ q->last = p;
+ p->nextP = NULL;
+ } else {
+ p->nextP = e->nextP;
+ }
+
+ } else {
+ /* The first and could also be the last */
+ if (q->last == e) {
+ q->last = NULL;
+ q->first = NULL;
+ } else {
+ q->first = e->nextP;
+ }
+ }
+
+ FREE(e);
+
+ return TRUE;
+ }
+
+ /* Try next */
+ p = e;
+ e = e->nextP;
+ }
+
+ return FALSE;
+}
+
+
+
+/* ----------------------------------------------------------------------
* C o u n t e r F u n c t i o n s
* ----------------------------------------------------------------------
*/
@@ -8502,13 +8735,70 @@ void socket_down(ErlNifEnv* env,
{
SocketDescriptor* descP = (SocketDescriptor*) obj;
- SSDBG( descP,
- ("SOCKET", "socket_down -> entry when"
- "\r\n sock: %d"
- "\r\n pid: %T"
- "\r\n mon: %T"
- "\r\n", descP->sock, *pid, *mon) );
+ SSDBG( descP, ("SOCKET", "socket_down -> entry with"
+ "\r\n sock: %d"
+ "\r\n pid: %T"
+ "\r\n", descP->sock, *pid) );
+
+ /* Eventually we should go through the other queues also,
+ * the process can be one of them...
+ */
+
+ /* Check first if its the current acceptor, and if not check the queue */
+ if (compare_pids(env, &descP->currentAcceptor.pid, pid)) {
+
+ SSDBG( descP, ("SOCKET",
+ "socket_down -> current acceptor - try pop the queue\r\n") );
+
+ if (acceptor_pop(env, descP,
+ &descP->currentAcceptor.pid,
+ &descP->currentAcceptor.mon,
+ &descP->currentAcceptor.ref)) {
+ int res;
+
+ /* There was another one, so we will still be in accepting state */
+
+ SSDBG( descP, ("SOCKET", "socket_down -> new (active) acceptor: "
+ "\r\n pid: %T"
+ "\r\n ref: %T"
+ "\r\n",
+ descP->currentAcceptor.pid,
+ descP->currentAcceptor.ref) );
+
+ if ((res = enif_select(env,
+ descP->sock,
+ (ERL_NIF_SELECT_READ),
+ descP,
+ &descP->currentAcceptor.pid,
+ descP->currentAcceptor.ref) < 0)) {
+
+ esock_warning_msg("Failed select (%d) for new acceptor "
+ "after current (%T) died\r\n",
+ res, *pid);
+
+ }
+
+ } else {
+
+ SSDBG( descP, ("SOCKET", "socket_down -> no active acceptor\r\n") );
+
+ descP->currentAcceptorP = NULL;
+ descP->state = SOCKET_STATE_LISTENING;
+ }
+
+ } else {
+
+ /* Maybe unqueue one of the waiting acceptors */
+
+ SSDBG( descP, ("SOCKET",
+ "socket_down -> "
+ "not current acceptor - maybe a waiting acceptor\r\n") );
+
+ qunqueue(env, &descP->acceptorsQ, pid);
+ }
+ SSDBG( descP, ("SOCKET", "socket_down -> done\r\n") );
+
}
diff --git a/erts/preloaded/ebin/socket.beam b/erts/preloaded/ebin/socket.beam
index 9baf6a422e..c24dc40e10 100644
--- a/erts/preloaded/ebin/socket.beam
+++ b/erts/preloaded/ebin/socket.beam
Binary files differ
diff --git a/erts/preloaded/src/socket.erl b/erts/preloaded/src/socket.erl
index fbfd1903a1..96dc89bd9e 100644
--- a/erts/preloaded/src/socket.erl
+++ b/erts/preloaded/src/socket.erl
@@ -984,6 +984,7 @@ do_accept(LSockRef, SI, Timeout) ->
Socket = #socket{info = SocketInfo,
ref = SockRef},
{ok, Socket};
+
{error, eagain} ->
NewTimeout = next_timeout(TS, Timeout),
receive
@@ -997,7 +998,11 @@ do_accept(LSockRef, SI, Timeout) ->
nif_cancel(LSockRef, accept, AccRef),
flush_select_msgs(LSockRef, AccRef),
{error, timeout}
- end
+ end;
+
+ {error, _} = ERROR ->
+ nif_cancel(LSockRef, accept, AccRef), % Just to be on the safe side...
+ ERROR
end.
diff --git a/lib/kernel/test/socket_server.erl b/lib/kernel/test/socket_server.erl
index d9bbf00e85..8a77b9b3c9 100644
--- a/lib/kernel/test/socket_server.erl
+++ b/lib/kernel/test/socket_server.erl
@@ -28,10 +28,12 @@
-define(LIB, socket_lib).
--record(manager, {peek, acceptor, handler_id, handlers}).
--record(acceptor, {socket, manager}).
+-record(manager, {socket, peek, acceptors, handler_id, handlers}).
+-record(acceptor, {id, socket, manager}).
-record(handler, {socket, peek, type, manager}).
+-define(NUM_ACCEPTORS, 5).
+
start() ->
start_tcp().
@@ -83,17 +85,13 @@ manager_reply(Pid, Ref, Reply) ->
manager_init(Domain, stream = Type, Proto, Peek) ->
put(sname, "manager"),
- i("try start acceptor"),
- case acceptor_start(Domain, Type, Proto) of
- {ok, {Pid, MRef}} ->
- i("acceptor started"),
- manager_loop(#manager{peek = Peek,
- acceptor = {Pid, MRef},
- handler_id = 1,
- handlers = []});
- {error, Reason} ->
- exit({failed_starting_acceptor, Reason})
- end;
+ i("try start acceptor(s)"),
+ {Sock, Acceptors} = manager_stream_init(Domain, Type, Proto),
+ manager_loop(#manager{socket = Sock,
+ peek = Peek,
+ acceptors = Acceptors,
+ handler_id = 1,
+ handlers = []});
manager_init(Domain, dgram = Type, Proto, Peek) ->
put(sname, "manager"),
i("try open socket"),
@@ -142,7 +140,7 @@ manager_init(Domain, dgram = Type, Proto, Peek) ->
handler_continue(Pid),
manager_loop(#manager{peek = Peek,
handler_id = 2, % Just in case
- handlers = [{Pid, MRef, 1}]});
+ handlers = [{1, Pid, MRef}]});
{error, SReason} ->
e("Failed starting handler: "
"~n ~p", [SReason]),
@@ -155,35 +153,142 @@ manager_init(Domain, dgram = Type, Proto, Peek) ->
end.
+manager_stream_init(Domain, Type, Proto) ->
+ i("try (socket) open"),
+ Sock = case socket:open(Domain, Type, Proto) of
+ {ok, S} ->
+ S;
+ {error, OReason} ->
+ throw({open, OReason})
+ end,
+ F = fun(X) -> case socket:getopt(Sock, socket, X) of
+ {ok, V} -> f("~p", [V]);
+ {error, R} -> f("error: ~p", [R])
+ end
+ end,
+ i("(socket) open (~s,~s,~s): "
+ "~n debug: ~s"
+ "~n prio: ~s"
+ "~n => try find (local) address",
+ [F(domain), F(type), F(protocol), F(debug), F(priority)]),
+ Addr = which_addr(Domain),
+ SA = #{family => Domain,
+ addr => Addr},
+ i("found: "
+ "~n ~p"
+ "~n => try (socket) bind", [Addr]),
+ %% ok = socket:setopt(Sock, otp, debug, true),
+ %% ok = socket:setopt(Sock, socket, debug, 1), %% must have rights!!
+ Port = case socket:bind(Sock, SA) of
+ {ok, P} ->
+ %% ok = socket:setopt(Sock, socket, debug, 0), %% must have rights!!
+ %% ok = socket:setopt(Sock, otp, debug, false),
+ P;
+ {error, BReason} ->
+ throw({bind, BReason})
+ end,
+ i("bound to: "
+ "~n ~p"
+ "~n => try (socket) listen (acceptconn: ~s)",
+ [Port, F(acceptconn)]),
+ case socket:listen(Sock) of
+ ok ->
+ i("listening (acceptconn: ~s)",
+ [F(acceptconn)]),
+ manager_stream_init(Sock, 1, ?NUM_ACCEPTORS, []);
+ {error, LReason} ->
+ throw({listen, LReason})
+ end.
+
+which_addr(Domain) ->
+ Iflist = case inet:getifaddrs() of
+ {ok, IFL} ->
+ IFL;
+ {error, Reason} ->
+ throw({inet,getifaddrs,Reason})
+ end,
+ which_addr(Domain, Iflist).
+
+which_addr(_Domain, []) ->
+ throw(no_address);
+which_addr(Domain, [{Name, IFO}|_IFL]) when (Name =/= "lo") ->
+ which_addr2(Domain, IFO);
+which_addr(Domain, [_|IFL]) ->
+ which_addr(Domain, IFL).
+
+which_addr2(_, []) ->
+ throw(no_address);
+which_addr2(inet = _Domain, [{addr, Addr}|_IFO]) when (size(Addr) =:= 4) ->
+ Addr;
+which_addr2(inet6 = _Domain, [{addr, Addr}|_IFO]) when (size(Addr) =:= 8) ->
+ Addr;
+which_addr2(Domain, [_|IFO]) ->
+ which_addr2(Domain, IFO).
+
+
+manager_stream_init(Sock, ID, NumAcceptors, Acc)
+ when (NumAcceptors > 0) ->
+ i("try start acceptor"),
+ case acceptor_start(Sock, ID) of
+ {ok, {Pid, MRef}} ->
+ i("acceptor ~w (~p) started", [ID, Pid]),
+ ?LIB:sleep(5000),
+ manager_stream_init(Sock, ID+1, NumAcceptors-1,
+ [{ID, Pid, MRef}|Acc]);
+ {error, Reason} ->
+ exit({failed_starting_acceptor, Reason})
+ end;
+manager_stream_init(Sock, _ID, 0, Acc) ->
+ %% Req = {kill_acceptor, length(Acc)}, % Last in the queue
+ %% Req = {kill_acceptor, 3}, % In the "middle" of the queue
+ %% Req = {kill_acceptor, 2}, % The first in the queue
+ %% Req = {kill_acceptor, 1}, % Current acceptor
+ %% Msg = {manager, self(), make_ref(), Req},
+ %% erlang:send_after(timer:seconds(10), self(), Msg),
+ {Sock, lists:reverse(Acc)}.
+
+
manager_loop(M) ->
receive
{'DOWN', MRef, process, Pid, Reason} ->
M2 = manager_handle_down(M, MRef, Pid, Reason),
manager_loop(M2);
-
+
{manager, Pid, Ref, Request} ->
M2 = manager_handle_request(M, Pid, Ref, Request),
manager_loop(M2)
end.
-manager_handle_down(#manager{acceptor = {Pid, MRef}}, MRef, Pid, Reason)
- when (Reason =/= normal) ->
- e("acceptor died: "
- "~n ~p", [Reason]),
- exit({acceptor_died, Reason});
-manager_handle_down(#manager{acceptor = {Pid, MRef}}, MRef, Pid, Reason) ->
- exit(Reason);
-manager_handle_down(#manager{handlers = Handlers} = M, _MRef, Pid, Reason) ->
- if
- (Reason =/= normal) ->
- e("handler ~p died: "
- "~n ~p", [Pid, Reason]);
- true ->
- i("handler ~p terminated", [Pid])
- end,
- Handlers2 = lists:keydelete(Pid, 1, Handlers),
- M#manager{handlers = Handlers2}.
+manager_handle_down(#manager{acceptors = Acceptors,
+ handlers = Handlers} = M, MRef, Pid, Reason) ->
+ case lists:keysearch(Pid, 2, Acceptors) of
+ {value, {ID, Pid, MRef}} when (Reason =:= normal) ->
+ i("acceptor ~w exited (normally)", [ID]),
+ case lists:keydelete(Pid, 2, Acceptors) of
+ [] ->
+ %% We are done
+ i("the last acceptor - we are done"),
+ exit(normal);
+ Acceptors2 ->
+ M#manager{acceptors = Acceptors2}
+ end;
+ {value, {ID, Pid, MRef}} ->
+ e("acceptor ~w crashed: "
+ "~n ~p", [ID, Reason]),
+ exit({acceptor_died, Reason});
+
+ false -> %% handler!
+ if
+ (Reason =/= normal) ->
+ e("handler ~p died: "
+ "~n ~p", [Pid, Reason]);
+ true ->
+ i("handler ~p terminated", [Pid])
+ end,
+ Handlers2 = lists:keydelete(Pid, 2, Handlers),
+ M#manager{handlers = Handlers2}
+ end.
manager_handle_request(#manager{peek = Peek,
@@ -196,7 +301,7 @@ manager_handle_request(#manager{peek = Peek,
i("handler ~w started", [HID]),
manager_reply(Pid, Ref, {ok, HPid}),
M#manager{handler_id = HID+1,
- handlers = [{HPid, HMRef, HID}|Handlers]};
+ handlers = [{HID, HPid, HMRef}|Handlers]};
{error, Reason} = ERROR ->
e("Failed starting new handler: "
"~n Sock: ~p"
@@ -204,21 +309,50 @@ manager_handle_request(#manager{peek = Peek,
manager_reply(Pid, Ref, ERROR),
M
end;
-manager_handle_request(#manager{acceptor = {Pid, MRef},
- handlers = Handlers}, Pid, Ref,
+manager_handle_request(#manager{socket = Sock,
+ acceptors = [{AID, APid, AMRef}]} = M, _Pid, _Ref,
+ {kill_acceptor, AID}) ->
+ i("try kill (only remeining) acceptor ~w", [AID]),
+ socket:setopt(Sock, otp, debug, true),
+ manager_stop_acceptor(APid, AMRef, AID, kill),
+ M#manager{acceptors = []};
+manager_handle_request(#manager{socket = Sock,
+ acceptors = Acceptors} = M, _Pid, _Ref,
+ {kill_acceptor, AID}) ->
+ i("try kill acceptor ~w", [AID]),
+ case lists:keysearch(AID, 1, Acceptors) of
+ {value, {AID, APid, AMRef}} ->
+ socket:setopt(Sock, otp, debug, true),
+ manager_stop_acceptor(APid, AMRef, AID, kill),
+ Acceptors2 = lists:keydelete(AID, 1, Acceptors),
+ M#manager{acceptors = Acceptors2};
+ false ->
+ e("no such acceptor"),
+ M
+ end;
+manager_handle_request(#manager{acceptors = Acceptors,
+ handlers = Handlers}, Pid, Ref,
{stop, Reason}) ->
i("stop"),
manager_reply(Pid, Ref, ok),
manager_stop_handlers(Handlers, Reason),
- i("try stop acceptor ~p: ~p", [Pid, Reason]),
- erlang:demonitor(MRef, [flush]),
- acceptor_stop(Pid, Reason),
- i("stop", []),
+ manager_stop_acceptors(Acceptors, Reason),
+ i("stopped", []),
exit(Reason).
+manager_stop_acceptors(Acceptors, Reason) ->
+ lists:foreach(fun({ID,P,M}) ->
+ manager_stop_acceptor(P, M, ID, Reason)
+ end, Acceptors).
+
+manager_stop_acceptor(Pid, MRef, ID, Reason) ->
+ i("try stop acceptor ~w (~p): ~p", [ID, Pid, Reason]),
+ erlang:demonitor(MRef, [flush]),
+ acceptor_stop(Pid, Reason),
+ ok.
manager_stop_handlers(Handlers, Reason) ->
- lists:foreach(fun({P,M,ID}) ->
+ lists:foreach(fun({ID,P,M}) ->
manager_stop_handler(P, M, ID, Reason)
end, Handlers).
@@ -232,10 +366,10 @@ manager_stop_handler(Pid, MRef, ID, Reason) ->
%% =========================================================================
-acceptor_start(Domain, Type, Proto) ->
+acceptor_start(Sock, ID) ->
Self = self(),
A = {Pid, _} = spawn_monitor(fun() ->
- acceptor_init(Self, Domain, Type, Proto)
+ acceptor_init(Self, Sock, ID)
end),
receive
{acceptor, Pid, ok} ->
@@ -258,93 +392,12 @@ acceptor_stop(Pid, _Reason) ->
%% reply(acceptor, Pid, Ref, Reply).
-acceptor_init(Manager, Domain, Type, Proto) ->
- put(sname, "acceptor"),
- try acceptor_do_init(Domain, Type, Proto) of
- Sock ->
- Manager ! {acceptor, self(), ok},
- acceptor_loop(#acceptor{manager = Manager,
- socket = Sock})
- catch
- throw:E:P ->
- e("Failed initiate: "
- "~n Error: ~p"
- "~n Path: ~p", [E, P]),
- Manager ! {acceptor, self(), {error, {catched, E, P}}}
- end.
-
-acceptor_do_init(Domain, Type, Proto) ->
- i("try (socket) open"),
- Sock = case socket:open(Domain, Type, Proto) of
- {ok, S} ->
- S;
- {error, OReason} ->
- throw({open, OReason})
- end,
- F = fun(X) -> case socket:getopt(Sock, socket, X) of
- {ok, V} -> f("~p", [V]);
- {error, R} -> f("error: ~p", [R])
- end
- end,
- i("(socket) open (~s,~s,~s): "
- "~n debug: ~s"
- "~n prio: ~s"
- "~n => try find (local) address",
- [F(domain), F(type), F(protocol), F(debug), F(priority)]),
- Addr = which_addr(Domain),
- SA = #{family => Domain,
- addr => Addr},
- i("found: "
- "~n ~p"
- "~n => try (socket) bind", [Addr]),
- %% ok = socket:setopt(Sock, otp, debug, true),
- %% ok = socket:setopt(Sock, socket, debug, 1), %% must have rights!!
- Port = case socket:bind(Sock, SA) of
- {ok, P} ->
- %% ok = socket:setopt(Sock, socket, debug, 0), %% must have rights!!
- %% ok = socket:setopt(Sock, otp, debug, false),
- P;
- {error, BReason} ->
- throw({bind, BReason})
- end,
- i("bound to: "
- "~n ~p"
- "~n => try (socket) listen (acceptconn: ~s)",
- [Port, F(acceptconn)]),
- case socket:listen(Sock) of
- ok ->
- i("listening (acceptconn: ~s)",
- [F(acceptconn)]),
- Sock;
- {error, LReason} ->
- throw({listen, LReason})
- end.
-
-which_addr(Domain) ->
- Iflist = case inet:getifaddrs() of
- {ok, IFL} ->
- IFL;
- {error, Reason} ->
- throw({inet,getifaddrs,Reason})
- end,
- which_addr(Domain, Iflist).
-
-which_addr(_Domain, []) ->
- throw(no_address);
-which_addr(Domain, [{Name, IFO}|_IFL]) when (Name =/= "lo") ->
- which_addr2(Domain, IFO);
-which_addr(Domain, [_|IFL]) ->
- which_addr(Domain, IFL).
-
-which_addr2(_, []) ->
- throw(no_address);
-which_addr2(inet = _Domain, [{addr, Addr}|_IFO]) when (size(Addr) =:= 4) ->
- Addr;
-which_addr2(inet6 = _Domain, [{addr, Addr}|_IFO]) when (size(Addr) =:= 8) ->
- Addr;
-which_addr2(Domain, [_|IFO]) ->
- which_addr2(Domain, IFO).
-
+acceptor_init(Manager, Sock, ID) ->
+ put(sname, f("acceptor[~w]", [ID])),
+ Manager ! {acceptor, self(), ok},
+ acceptor_loop(#acceptor{id = ID,
+ manager = Manager,
+ socket = Sock}).
acceptor_loop(#acceptor{socket = LSock} = A) ->
i("try accept"),
@@ -600,6 +653,8 @@ send(#handler{socket = Sock, type = dgram}, Msg, Dest) ->
f(F, A) ->
?LIB:f(F, A).
+e(F) ->
+ e(F, []).
e(F, A) ->
?LIB:e(F, A).