diff options
-rw-r--r-- | erts/emulator/nifs/common/socket_nif.c | 409 |
1 files changed, 127 insertions, 282 deletions
diff --git a/erts/emulator/nifs/common/socket_nif.c b/erts/emulator/nifs/common/socket_nif.c index ba8e2137e8..3caffd8554 100644 --- a/erts/emulator/nifs/common/socket_nif.c +++ b/erts/emulator/nifs/common/socket_nif.c @@ -2243,15 +2243,16 @@ static void dec_socket(int domain, int type, int protocol); ERL_NIF_TERM ref); \ static BOOLEAN_T O##_pop(ErlNifEnv* env, \ SocketDescriptor* descP, \ - ErlNifPid* pid, \ - ESockMonitor* mon, \ - ERL_NIF_TERM* ref); \ + SocketRequestor* reqP); \ static BOOLEAN_T O##_unqueue(ErlNifEnv* env, \ SocketDescriptor* descP, \ const ErlNifPid* pid); ESOCK_OPERATOR_FUNCS_DEFS #undef ESOCK_OPERATOR_FUNCS_DEF +static BOOLEAN_T requestor_pop(SocketRequestQueue* q, + SocketRequestor* reqP); + static BOOLEAN_T qsearch4pid(ErlNifEnv* env, SocketRequestQueue* q, ErlNifPid* pid); @@ -5112,10 +5113,7 @@ ERL_NIF_TERM naccept_accepting_current_accept(ErlNifEnv* env, * */ - if (acceptor_pop(env, descP, - &descP->currentAcceptor.pid, - &descP->currentAcceptor.mon, - &descP->currentAcceptor.ref)) { + if (acceptor_pop(env, descP, &descP->currentAcceptor)) { /* There was another one */ @@ -13164,10 +13162,7 @@ ERL_NIF_TERM ncancel_accept_current(ErlNifEnv* env, SSDBG( descP, ("SOCKET", "ncancel_accept_current -> cancel res: %T\r\n", res) ); - if (acceptor_pop(env, descP, - &descP->currentAcceptor.pid, - &descP->currentAcceptor.mon, - &descP->currentAcceptor.ref)) { + if (acceptor_pop(env, descP, &descP->currentAcceptor)) { /* There was another one */ @@ -13288,10 +13283,7 @@ ERL_NIF_TERM ncancel_send_current(ErlNifEnv* env, SSDBG( descP, ("SOCKET", "ncancel_send_current -> cancel res: %T\r\n", res) ); - if (writer_pop(env, descP, - &descP->currentWriter.pid, - &descP->currentWriter.mon, - &descP->currentWriter.ref)) { + if (writer_pop(env, descP, &descP->currentWriter)) { /* There was another one */ @@ -13410,10 +13402,7 @@ ERL_NIF_TERM ncancel_recv_current(ErlNifEnv* env, SSDBG( descP, ("SOCKET", "ncancel_recv_current -> cancel res: %T\r\n", res) ); - if (reader_pop(env, descP, - &descP->currentReader.pid, - &descP->currentReader.mon, - &descP->currentReader.ref)) { + if (reader_pop(env, descP, &descP->currentReader)) { /* There was another one */ @@ -13619,10 +13608,7 @@ ERL_NIF_TERM send_check_result(ErlNifEnv* env, /* Ok, this write is done maybe activate the next (if any) */ - if (writer_pop(env, descP, - &descP->currentWriter.pid, - &descP->currentWriter.mon, - &descP->currentWriter.ref)) { + if (writer_pop(env, descP, &descP->currentWriter)) { /* There was another one */ @@ -13652,10 +13638,8 @@ ERL_NIF_TERM send_check_result(ErlNifEnv* env, /* Some kind of send failure - check what kind */ if ((saveErrno != EAGAIN) && (saveErrno != EINTR)) { - ErlNifPid pid; - // ErlNifMonitor mon; - ESockMonitor mon; - ERL_NIF_TERM ref, res; + SocketRequestor req; + ERL_NIF_TERM res; /* * An actual failure - we (and everyone waiting) give up @@ -13669,15 +13653,17 @@ ERL_NIF_TERM send_check_result(ErlNifEnv* env, res = esock_make_error_errno(env, saveErrno); if (descP->currentWriterP != NULL) { + DEMONP("send_check_result -> current writer", env, descP, &descP->currentWriter.mon); - while (writer_pop(env, descP, &pid, &mon, &ref)) { + while (writer_pop(env, descP, &req)) { SSDBG( descP, - ("SOCKET", "send_check_result -> abort %T\r\n", pid) ); - esock_send_abort_msg(env, sockRef, ref, res, &pid); + ("SOCKET", "send_check_result -> abort %T\r\n", + req.pid) ); + esock_send_abort_msg(env, sockRef, req.ref, res, &req.pid); DEMONP("send_check_result -> pop'ed writer", - env, descP, &mon); + env, descP, &req.mon); } } @@ -13852,10 +13838,7 @@ ERL_NIF_TERM recv_update_current_reader(ErlNifEnv* env, DEMONP("recv_update_current_reader -> current reader", env, descP, &descP->currentReader.mon); - if (reader_pop(env, descP, - &descP->currentReader.pid, - &descP->currentReader.mon, - &descP->currentReader.ref)) { + if (reader_pop(env, descP, &descP->currentReader)) { /* There was another one */ @@ -13898,21 +13881,20 @@ void recv_error_current_reader(ErlNifEnv* env, ERL_NIF_TERM sockRef, ERL_NIF_TERM reason) { + SocketRequestor req; + if (descP->currentReaderP != NULL) { - ErlNifPid pid; - // ErlNifMonitor mon; - ESockMonitor mon; - ERL_NIF_TERM ref; DEMONP("recv_error_current_reader -> current reader", env, descP, &descP->currentReader.mon); - while (reader_pop(env, descP, &pid, &mon, &ref)) { + while (reader_pop(env, descP, &req)) { SSDBG( descP, - ("SOCKET", "recv_error_current_reader -> abort %T\r\n", pid) ); - esock_send_abort_msg(env, sockRef, ref, reason, &pid); + ("SOCKET", "recv_error_current_reader -> abort %T\r\n", + req.pid) ); + esock_send_abort_msg(env, sockRef, req.ref, reason, &req.pid); DEMONP("recv_error_current_reader -> pop'ed reader", - env, descP, &mon); + env, descP, &req.mon); } } } @@ -16962,280 +16944,152 @@ int esock_select_cancel(ErlNifEnv* env, /* ---------------------------------------------------------------------- - * R e q u e s t Q u e u e F u n c t i o n s + * R e q u e s t o r Q u e u e F u n c t i o n s * ---------------------------------------------------------------------- - */ - -/* *** acceptor search for pid *** * - * Search for a pid in the acceptor queue. + * Since each of these functions (search4pid, push, pop and unqueu + * are virtually identical for acceptors, writers and readers, + * we make use of set of declaration macros. */ -#if !defined(__WIN32__) -static -BOOLEAN_T acceptor_search4pid(ErlNifEnv* env, - SocketDescriptor* descP, - ErlNifPid* pid) -{ - return qsearch4pid(env, &descP->acceptorsQ, pid); -} +#if !defined(__WIN32__) -/* *** acceptor push *** +/* *** search4pid *** * - * Push an acceptor onto the acceptor queue. - * This happens when we already have atleast one current acceptor. - */ -static -ERL_NIF_TERM acceptor_push(ErlNifEnv* env, - SocketDescriptor* descP, - ErlNifPid pid, - ERL_NIF_TERM ref) -{ - SocketRequestQueueElement* e = MALLOC(sizeof(SocketRequestQueueElement)); - SocketRequestor* reqP = &e->data; - - reqP->pid = pid; - reqP->ref = enif_make_copy(descP->env, ref); - - if (MONP("acceptor_push -> acceptor request", - env, descP, &pid, &reqP->mon) != 0) { - FREE(reqP); - return esock_make_error(env, atom_exmon); - } - - qpush(&descP->acceptorsQ, e); - - // THIS IS OK => MAKES THE CALLER WAIT FOR ITS TURN - return esock_make_error(env, esock_atom_eagain); -} - - -/* *** acceptor pop *** + * Search for a pid in the requestor (acceptor, writer, or reader) queue. * - * Pop an acceptor from the acceptor queue. */ -static -BOOLEAN_T acceptor_pop(ErlNifEnv* env, - SocketDescriptor* descP, - ErlNifPid* pid, - // ErlNifMonitor* mon, - ESockMonitor* mon, - ERL_NIF_TERM* ref) -{ - SocketRequestQueueElement* e = qpop(&descP->acceptorsQ); - if (e != NULL) { - *pid = e->data.pid; - *mon = e->data.mon; - *ref = e->data.ref; - FREE(e); - return TRUE; - } else { - /* (acceptors) Queue was empty */ - // *pid = NULL; we have no null value for pids - // *mon = NULL; we have no null value for monitors - *ref = esock_atom_undefined; // Just in case - return FALSE; +#define REQ_SEARCH4PID_FUNCS \ + REQ_SEARCH4PID_FUNC_DECL(acceptor, acceptorsQ) \ + REQ_SEARCH4PID_FUNC_DECL(writer, writersQ) \ + REQ_SEARCH4PID_FUNC_DECL(reader, readersQ) + +#define REQ_SEARCH4PID_FUNC_DECL(F, Q) \ + static \ + BOOLEAN_T F##_search4pid(ErlNifEnv* env, \ + SocketDescriptor* descP, \ + ErlNifPid* pid) \ + { \ + return qsearch4pid(env, &descP->Q, pid); \ } - -} +REQ_SEARCH4PID_FUNCS +#undef REQ_SEARCH4PID_FUNC_DECL -/* *** acceptor unqueue *** +/* *** push *** * - * Remove an acceptor from the acceptor queue. - */ -static -BOOLEAN_T acceptor_unqueue(ErlNifEnv* env, - SocketDescriptor* descP, - const ErlNifPid* pid) -{ - return qunqueue(env, descP, "qunqueue -> waiting acceptor", - &descP->acceptorsQ, pid); -} - - - -/* *** writer search for pid *** + * Push a requestor (acceptor, writer, or reader) onto its queue. + * This happens when we already have a current request (of its type). * - * Search for a pid in the writer queue. */ -static -BOOLEAN_T writer_search4pid(ErlNifEnv* env, - SocketDescriptor* descP, - ErlNifPid* pid) -{ - return qsearch4pid(env, &descP->writersQ, pid); -} - -/* *** writer push *** - * - * Push an writer onto the writer queue. - * This happens when we already have atleast one current writer. - */ -static -ERL_NIF_TERM writer_push(ErlNifEnv* env, - SocketDescriptor* descP, - ErlNifPid pid, - ERL_NIF_TERM ref) -{ - SocketRequestQueueElement* e = MALLOC(sizeof(SocketRequestQueueElement)); - SocketRequestor* reqP = &e->data; - - reqP->pid = pid; - reqP->ref = enif_make_copy(descP->env, ref); - - if (MONP("writer_push -> writer request", - env, descP, &pid, &reqP->mon) != 0) { - FREE(reqP); - return esock_make_error(env, atom_exmon); +#define REQ_PUSH_FUNCS \ + REQ_PUSH_FUNC_DECL(acceptor, acceptorsQ) \ + REQ_PUSH_FUNC_DECL(writer, writersQ) \ + REQ_PUSH_FUNC_DECL(reader, readersQ) + +#define REQ_PUSH_FUNC_DECL(F, Q) \ + static \ + ERL_NIF_TERM F##_push(ErlNifEnv* env, \ + SocketDescriptor* descP, \ + ErlNifPid pid, \ + ERL_NIF_TERM ref) \ + { \ + SocketRequestQueueElement* e = MALLOC(sizeof(SocketRequestQueueElement)); \ + SocketRequestor* reqP = &e->data; \ + \ + reqP->pid = pid; \ + reqP->ref = enif_make_copy(descP->env, ref); \ + \ + if (MONP("reader_push -> " #F " request", \ + env, descP, &pid, &reqP->mon) != 0) { \ + FREE(reqP); \ + return esock_make_error(env, atom_exmon); \ + } \ + \ + qpush(&descP->Q, e); \ + \ + return esock_make_error(env, esock_atom_eagain); \ } - - qpush(&descP->writersQ, e); - - // THIS IS OK => MAKES THE CALLER WAIT FOR ITS TURN - return esock_make_error(env, esock_atom_eagain); -} +REQ_PUSH_FUNCS +#undef REQ_PUSH_FUNC_DECL -/* *** writer pop *** +/* *** pop *** + * + * Pop a requestor (acceptor, writer, or reader) from its queue. * - * Pop an writer from the writer queue. */ -static -BOOLEAN_T writer_pop(ErlNifEnv* env, - SocketDescriptor* descP, - ErlNifPid* pid, - // ErlNifMonitor* mon, - ESockMonitor* mon, - ERL_NIF_TERM* ref) -{ - SocketRequestQueueElement* e = qpop(&descP->writersQ); - - if (e != NULL) { - *pid = e->data.pid; - *mon = e->data.mon; - *ref = e->data.ref; // At this point the ref has already been copied (env) - FREE(e); - return TRUE; - } else { - /* (writers) Queue was empty */ - // *pid = NULL; we have no null value for pids - // *mon = NULL; we have no null value for monitors - *ref = esock_atom_undefined; // Just in case - return FALSE; +#define REQ_POP_FUNCS \ + REQ_POP_FUNC_DECL(acceptor, acceptorsQ) \ + REQ_POP_FUNC_DECL(writer, writersQ) \ + REQ_POP_FUNC_DECL(reader, readersQ) + +#define REQ_POP_FUNC_DECL(F, Q) \ + static \ + BOOLEAN_T F##_pop(ErlNifEnv* env, \ + SocketDescriptor* descP, \ + SocketRequestor* reqP) \ + { \ + return requestor_pop(&descP->Q, reqP); \ } - -} +REQ_POP_FUNCS +#undef REQ_POP_FUNC_DECL -/* *** writer unqueue *** +/* *** unqueue *** * - * Remove an writer from the writer queue. - */ -static -BOOLEAN_T writer_unqueue(ErlNifEnv* env, - SocketDescriptor* descP, - const ErlNifPid* pid) -{ - return qunqueue(env, descP, "qunqueue -> waiting writer", - &descP->writersQ, pid); -} - - - -/* *** reader search for pid *** + * Remove a requestor (acceptor, writer, or reader) from its queue. * - * Search for a pid in the reader queue. */ -static -BOOLEAN_T reader_search4pid(ErlNifEnv* env, - SocketDescriptor* descP, - ErlNifPid* pid) -{ - return qsearch4pid(env, &descP->readersQ, pid); -} +#define REQ_UNQUEUE_FUNCS \ + REQ_UNQUEUE_FUNC_DECL(acceptor, acceptorsQ) \ + REQ_UNQUEUE_FUNC_DECL(writer, writersQ) \ + REQ_UNQUEUE_FUNC_DECL(reader, readersQ) -/* *** reader push *** - * - * Push an reader onto the raeder queue. - * This happens when we already have atleast one current reader. - */ -static -ERL_NIF_TERM reader_push(ErlNifEnv* env, - SocketDescriptor* descP, - ErlNifPid pid, - ERL_NIF_TERM ref) -{ - SocketRequestQueueElement* e = MALLOC(sizeof(SocketRequestQueueElement)); - SocketRequestor* reqP = &e->data; - - reqP->pid = pid; - reqP->ref = enif_make_copy(descP->env, ref); - - if (MONP("reader_push -> reader request", - env, descP, &pid, &reqP->mon) != 0) { - FREE(reqP); - return esock_make_error(env, atom_exmon); +#define REQ_UNQUEUE_FUNC_DECL(F, Q) \ + static \ + BOOLEAN_T F##_unqueue(ErlNifEnv* env, \ + SocketDescriptor* descP, \ + const ErlNifPid* pid) \ + { \ + return qunqueue(env, descP, "qunqueue -> waiting " #F, \ + &descP->Q, pid); \ } - - qpush(&descP->readersQ, e); - - // THIS IS OK => MAKES THE CALLER WAIT FOR ITS TURN - return esock_make_error(env, esock_atom_eagain); -} +REQ_UNQUEUE_FUNCS +#undef REQ_UNQUEUE_FUNC_DECL + -/* *** reader pop *** +/* *** requestor pop *** * - * Pop an writer from the reader queue. + * Pop an requestor from its queue. */ static -BOOLEAN_T reader_pop(ErlNifEnv* env, - SocketDescriptor* descP, - ErlNifPid* pid, - // ErlNifMonitor* mon, - ESockMonitor* mon, - ERL_NIF_TERM* ref) +BOOLEAN_T requestor_pop(SocketRequestQueue* q, + SocketRequestor* reqP) { - SocketRequestQueueElement* e = qpop(&descP->readersQ); + SocketRequestQueueElement* e = qpop(q); if (e != NULL) { - *pid = e->data.pid; - *mon = e->data.mon; - *ref = e->data.ref; // At this point the ref has already been copied (env) + reqP->pid = e->data.pid; + reqP->mon = e->data.mon; + reqP->ref = e->data.ref; FREE(e); return TRUE; } else { - /* (readers) Queue was empty */ - // *pid = NULL; we have no null value for pids - // *mon = NULL; we have no null value for monitors - *ref = esock_atom_undefined; // Just in case + /* (writers) Queue was empty */ + enif_set_pid_undefined(&reqP->pid); + // *reqP->mon = NULL; we have no null value for monitors + reqP->ref = esock_atom_undefined; // Just in case return FALSE; } } -/* *** reader unqueue *** - * - * Remove an reader from the reader queue. - */ -static -BOOLEAN_T reader_unqueue(ErlNifEnv* env, - SocketDescriptor* descP, - const ErlNifPid* pid) -{ - return qunqueue(env, descP, "qunqueue -> waiting reader", - &descP->readersQ, pid); -} - - - - - static BOOLEAN_T qsearch4pid(ErlNifEnv* env, SocketRequestQueue* q, @@ -17997,10 +17851,7 @@ void socket_down_acceptor(ErlNifEnv* env, "socket_down_acceptor -> " "current acceptor - try pop the queue\r\n") ); - if (acceptor_pop(env, descP, - &descP->currentAcceptor.pid, - &descP->currentAcceptor.mon, - &descP->currentAcceptor.ref)) { + if (acceptor_pop(env, descP, &descP->currentAcceptor)) { int res; /* There was another one, so we will still be in accepting state */ @@ -18063,10 +17914,7 @@ void socket_down_writer(ErlNifEnv* env, "socket_down_writer -> " "current writer - try pop the queue\r\n") ); - if (writer_pop(env, descP, - &descP->currentWriter.pid, - &descP->currentWriter.mon, - &descP->currentWriter.ref)) { + if (writer_pop(env, descP, &descP->currentWriter)) { int res; /* There was another one */ @@ -18127,10 +17975,7 @@ void socket_down_reader(ErlNifEnv* env, "socket_down_reader -> " "current reader - try pop the queue\r\n") ); - if (reader_pop(env, descP, - &descP->currentReader.pid, - &descP->currentReader.mon, - &descP->currentReader.ref)) { + if (reader_pop(env, descP, &descP->currentReader)) { int res; /* There was another one */ |