From 017565203f40860d24b80a54136a160aee460dbe Mon Sep 17 00:00:00 2001 From: Micael Karlberg Date: Mon, 16 Jul 2018 18:21:48 +0200 Subject: [socket-nif] Add support for multiple acceptor processes Its now possible to have multiple (simultaneous) acceptor processes for the same listening socket. OTP-14831 --- erts/emulator/nifs/common/socket_int.h | 2 + erts/emulator/nifs/common/socket_nif.c | 310 +++++++++++++++++++++++++++++++-- 2 files changed, 302 insertions(+), 10 deletions(-) (limited to 'erts/emulator/nifs/common') diff --git a/erts/emulator/nifs/common/socket_int.h b/erts/emulator/nifs/common/socket_int.h index aa10260134..67e4baba27 100644 --- a/erts/emulator/nifs/common/socket_int.h +++ b/erts/emulator/nifs/common/socket_int.h @@ -92,6 +92,7 @@ typedef unsigned int BOOLEAN_T; #define BOOL2ATOM(__B__) ((__B__) ? esock_atom_true : esock_atom_false) +#define B2S(__B__) ((__B__) ? "true" : "false") /* Misc error strings */ #define ESOCK_STR_EAFNOSUPPORT "eafnosupport" @@ -141,6 +142,7 @@ extern ERL_NIF_TERM esock_atom_eagain; extern ERL_NIF_TERM esock_atom_einval; + /* ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ * Various wrapper macros for enif functions */ diff --git a/erts/emulator/nifs/common/socket_nif.c b/erts/emulator/nifs/common/socket_nif.c index 713153d7c5..224dcc9ff6 100644 --- a/erts/emulator/nifs/common/socket_nif.c +++ b/erts/emulator/nifs/common/socket_nif.c @@ -572,7 +572,7 @@ typedef struct { /* +++ Accept stuff +++ */ ErlNifMutex* accMtx; SocketRequestor currentAcceptor; - SocketRequestor* currentAcceptorP; // NULL or points to currentReader + SocketRequestor* currentAcceptorP; // NULL or points to currentAcceptor SocketRequestQueue acceptorsQ; /* +++ Config & Misc stuff +++ */ @@ -1488,6 +1488,29 @@ static void inc_socket(int domain, int type, int protocol); static void dec_socket(int domain, int type, int protocol); +static BOOLEAN_T acceptor_search4pid(ErlNifEnv* env, + SocketDescriptor* descP, + ErlNifPid* pid); +static ERL_NIF_TERM acceptor_push(ErlNifEnv* env, + SocketDescriptor* descP, + ErlNifPid pid, + ERL_NIF_TERM ref); +static BOOLEAN_T acceptor_pop(ErlNifEnv* env, + SocketDescriptor* descP, + ErlNifPid* pid, + ErlNifMonitor* mon, + ERL_NIF_TERM* ref); + +static BOOLEAN_T qsearch4pid(ErlNifEnv* env, + SocketRequestQueue* q, + ErlNifPid* pid); +static void qpush(SocketRequestQueue* q, + SocketRequestQueueElement* e); +static SocketRequestQueueElement* qpop(SocketRequestQueue* q); +static BOOLEAN_T qunqueue(ErlNifEnv* env, + SocketRequestQueue* q, + const ErlNifPid* pid); + /* #if defined(HAVE_SYS_UN_H) || defined(SO_BINDTODEVICE) static size_t my_strnlen(const char *s, size_t maxlen); @@ -2663,6 +2686,7 @@ ERL_NIF_TERM naccept_accepting(ErlNifEnv* env, HANDLE accEvent; ErlNifPid caller; int save_errno; + ERL_NIF_TERM result; SSDBG( descP, ("SOCKET", "naccept_accepting -> get caller\r\n") ); @@ -2682,11 +2706,18 @@ ERL_NIF_TERM naccept_accepting(ErlNifEnv* env, * for the select message). */ + SSDBG( descP, ("SOCKET", "naccept_accepting -> not (active) acceptor\r\n") ); + + if (!acceptor_search4pid(env, descP, &caller)) + result = acceptor_push(env, descP, caller, ref); + else + result = esock_make_error(env, esock_atom_eagain); + SSDBG( descP, ("SOCKET", - "naccept_accepting -> not current acceptor: busy\r\n") ); + "naccept_accepting -> queue (push) result: %T\r\n", result) ); - return esock_make_error(env, atom_exbusy); + return result; } n = sizeof(descP->remote); @@ -2777,8 +2808,30 @@ ERL_NIF_TERM naccept_accepting(ErlNifEnv* env, * And if so, pop it and copy the (waiting) acceptor, and then * make a new select with that info). */ - descP->state = SOCKET_STATE_LISTENING; + if (acceptor_pop(env, descP, + &descP->currentAcceptor.pid, + &descP->currentAcceptor.mon, + &descP->currentAcceptor.ref)) { + + /* There was another one */ + + SSDBG( descP, ("SOCKET", "naccept_accepting -> new (active) acceptor: " + "\r\n pid: %T" + "\r\n ref: %T" + "\r\n", + descP->currentAcceptor.pid, + descP->currentAcceptor.ref) ); + + SELECT(env, + descP->sock, + (ERL_NIF_SELECT_READ), + descP, &descP->currentAcceptor.pid, descP->currentAcceptor.ref); + } else { + descP->currentAcceptorP = NULL; + descP->state = SOCKET_STATE_LISTENING; + } + return esock_make_ok2(env, accRef); } } @@ -8237,6 +8290,186 @@ char* send_msg(ErlNifEnv* env, +/* ---------------------------------------------------------------------- + * R e q u e s t Q u e u e F u n c t i o n s + * ---------------------------------------------------------------------- + */ + +/* *** acceptor search for pid *** + * + * Search for a pid in the acceptor queue. + */ +static +BOOLEAN_T acceptor_search4pid(ErlNifEnv* env, + SocketDescriptor* descP, + ErlNifPid* pid) +{ + return qsearch4pid(env, &descP->acceptorsQ, pid); +} + + +/* *** acceptor push *** + * + * Push an acceptor onto the acceptor queue. + * This happens when we already have atleast one current acceptor. + */ +static +ERL_NIF_TERM acceptor_push(ErlNifEnv* env, + SocketDescriptor* descP, + ErlNifPid pid, + ERL_NIF_TERM ref) +{ + SocketRequestQueueElement* e = MALLOC(sizeof(SocketRequestQueueElement)); + SocketRequestor* reqP = &e->data; + + reqP->pid = pid; + reqP->ref = ref; + + if (MONP(env, descP, &pid, &reqP->mon) > 0) { + FREE(reqP); + return esock_make_error(env, atom_exmon); + } + + qpush(&descP->acceptorsQ, e); + + // THIS IS OK => MAKES THE CALLER WAIT FOR ITS TURN + return esock_make_error(env, esock_atom_eagain); +} + + +/* *** acceptor pop *** + * + * Pop an acceptor from the acceptor queue. + */ +static +BOOLEAN_T acceptor_pop(ErlNifEnv* env, + SocketDescriptor* descP, + ErlNifPid* pid, + ErlNifMonitor* mon, + ERL_NIF_TERM* ref) +{ + SocketRequestQueueElement* e = qpop(&descP->acceptorsQ); + + if (e != NULL) { + *pid = e->data.pid; + *mon = e->data.mon; + *ref = e->data.ref; + FREE(e); + return TRUE; + } else { + /* (acceptors) Queue was empty */ + // *pid = NULL; we have no null value for pids + // *mon = NULL; we have no null value for monitors + *ref = esock_atom_undefined; // Just in case + return FALSE; + } + +} + + +static +BOOLEAN_T qsearch4pid(ErlNifEnv* env, + SocketRequestQueue* q, + ErlNifPid* pid) +{ + SocketRequestQueueElement* tmp = q->first; + + while (tmp != NULL) { + if (compare_pids(env, &tmp->data.pid, pid)) + return TRUE; + else + tmp = tmp->nextP; + } + + return FALSE; +} + + +static +void qpush(SocketRequestQueue* q, + SocketRequestQueueElement* e) +{ + if (q->first != NULL) { + q->last->nextP = e; + q->last = e; + e->nextP = NULL; + } else { + q->first = e; + q->last = e; + e->nextP = NULL; + } +} + + +static +SocketRequestQueueElement* qpop(SocketRequestQueue* q) +{ + SocketRequestQueueElement* e = q->first; + + if (e != NULL) { + /* Atleast one element in the queue */ + if (e == q->last) { + /* Only one element in the queue */ + q->first = q->last = NULL; + } else { + /* More than one element in the queue */ + q->first = e->nextP; + } + } + + return e; +} + + + +static +BOOLEAN_T qunqueue(ErlNifEnv* env, + SocketRequestQueue* q, + const ErlNifPid* pid) +{ + SocketRequestQueueElement* e = q->first; + SocketRequestQueueElement* p = NULL; + + /* Check if it was one of the waiting acceptor processes */ + while (e != NULL) { + if (compare_pids(env, &e->data.pid, pid)) { + + /* We have a match */ + + if (p != NULL) { + /* Not the first, but could be the last */ + if (q->last == e) { + q->last = p; + p->nextP = NULL; + } else { + p->nextP = e->nextP; + } + + } else { + /* The first and could also be the last */ + if (q->last == e) { + q->last = NULL; + q->first = NULL; + } else { + q->first = e->nextP; + } + } + + FREE(e); + + return TRUE; + } + + /* Try next */ + p = e; + e = e->nextP; + } + + return FALSE; +} + + + /* ---------------------------------------------------------------------- * C o u n t e r F u n c t i o n s * ---------------------------------------------------------------------- @@ -8502,13 +8735,70 @@ void socket_down(ErlNifEnv* env, { SocketDescriptor* descP = (SocketDescriptor*) obj; - SSDBG( descP, - ("SOCKET", "socket_down -> entry when" - "\r\n sock: %d" - "\r\n pid: %T" - "\r\n mon: %T" - "\r\n", descP->sock, *pid, *mon) ); + SSDBG( descP, ("SOCKET", "socket_down -> entry with" + "\r\n sock: %d" + "\r\n pid: %T" + "\r\n", descP->sock, *pid) ); + + /* Eventually we should go through the other queues also, + * the process can be one of them... + */ + + /* Check first if its the current acceptor, and if not check the queue */ + if (compare_pids(env, &descP->currentAcceptor.pid, pid)) { + + SSDBG( descP, ("SOCKET", + "socket_down -> current acceptor - try pop the queue\r\n") ); + + if (acceptor_pop(env, descP, + &descP->currentAcceptor.pid, + &descP->currentAcceptor.mon, + &descP->currentAcceptor.ref)) { + int res; + + /* There was another one, so we will still be in accepting state */ + + SSDBG( descP, ("SOCKET", "socket_down -> new (active) acceptor: " + "\r\n pid: %T" + "\r\n ref: %T" + "\r\n", + descP->currentAcceptor.pid, + descP->currentAcceptor.ref) ); + + if ((res = enif_select(env, + descP->sock, + (ERL_NIF_SELECT_READ), + descP, + &descP->currentAcceptor.pid, + descP->currentAcceptor.ref) < 0)) { + + esock_warning_msg("Failed select (%d) for new acceptor " + "after current (%T) died\r\n", + res, *pid); + + } + + } else { + + SSDBG( descP, ("SOCKET", "socket_down -> no active acceptor\r\n") ); + + descP->currentAcceptorP = NULL; + descP->state = SOCKET_STATE_LISTENING; + } + + } else { + + /* Maybe unqueue one of the waiting acceptors */ + + SSDBG( descP, ("SOCKET", + "socket_down -> " + "not current acceptor - maybe a waiting acceptor\r\n") ); + + qunqueue(env, &descP->acceptorsQ, pid); + } + SSDBG( descP, ("SOCKET", "socket_down -> done\r\n") ); + } -- cgit v1.2.3