From 017565203f40860d24b80a54136a160aee460dbe Mon Sep 17 00:00:00 2001 From: Micael Karlberg Date: Mon, 16 Jul 2018 18:21:48 +0200 Subject: [socket-nif] Add support for multiple acceptor processes Its now possible to have multiple (simultaneous) acceptor processes for the same listening socket. OTP-14831 --- erts/emulator/nifs/common/socket_int.h | 2 + erts/emulator/nifs/common/socket_nif.c | 310 ++++++++++++++++++++++++++++++-- erts/preloaded/ebin/socket.beam | Bin 46328 -> 46540 bytes erts/preloaded/src/socket.erl | 7 +- lib/kernel/test/socket_server.erl | 313 +++++++++++++++++++-------------- 5 files changed, 492 insertions(+), 140 deletions(-) diff --git a/erts/emulator/nifs/common/socket_int.h b/erts/emulator/nifs/common/socket_int.h index aa10260134..67e4baba27 100644 --- a/erts/emulator/nifs/common/socket_int.h +++ b/erts/emulator/nifs/common/socket_int.h @@ -92,6 +92,7 @@ typedef unsigned int BOOLEAN_T; #define BOOL2ATOM(__B__) ((__B__) ? esock_atom_true : esock_atom_false) +#define B2S(__B__) ((__B__) ? "true" : "false") /* Misc error strings */ #define ESOCK_STR_EAFNOSUPPORT "eafnosupport" @@ -141,6 +142,7 @@ extern ERL_NIF_TERM esock_atom_eagain; extern ERL_NIF_TERM esock_atom_einval; + /* ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ * Various wrapper macros for enif functions */ diff --git a/erts/emulator/nifs/common/socket_nif.c b/erts/emulator/nifs/common/socket_nif.c index 713153d7c5..224dcc9ff6 100644 --- a/erts/emulator/nifs/common/socket_nif.c +++ b/erts/emulator/nifs/common/socket_nif.c @@ -572,7 +572,7 @@ typedef struct { /* +++ Accept stuff +++ */ ErlNifMutex* accMtx; SocketRequestor currentAcceptor; - SocketRequestor* currentAcceptorP; // NULL or points to currentReader + SocketRequestor* currentAcceptorP; // NULL or points to currentAcceptor SocketRequestQueue acceptorsQ; /* +++ Config & Misc stuff +++ */ @@ -1488,6 +1488,29 @@ static void inc_socket(int domain, int type, int protocol); static void dec_socket(int domain, int type, int protocol); +static BOOLEAN_T acceptor_search4pid(ErlNifEnv* env, + SocketDescriptor* descP, + ErlNifPid* pid); +static ERL_NIF_TERM acceptor_push(ErlNifEnv* env, + SocketDescriptor* descP, + ErlNifPid pid, + ERL_NIF_TERM ref); +static BOOLEAN_T acceptor_pop(ErlNifEnv* env, + SocketDescriptor* descP, + ErlNifPid* pid, + ErlNifMonitor* mon, + ERL_NIF_TERM* ref); + +static BOOLEAN_T qsearch4pid(ErlNifEnv* env, + SocketRequestQueue* q, + ErlNifPid* pid); +static void qpush(SocketRequestQueue* q, + SocketRequestQueueElement* e); +static SocketRequestQueueElement* qpop(SocketRequestQueue* q); +static BOOLEAN_T qunqueue(ErlNifEnv* env, + SocketRequestQueue* q, + const ErlNifPid* pid); + /* #if defined(HAVE_SYS_UN_H) || defined(SO_BINDTODEVICE) static size_t my_strnlen(const char *s, size_t maxlen); @@ -2663,6 +2686,7 @@ ERL_NIF_TERM naccept_accepting(ErlNifEnv* env, HANDLE accEvent; ErlNifPid caller; int save_errno; + ERL_NIF_TERM result; SSDBG( descP, ("SOCKET", "naccept_accepting -> get caller\r\n") ); @@ -2682,11 +2706,18 @@ ERL_NIF_TERM naccept_accepting(ErlNifEnv* env, * for the select message). */ + SSDBG( descP, ("SOCKET", "naccept_accepting -> not (active) acceptor\r\n") ); + + if (!acceptor_search4pid(env, descP, &caller)) + result = acceptor_push(env, descP, caller, ref); + else + result = esock_make_error(env, esock_atom_eagain); + SSDBG( descP, ("SOCKET", - "naccept_accepting -> not current acceptor: busy\r\n") ); + "naccept_accepting -> queue (push) result: %T\r\n", result) ); - return esock_make_error(env, atom_exbusy); + return result; } n = sizeof(descP->remote); @@ -2777,8 +2808,30 @@ ERL_NIF_TERM naccept_accepting(ErlNifEnv* env, * And if so, pop it and copy the (waiting) acceptor, and then * make a new select with that info). */ - descP->state = SOCKET_STATE_LISTENING; + if (acceptor_pop(env, descP, + &descP->currentAcceptor.pid, + &descP->currentAcceptor.mon, + &descP->currentAcceptor.ref)) { + + /* There was another one */ + + SSDBG( descP, ("SOCKET", "naccept_accepting -> new (active) acceptor: " + "\r\n pid: %T" + "\r\n ref: %T" + "\r\n", + descP->currentAcceptor.pid, + descP->currentAcceptor.ref) ); + + SELECT(env, + descP->sock, + (ERL_NIF_SELECT_READ), + descP, &descP->currentAcceptor.pid, descP->currentAcceptor.ref); + } else { + descP->currentAcceptorP = NULL; + descP->state = SOCKET_STATE_LISTENING; + } + return esock_make_ok2(env, accRef); } } @@ -8237,6 +8290,186 @@ char* send_msg(ErlNifEnv* env, +/* ---------------------------------------------------------------------- + * R e q u e s t Q u e u e F u n c t i o n s + * ---------------------------------------------------------------------- + */ + +/* *** acceptor search for pid *** + * + * Search for a pid in the acceptor queue. + */ +static +BOOLEAN_T acceptor_search4pid(ErlNifEnv* env, + SocketDescriptor* descP, + ErlNifPid* pid) +{ + return qsearch4pid(env, &descP->acceptorsQ, pid); +} + + +/* *** acceptor push *** + * + * Push an acceptor onto the acceptor queue. + * This happens when we already have atleast one current acceptor. + */ +static +ERL_NIF_TERM acceptor_push(ErlNifEnv* env, + SocketDescriptor* descP, + ErlNifPid pid, + ERL_NIF_TERM ref) +{ + SocketRequestQueueElement* e = MALLOC(sizeof(SocketRequestQueueElement)); + SocketRequestor* reqP = &e->data; + + reqP->pid = pid; + reqP->ref = ref; + + if (MONP(env, descP, &pid, &reqP->mon) > 0) { + FREE(reqP); + return esock_make_error(env, atom_exmon); + } + + qpush(&descP->acceptorsQ, e); + + // THIS IS OK => MAKES THE CALLER WAIT FOR ITS TURN + return esock_make_error(env, esock_atom_eagain); +} + + +/* *** acceptor pop *** + * + * Pop an acceptor from the acceptor queue. + */ +static +BOOLEAN_T acceptor_pop(ErlNifEnv* env, + SocketDescriptor* descP, + ErlNifPid* pid, + ErlNifMonitor* mon, + ERL_NIF_TERM* ref) +{ + SocketRequestQueueElement* e = qpop(&descP->acceptorsQ); + + if (e != NULL) { + *pid = e->data.pid; + *mon = e->data.mon; + *ref = e->data.ref; + FREE(e); + return TRUE; + } else { + /* (acceptors) Queue was empty */ + // *pid = NULL; we have no null value for pids + // *mon = NULL; we have no null value for monitors + *ref = esock_atom_undefined; // Just in case + return FALSE; + } + +} + + +static +BOOLEAN_T qsearch4pid(ErlNifEnv* env, + SocketRequestQueue* q, + ErlNifPid* pid) +{ + SocketRequestQueueElement* tmp = q->first; + + while (tmp != NULL) { + if (compare_pids(env, &tmp->data.pid, pid)) + return TRUE; + else + tmp = tmp->nextP; + } + + return FALSE; +} + + +static +void qpush(SocketRequestQueue* q, + SocketRequestQueueElement* e) +{ + if (q->first != NULL) { + q->last->nextP = e; + q->last = e; + e->nextP = NULL; + } else { + q->first = e; + q->last = e; + e->nextP = NULL; + } +} + + +static +SocketRequestQueueElement* qpop(SocketRequestQueue* q) +{ + SocketRequestQueueElement* e = q->first; + + if (e != NULL) { + /* Atleast one element in the queue */ + if (e == q->last) { + /* Only one element in the queue */ + q->first = q->last = NULL; + } else { + /* More than one element in the queue */ + q->first = e->nextP; + } + } + + return e; +} + + + +static +BOOLEAN_T qunqueue(ErlNifEnv* env, + SocketRequestQueue* q, + const ErlNifPid* pid) +{ + SocketRequestQueueElement* e = q->first; + SocketRequestQueueElement* p = NULL; + + /* Check if it was one of the waiting acceptor processes */ + while (e != NULL) { + if (compare_pids(env, &e->data.pid, pid)) { + + /* We have a match */ + + if (p != NULL) { + /* Not the first, but could be the last */ + if (q->last == e) { + q->last = p; + p->nextP = NULL; + } else { + p->nextP = e->nextP; + } + + } else { + /* The first and could also be the last */ + if (q->last == e) { + q->last = NULL; + q->first = NULL; + } else { + q->first = e->nextP; + } + } + + FREE(e); + + return TRUE; + } + + /* Try next */ + p = e; + e = e->nextP; + } + + return FALSE; +} + + + /* ---------------------------------------------------------------------- * C o u n t e r F u n c t i o n s * ---------------------------------------------------------------------- @@ -8502,13 +8735,70 @@ void socket_down(ErlNifEnv* env, { SocketDescriptor* descP = (SocketDescriptor*) obj; - SSDBG( descP, - ("SOCKET", "socket_down -> entry when" - "\r\n sock: %d" - "\r\n pid: %T" - "\r\n mon: %T" - "\r\n", descP->sock, *pid, *mon) ); + SSDBG( descP, ("SOCKET", "socket_down -> entry with" + "\r\n sock: %d" + "\r\n pid: %T" + "\r\n", descP->sock, *pid) ); + + /* Eventually we should go through the other queues also, + * the process can be one of them... + */ + + /* Check first if its the current acceptor, and if not check the queue */ + if (compare_pids(env, &descP->currentAcceptor.pid, pid)) { + + SSDBG( descP, ("SOCKET", + "socket_down -> current acceptor - try pop the queue\r\n") ); + + if (acceptor_pop(env, descP, + &descP->currentAcceptor.pid, + &descP->currentAcceptor.mon, + &descP->currentAcceptor.ref)) { + int res; + + /* There was another one, so we will still be in accepting state */ + + SSDBG( descP, ("SOCKET", "socket_down -> new (active) acceptor: " + "\r\n pid: %T" + "\r\n ref: %T" + "\r\n", + descP->currentAcceptor.pid, + descP->currentAcceptor.ref) ); + + if ((res = enif_select(env, + descP->sock, + (ERL_NIF_SELECT_READ), + descP, + &descP->currentAcceptor.pid, + descP->currentAcceptor.ref) < 0)) { + + esock_warning_msg("Failed select (%d) for new acceptor " + "after current (%T) died\r\n", + res, *pid); + + } + + } else { + + SSDBG( descP, ("SOCKET", "socket_down -> no active acceptor\r\n") ); + + descP->currentAcceptorP = NULL; + descP->state = SOCKET_STATE_LISTENING; + } + + } else { + + /* Maybe unqueue one of the waiting acceptors */ + + SSDBG( descP, ("SOCKET", + "socket_down -> " + "not current acceptor - maybe a waiting acceptor\r\n") ); + + qunqueue(env, &descP->acceptorsQ, pid); + } + SSDBG( descP, ("SOCKET", "socket_down -> done\r\n") ); + } diff --git a/erts/preloaded/ebin/socket.beam b/erts/preloaded/ebin/socket.beam index 9baf6a422e..c24dc40e10 100644 Binary files a/erts/preloaded/ebin/socket.beam and b/erts/preloaded/ebin/socket.beam differ diff --git a/erts/preloaded/src/socket.erl b/erts/preloaded/src/socket.erl index fbfd1903a1..96dc89bd9e 100644 --- a/erts/preloaded/src/socket.erl +++ b/erts/preloaded/src/socket.erl @@ -984,6 +984,7 @@ do_accept(LSockRef, SI, Timeout) -> Socket = #socket{info = SocketInfo, ref = SockRef}, {ok, Socket}; + {error, eagain} -> NewTimeout = next_timeout(TS, Timeout), receive @@ -997,7 +998,11 @@ do_accept(LSockRef, SI, Timeout) -> nif_cancel(LSockRef, accept, AccRef), flush_select_msgs(LSockRef, AccRef), {error, timeout} - end + end; + + {error, _} = ERROR -> + nif_cancel(LSockRef, accept, AccRef), % Just to be on the safe side... + ERROR end. diff --git a/lib/kernel/test/socket_server.erl b/lib/kernel/test/socket_server.erl index d9bbf00e85..8a77b9b3c9 100644 --- a/lib/kernel/test/socket_server.erl +++ b/lib/kernel/test/socket_server.erl @@ -28,10 +28,12 @@ -define(LIB, socket_lib). --record(manager, {peek, acceptor, handler_id, handlers}). --record(acceptor, {socket, manager}). +-record(manager, {socket, peek, acceptors, handler_id, handlers}). +-record(acceptor, {id, socket, manager}). -record(handler, {socket, peek, type, manager}). +-define(NUM_ACCEPTORS, 5). + start() -> start_tcp(). @@ -83,17 +85,13 @@ manager_reply(Pid, Ref, Reply) -> manager_init(Domain, stream = Type, Proto, Peek) -> put(sname, "manager"), - i("try start acceptor"), - case acceptor_start(Domain, Type, Proto) of - {ok, {Pid, MRef}} -> - i("acceptor started"), - manager_loop(#manager{peek = Peek, - acceptor = {Pid, MRef}, - handler_id = 1, - handlers = []}); - {error, Reason} -> - exit({failed_starting_acceptor, Reason}) - end; + i("try start acceptor(s)"), + {Sock, Acceptors} = manager_stream_init(Domain, Type, Proto), + manager_loop(#manager{socket = Sock, + peek = Peek, + acceptors = Acceptors, + handler_id = 1, + handlers = []}); manager_init(Domain, dgram = Type, Proto, Peek) -> put(sname, "manager"), i("try open socket"), @@ -142,7 +140,7 @@ manager_init(Domain, dgram = Type, Proto, Peek) -> handler_continue(Pid), manager_loop(#manager{peek = Peek, handler_id = 2, % Just in case - handlers = [{Pid, MRef, 1}]}); + handlers = [{1, Pid, MRef}]}); {error, SReason} -> e("Failed starting handler: " "~n ~p", [SReason]), @@ -155,35 +153,142 @@ manager_init(Domain, dgram = Type, Proto, Peek) -> end. +manager_stream_init(Domain, Type, Proto) -> + i("try (socket) open"), + Sock = case socket:open(Domain, Type, Proto) of + {ok, S} -> + S; + {error, OReason} -> + throw({open, OReason}) + end, + F = fun(X) -> case socket:getopt(Sock, socket, X) of + {ok, V} -> f("~p", [V]); + {error, R} -> f("error: ~p", [R]) + end + end, + i("(socket) open (~s,~s,~s): " + "~n debug: ~s" + "~n prio: ~s" + "~n => try find (local) address", + [F(domain), F(type), F(protocol), F(debug), F(priority)]), + Addr = which_addr(Domain), + SA = #{family => Domain, + addr => Addr}, + i("found: " + "~n ~p" + "~n => try (socket) bind", [Addr]), + %% ok = socket:setopt(Sock, otp, debug, true), + %% ok = socket:setopt(Sock, socket, debug, 1), %% must have rights!! + Port = case socket:bind(Sock, SA) of + {ok, P} -> + %% ok = socket:setopt(Sock, socket, debug, 0), %% must have rights!! + %% ok = socket:setopt(Sock, otp, debug, false), + P; + {error, BReason} -> + throw({bind, BReason}) + end, + i("bound to: " + "~n ~p" + "~n => try (socket) listen (acceptconn: ~s)", + [Port, F(acceptconn)]), + case socket:listen(Sock) of + ok -> + i("listening (acceptconn: ~s)", + [F(acceptconn)]), + manager_stream_init(Sock, 1, ?NUM_ACCEPTORS, []); + {error, LReason} -> + throw({listen, LReason}) + end. + +which_addr(Domain) -> + Iflist = case inet:getifaddrs() of + {ok, IFL} -> + IFL; + {error, Reason} -> + throw({inet,getifaddrs,Reason}) + end, + which_addr(Domain, Iflist). + +which_addr(_Domain, []) -> + throw(no_address); +which_addr(Domain, [{Name, IFO}|_IFL]) when (Name =/= "lo") -> + which_addr2(Domain, IFO); +which_addr(Domain, [_|IFL]) -> + which_addr(Domain, IFL). + +which_addr2(_, []) -> + throw(no_address); +which_addr2(inet = _Domain, [{addr, Addr}|_IFO]) when (size(Addr) =:= 4) -> + Addr; +which_addr2(inet6 = _Domain, [{addr, Addr}|_IFO]) when (size(Addr) =:= 8) -> + Addr; +which_addr2(Domain, [_|IFO]) -> + which_addr2(Domain, IFO). + + +manager_stream_init(Sock, ID, NumAcceptors, Acc) + when (NumAcceptors > 0) -> + i("try start acceptor"), + case acceptor_start(Sock, ID) of + {ok, {Pid, MRef}} -> + i("acceptor ~w (~p) started", [ID, Pid]), + ?LIB:sleep(5000), + manager_stream_init(Sock, ID+1, NumAcceptors-1, + [{ID, Pid, MRef}|Acc]); + {error, Reason} -> + exit({failed_starting_acceptor, Reason}) + end; +manager_stream_init(Sock, _ID, 0, Acc) -> + %% Req = {kill_acceptor, length(Acc)}, % Last in the queue + %% Req = {kill_acceptor, 3}, % In the "middle" of the queue + %% Req = {kill_acceptor, 2}, % The first in the queue + %% Req = {kill_acceptor, 1}, % Current acceptor + %% Msg = {manager, self(), make_ref(), Req}, + %% erlang:send_after(timer:seconds(10), self(), Msg), + {Sock, lists:reverse(Acc)}. + + manager_loop(M) -> receive {'DOWN', MRef, process, Pid, Reason} -> M2 = manager_handle_down(M, MRef, Pid, Reason), manager_loop(M2); - + {manager, Pid, Ref, Request} -> M2 = manager_handle_request(M, Pid, Ref, Request), manager_loop(M2) end. -manager_handle_down(#manager{acceptor = {Pid, MRef}}, MRef, Pid, Reason) - when (Reason =/= normal) -> - e("acceptor died: " - "~n ~p", [Reason]), - exit({acceptor_died, Reason}); -manager_handle_down(#manager{acceptor = {Pid, MRef}}, MRef, Pid, Reason) -> - exit(Reason); -manager_handle_down(#manager{handlers = Handlers} = M, _MRef, Pid, Reason) -> - if - (Reason =/= normal) -> - e("handler ~p died: " - "~n ~p", [Pid, Reason]); - true -> - i("handler ~p terminated", [Pid]) - end, - Handlers2 = lists:keydelete(Pid, 1, Handlers), - M#manager{handlers = Handlers2}. +manager_handle_down(#manager{acceptors = Acceptors, + handlers = Handlers} = M, MRef, Pid, Reason) -> + case lists:keysearch(Pid, 2, Acceptors) of + {value, {ID, Pid, MRef}} when (Reason =:= normal) -> + i("acceptor ~w exited (normally)", [ID]), + case lists:keydelete(Pid, 2, Acceptors) of + [] -> + %% We are done + i("the last acceptor - we are done"), + exit(normal); + Acceptors2 -> + M#manager{acceptors = Acceptors2} + end; + {value, {ID, Pid, MRef}} -> + e("acceptor ~w crashed: " + "~n ~p", [ID, Reason]), + exit({acceptor_died, Reason}); + + false -> %% handler! + if + (Reason =/= normal) -> + e("handler ~p died: " + "~n ~p", [Pid, Reason]); + true -> + i("handler ~p terminated", [Pid]) + end, + Handlers2 = lists:keydelete(Pid, 2, Handlers), + M#manager{handlers = Handlers2} + end. manager_handle_request(#manager{peek = Peek, @@ -196,7 +301,7 @@ manager_handle_request(#manager{peek = Peek, i("handler ~w started", [HID]), manager_reply(Pid, Ref, {ok, HPid}), M#manager{handler_id = HID+1, - handlers = [{HPid, HMRef, HID}|Handlers]}; + handlers = [{HID, HPid, HMRef}|Handlers]}; {error, Reason} = ERROR -> e("Failed starting new handler: " "~n Sock: ~p" @@ -204,21 +309,50 @@ manager_handle_request(#manager{peek = Peek, manager_reply(Pid, Ref, ERROR), M end; -manager_handle_request(#manager{acceptor = {Pid, MRef}, - handlers = Handlers}, Pid, Ref, +manager_handle_request(#manager{socket = Sock, + acceptors = [{AID, APid, AMRef}]} = M, _Pid, _Ref, + {kill_acceptor, AID}) -> + i("try kill (only remeining) acceptor ~w", [AID]), + socket:setopt(Sock, otp, debug, true), + manager_stop_acceptor(APid, AMRef, AID, kill), + M#manager{acceptors = []}; +manager_handle_request(#manager{socket = Sock, + acceptors = Acceptors} = M, _Pid, _Ref, + {kill_acceptor, AID}) -> + i("try kill acceptor ~w", [AID]), + case lists:keysearch(AID, 1, Acceptors) of + {value, {AID, APid, AMRef}} -> + socket:setopt(Sock, otp, debug, true), + manager_stop_acceptor(APid, AMRef, AID, kill), + Acceptors2 = lists:keydelete(AID, 1, Acceptors), + M#manager{acceptors = Acceptors2}; + false -> + e("no such acceptor"), + M + end; +manager_handle_request(#manager{acceptors = Acceptors, + handlers = Handlers}, Pid, Ref, {stop, Reason}) -> i("stop"), manager_reply(Pid, Ref, ok), manager_stop_handlers(Handlers, Reason), - i("try stop acceptor ~p: ~p", [Pid, Reason]), - erlang:demonitor(MRef, [flush]), - acceptor_stop(Pid, Reason), - i("stop", []), + manager_stop_acceptors(Acceptors, Reason), + i("stopped", []), exit(Reason). +manager_stop_acceptors(Acceptors, Reason) -> + lists:foreach(fun({ID,P,M}) -> + manager_stop_acceptor(P, M, ID, Reason) + end, Acceptors). + +manager_stop_acceptor(Pid, MRef, ID, Reason) -> + i("try stop acceptor ~w (~p): ~p", [ID, Pid, Reason]), + erlang:demonitor(MRef, [flush]), + acceptor_stop(Pid, Reason), + ok. manager_stop_handlers(Handlers, Reason) -> - lists:foreach(fun({P,M,ID}) -> + lists:foreach(fun({ID,P,M}) -> manager_stop_handler(P, M, ID, Reason) end, Handlers). @@ -232,10 +366,10 @@ manager_stop_handler(Pid, MRef, ID, Reason) -> %% ========================================================================= -acceptor_start(Domain, Type, Proto) -> +acceptor_start(Sock, ID) -> Self = self(), A = {Pid, _} = spawn_monitor(fun() -> - acceptor_init(Self, Domain, Type, Proto) + acceptor_init(Self, Sock, ID) end), receive {acceptor, Pid, ok} -> @@ -258,93 +392,12 @@ acceptor_stop(Pid, _Reason) -> %% reply(acceptor, Pid, Ref, Reply). -acceptor_init(Manager, Domain, Type, Proto) -> - put(sname, "acceptor"), - try acceptor_do_init(Domain, Type, Proto) of - Sock -> - Manager ! {acceptor, self(), ok}, - acceptor_loop(#acceptor{manager = Manager, - socket = Sock}) - catch - throw:E:P -> - e("Failed initiate: " - "~n Error: ~p" - "~n Path: ~p", [E, P]), - Manager ! {acceptor, self(), {error, {catched, E, P}}} - end. - -acceptor_do_init(Domain, Type, Proto) -> - i("try (socket) open"), - Sock = case socket:open(Domain, Type, Proto) of - {ok, S} -> - S; - {error, OReason} -> - throw({open, OReason}) - end, - F = fun(X) -> case socket:getopt(Sock, socket, X) of - {ok, V} -> f("~p", [V]); - {error, R} -> f("error: ~p", [R]) - end - end, - i("(socket) open (~s,~s,~s): " - "~n debug: ~s" - "~n prio: ~s" - "~n => try find (local) address", - [F(domain), F(type), F(protocol), F(debug), F(priority)]), - Addr = which_addr(Domain), - SA = #{family => Domain, - addr => Addr}, - i("found: " - "~n ~p" - "~n => try (socket) bind", [Addr]), - %% ok = socket:setopt(Sock, otp, debug, true), - %% ok = socket:setopt(Sock, socket, debug, 1), %% must have rights!! - Port = case socket:bind(Sock, SA) of - {ok, P} -> - %% ok = socket:setopt(Sock, socket, debug, 0), %% must have rights!! - %% ok = socket:setopt(Sock, otp, debug, false), - P; - {error, BReason} -> - throw({bind, BReason}) - end, - i("bound to: " - "~n ~p" - "~n => try (socket) listen (acceptconn: ~s)", - [Port, F(acceptconn)]), - case socket:listen(Sock) of - ok -> - i("listening (acceptconn: ~s)", - [F(acceptconn)]), - Sock; - {error, LReason} -> - throw({listen, LReason}) - end. - -which_addr(Domain) -> - Iflist = case inet:getifaddrs() of - {ok, IFL} -> - IFL; - {error, Reason} -> - throw({inet,getifaddrs,Reason}) - end, - which_addr(Domain, Iflist). - -which_addr(_Domain, []) -> - throw(no_address); -which_addr(Domain, [{Name, IFO}|_IFL]) when (Name =/= "lo") -> - which_addr2(Domain, IFO); -which_addr(Domain, [_|IFL]) -> - which_addr(Domain, IFL). - -which_addr2(_, []) -> - throw(no_address); -which_addr2(inet = _Domain, [{addr, Addr}|_IFO]) when (size(Addr) =:= 4) -> - Addr; -which_addr2(inet6 = _Domain, [{addr, Addr}|_IFO]) when (size(Addr) =:= 8) -> - Addr; -which_addr2(Domain, [_|IFO]) -> - which_addr2(Domain, IFO). - +acceptor_init(Manager, Sock, ID) -> + put(sname, f("acceptor[~w]", [ID])), + Manager ! {acceptor, self(), ok}, + acceptor_loop(#acceptor{id = ID, + manager = Manager, + socket = Sock}). acceptor_loop(#acceptor{socket = LSock} = A) -> i("try accept"), @@ -600,6 +653,8 @@ send(#handler{socket = Sock, type = dgram}, Msg, Dest) -> f(F, A) -> ?LIB:f(F, A). +e(F) -> + e(F, []). e(F, A) -> ?LIB:e(F, A). -- cgit v1.2.3