diff options
-rw-r--r-- | erts/emulator/nifs/common/socket_int.h | 1 | ||||
-rw-r--r-- | erts/emulator/nifs/common/socket_nif.c | 803 | ||||
-rw-r--r-- | lib/kernel/test/socket_server.erl | 4 |
3 files changed, 727 insertions, 81 deletions
diff --git a/erts/emulator/nifs/common/socket_int.h b/erts/emulator/nifs/common/socket_int.h index 3595c483d7..c3595e495d 100644 --- a/erts/emulator/nifs/common/socket_int.h +++ b/erts/emulator/nifs/common/socket_int.h @@ -184,6 +184,7 @@ extern ERL_NIF_TERM esock_atom_einval; #define MKLA(E,A,L) enif_make_list_from_array((E), (A), (L)) #define MKEL(E) enif_make_list((E), 0) #define MKMA(E,KA,VA,L,M) enif_make_map_from_arrays((E), (KA), (VA), (L), (M)) +#define MKPID(E, P) enif_make_pid((E), (P)) #define MKREF(E) enif_make_ref((E)) #define MKS(E,S) enif_make_string((E), (S), ERL_NIF_LATIN1) #define MKSL(E,S,L) enif_make_string_len((E), (S), (L), ERL_NIF_LATIN1) diff --git a/erts/emulator/nifs/common/socket_nif.c b/erts/emulator/nifs/common/socket_nif.c index c48d6eab00..a6940f788c 100644 --- a/erts/emulator/nifs/common/socket_nif.c +++ b/erts/emulator/nifs/common/socket_nif.c @@ -1490,6 +1490,8 @@ static ERL_NIF_TERM ngetopt_otp_debug(ErlNifEnv* env, SocketDescriptor* descP); static ERL_NIF_TERM ngetopt_otp_iow(ErlNifEnv* env, SocketDescriptor* descP); +static ERL_NIF_TERM ngetopt_otp_ctrl_proc(ErlNifEnv* env, + SocketDescriptor* descP); static ERL_NIF_TERM ngetopt_native(ErlNifEnv* env, SocketDescriptor* descP, int level, @@ -1849,6 +1851,11 @@ static ERL_NIF_TERM ncancel_send_waiting(ErlNifEnv* env, static ERL_NIF_TERM ncancel_recv(ErlNifEnv* env, SocketDescriptor* descP, ERL_NIF_TERM opRef); +static ERL_NIF_TERM ncancel_recv_current(ErlNifEnv* env, + SocketDescriptor* descP); +static ERL_NIF_TERM ncancel_recv_waiting(ErlNifEnv* env, + SocketDescriptor* descP, + ERL_NIF_TERM opRef); static ERL_NIF_TERM ncancel_read_select(ErlNifEnv* env, SocketDescriptor* descP, ERL_NIF_TERM opRef); @@ -1911,6 +1918,18 @@ static ERL_NIF_TERM send_check_result(ErlNifEnv* env, ssize_t dataSize, int saveErrno, ERL_NIF_TERM sendRef); +static BOOLEAN_T recv_check_reader(ErlNifEnv* env, + SocketDescriptor* descP, + ERL_NIF_TERM ref, + ERL_NIF_TERM* checkResult); +static char* recv_init_current_reader(ErlNifEnv* env, + SocketDescriptor* descP, + ERL_NIF_TERM ref); +static ERL_NIF_TERM recv_update_current_reader(ErlNifEnv* env, + SocketDescriptor* descP); +static void recv_error_current_reader(ErlNifEnv* env, + SocketDescriptor* descP, + ERL_NIF_TERM reason); static ERL_NIF_TERM recv_check_result(ErlNifEnv* env, SocketDescriptor* descP, int read, @@ -2150,6 +2169,22 @@ static BOOLEAN_T writer_unqueue(ErlNifEnv* env, SocketDescriptor* descP, const ErlNifPid* pid); +static BOOLEAN_T reader_search4pid(ErlNifEnv* env, + SocketDescriptor* descP, + ErlNifPid* pid); +static ERL_NIF_TERM reader_push(ErlNifEnv* env, + SocketDescriptor* descP, + ErlNifPid pid, + ERL_NIF_TERM ref); +static BOOLEAN_T reader_pop(ErlNifEnv* env, + SocketDescriptor* descP, + ErlNifPid* pid, + ErlNifMonitor* mon, + ERL_NIF_TERM* ref); +static BOOLEAN_T reader_unqueue(ErlNifEnv* env, + SocketDescriptor* descP, + const ErlNifPid* pid); + static BOOLEAN_T qsearch4pid(ErlNifEnv* env, SocketRequestQueue* q, ErlNifPid* pid); @@ -2159,7 +2194,6 @@ static SocketRequestQueueElement* qpop(SocketRequestQueue* q); static BOOLEAN_T qunqueue(ErlNifEnv* env, SocketRequestQueue* q, const ErlNifPid* pid); - /* #if defined(HAVE_SYS_UN_H) || defined(SO_BINDTODEVICE) static size_t my_strnlen(const char *s, size_t maxlen); @@ -2175,6 +2209,15 @@ static void socket_down(ErlNifEnv* env, void* obj, const ErlNifPid* pid, const ErlNifMonitor* mon); +static void socket_down_acceptor(ErlNifEnv* env, + SocketDescriptor* descP, + const ErlNifPid* pid); +static void socket_down_writer(ErlNifEnv* env, + SocketDescriptor* descP, + const ErlNifPid* pid); +static void socket_down_reader(ErlNifEnv* env, + SocketDescriptor* descP, + const ErlNifPid* pid); /* static char* send_msg_error_closed(ErlNifEnv* env, @@ -3986,6 +4029,8 @@ ERL_NIF_TERM nsendmsg(ErlNifEnv* env, if (ctrlBuf != NULL) FREE(ctrlBuf); return esock_make_error_str(env, xres); } + } else { + ctrlBufUsed = 0; } msgHdr.msg_control = ctrlBuf; msgHdr.msg_controllen = ctrlBufUsed; @@ -4168,6 +4213,7 @@ ERL_NIF_TERM nrecv(ErlNifEnv* env, { ssize_t read; ErlNifBinary buf; + ERL_NIF_TERM readerCheck; int save_errno; int bufSz = (len ? len : descP->rBufSz); @@ -4179,6 +4225,10 @@ ERL_NIF_TERM nrecv(ErlNifEnv* env, if (!descP->isReadable) return enif_make_badarg(env); + /* Check if there is already a current reader and if its us */ + if (!recv_check_reader(env, descP, recvRef, &readerCheck)) + return readerCheck; + /* Allocate a buffer: * Either as much as we want to read or (if zero (0)) use the "default" * size (what has been configured). @@ -4315,6 +4365,7 @@ ERL_NIF_TERM nrecvfrom(ErlNifEnv* env, ssize_t read; int save_errno; ErlNifBinary buf; + ERL_NIF_TERM readerCheck; int bufSz = (len ? len : descP->rBufSz); SSDBG( descP, ("SOCKET", "nrecvfrom -> entry with" @@ -4325,6 +4376,10 @@ ERL_NIF_TERM nrecvfrom(ErlNifEnv* env, if (!descP->isReadable) return enif_make_badarg(env); + /* Check if there is already a current reader and if its us */ + if (!recv_check_reader(env, descP, recvRef, &readerCheck)) + return readerCheck; + /* Allocate a buffer: * Either as much as we want to read or (if zero (0)) use the "default" * size (what has been configured). @@ -4476,6 +4531,7 @@ ERL_NIF_TERM nrecvmsg(ErlNifEnv* env, struct iovec iov[1]; // Shall we always use 1? ErlNifBinary data[1]; // Shall we always use 1? ErlNifBinary ctrl; + ERL_NIF_TERM readerCheck; SocketAddress addr; SSDBG( descP, ("SOCKET", "nrecvmsg -> entry with" @@ -4487,6 +4543,10 @@ ERL_NIF_TERM nrecvmsg(ErlNifEnv* env, if (!descP->isReadable) return enif_make_badarg(env); + /* Check if there is already a current reader and if its us */ + if (!recv_check_reader(env, descP, recvRef, &readerCheck)) + return readerCheck; + /* for (i = 0; i < sizeof(buf); i++) { if (!ALLOC_BIN(bifSz, &buf[i])) @@ -8093,7 +8153,7 @@ ERL_NIF_TERM ngetopt_otp(ErlNifEnv* env, ERL_NIF_TERM result; SSDBG( descP, - ("SOCKET", "ngetopt_opt -> entry with" + ("SOCKET", "ngetopt_otp -> entry with" "\r\n eOpt: %d" "\r\n", eOpt) ); @@ -8106,13 +8166,17 @@ ERL_NIF_TERM ngetopt_otp(ErlNifEnv* env, result = ngetopt_otp_iow(env, descP); break; + case SOCKET_OPT_OTP_CTRL_PROC: + result = ngetopt_otp_ctrl_proc(env, descP); + break; + default: result = esock_make_error(env, esock_atom_einval); break; } SSDBG( descP, - ("SOCKET", "ngetopt_opt -> done when" + ("SOCKET", "ngetopt_otp -> done when" "\r\n result: %T" "\r\n", result) ); @@ -8144,6 +8208,18 @@ ERL_NIF_TERM ngetopt_otp_iow(ErlNifEnv* env, } +/* ngetopt_otp_ctrl_proc - Handle the OTP (level) controlling_process options + */ +static +ERL_NIF_TERM ngetopt_otp_ctrl_proc(ErlNifEnv* env, + SocketDescriptor* descP) +{ + ERL_NIF_TERM eVal = MKPID(env, &descP->ctrlPid); + + return esock_make_ok2(env, eVal); +} + + /* The option has *not* been encoded. Instead it has been provided * in "native mode" (option is provided as is). In this case it will have the @@ -10752,6 +10828,8 @@ ERL_NIF_TERM ncancel_send(ErlNifEnv* env, { ERL_NIF_TERM res; + MLOCK(descP->writeMtx); + SSDBG( descP, ("SOCKET", "ncancel_send -> entry with" "\r\n opRef: %T" @@ -10759,8 +10837,6 @@ ERL_NIF_TERM ncancel_send(ErlNifEnv* env, "\r\n", opRef, ((descP->currentWriterP == NULL) ? "without writer" : "with writer")) ); - MLOCK(descP->writeMtx); - if (descP->currentWriterP != NULL) { if (COMPARE(opRef, descP->currentWriter.ref) == 0) { res = ncancel_send_current(env, descP); @@ -10859,14 +10935,116 @@ ERL_NIF_TERM ncancel_send_waiting(ErlNifEnv* env, /* *** ncancel_recv *** * - * + * Cancel a read operation. + * Its either the current reader or one of the waiting readers. */ static ERL_NIF_TERM ncancel_recv(ErlNifEnv* env, SocketDescriptor* descP, ERL_NIF_TERM opRef) { - return esock_make_error(env, esock_atom_einval); + ERL_NIF_TERM res; + + MLOCK(descP->readMtx); + + SSDBG( descP, + ("SOCKET", "ncancel_recv -> entry with" + "\r\n opRef: %T" + "\r\n %s" + "\r\n", opRef, + ((descP->currentReaderP == NULL) ? "without reader" : "with reader")) ); + + if (descP->currentReaderP != NULL) { + if (COMPARE(opRef, descP->currentReader.ref) == 0) { + res = ncancel_recv_current(env, descP); + } else { + res = ncancel_recv_waiting(env, descP, opRef); + } + } else { + /* Or badarg? */ + res = esock_make_error(env, esock_atom_einval); + } + + MUNLOCK(descP->readMtx); + + SSDBG( descP, + ("SOCKET", "ncancel_recv -> done with result:" + "\r\n %T" + "\r\n", res) ); + + return res; +} + + +/* The current reader process has an ongoing select we first must + * cancel. Then we must re-activate the "first" (the first + * in the reader queue). + */ +static +ERL_NIF_TERM ncancel_recv_current(ErlNifEnv* env, + SocketDescriptor* descP) +{ + ERL_NIF_TERM res; + + SSDBG( descP, ("SOCKET", "ncancel_recv_current -> entry\r\n") ); + + res = ncancel_read_select(env, descP, descP->currentReader.ref); + + SSDBG( descP, ("SOCKET", "ncancel_recv_current -> cancel res: %T\r\n", res) ); + + if (reader_pop(env, descP, + &descP->currentReader.pid, + &descP->currentReader.mon, + &descP->currentReader.ref)) { + + /* There was another one */ + + SSDBG( descP, ("SOCKET", "ncancel_recv_current -> new (active) reader: " + "\r\n pid: %T" + "\r\n ref: %T" + "\r\n", + descP->currentReader.pid, + descP->currentReader.ref) ); + + SELECT(env, + descP->sock, + (ERL_NIF_SELECT_READ), + descP, &descP->currentReader.pid, descP->currentReader.ref); + + } else { + SSDBG( descP, ("SOCKET", "ncancel_recv_current -> no more readers\r\n") ); + descP->currentReaderP = NULL; + } + + SSDBG( descP, ("SOCKET", "ncancel_recv_current -> done with result:" + "\r\n %T" + "\r\n", res) ); + + return res; +} + + +/* These processes have not performed a select, so we can simply + * remove them from the reader queue. + */ +static +ERL_NIF_TERM ncancel_recv_waiting(ErlNifEnv* env, + SocketDescriptor* descP, + ERL_NIF_TERM opRef) +{ + ErlNifPid caller; + + if (enif_self(env, &caller) == NULL) + return esock_make_error(env, atom_exself); + + /* unqueue request from (reader) queue */ + + if (reader_unqueue(env, descP, &caller)) { + return esock_atom_ok; + } else { + /* Race? */ + return esock_make_error(env, esock_atom_not_found); + } } @@ -10958,7 +11136,8 @@ BOOLEAN_T send_check_writer(ErlNifEnv* env, SSDBG( descP, ("SOCKET", - "nsend -> queue (push) result: %T\r\n", checkResult) ); + "send_check_writer -> queue (push) result: %T\r\n", + checkResult) ); return FALSE; @@ -11005,7 +11184,8 @@ ERL_NIF_TERM send_check_result(ErlNifEnv* env, cnt_inc(&descP->writePkgCnt, 1); cnt_inc(&descP->writeByteCnt, written); - DEMONP(env, descP, &descP->currentWriter.mon); + if (descP->currentWriterP != NULL) + DEMONP(env, descP, &descP->currentWriter.mon); SSDBG( descP, ("SOCKET", "send_check_result -> " @@ -11058,11 +11238,15 @@ ERL_NIF_TERM send_check_result(ErlNifEnv* env, res = esock_make_error_errno(env, saveErrno); - while (writer_pop(env, descP, &pid, &mon, &ref)) { - SSDBG( descP, - ("SOCKET", "send_check_result -> abort %T\r\n", pid) ); - send_msg_nif_abort(env, ref, res, &pid); - DEMONP(env, descP, &mon); + if (descP->currentWriterP != NULL) { + DEMONP(env, descP, &descP->currentWriter.mon); + + while (writer_pop(env, descP, &pid, &mon, &ref)) { + SSDBG( descP, + ("SOCKET", "send_check_result -> abort %T\r\n", pid) ); + send_msg_nif_abort(env, ref, res, &pid); + DEMONP(env, descP, &mon); + } } return res; @@ -11087,6 +11271,20 @@ ERL_NIF_TERM send_check_result(ErlNifEnv* env, * so schedule the rest for later. */ + if (descP->currentWriterP == NULL) { + ErlNifPid caller; + + if (enif_self(env, &caller) == NULL) + return esock_make_error(env, atom_exself); + descP->currentWriter.pid = caller; + if (MONP(env, descP, + &descP->currentWriter.pid, + &descP->currentWriter.mon) > 0) + return esock_make_error(env, atom_exmon); + descP->currentWriter.ref = enif_make_copy(descP->env, sendRef); + descP->currentWriterP = &descP->currentWriter; + } + cnt_inc(&descP->writeWaits, 1); SELECT(env, descP->sock, (ERL_NIF_SELECT_WRITE), @@ -11100,6 +11298,167 @@ ERL_NIF_TERM send_check_result(ErlNifEnv* env, } + +/* *** recv_check_reader *** + * + * Checks if we have a current reader and if that is us. If not, then we must + * be made to wait for our turn. This is done by pushing us unto the reader queue. + */ +static +BOOLEAN_T recv_check_reader(ErlNifEnv* env, + SocketDescriptor* descP, + ERL_NIF_TERM ref, + ERL_NIF_TERM* checkResult) +{ + if (descP->currentReaderP != NULL) { + ErlNifPid caller; + + if (enif_self(env, &caller) == NULL) { + *checkResult = esock_make_error(env, atom_exself); + return FALSE; + } + + if (!compare_pids(env, &descP->currentReader.pid, &caller)) { + /* Not the "current reader", so (maybe) push onto queue */ + + SSDBG( descP, + ("SOCKET", "recv_check_reader -> not (current) reader\r\n") ); + + if (!reader_search4pid(env, descP, &caller)) + *checkResult = reader_push(env, descP, caller, ref); + else + *checkResult = esock_make_error(env, esock_atom_eagain); + + SSDBG( descP, + ("SOCKET", + "recv_check_reader -> queue (push) result: %T\r\n", + checkResult) ); + + return FALSE; + + } + + } + + *checkResult = esock_atom_ok; // Does not actually matter in this case, but ... + + return TRUE; +} + + + +/* *** recv_init_current_reader *** + * + * Initiate (maybe) the currentReader structure of the descriptor. + * Including monitoring the calling process. + */ +static +char* recv_init_current_reader(ErlNifEnv* env, + SocketDescriptor* descP, + ERL_NIF_TERM recvRef) +{ + if (descP->currentReaderP == NULL) { + ErlNifPid caller; + + if (enif_self(env, &caller) == NULL) + return str_exself; + + descP->currentReader.pid = caller; + if (MONP(env, descP, + &descP->currentReader.pid, + &descP->currentReader.mon) > 0) { + return str_exmon; + } + descP->currentReader.ref = enif_make_copy(descP->env, recvRef); + descP->currentReaderP = &descP->currentReader; + } + + return NULL; +} + + + +/* *** recv_update_current_reader *** + * + * Demonitors the current reader process and pop's the reader queue. + * If there is a waiting (reader) process, then it will be assigned + * as the new current reader and a new (read) select will be done. + */ + +static +ERL_NIF_TERM recv_update_current_reader(ErlNifEnv* env, + SocketDescriptor* descP) +{ + if (descP->currentReaderP != NULL) { + + DEMONP(env, descP, &descP->currentReader.mon); + + if (reader_pop(env, descP, + &descP->currentReader.pid, + &descP->currentReader.mon, + &descP->currentReader.ref)) { + + /* There was another one */ + + SSDBG( descP, + ("SOCKET", "recv_update_current_reader -> new (active) reader: " + "\r\n pid: %T" + "\r\n ref: %T" + "\r\n", + descP->currentReader.pid, + descP->currentReader.ref) ); + + SELECT(env, + descP->sock, + (ERL_NIF_SELECT_READ), + descP, + &descP->currentReader.pid, + descP->currentReader.ref); + + } else { + descP->currentWriterP = NULL; + } + } + + return esock_atom_ok; +} + + + +/* *** recv_error_current_reader *** + * + * Process the current reader and any waiting readers + * when a read (fatal) error has occured. + * All waiting readers will be "aborted", that is a + * nif_abort message will be sent (with reaf and reason). + */ +static +void recv_error_current_reader(ErlNifEnv* env, + SocketDescriptor* descP, + ERL_NIF_TERM reason) +{ + if (descP->currentReaderP != NULL) { + ErlNifPid pid; + ErlNifMonitor mon; + ERL_NIF_TERM ref; + + DEMONP(env, descP, &descP->currentReader.mon); + + while (reader_pop(env, descP, &pid, &mon, &ref)) { + SSDBG( descP, + ("SOCKET", "recv_error_current_reader -> abort %T\r\n", pid) ); + send_msg_nif_abort(env, ref, reason, &pid); + DEMONP(env, descP, &mon); + } + } +} + + + +/* *** recv_check_result *** + * + * Process the result of a call to recv. + */ static ERL_NIF_TERM recv_check_result(ErlNifEnv* env, SocketDescriptor* descP, @@ -11109,6 +11468,7 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env, ErlNifBinary* bufP, ERL_NIF_TERM recvRef) { + char* xres; ERL_NIF_TERM data; SSDBG( descP, @@ -11129,15 +11489,20 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env, */ if ((read == 0) && (descP->type == SOCK_STREAM)) { + ERL_NIF_TERM res = esock_make_error(env, atom_closed); /* * When a stream socket peer has performed an orderly shutdown, the return * value will be 0 (the traditional "end-of-file" return). * * *We* do never actually try to read 0 bytes from a stream socket! + * + * We must also notify any waiting readers! */ - return esock_make_error(env, atom_closed); + recv_error_current_reader(env, descP, res); + + return res; } @@ -11173,6 +11538,11 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env, * => We choose alt 1 for now. */ + cnt_inc(&descP->readByteCnt, read); + + if ((xres = recv_init_current_reader(env, descP, recvRef)) != NULL) + return esock_make_error_str(env, xres); + data = MKBIN(env, bufP); SSDBG( descP, @@ -11188,16 +11558,24 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env, /* <KOLLA> * WE NEED TO INFORM ANY WAITING READERS + * + * DEMONP of the current reader! + * * </KOLLA> */ - data = MKBIN(env, bufP); + cnt_inc(&descP->readPkgCnt, 1); + cnt_inc(&descP->readByteCnt, read); SSDBG( descP, ("SOCKET", "recv_check_result -> [%d] " "we got exactly what we could fit\r\n", toRead) ); + recv_update_current_reader(env, descP); + + data = MKBIN(env, bufP); + return esock_make_ok3(env, atom_true, data); } @@ -11207,6 +11585,7 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env, /* +++ Error handling +++ */ if (saveErrno == ECONNRESET) { + ERL_NIF_TERM res = esock_make_error(env, atom_closed); /* +++ Oups - closed +++ */ @@ -11231,12 +11610,14 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env, descP->closeLocal = FALSE; descP->state = SOCKET_STATE_CLOSING; + recv_error_current_reader(env, descP, res); + SELECT(env, descP->sock, (ERL_NIF_SELECT_STOP), descP, NULL, recvRef); - return esock_make_error(env, atom_closed); + return res; } else if ((saveErrno == ERRNO_BLOCK) || (saveErrno == EAGAIN)) { @@ -11248,9 +11629,14 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env, return esock_make_error(env, esock_atom_eagain); } else { + ERL_NIF_TERM res = esock_make_error_errno(env, saveErrno); + SSDBG( descP, ("SOCKET", "recv_check_result -> [%d] errno: %d\r\n", toRead, saveErrno) ); - return esock_make_error_errno(env, saveErrno); + + recv_error_current_reader(env, descP, res); + + return res; } } else { @@ -11265,19 +11651,24 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env, if (toRead == 0) { - /* +++ We got a chunk of data but +++ - * +++ since we did not fill the +++ - * +++ buffer, we must split it +++ - * +++ into a sub-binary. +++ + /* +++ We got it all, but since we +++ + * +++ did not fill the buffer, we +++ + * +++ must split it into a sub-binary. +++ */ SSDBG( descP, ("SOCKET", "recv_check_result -> [%d] split buffer\r\n", toRead) ); + cnt_inc(&descP->readPkgCnt, 1); + cnt_inc(&descP->readByteCnt, read); + + recv_update_current_reader(env, descP); + data = MKBIN(env, bufP); data = MKSBIN(env, data, 0, read); - SSDBG( descP, ("SOCKET", "recv_check_result -> [%d] done\r\n", toRead) ); + SSDBG( descP, + ("SOCKET", "recv_check_result -> [%d] done\r\n", toRead) ); return esock_make_ok3(env, atom_true, data); @@ -11289,6 +11680,8 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env, SSDBG( descP, ("SOCKET", "recv_check_result -> [%d] " "only part of message - expect more\r\n", toRead) ); + cnt_inc(&descP->readByteCnt, read); + return esock_make_ok3(env, atom_false, MKBIN(env, bufP)); } } @@ -13907,10 +14300,10 @@ ERL_NIF_TERM writer_push(ErlNifEnv* env, */ static BOOLEAN_T writer_pop(ErlNifEnv* env, - SocketDescriptor* descP, - ErlNifPid* pid, - ErlNifMonitor* mon, - ERL_NIF_TERM* ref) + SocketDescriptor* descP, + ErlNifPid* pid, + ErlNifMonitor* mon, + ERL_NIF_TERM* ref) { SocketRequestQueueElement* e = qpop(&descP->writersQ); @@ -13921,7 +14314,7 @@ BOOLEAN_T writer_pop(ErlNifEnv* env, FREE(e); return TRUE; } else { - /* (acceptors) Queue was empty */ + /* (writers) Queue was empty */ // *pid = NULL; we have no null value for pids // *mon = NULL; we have no null value for monitors *ref = esock_atom_undefined; // Just in case @@ -13945,6 +14338,92 @@ BOOLEAN_T writer_unqueue(ErlNifEnv* env, +/* *** reader search for pid *** + * + * Search for a pid in the reader queue. + */ +static +BOOLEAN_T reader_search4pid(ErlNifEnv* env, + SocketDescriptor* descP, + ErlNifPid* pid) +{ + return qsearch4pid(env, &descP->readersQ, pid); +} + + +/* *** reader push *** + * + * Push an reader onto the raeder queue. + * This happens when we already have atleast one current reader. + */ +static +ERL_NIF_TERM reader_push(ErlNifEnv* env, + SocketDescriptor* descP, + ErlNifPid pid, + ERL_NIF_TERM ref) +{ + SocketRequestQueueElement* e = MALLOC(sizeof(SocketRequestQueueElement)); + SocketRequestor* reqP = &e->data; + + reqP->pid = pid; + reqP->ref = enif_make_copy(descP->env, ref); + + if (MONP(env, descP, &pid, &reqP->mon) > 0) { + FREE(reqP); + return esock_make_error(env, atom_exmon); + } + + qpush(&descP->readersQ, e); + + // THIS IS OK => MAKES THE CALLER WAIT FOR ITS TURN + return esock_make_error(env, esock_atom_eagain); +} + + +/* *** reader pop *** + * + * Pop an writer from the reader queue. + */ +static +BOOLEAN_T reader_pop(ErlNifEnv* env, + SocketDescriptor* descP, + ErlNifPid* pid, + ErlNifMonitor* mon, + ERL_NIF_TERM* ref) +{ + SocketRequestQueueElement* e = qpop(&descP->readersQ); + + if (e != NULL) { + *pid = e->data.pid; + *mon = e->data.mon; + *ref = e->data.ref; // At this point the ref has already been copied (env) + FREE(e); + return TRUE; + } else { + /* (readers) Queue was empty */ + // *pid = NULL; we have no null value for pids + // *mon = NULL; we have no null value for monitors + *ref = esock_atom_undefined; // Just in case + return FALSE; + } + +} + + +/* *** reader unqueue *** + * + * Remove an reader from the reader queue. + */ +static +BOOLEAN_T reader_unqueue(ErlNifEnv* env, + SocketDescriptor* descP, + const ErlNifPid* pid) +{ + return qunqueue(env, &descP->readersQ, pid); +} + + + static @@ -14324,77 +14803,243 @@ void socket_down(ErlNifEnv* env, "\r\n pid: %T" "\r\n", descP->sock, *pid) ); - /* Eventually we should go through the other queues also, - * the process can be one of them... - * - * Currently only the accteptors actuallu use the queues. - */ + + if (compare_pids(env, &descP->ctrlPid, pid)) { + /* We don't bother with the queue cleanup here - + * we leave it to the stop callback function. + */ - if (descP->currentAcceptorP != NULL) { + descP->state = SOCKET_STATE_CLOSING; + descP->closeLocal = TRUE; + descP->closeRef = MKREF(env); + enif_select(env, descP->sock, (ERL_NIF_SELECT_STOP), + descP, NULL, descP->closeRef); - /* - * We have acceptor(s) (atleast one) - * - * Check first if its the current acceptor, - * and if not check the queue. - */ + } else { + + /* check all operation queue(s): acceptor, writer and reader. */ + + MLOCK(descP->accMtx); + if (descP->currentAcceptorP != NULL) + socket_down_acceptor(env, descP, pid); + MUNLOCK(descP->accMtx); + + MLOCK(descP->writeMtx); + if (descP->currentWriterP != NULL) + socket_down_writer(env, descP, pid); + MUNLOCK(descP->writeMtx); + + MLOCK(descP->readMtx); + if (descP->currentReaderP != NULL) + socket_down_reader(env, descP, pid); + MUNLOCK(descP->readMtx); + + } + + SSDBG( descP, ("SOCKET", "socket_down -> done\r\n") ); + +} - if (compare_pids(env, &descP->currentAcceptor.pid, pid)) { + +/* *** socket_down_acceptor *** + * + * Check and then handle a downed acceptor process. + * + */ +static +void socket_down_acceptor(ErlNifEnv* env, + SocketDescriptor* descP, + const ErlNifPid* pid) +{ + if (compare_pids(env, &descP->currentAcceptor.pid, pid)) { + + SSDBG( descP, ("SOCKET", + "socket_down_acceptor -> " + "current acceptor - try pop the queue\r\n") ); + + if (acceptor_pop(env, descP, + &descP->currentAcceptor.pid, + &descP->currentAcceptor.mon, + &descP->currentAcceptor.ref)) { + int res; + + /* There was another one, so we will still be in accepting state */ + SSDBG( descP, ("SOCKET", - "socket_down -> " - "current acceptor - try pop the queue\r\n") ); + "socket_down_acceptor -> new (active) acceptor: " + "\r\n pid: %T" + "\r\n ref: %T" + "\r\n", + descP->currentAcceptor.pid, + descP->currentAcceptor.ref) ); - if (acceptor_pop(env, descP, - &descP->currentAcceptor.pid, - &descP->currentAcceptor.mon, - &descP->currentAcceptor.ref)) { - int res; + if ((res = enif_select(env, + descP->sock, + (ERL_NIF_SELECT_READ), + descP, + &descP->currentAcceptor.pid, + descP->currentAcceptor.ref) < 0)) { - /* There was another one, so we will still be in accepting state */ + esock_warning_msg("Failed select (%d) for new acceptor " + "after current (%T) died\r\n", + res, *pid); - SSDBG( descP, ("SOCKET", "socket_down -> new (active) acceptor: " - "\r\n pid: %T" - "\r\n ref: %T" - "\r\n", - descP->currentAcceptor.pid, - descP->currentAcceptor.ref) ); + } + + } else { + + SSDBG( descP, ("SOCKET", + "socket_down_acceptor -> no active acceptor\r\n") ); + + descP->currentAcceptorP = NULL; + descP->state = SOCKET_STATE_LISTENING; + } + + } else { + + /* Maybe unqueue one of the waiting acceptors */ + + SSDBG( descP, ("SOCKET", + "socket_down_acceptor -> " + "not current acceptor - maybe a waiting acceptor\r\n") ); + + acceptor_unqueue(env, descP, pid); + } +} + + + + +/* *** socket_down_writer *** + * + * Check and then handle a downed writer process. + * + */ +static +void socket_down_writer(ErlNifEnv* env, + SocketDescriptor* descP, + const ErlNifPid* pid) +{ + if (compare_pids(env, &descP->currentWriter.pid, pid)) { + + SSDBG( descP, ("SOCKET", + "socket_down_writer -> " + "current writer - try pop the queue\r\n") ); + + if (writer_pop(env, descP, + &descP->currentWriter.pid, + &descP->currentWriter.mon, + &descP->currentWriter.ref)) { + int res; + + /* There was another one */ + + SSDBG( descP, ("SOCKET", "socket_down_writer -> new (current) writer: " + "\r\n pid: %T" + "\r\n ref: %T" + "\r\n", + descP->currentWriter.pid, + descP->currentWriter.ref) ); + + if ((res = enif_select(env, + descP->sock, + (ERL_NIF_SELECT_WRITE), + descP, + &descP->currentWriter.pid, + descP->currentWriter.ref) < 0)) { - if ((res = enif_select(env, - descP->sock, - (ERL_NIF_SELECT_READ), - descP, - &descP->currentAcceptor.pid, - descP->currentAcceptor.ref) < 0)) { - - esock_warning_msg("Failed select (%d) for new acceptor " - "after current (%T) died\r\n", - res, *pid); - - } + esock_warning_msg("Failed select (%d) for new writer " + "after current (%T) died\r\n", + res, *pid); - } else { + } + + } else { + + SSDBG( descP, ("SOCKET", + "socket_down_writer -> no active writer\r\n") ); + + descP->currentWriterP = NULL; + } + + } else { + + /* Maybe unqueue one of the waiting writer(s) */ + + SSDBG( descP, ("SOCKET", + "socket_down_writer -> " + "not current writer - maybe a waiting writer\r\n") ); + + writer_unqueue(env, descP, pid); + } +} + + + + +/* *** socket_down_reader *** + * + * Check and then handle a downed reader process. + * + */ +static +void socket_down_reader(ErlNifEnv* env, + SocketDescriptor* descP, + const ErlNifPid* pid) +{ + if (compare_pids(env, &descP->currentReader.pid, pid)) { + + SSDBG( descP, ("SOCKET", + "socket_down_reader -> " + "current reader - try pop the queue\r\n") ); + + if (reader_pop(env, descP, + &descP->currentReader.pid, + &descP->currentReader.mon, + &descP->currentReader.ref)) { + int res; + + /* There was another one */ + + SSDBG( descP, ("SOCKET", "socket_down_reader -> new (current) reader: " + "\r\n pid: %T" + "\r\n ref: %T" + "\r\n", + descP->currentReader.pid, + descP->currentReader.ref) ); + + if ((res = enif_select(env, + descP->sock, + (ERL_NIF_SELECT_READ), + descP, + &descP->currentReader.pid, + descP->currentReader.ref) < 0)) { - SSDBG( descP, ("SOCKET", "socket_down -> no active acceptor\r\n") ); + esock_warning_msg("Failed select (%d) for new reader " + "after current (%T) died\r\n", + res, *pid); - descP->currentAcceptorP = NULL; - descP->state = SOCKET_STATE_LISTENING; } } else { - /* Maybe unqueue one of the waiting acceptors */ - SSDBG( descP, ("SOCKET", - "socket_down -> " - "not current acceptor - maybe a waiting acceptor\r\n") ); + "socket_down_reader -> no active reader\r\n") ); - acceptor_unqueue(env, descP, pid); + descP->currentReaderP = NULL; } + + } else { + + /* Maybe unqueue one of the waiting reader(s) */ + + SSDBG( descP, ("SOCKET", + "socket_down_reader -> " + "not current reader - maybe a waiting reader\r\n") ); + + reader_unqueue(env, descP, pid); } - - SSDBG( descP, ("SOCKET", "socket_down -> done\r\n") ); - } diff --git a/lib/kernel/test/socket_server.erl b/lib/kernel/test/socket_server.erl index 9142942428..45adffc5e6 100644 --- a/lib/kernel/test/socket_server.erl +++ b/lib/kernel/test/socket_server.erl @@ -922,8 +922,8 @@ send(#handler{socket = Sock, msg = true, type = dgram, stimeout = Timeout}, CMsgHdr = #{level => ip, type => tos, data => reliability}, CMsgHdrs = [CMsgHdr], MsgHdr = #{addr => Dest, - iov => [Msg], - ctrl => CMsgHdrs}, + ctrl => CMsgHdrs, + iov => [Msg]}, %% ok = socket:setopt(Sock, otp, debug, true), Res = socket:sendmsg(Sock, MsgHdr, Timeout), %% ok = socket:setopt(Sock, otp, debug, false), |