aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--erts/emulator/nifs/common/socket_nif.c409
1 files changed, 127 insertions, 282 deletions
diff --git a/erts/emulator/nifs/common/socket_nif.c b/erts/emulator/nifs/common/socket_nif.c
index ba8e2137e8..3caffd8554 100644
--- a/erts/emulator/nifs/common/socket_nif.c
+++ b/erts/emulator/nifs/common/socket_nif.c
@@ -2243,15 +2243,16 @@ static void dec_socket(int domain, int type, int protocol);
ERL_NIF_TERM ref); \
static BOOLEAN_T O##_pop(ErlNifEnv* env, \
SocketDescriptor* descP, \
- ErlNifPid* pid, \
- ESockMonitor* mon, \
- ERL_NIF_TERM* ref); \
+ SocketRequestor* reqP); \
static BOOLEAN_T O##_unqueue(ErlNifEnv* env, \
SocketDescriptor* descP, \
const ErlNifPid* pid);
ESOCK_OPERATOR_FUNCS_DEFS
#undef ESOCK_OPERATOR_FUNCS_DEF
+static BOOLEAN_T requestor_pop(SocketRequestQueue* q,
+ SocketRequestor* reqP);
+
static BOOLEAN_T qsearch4pid(ErlNifEnv* env,
SocketRequestQueue* q,
ErlNifPid* pid);
@@ -5112,10 +5113,7 @@ ERL_NIF_TERM naccept_accepting_current_accept(ErlNifEnv* env,
*
*/
- if (acceptor_pop(env, descP,
- &descP->currentAcceptor.pid,
- &descP->currentAcceptor.mon,
- &descP->currentAcceptor.ref)) {
+ if (acceptor_pop(env, descP, &descP->currentAcceptor)) {
/* There was another one */
@@ -13164,10 +13162,7 @@ ERL_NIF_TERM ncancel_accept_current(ErlNifEnv* env,
SSDBG( descP, ("SOCKET", "ncancel_accept_current -> cancel res: %T\r\n", res) );
- if (acceptor_pop(env, descP,
- &descP->currentAcceptor.pid,
- &descP->currentAcceptor.mon,
- &descP->currentAcceptor.ref)) {
+ if (acceptor_pop(env, descP, &descP->currentAcceptor)) {
/* There was another one */
@@ -13288,10 +13283,7 @@ ERL_NIF_TERM ncancel_send_current(ErlNifEnv* env,
SSDBG( descP, ("SOCKET", "ncancel_send_current -> cancel res: %T\r\n", res) );
- if (writer_pop(env, descP,
- &descP->currentWriter.pid,
- &descP->currentWriter.mon,
- &descP->currentWriter.ref)) {
+ if (writer_pop(env, descP, &descP->currentWriter)) {
/* There was another one */
@@ -13410,10 +13402,7 @@ ERL_NIF_TERM ncancel_recv_current(ErlNifEnv* env,
SSDBG( descP, ("SOCKET", "ncancel_recv_current -> cancel res: %T\r\n", res) );
- if (reader_pop(env, descP,
- &descP->currentReader.pid,
- &descP->currentReader.mon,
- &descP->currentReader.ref)) {
+ if (reader_pop(env, descP, &descP->currentReader)) {
/* There was another one */
@@ -13619,10 +13608,7 @@ ERL_NIF_TERM send_check_result(ErlNifEnv* env,
/* Ok, this write is done maybe activate the next (if any) */
- if (writer_pop(env, descP,
- &descP->currentWriter.pid,
- &descP->currentWriter.mon,
- &descP->currentWriter.ref)) {
+ if (writer_pop(env, descP, &descP->currentWriter)) {
/* There was another one */
@@ -13652,10 +13638,8 @@ ERL_NIF_TERM send_check_result(ErlNifEnv* env,
/* Some kind of send failure - check what kind */
if ((saveErrno != EAGAIN) && (saveErrno != EINTR)) {
- ErlNifPid pid;
- // ErlNifMonitor mon;
- ESockMonitor mon;
- ERL_NIF_TERM ref, res;
+ SocketRequestor req;
+ ERL_NIF_TERM res;
/*
* An actual failure - we (and everyone waiting) give up
@@ -13669,15 +13653,17 @@ ERL_NIF_TERM send_check_result(ErlNifEnv* env,
res = esock_make_error_errno(env, saveErrno);
if (descP->currentWriterP != NULL) {
+
DEMONP("send_check_result -> current writer",
env, descP, &descP->currentWriter.mon);
- while (writer_pop(env, descP, &pid, &mon, &ref)) {
+ while (writer_pop(env, descP, &req)) {
SSDBG( descP,
- ("SOCKET", "send_check_result -> abort %T\r\n", pid) );
- esock_send_abort_msg(env, sockRef, ref, res, &pid);
+ ("SOCKET", "send_check_result -> abort %T\r\n",
+ req.pid) );
+ esock_send_abort_msg(env, sockRef, req.ref, res, &req.pid);
DEMONP("send_check_result -> pop'ed writer",
- env, descP, &mon);
+ env, descP, &req.mon);
}
}
@@ -13852,10 +13838,7 @@ ERL_NIF_TERM recv_update_current_reader(ErlNifEnv* env,
DEMONP("recv_update_current_reader -> current reader",
env, descP, &descP->currentReader.mon);
- if (reader_pop(env, descP,
- &descP->currentReader.pid,
- &descP->currentReader.mon,
- &descP->currentReader.ref)) {
+ if (reader_pop(env, descP, &descP->currentReader)) {
/* There was another one */
@@ -13898,21 +13881,20 @@ void recv_error_current_reader(ErlNifEnv* env,
ERL_NIF_TERM sockRef,
ERL_NIF_TERM reason)
{
+ SocketRequestor req;
+
if (descP->currentReaderP != NULL) {
- ErlNifPid pid;
- // ErlNifMonitor mon;
- ESockMonitor mon;
- ERL_NIF_TERM ref;
DEMONP("recv_error_current_reader -> current reader",
env, descP, &descP->currentReader.mon);
- while (reader_pop(env, descP, &pid, &mon, &ref)) {
+ while (reader_pop(env, descP, &req)) {
SSDBG( descP,
- ("SOCKET", "recv_error_current_reader -> abort %T\r\n", pid) );
- esock_send_abort_msg(env, sockRef, ref, reason, &pid);
+ ("SOCKET", "recv_error_current_reader -> abort %T\r\n",
+ req.pid) );
+ esock_send_abort_msg(env, sockRef, req.ref, reason, &req.pid);
DEMONP("recv_error_current_reader -> pop'ed reader",
- env, descP, &mon);
+ env, descP, &req.mon);
}
}
}
@@ -16962,280 +16944,152 @@ int esock_select_cancel(ErlNifEnv* env,
/* ----------------------------------------------------------------------
- * R e q u e s t Q u e u e F u n c t i o n s
+ * R e q u e s t o r Q u e u e F u n c t i o n s
* ----------------------------------------------------------------------
- */
-
-/* *** acceptor search for pid ***
*
- * Search for a pid in the acceptor queue.
+ * Since each of these functions (search4pid, push, pop and unqueu
+ * are virtually identical for acceptors, writers and readers,
+ * we make use of set of declaration macros.
*/
-#if !defined(__WIN32__)
-static
-BOOLEAN_T acceptor_search4pid(ErlNifEnv* env,
- SocketDescriptor* descP,
- ErlNifPid* pid)
-{
- return qsearch4pid(env, &descP->acceptorsQ, pid);
-}
+#if !defined(__WIN32__)
-/* *** acceptor push ***
+/* *** search4pid ***
*
- * Push an acceptor onto the acceptor queue.
- * This happens when we already have atleast one current acceptor.
- */
-static
-ERL_NIF_TERM acceptor_push(ErlNifEnv* env,
- SocketDescriptor* descP,
- ErlNifPid pid,
- ERL_NIF_TERM ref)
-{
- SocketRequestQueueElement* e = MALLOC(sizeof(SocketRequestQueueElement));
- SocketRequestor* reqP = &e->data;
-
- reqP->pid = pid;
- reqP->ref = enif_make_copy(descP->env, ref);
-
- if (MONP("acceptor_push -> acceptor request",
- env, descP, &pid, &reqP->mon) != 0) {
- FREE(reqP);
- return esock_make_error(env, atom_exmon);
- }
-
- qpush(&descP->acceptorsQ, e);
-
- // THIS IS OK => MAKES THE CALLER WAIT FOR ITS TURN
- return esock_make_error(env, esock_atom_eagain);
-}
-
-
-/* *** acceptor pop ***
+ * Search for a pid in the requestor (acceptor, writer, or reader) queue.
*
- * Pop an acceptor from the acceptor queue.
*/
-static
-BOOLEAN_T acceptor_pop(ErlNifEnv* env,
- SocketDescriptor* descP,
- ErlNifPid* pid,
- // ErlNifMonitor* mon,
- ESockMonitor* mon,
- ERL_NIF_TERM* ref)
-{
- SocketRequestQueueElement* e = qpop(&descP->acceptorsQ);
- if (e != NULL) {
- *pid = e->data.pid;
- *mon = e->data.mon;
- *ref = e->data.ref;
- FREE(e);
- return TRUE;
- } else {
- /* (acceptors) Queue was empty */
- // *pid = NULL; we have no null value for pids
- // *mon = NULL; we have no null value for monitors
- *ref = esock_atom_undefined; // Just in case
- return FALSE;
+#define REQ_SEARCH4PID_FUNCS \
+ REQ_SEARCH4PID_FUNC_DECL(acceptor, acceptorsQ) \
+ REQ_SEARCH4PID_FUNC_DECL(writer, writersQ) \
+ REQ_SEARCH4PID_FUNC_DECL(reader, readersQ)
+
+#define REQ_SEARCH4PID_FUNC_DECL(F, Q) \
+ static \
+ BOOLEAN_T F##_search4pid(ErlNifEnv* env, \
+ SocketDescriptor* descP, \
+ ErlNifPid* pid) \
+ { \
+ return qsearch4pid(env, &descP->Q, pid); \
}
-
-}
+REQ_SEARCH4PID_FUNCS
+#undef REQ_SEARCH4PID_FUNC_DECL
-/* *** acceptor unqueue ***
+/* *** push ***
*
- * Remove an acceptor from the acceptor queue.
- */
-static
-BOOLEAN_T acceptor_unqueue(ErlNifEnv* env,
- SocketDescriptor* descP,
- const ErlNifPid* pid)
-{
- return qunqueue(env, descP, "qunqueue -> waiting acceptor",
- &descP->acceptorsQ, pid);
-}
-
-
-
-/* *** writer search for pid ***
+ * Push a requestor (acceptor, writer, or reader) onto its queue.
+ * This happens when we already have a current request (of its type).
*
- * Search for a pid in the writer queue.
*/
-static
-BOOLEAN_T writer_search4pid(ErlNifEnv* env,
- SocketDescriptor* descP,
- ErlNifPid* pid)
-{
- return qsearch4pid(env, &descP->writersQ, pid);
-}
-
-/* *** writer push ***
- *
- * Push an writer onto the writer queue.
- * This happens when we already have atleast one current writer.
- */
-static
-ERL_NIF_TERM writer_push(ErlNifEnv* env,
- SocketDescriptor* descP,
- ErlNifPid pid,
- ERL_NIF_TERM ref)
-{
- SocketRequestQueueElement* e = MALLOC(sizeof(SocketRequestQueueElement));
- SocketRequestor* reqP = &e->data;
-
- reqP->pid = pid;
- reqP->ref = enif_make_copy(descP->env, ref);
-
- if (MONP("writer_push -> writer request",
- env, descP, &pid, &reqP->mon) != 0) {
- FREE(reqP);
- return esock_make_error(env, atom_exmon);
+#define REQ_PUSH_FUNCS \
+ REQ_PUSH_FUNC_DECL(acceptor, acceptorsQ) \
+ REQ_PUSH_FUNC_DECL(writer, writersQ) \
+ REQ_PUSH_FUNC_DECL(reader, readersQ)
+
+#define REQ_PUSH_FUNC_DECL(F, Q) \
+ static \
+ ERL_NIF_TERM F##_push(ErlNifEnv* env, \
+ SocketDescriptor* descP, \
+ ErlNifPid pid, \
+ ERL_NIF_TERM ref) \
+ { \
+ SocketRequestQueueElement* e = MALLOC(sizeof(SocketRequestQueueElement)); \
+ SocketRequestor* reqP = &e->data; \
+ \
+ reqP->pid = pid; \
+ reqP->ref = enif_make_copy(descP->env, ref); \
+ \
+ if (MONP("reader_push -> " #F " request", \
+ env, descP, &pid, &reqP->mon) != 0) { \
+ FREE(reqP); \
+ return esock_make_error(env, atom_exmon); \
+ } \
+ \
+ qpush(&descP->Q, e); \
+ \
+ return esock_make_error(env, esock_atom_eagain); \
}
-
- qpush(&descP->writersQ, e);
-
- // THIS IS OK => MAKES THE CALLER WAIT FOR ITS TURN
- return esock_make_error(env, esock_atom_eagain);
-}
+REQ_PUSH_FUNCS
+#undef REQ_PUSH_FUNC_DECL
-/* *** writer pop ***
+/* *** pop ***
+ *
+ * Pop a requestor (acceptor, writer, or reader) from its queue.
*
- * Pop an writer from the writer queue.
*/
-static
-BOOLEAN_T writer_pop(ErlNifEnv* env,
- SocketDescriptor* descP,
- ErlNifPid* pid,
- // ErlNifMonitor* mon,
- ESockMonitor* mon,
- ERL_NIF_TERM* ref)
-{
- SocketRequestQueueElement* e = qpop(&descP->writersQ);
-
- if (e != NULL) {
- *pid = e->data.pid;
- *mon = e->data.mon;
- *ref = e->data.ref; // At this point the ref has already been copied (env)
- FREE(e);
- return TRUE;
- } else {
- /* (writers) Queue was empty */
- // *pid = NULL; we have no null value for pids
- // *mon = NULL; we have no null value for monitors
- *ref = esock_atom_undefined; // Just in case
- return FALSE;
+#define REQ_POP_FUNCS \
+ REQ_POP_FUNC_DECL(acceptor, acceptorsQ) \
+ REQ_POP_FUNC_DECL(writer, writersQ) \
+ REQ_POP_FUNC_DECL(reader, readersQ)
+
+#define REQ_POP_FUNC_DECL(F, Q) \
+ static \
+ BOOLEAN_T F##_pop(ErlNifEnv* env, \
+ SocketDescriptor* descP, \
+ SocketRequestor* reqP) \
+ { \
+ return requestor_pop(&descP->Q, reqP); \
}
-
-}
+REQ_POP_FUNCS
+#undef REQ_POP_FUNC_DECL
-/* *** writer unqueue ***
+/* *** unqueue ***
*
- * Remove an writer from the writer queue.
- */
-static
-BOOLEAN_T writer_unqueue(ErlNifEnv* env,
- SocketDescriptor* descP,
- const ErlNifPid* pid)
-{
- return qunqueue(env, descP, "qunqueue -> waiting writer",
- &descP->writersQ, pid);
-}
-
-
-
-/* *** reader search for pid ***
+ * Remove a requestor (acceptor, writer, or reader) from its queue.
*
- * Search for a pid in the reader queue.
*/
-static
-BOOLEAN_T reader_search4pid(ErlNifEnv* env,
- SocketDescriptor* descP,
- ErlNifPid* pid)
-{
- return qsearch4pid(env, &descP->readersQ, pid);
-}
+#define REQ_UNQUEUE_FUNCS \
+ REQ_UNQUEUE_FUNC_DECL(acceptor, acceptorsQ) \
+ REQ_UNQUEUE_FUNC_DECL(writer, writersQ) \
+ REQ_UNQUEUE_FUNC_DECL(reader, readersQ)
-/* *** reader push ***
- *
- * Push an reader onto the raeder queue.
- * This happens when we already have atleast one current reader.
- */
-static
-ERL_NIF_TERM reader_push(ErlNifEnv* env,
- SocketDescriptor* descP,
- ErlNifPid pid,
- ERL_NIF_TERM ref)
-{
- SocketRequestQueueElement* e = MALLOC(sizeof(SocketRequestQueueElement));
- SocketRequestor* reqP = &e->data;
-
- reqP->pid = pid;
- reqP->ref = enif_make_copy(descP->env, ref);
-
- if (MONP("reader_push -> reader request",
- env, descP, &pid, &reqP->mon) != 0) {
- FREE(reqP);
- return esock_make_error(env, atom_exmon);
+#define REQ_UNQUEUE_FUNC_DECL(F, Q) \
+ static \
+ BOOLEAN_T F##_unqueue(ErlNifEnv* env, \
+ SocketDescriptor* descP, \
+ const ErlNifPid* pid) \
+ { \
+ return qunqueue(env, descP, "qunqueue -> waiting " #F, \
+ &descP->Q, pid); \
}
-
- qpush(&descP->readersQ, e);
-
- // THIS IS OK => MAKES THE CALLER WAIT FOR ITS TURN
- return esock_make_error(env, esock_atom_eagain);
-}
+REQ_UNQUEUE_FUNCS
+#undef REQ_UNQUEUE_FUNC_DECL
+
-/* *** reader pop ***
+/* *** requestor pop ***
*
- * Pop an writer from the reader queue.
+ * Pop an requestor from its queue.
*/
static
-BOOLEAN_T reader_pop(ErlNifEnv* env,
- SocketDescriptor* descP,
- ErlNifPid* pid,
- // ErlNifMonitor* mon,
- ESockMonitor* mon,
- ERL_NIF_TERM* ref)
+BOOLEAN_T requestor_pop(SocketRequestQueue* q,
+ SocketRequestor* reqP)
{
- SocketRequestQueueElement* e = qpop(&descP->readersQ);
+ SocketRequestQueueElement* e = qpop(q);
if (e != NULL) {
- *pid = e->data.pid;
- *mon = e->data.mon;
- *ref = e->data.ref; // At this point the ref has already been copied (env)
+ reqP->pid = e->data.pid;
+ reqP->mon = e->data.mon;
+ reqP->ref = e->data.ref;
FREE(e);
return TRUE;
} else {
- /* (readers) Queue was empty */
- // *pid = NULL; we have no null value for pids
- // *mon = NULL; we have no null value for monitors
- *ref = esock_atom_undefined; // Just in case
+ /* (writers) Queue was empty */
+ enif_set_pid_undefined(&reqP->pid);
+ // *reqP->mon = NULL; we have no null value for monitors
+ reqP->ref = esock_atom_undefined; // Just in case
return FALSE;
}
}
-/* *** reader unqueue ***
- *
- * Remove an reader from the reader queue.
- */
-static
-BOOLEAN_T reader_unqueue(ErlNifEnv* env,
- SocketDescriptor* descP,
- const ErlNifPid* pid)
-{
- return qunqueue(env, descP, "qunqueue -> waiting reader",
- &descP->readersQ, pid);
-}
-
-
-
-
-
static
BOOLEAN_T qsearch4pid(ErlNifEnv* env,
SocketRequestQueue* q,
@@ -17997,10 +17851,7 @@ void socket_down_acceptor(ErlNifEnv* env,
"socket_down_acceptor -> "
"current acceptor - try pop the queue\r\n") );
- if (acceptor_pop(env, descP,
- &descP->currentAcceptor.pid,
- &descP->currentAcceptor.mon,
- &descP->currentAcceptor.ref)) {
+ if (acceptor_pop(env, descP, &descP->currentAcceptor)) {
int res;
/* There was another one, so we will still be in accepting state */
@@ -18063,10 +17914,7 @@ void socket_down_writer(ErlNifEnv* env,
"socket_down_writer -> "
"current writer - try pop the queue\r\n") );
- if (writer_pop(env, descP,
- &descP->currentWriter.pid,
- &descP->currentWriter.mon,
- &descP->currentWriter.ref)) {
+ if (writer_pop(env, descP, &descP->currentWriter)) {
int res;
/* There was another one */
@@ -18127,10 +17975,7 @@ void socket_down_reader(ErlNifEnv* env,
"socket_down_reader -> "
"current reader - try pop the queue\r\n") );
- if (reader_pop(env, descP,
- &descP->currentReader.pid,
- &descP->currentReader.mon,
- &descP->currentReader.ref)) {
+ if (reader_pop(env, descP, &descP->currentReader)) {
int res;
/* There was another one */