From 13d10bc60a41f98647d802524ea8ef8fa9af6b39 Mon Sep 17 00:00:00 2001 From: Micael Karlberg Date: Thu, 20 Sep 2018 15:37:37 +0200 Subject: [socket-nif] Add proper send timeout handling Added proper send timeout handling. Made use of the enif_select(mode = cancel) feature. Each time a timeout expires, the "active" send (the surrent write select) has to be cancelled. OTP-14831 --- erts/emulator/nifs/common/socket_nif.c | 384 +++++++++++++++++++++++++++++++-- erts/preloaded/ebin/socket.beam | Bin 65868 -> 66040 bytes erts/preloaded/src/socket.erl | 15 +- 3 files changed, 375 insertions(+), 24 deletions(-) (limited to 'erts') diff --git a/erts/emulator/nifs/common/socket_nif.c b/erts/emulator/nifs/common/socket_nif.c index fde2349234..04c3609b32 100644 --- a/erts/emulator/nifs/common/socket_nif.c +++ b/erts/emulator/nifs/common/socket_nif.c @@ -1839,6 +1839,11 @@ static ERL_NIF_TERM ncancel_accept_waiting(ErlNifEnv* env, static ERL_NIF_TERM ncancel_send(ErlNifEnv* env, SocketDescriptor* descP, ERL_NIF_TERM opRef); +static ERL_NIF_TERM ncancel_send_current(ErlNifEnv* env, + SocketDescriptor* descP); +static ERL_NIF_TERM ncancel_send_waiting(ErlNifEnv* env, + SocketDescriptor* descP, + ERL_NIF_TERM opRef); static ERL_NIF_TERM ncancel_recv(ErlNifEnv* env, SocketDescriptor* descP, ERL_NIF_TERM opRef); @@ -1894,6 +1899,10 @@ static ERL_NIF_TERM ngetopt_timeval_opt(ErlNifEnv* env, int level, int opt); +static BOOLEAN_T send_check_writer(ErlNifEnv* env, + SocketDescriptor* descP, + ERL_NIF_TERM ref, + ERL_NIF_TERM* checkResult); static ERL_NIF_TERM send_check_result(ErlNifEnv* env, SocketDescriptor* descP, ssize_t written, @@ -2123,6 +2132,22 @@ static BOOLEAN_T acceptor_unqueue(ErlNifEnv* env, SocketDescriptor* descP, const ErlNifPid* pid); +static BOOLEAN_T writer_search4pid(ErlNifEnv* env, + SocketDescriptor* descP, + ErlNifPid* pid); +static ERL_NIF_TERM writer_push(ErlNifEnv* env, + SocketDescriptor* descP, + ErlNifPid pid, + ERL_NIF_TERM ref); +static BOOLEAN_T writer_pop(ErlNifEnv* env, + SocketDescriptor* descP, + ErlNifPid* pid, + ErlNifMonitor* mon, + ERL_NIF_TERM* ref); +static BOOLEAN_T writer_unqueue(ErlNifEnv* env, + SocketDescriptor* descP, + const ErlNifPid* pid); + static BOOLEAN_T qsearch4pid(ErlNifEnv* env, SocketRequestQueue* q, ErlNifPid* pid); @@ -3399,7 +3424,7 @@ ERL_NIF_TERM naccept_accepting(ErlNifEnv* env, if (!compare_pids(env, &descP->currentAcceptor.pid, &caller)) { - /* Not the "current caller", so (maybe) push onto queue */ + /* Not the "current acceptor", so (maybe) push onto queue */ SSDBG( descP, ("SOCKET", "naccept_accepting -> not (active) acceptor\r\n") ); @@ -3459,6 +3484,8 @@ ERL_NIF_TERM naccept_accepting(ErlNifEnv* env, SSDBG( descP, ("SOCKET", "naccept_accepting -> accept success\r\n") ); + DEMONP(env, descP, &descP->currentAcceptor.mon); + if ((accEvent = sock_create_event(accSock)) == INVALID_EVENT) { save_errno = sock_errno(); while ((sock_close(accSock) == INVALID_SOCKET) && @@ -3499,10 +3526,7 @@ ERL_NIF_TERM naccept_accepting(ErlNifEnv* env, accDescP->state = SOCKET_STATE_CONNECTED; - /* Here we should have the test if we have something in the queue. - * And if so, pop it and copy the (waiting) acceptor, and then - * make a new select with that info). - */ + /* Check if there are waiting acceptors (popping the acceptor queue) */ if (acceptor_pop(env, descP, &descP->currentAcceptor.pid, @@ -3607,12 +3631,12 @@ ERL_NIF_TERM nif_send(ErlNifEnv* env, } -/* What do we do when another process tries to write - * when the current writer has a select already waiting? - * Queue it? And what about simultaneous read and write? - * Queue up all operations towards the socket? +/* *** nsend *** * - * We (may) need a currentOp field and an ops queue field. + * Do the actual send. + * Do some initial writer checks, do the actual send and then + * analyze the result. If we are done, another writer may be + * scheduled (if there is one in the writer queue). */ static ERL_NIF_TERM nsend(ErlNifEnv* env, @@ -3621,12 +3645,17 @@ ERL_NIF_TERM nsend(ErlNifEnv* env, ErlNifBinary* sndDataP, int flags) { - int save_errno; - ssize_t written; + int save_errno; + ssize_t written; + ERL_NIF_TERM writerCheck; if (!descP->isWritable) return enif_make_badarg(env); + /* Check if there is already a current writer and if its us */ + if (!send_check_writer(env, descP, sendRef, &writerCheck)) + return writerCheck; + /* We ignore the wrap for the moment. * Maybe we should issue a wrap-message to controlling process... */ @@ -3645,6 +3674,7 @@ ERL_NIF_TERM nsend(ErlNifEnv* env, } + /* ---------------------------------------------------------------------- * nif_sendto * @@ -3710,9 +3740,13 @@ ERL_NIF_TERM nif_sendto(ErlNifEnv* env, &remoteAddrLen)) != NULL) return esock_make_error_str(env, xres); + MLOCK(descP->writeMtx); + res = nsendto(env, descP, sendRef, &sndData, flags, &remoteAddr, remoteAddrLen); + MUNLOCK(descP->writeMtx); + SGDBG( ("SOCKET", "nif_sendto -> done with result: " "\r\n %T" "\r\n", res) ); @@ -3730,12 +3764,17 @@ ERL_NIF_TERM nsendto(ErlNifEnv* env, SocketAddress* toAddrP, unsigned int toAddrLen) { - int save_errno; - ssize_t written; + int save_errno; + ssize_t written; + ERL_NIF_TERM writerCheck; if (!descP->isWritable) return enif_make_badarg(env); + /* Check if there is already a current writer and if its us */ + if (!send_check_writer(env, descP, sendRef, &writerCheck)) + return writerCheck; + /* We ignore the wrap for the moment. * Maybe we should issue a wrap-message to controlling process... */ @@ -3811,8 +3850,12 @@ ERL_NIF_TERM nif_sendmsg(ErlNifEnv* env, if (!esendflags2sendflags(eflags, &flags)) return esock_make_error(env, esock_atom_einval); + MLOCK(descP->writeMtx); + res = nsendmsg(env, descP, sendRef, eMsgHdr, flags); + MUNLOCK(descP->writeMtx); + SSDBG( descP, ("SOCKET", "nif_sendmsg -> done with result: " "\r\n %T" @@ -3839,12 +3882,16 @@ ERL_NIF_TERM nsendmsg(ErlNifEnv* env, size_t ctrlBufLen, ctrlBufUsed; int save_errno; ssize_t written, dataSize; + ERL_NIF_TERM writerCheck; char* xres; if (!descP->isWritable) return enif_make_badarg(env); - + /* Check if there is already a current writer and if its us */ + if (!send_check_writer(env, descP, sendRef, &writerCheck)) + return writerCheck; + /* Depending on if we are *connected* or not, we require * different things in the msghdr map. */ @@ -10607,7 +10654,7 @@ ERL_NIF_TERM ncancel_accept(ErlNifEnv* env, } -/* The current process has an ongoing select we first must +/* The current acceptor process has an ongoing select we first must * cancel. Then we must re-activate the "first" (the first * in the acceptor queue). */ @@ -10664,7 +10711,7 @@ ERL_NIF_TERM ncancel_accept_waiting(ErlNifEnv* env, SocketDescriptor* descP, ERL_NIF_TERM opRef) { - ErlNifPid caller; + ErlNifPid caller; if (enif_self(env, &caller) == NULL) return esock_make_error(env, atom_exself); @@ -10683,14 +10730,117 @@ ERL_NIF_TERM ncancel_accept_waiting(ErlNifEnv* env, /* *** ncancel_send *** * - * + * Cancel a send operation. + * Its either the current writer or one of the waiting writers. */ static ERL_NIF_TERM ncancel_send(ErlNifEnv* env, SocketDescriptor* descP, ERL_NIF_TERM opRef) { - return esock_make_error(env, esock_atom_einval); + ERL_NIF_TERM res; + + SSDBG( descP, + ("SOCKET", "ncancel_send -> entry with" + "\r\n opRef: %T" + "\r\n %s" + "\r\n", opRef, + ((descP->currentWriterP == NULL) ? "without writer" : "with writer")) ); + + MLOCK(descP->writeMtx); + + if (descP->currentWriterP != NULL) { + if (COMPARE(opRef, descP->currentWriter.ref) == 0) { + res = ncancel_send_current(env, descP); + } else { + res = ncancel_send_waiting(env, descP, opRef); + } + } else { + /* Or badarg? */ + res = esock_make_error(env, esock_atom_einval); + } + + MUNLOCK(descP->writeMtx); + + SSDBG( descP, + ("SOCKET", "ncancel_send -> done with result:" + "\r\n %T" + "\r\n", res) ); + + return res; +} + + + +/* The current writer process has an ongoing select we first must + * cancel. Then we must re-activate the "first" (the first + * in the writer queue). + */ +static +ERL_NIF_TERM ncancel_send_current(ErlNifEnv* env, + SocketDescriptor* descP) +{ + ERL_NIF_TERM res; + + SSDBG( descP, ("SOCKET", "ncancel_send_current -> entry\r\n") ); + + res = ncancel_write_select(env, descP, descP->currentWriter.ref); + + SSDBG( descP, ("SOCKET", "ncancel_send_current -> cancel res: %T\r\n", res) ); + + if (writer_pop(env, descP, + &descP->currentWriter.pid, + &descP->currentWriter.mon, + &descP->currentWriter.ref)) { + + /* There was another one */ + + SSDBG( descP, ("SOCKET", "ncancel_send_current -> new (active) writer: " + "\r\n pid: %T" + "\r\n ref: %T" + "\r\n", + descP->currentWriter.pid, + descP->currentWriter.ref) ); + + SELECT(env, + descP->sock, + (ERL_NIF_SELECT_WRITE), + descP, &descP->currentWriter.pid, descP->currentWriter.ref); + + } else { + SSDBG( descP, ("SOCKET", "ncancel_send_current -> no more writers\r\n") ); + descP->currentWriterP = NULL; + } + + SSDBG( descP, ("SOCKET", "ncancel_send_current -> done with result:" + "\r\n %T" + "\r\n", res) ); + + return res; +} + + +/* These processes have not performed a select, so we can simply + * remove them from the writer queue. + */ +static +ERL_NIF_TERM ncancel_send_waiting(ErlNifEnv* env, + SocketDescriptor* descP, + ERL_NIF_TERM opRef) +{ + ErlNifPid caller; + + if (enif_self(env, &caller) == NULL) + return esock_make_error(env, atom_exself); + + /* unqueue request from (writer) queue */ + + if (writer_unqueue(env, descP, &caller)) { + return esock_atom_ok; + } else { + /* Race? */ + return esock_make_error(env, esock_atom_not_found); + } } @@ -10764,6 +10914,66 @@ ERL_NIF_TERM ncancel_mode_select(ErlNifEnv* env, * ---------------------------------------------------------------------- */ +/* *** send_check_writer *** + * + * Checks if we have a current writer and if that is us. If not, then we must + * be made to wait for our turn. This is done by pushing us unto the writer queue. + */ +static +BOOLEAN_T send_check_writer(ErlNifEnv* env, + SocketDescriptor* descP, + ERL_NIF_TERM ref, + ERL_NIF_TERM* checkResult) +{ + if (descP->currentWriterP != NULL) { + ErlNifPid caller; + + if (enif_self(env, &caller) == NULL) { + *checkResult = esock_make_error(env, atom_exself); + return FALSE; + } + + if (!compare_pids(env, &descP->currentWriter.pid, &caller)) { + /* Not the "current writer", so (maybe) push onto queue */ + + SSDBG( descP, + ("SOCKET", "send_check_writer -> not (current) writer\r\n") ); + + if (!writer_search4pid(env, descP, &caller)) + *checkResult = writer_push(env, descP, caller, ref); + else + *checkResult = esock_make_error(env, esock_atom_eagain); + + SSDBG( descP, + ("SOCKET", + "nsend -> queue (push) result: %T\r\n", checkResult) ); + + return FALSE; + + } + + } + + *checkResult = esock_atom_ok; // Does not actually matter in this case, but ... + + return TRUE; +} + + + +/* *** send_check_result *** + * + * Check the result of a socket send (send, sendto and sendmsg) call. + * If a "complete" send has been made, the next (waiting) writer will be + * scheduled (if there is one). + * If we did not manage to send the entire package, make another select, + * so that we can be informed when we can make another try (to send the rest), + * and return with the amount we actually managed to send (its up to the caller + * (that is the erlang code) to figure out hust much is left to send). + * If the write fail, we give up and return with the appropriate error code. + * + * What about the remaining writers!! + */ static ERL_NIF_TERM send_check_result(ErlNifEnv* env, SocketDescriptor* descP, @@ -10783,24 +10993,67 @@ ERL_NIF_TERM send_check_result(ErlNifEnv* env, cnt_inc(&descP->writePkgCnt, 1); cnt_inc(&descP->writeByteCnt, written); + DEMONP(env, descP, &descP->currentWriter.mon); SSDBG( descP, ("SOCKET", "send_check_result -> " "everything written (%d,%d) - done\r\n", dataSize, written) ); + /* Ok, this write is done maybe activate the next (if any) */ + + if (writer_pop(env, descP, + &descP->currentWriter.pid, + &descP->currentWriter.mon, + &descP->currentWriter.ref)) { + + /* There was another one */ + + SSDBG( descP, ("SOCKET", "send_check_result -> new (active) writer: " + "\r\n pid: %T" + "\r\n ref: %T" + "\r\n", + descP->currentWriter.pid, + descP->currentWriter.ref) ); + + SELECT(env, + descP->sock, + (ERL_NIF_SELECT_WRITE), + descP, &descP->currentWriter.pid, descP->currentWriter.ref); + + } else { + descP->currentWriterP = NULL; + } + return esock_atom_ok; } else if (written < 0) { - /* Ouch, check what kind of failure */ + /* Some kind of send failure - check what kind */ + if ((saveErrno != EAGAIN) && (saveErrno != EINTR)) { + ErlNifPid pid; + ErlNifMonitor mon; + ERL_NIF_TERM ref, res; + + /* + * An actual failure - we (and everyone waiting) give up + */ cnt_inc(&descP->writeFails, 1); SSDBG( descP, ("SOCKET", "send_check_result -> error: %d\r\n", saveErrno) ); - return esock_make_error_errno(env, saveErrno); + res = esock_make_error_errno(env, saveErrno); + + while (writer_pop(env, descP, &pid, &mon, &ref)) { + SSDBG( descP, + ("SOCKET", "send_check_result -> abort %T\r\n", pid) ); + send_msg_nif_abort(env, ref, res, &pid); + DEMONP(env, descP, &mon); + } + + return res; } else { @@ -13561,6 +13814,95 @@ BOOLEAN_T acceptor_unqueue(ErlNifEnv* env, } + +/* *** writer search for pid *** + * + * Search for a pid in the writer queue. + */ +static +BOOLEAN_T writer_search4pid(ErlNifEnv* env, + SocketDescriptor* descP, + ErlNifPid* pid) +{ + return qsearch4pid(env, &descP->writersQ, pid); +} + + +/* *** writer push *** + * + * Push an writer onto the writer queue. + * This happens when we already have atleast one current writer. + */ +static +ERL_NIF_TERM writer_push(ErlNifEnv* env, + SocketDescriptor* descP, + ErlNifPid pid, + ERL_NIF_TERM ref) +{ + SocketRequestQueueElement* e = MALLOC(sizeof(SocketRequestQueueElement)); + SocketRequestor* reqP = &e->data; + + reqP->pid = pid; + reqP->ref = enif_make_copy(descP->env, ref); + + if (MONP(env, descP, &pid, &reqP->mon) > 0) { + FREE(reqP); + return esock_make_error(env, atom_exmon); + } + + qpush(&descP->writersQ, e); + + // THIS IS OK => MAKES THE CALLER WAIT FOR ITS TURN + return esock_make_error(env, esock_atom_eagain); +} + + +/* *** writer pop *** + * + * Pop an writer from the writer queue. + */ +static +BOOLEAN_T writer_pop(ErlNifEnv* env, + SocketDescriptor* descP, + ErlNifPid* pid, + ErlNifMonitor* mon, + ERL_NIF_TERM* ref) +{ + SocketRequestQueueElement* e = qpop(&descP->writersQ); + + if (e != NULL) { + *pid = e->data.pid; + *mon = e->data.mon; + *ref = e->data.ref; // At this point the ref has already been copied (env) + FREE(e); + return TRUE; + } else { + /* (acceptors) Queue was empty */ + // *pid = NULL; we have no null value for pids + // *mon = NULL; we have no null value for monitors + *ref = esock_atom_undefined; // Just in case + return FALSE; + } + +} + + +/* *** writer unqueue *** + * + * Remove an writer from the writer queue. + */ +static +BOOLEAN_T writer_unqueue(ErlNifEnv* env, + SocketDescriptor* descP, + const ErlNifPid* pid) +{ + return qunqueue(env, &descP->writersQ, pid); +} + + + + + static BOOLEAN_T qsearch4pid(ErlNifEnv* env, SocketRequestQueue* q, diff --git a/erts/preloaded/ebin/socket.beam b/erts/preloaded/ebin/socket.beam index 2475dce37b..9e6d9f4709 100644 Binary files a/erts/preloaded/ebin/socket.beam and b/erts/preloaded/ebin/socket.beam differ diff --git a/erts/preloaded/src/socket.erl b/erts/preloaded/src/socket.erl index 1c16c94711..652054457f 100644 --- a/erts/preloaded/src/socket.erl +++ b/erts/preloaded/src/socket.erl @@ -1345,10 +1345,19 @@ sendto(Socket, Data, Dest) -> Data :: binary(), Dest :: null | sockaddr(), Flags :: send_flags(), - Reason :: term(). + Reason :: term() + ; (Socket, Data, Dest, Timeout) -> ok | {error, Reason} when + Socket :: socket(), + Data :: iodata(), + Dest :: null | sockaddr(), + Timeout :: timeout(), + Reason :: term(). + +sendto(Socket, Data, Dest, Flags) when is_list(Flags) -> + sendto(Socket, Data, Dest, Flags, ?SOCKET_SENDTO_TIMEOUT_DEFAULT); +sendto(Socket, Data, Dest, Timeout) -> + sendto(Socket, Data, Dest, ?SOCKET_SENDTO_FLAGS_DEFAULT, Timeout). -sendto(Socket, Data, Dest, Flags) -> - sendto(Socket, Data, Dest, Flags, ?SOCKET_SENDTO_TIMEOUT_DEFAULT). -spec sendto(Socket, Data, Dest, Flags, Timeout) -> ok | {error, Reason} when Socket :: socket(), -- cgit v1.2.3