aboutsummaryrefslogtreecommitdiffstats
path: root/erts/emulator/nifs
diff options
context:
space:
mode:
authorMicael Karlberg <[email protected]>2018-07-16 18:21:48 +0200
committerMicael Karlberg <[email protected]>2018-09-18 14:50:18 +0200
commit017565203f40860d24b80a54136a160aee460dbe (patch)
tree7dd8d8a9426cd918a1d41db7dcd99bcb7cc35a51 /erts/emulator/nifs
parent8de18e84deaed4c9e6e7242ae2550fc6618dc44d (diff)
downloadotp-017565203f40860d24b80a54136a160aee460dbe.tar.gz
otp-017565203f40860d24b80a54136a160aee460dbe.tar.bz2
otp-017565203f40860d24b80a54136a160aee460dbe.zip
[socket-nif] Add support for multiple acceptor processes
Its now possible to have multiple (simultaneous) acceptor processes for the same listening socket. OTP-14831
Diffstat (limited to 'erts/emulator/nifs')
-rw-r--r--erts/emulator/nifs/common/socket_int.h2
-rw-r--r--erts/emulator/nifs/common/socket_nif.c310
2 files changed, 302 insertions, 10 deletions
diff --git a/erts/emulator/nifs/common/socket_int.h b/erts/emulator/nifs/common/socket_int.h
index aa10260134..67e4baba27 100644
--- a/erts/emulator/nifs/common/socket_int.h
+++ b/erts/emulator/nifs/common/socket_int.h
@@ -92,6 +92,7 @@ typedef unsigned int BOOLEAN_T;
#define BOOL2ATOM(__B__) ((__B__) ? esock_atom_true : esock_atom_false)
+#define B2S(__B__) ((__B__) ? "true" : "false")
/* Misc error strings */
#define ESOCK_STR_EAFNOSUPPORT "eafnosupport"
@@ -141,6 +142,7 @@ extern ERL_NIF_TERM esock_atom_eagain;
extern ERL_NIF_TERM esock_atom_einval;
+
/* ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
* Various wrapper macros for enif functions
*/
diff --git a/erts/emulator/nifs/common/socket_nif.c b/erts/emulator/nifs/common/socket_nif.c
index 713153d7c5..224dcc9ff6 100644
--- a/erts/emulator/nifs/common/socket_nif.c
+++ b/erts/emulator/nifs/common/socket_nif.c
@@ -572,7 +572,7 @@ typedef struct {
/* +++ Accept stuff +++ */
ErlNifMutex* accMtx;
SocketRequestor currentAcceptor;
- SocketRequestor* currentAcceptorP; // NULL or points to currentReader
+ SocketRequestor* currentAcceptorP; // NULL or points to currentAcceptor
SocketRequestQueue acceptorsQ;
/* +++ Config & Misc stuff +++ */
@@ -1488,6 +1488,29 @@ static void inc_socket(int domain, int type, int protocol);
static void dec_socket(int domain, int type, int protocol);
+static BOOLEAN_T acceptor_search4pid(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ ErlNifPid* pid);
+static ERL_NIF_TERM acceptor_push(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ ErlNifPid pid,
+ ERL_NIF_TERM ref);
+static BOOLEAN_T acceptor_pop(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ ErlNifPid* pid,
+ ErlNifMonitor* mon,
+ ERL_NIF_TERM* ref);
+
+static BOOLEAN_T qsearch4pid(ErlNifEnv* env,
+ SocketRequestQueue* q,
+ ErlNifPid* pid);
+static void qpush(SocketRequestQueue* q,
+ SocketRequestQueueElement* e);
+static SocketRequestQueueElement* qpop(SocketRequestQueue* q);
+static BOOLEAN_T qunqueue(ErlNifEnv* env,
+ SocketRequestQueue* q,
+ const ErlNifPid* pid);
+
/*
#if defined(HAVE_SYS_UN_H) || defined(SO_BINDTODEVICE)
static size_t my_strnlen(const char *s, size_t maxlen);
@@ -2663,6 +2686,7 @@ ERL_NIF_TERM naccept_accepting(ErlNifEnv* env,
HANDLE accEvent;
ErlNifPid caller;
int save_errno;
+ ERL_NIF_TERM result;
SSDBG( descP, ("SOCKET", "naccept_accepting -> get caller\r\n") );
@@ -2682,11 +2706,18 @@ ERL_NIF_TERM naccept_accepting(ErlNifEnv* env,
* for the select message).
*/
+ SSDBG( descP, ("SOCKET", "naccept_accepting -> not (active) acceptor\r\n") );
+
+ if (!acceptor_search4pid(env, descP, &caller))
+ result = acceptor_push(env, descP, caller, ref);
+ else
+ result = esock_make_error(env, esock_atom_eagain);
+
SSDBG( descP,
("SOCKET",
- "naccept_accepting -> not current acceptor: busy\r\n") );
+ "naccept_accepting -> queue (push) result: %T\r\n", result) );
- return esock_make_error(env, atom_exbusy);
+ return result;
}
n = sizeof(descP->remote);
@@ -2777,8 +2808,30 @@ ERL_NIF_TERM naccept_accepting(ErlNifEnv* env,
* And if so, pop it and copy the (waiting) acceptor, and then
* make a new select with that info).
*/
- descP->state = SOCKET_STATE_LISTENING;
+ if (acceptor_pop(env, descP,
+ &descP->currentAcceptor.pid,
+ &descP->currentAcceptor.mon,
+ &descP->currentAcceptor.ref)) {
+
+ /* There was another one */
+
+ SSDBG( descP, ("SOCKET", "naccept_accepting -> new (active) acceptor: "
+ "\r\n pid: %T"
+ "\r\n ref: %T"
+ "\r\n",
+ descP->currentAcceptor.pid,
+ descP->currentAcceptor.ref) );
+
+ SELECT(env,
+ descP->sock,
+ (ERL_NIF_SELECT_READ),
+ descP, &descP->currentAcceptor.pid, descP->currentAcceptor.ref);
+ } else {
+ descP->currentAcceptorP = NULL;
+ descP->state = SOCKET_STATE_LISTENING;
+ }
+
return esock_make_ok2(env, accRef);
}
}
@@ -8238,6 +8291,186 @@ char* send_msg(ErlNifEnv* env,
/* ----------------------------------------------------------------------
+ * R e q u e s t Q u e u e F u n c t i o n s
+ * ----------------------------------------------------------------------
+ */
+
+/* *** acceptor search for pid ***
+ *
+ * Search for a pid in the acceptor queue.
+ */
+static
+BOOLEAN_T acceptor_search4pid(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ ErlNifPid* pid)
+{
+ return qsearch4pid(env, &descP->acceptorsQ, pid);
+}
+
+
+/* *** acceptor push ***
+ *
+ * Push an acceptor onto the acceptor queue.
+ * This happens when we already have atleast one current acceptor.
+ */
+static
+ERL_NIF_TERM acceptor_push(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ ErlNifPid pid,
+ ERL_NIF_TERM ref)
+{
+ SocketRequestQueueElement* e = MALLOC(sizeof(SocketRequestQueueElement));
+ SocketRequestor* reqP = &e->data;
+
+ reqP->pid = pid;
+ reqP->ref = ref;
+
+ if (MONP(env, descP, &pid, &reqP->mon) > 0) {
+ FREE(reqP);
+ return esock_make_error(env, atom_exmon);
+ }
+
+ qpush(&descP->acceptorsQ, e);
+
+ // THIS IS OK => MAKES THE CALLER WAIT FOR ITS TURN
+ return esock_make_error(env, esock_atom_eagain);
+}
+
+
+/* *** acceptor pop ***
+ *
+ * Pop an acceptor from the acceptor queue.
+ */
+static
+BOOLEAN_T acceptor_pop(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ ErlNifPid* pid,
+ ErlNifMonitor* mon,
+ ERL_NIF_TERM* ref)
+{
+ SocketRequestQueueElement* e = qpop(&descP->acceptorsQ);
+
+ if (e != NULL) {
+ *pid = e->data.pid;
+ *mon = e->data.mon;
+ *ref = e->data.ref;
+ FREE(e);
+ return TRUE;
+ } else {
+ /* (acceptors) Queue was empty */
+ // *pid = NULL; we have no null value for pids
+ // *mon = NULL; we have no null value for monitors
+ *ref = esock_atom_undefined; // Just in case
+ return FALSE;
+ }
+
+}
+
+
+static
+BOOLEAN_T qsearch4pid(ErlNifEnv* env,
+ SocketRequestQueue* q,
+ ErlNifPid* pid)
+{
+ SocketRequestQueueElement* tmp = q->first;
+
+ while (tmp != NULL) {
+ if (compare_pids(env, &tmp->data.pid, pid))
+ return TRUE;
+ else
+ tmp = tmp->nextP;
+ }
+
+ return FALSE;
+}
+
+
+static
+void qpush(SocketRequestQueue* q,
+ SocketRequestQueueElement* e)
+{
+ if (q->first != NULL) {
+ q->last->nextP = e;
+ q->last = e;
+ e->nextP = NULL;
+ } else {
+ q->first = e;
+ q->last = e;
+ e->nextP = NULL;
+ }
+}
+
+
+static
+SocketRequestQueueElement* qpop(SocketRequestQueue* q)
+{
+ SocketRequestQueueElement* e = q->first;
+
+ if (e != NULL) {
+ /* Atleast one element in the queue */
+ if (e == q->last) {
+ /* Only one element in the queue */
+ q->first = q->last = NULL;
+ } else {
+ /* More than one element in the queue */
+ q->first = e->nextP;
+ }
+ }
+
+ return e;
+}
+
+
+
+static
+BOOLEAN_T qunqueue(ErlNifEnv* env,
+ SocketRequestQueue* q,
+ const ErlNifPid* pid)
+{
+ SocketRequestQueueElement* e = q->first;
+ SocketRequestQueueElement* p = NULL;
+
+ /* Check if it was one of the waiting acceptor processes */
+ while (e != NULL) {
+ if (compare_pids(env, &e->data.pid, pid)) {
+
+ /* We have a match */
+
+ if (p != NULL) {
+ /* Not the first, but could be the last */
+ if (q->last == e) {
+ q->last = p;
+ p->nextP = NULL;
+ } else {
+ p->nextP = e->nextP;
+ }
+
+ } else {
+ /* The first and could also be the last */
+ if (q->last == e) {
+ q->last = NULL;
+ q->first = NULL;
+ } else {
+ q->first = e->nextP;
+ }
+ }
+
+ FREE(e);
+
+ return TRUE;
+ }
+
+ /* Try next */
+ p = e;
+ e = e->nextP;
+ }
+
+ return FALSE;
+}
+
+
+
+/* ----------------------------------------------------------------------
* C o u n t e r F u n c t i o n s
* ----------------------------------------------------------------------
*/
@@ -8502,13 +8735,70 @@ void socket_down(ErlNifEnv* env,
{
SocketDescriptor* descP = (SocketDescriptor*) obj;
- SSDBG( descP,
- ("SOCKET", "socket_down -> entry when"
- "\r\n sock: %d"
- "\r\n pid: %T"
- "\r\n mon: %T"
- "\r\n", descP->sock, *pid, *mon) );
+ SSDBG( descP, ("SOCKET", "socket_down -> entry with"
+ "\r\n sock: %d"
+ "\r\n pid: %T"
+ "\r\n", descP->sock, *pid) );
+
+ /* Eventually we should go through the other queues also,
+ * the process can be one of them...
+ */
+
+ /* Check first if its the current acceptor, and if not check the queue */
+ if (compare_pids(env, &descP->currentAcceptor.pid, pid)) {
+
+ SSDBG( descP, ("SOCKET",
+ "socket_down -> current acceptor - try pop the queue\r\n") );
+
+ if (acceptor_pop(env, descP,
+ &descP->currentAcceptor.pid,
+ &descP->currentAcceptor.mon,
+ &descP->currentAcceptor.ref)) {
+ int res;
+
+ /* There was another one, so we will still be in accepting state */
+
+ SSDBG( descP, ("SOCKET", "socket_down -> new (active) acceptor: "
+ "\r\n pid: %T"
+ "\r\n ref: %T"
+ "\r\n",
+ descP->currentAcceptor.pid,
+ descP->currentAcceptor.ref) );
+
+ if ((res = enif_select(env,
+ descP->sock,
+ (ERL_NIF_SELECT_READ),
+ descP,
+ &descP->currentAcceptor.pid,
+ descP->currentAcceptor.ref) < 0)) {
+
+ esock_warning_msg("Failed select (%d) for new acceptor "
+ "after current (%T) died\r\n",
+ res, *pid);
+
+ }
+
+ } else {
+
+ SSDBG( descP, ("SOCKET", "socket_down -> no active acceptor\r\n") );
+
+ descP->currentAcceptorP = NULL;
+ descP->state = SOCKET_STATE_LISTENING;
+ }
+
+ } else {
+
+ /* Maybe unqueue one of the waiting acceptors */
+
+ SSDBG( descP, ("SOCKET",
+ "socket_down -> "
+ "not current acceptor - maybe a waiting acceptor\r\n") );
+
+ qunqueue(env, &descP->acceptorsQ, pid);
+ }
+ SSDBG( descP, ("SOCKET", "socket_down -> done\r\n") );
+
}