aboutsummaryrefslogtreecommitdiffstats
path: root/erts/emulator/nifs/common/socket_nif.c
diff options
context:
space:
mode:
authorMicael Karlberg <[email protected]>2018-09-20 15:37:37 +0200
committerMicael Karlberg <[email protected]>2018-09-20 16:29:05 +0200
commit13d10bc60a41f98647d802524ea8ef8fa9af6b39 (patch)
tree2a56b2fa3b0d63c05c49aea93427a343e149835f /erts/emulator/nifs/common/socket_nif.c
parente01a856c993b55c3fbc76fd429783d4aad5bfc80 (diff)
downloadotp-13d10bc60a41f98647d802524ea8ef8fa9af6b39.tar.gz
otp-13d10bc60a41f98647d802524ea8ef8fa9af6b39.tar.bz2
otp-13d10bc60a41f98647d802524ea8ef8fa9af6b39.zip
[socket-nif] Add proper send timeout handling
Added proper send timeout handling. Made use of the enif_select(mode = cancel) feature. Each time a timeout expires, the "active" send (the surrent write select) has to be cancelled. OTP-14831
Diffstat (limited to 'erts/emulator/nifs/common/socket_nif.c')
-rw-r--r--erts/emulator/nifs/common/socket_nif.c384
1 files changed, 363 insertions, 21 deletions
diff --git a/erts/emulator/nifs/common/socket_nif.c b/erts/emulator/nifs/common/socket_nif.c
index fde2349234..04c3609b32 100644
--- a/erts/emulator/nifs/common/socket_nif.c
+++ b/erts/emulator/nifs/common/socket_nif.c
@@ -1839,6 +1839,11 @@ static ERL_NIF_TERM ncancel_accept_waiting(ErlNifEnv* env,
static ERL_NIF_TERM ncancel_send(ErlNifEnv* env,
SocketDescriptor* descP,
ERL_NIF_TERM opRef);
+static ERL_NIF_TERM ncancel_send_current(ErlNifEnv* env,
+ SocketDescriptor* descP);
+static ERL_NIF_TERM ncancel_send_waiting(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ ERL_NIF_TERM opRef);
static ERL_NIF_TERM ncancel_recv(ErlNifEnv* env,
SocketDescriptor* descP,
ERL_NIF_TERM opRef);
@@ -1894,6 +1899,10 @@ static ERL_NIF_TERM ngetopt_timeval_opt(ErlNifEnv* env,
int level,
int opt);
+static BOOLEAN_T send_check_writer(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ ERL_NIF_TERM ref,
+ ERL_NIF_TERM* checkResult);
static ERL_NIF_TERM send_check_result(ErlNifEnv* env,
SocketDescriptor* descP,
ssize_t written,
@@ -2123,6 +2132,22 @@ static BOOLEAN_T acceptor_unqueue(ErlNifEnv* env,
SocketDescriptor* descP,
const ErlNifPid* pid);
+static BOOLEAN_T writer_search4pid(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ ErlNifPid* pid);
+static ERL_NIF_TERM writer_push(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ ErlNifPid pid,
+ ERL_NIF_TERM ref);
+static BOOLEAN_T writer_pop(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ ErlNifPid* pid,
+ ErlNifMonitor* mon,
+ ERL_NIF_TERM* ref);
+static BOOLEAN_T writer_unqueue(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ const ErlNifPid* pid);
+
static BOOLEAN_T qsearch4pid(ErlNifEnv* env,
SocketRequestQueue* q,
ErlNifPid* pid);
@@ -3399,7 +3424,7 @@ ERL_NIF_TERM naccept_accepting(ErlNifEnv* env,
if (!compare_pids(env, &descP->currentAcceptor.pid, &caller)) {
- /* Not the "current caller", so (maybe) push onto queue */
+ /* Not the "current acceptor", so (maybe) push onto queue */
SSDBG( descP, ("SOCKET", "naccept_accepting -> not (active) acceptor\r\n") );
@@ -3459,6 +3484,8 @@ ERL_NIF_TERM naccept_accepting(ErlNifEnv* env,
SSDBG( descP, ("SOCKET", "naccept_accepting -> accept success\r\n") );
+ DEMONP(env, descP, &descP->currentAcceptor.mon);
+
if ((accEvent = sock_create_event(accSock)) == INVALID_EVENT) {
save_errno = sock_errno();
while ((sock_close(accSock) == INVALID_SOCKET) &&
@@ -3499,10 +3526,7 @@ ERL_NIF_TERM naccept_accepting(ErlNifEnv* env,
accDescP->state = SOCKET_STATE_CONNECTED;
- /* Here we should have the test if we have something in the queue.
- * And if so, pop it and copy the (waiting) acceptor, and then
- * make a new select with that info).
- */
+ /* Check if there are waiting acceptors (popping the acceptor queue) */
if (acceptor_pop(env, descP,
&descP->currentAcceptor.pid,
@@ -3607,12 +3631,12 @@ ERL_NIF_TERM nif_send(ErlNifEnv* env,
}
-/* What do we do when another process tries to write
- * when the current writer has a select already waiting?
- * Queue it? And what about simultaneous read and write?
- * Queue up all operations towards the socket?
+/* *** nsend ***
*
- * We (may) need a currentOp field and an ops queue field.
+ * Do the actual send.
+ * Do some initial writer checks, do the actual send and then
+ * analyze the result. If we are done, another writer may be
+ * scheduled (if there is one in the writer queue).
*/
static
ERL_NIF_TERM nsend(ErlNifEnv* env,
@@ -3621,12 +3645,17 @@ ERL_NIF_TERM nsend(ErlNifEnv* env,
ErlNifBinary* sndDataP,
int flags)
{
- int save_errno;
- ssize_t written;
+ int save_errno;
+ ssize_t written;
+ ERL_NIF_TERM writerCheck;
if (!descP->isWritable)
return enif_make_badarg(env);
+ /* Check if there is already a current writer and if its us */
+ if (!send_check_writer(env, descP, sendRef, &writerCheck))
+ return writerCheck;
+
/* We ignore the wrap for the moment.
* Maybe we should issue a wrap-message to controlling process...
*/
@@ -3645,6 +3674,7 @@ ERL_NIF_TERM nsend(ErlNifEnv* env,
}
+
/* ----------------------------------------------------------------------
* nif_sendto
*
@@ -3710,9 +3740,13 @@ ERL_NIF_TERM nif_sendto(ErlNifEnv* env,
&remoteAddrLen)) != NULL)
return esock_make_error_str(env, xres);
+ MLOCK(descP->writeMtx);
+
res = nsendto(env, descP, sendRef, &sndData, flags,
&remoteAddr, remoteAddrLen);
+ MUNLOCK(descP->writeMtx);
+
SGDBG( ("SOCKET", "nif_sendto -> done with result: "
"\r\n %T"
"\r\n", res) );
@@ -3730,12 +3764,17 @@ ERL_NIF_TERM nsendto(ErlNifEnv* env,
SocketAddress* toAddrP,
unsigned int toAddrLen)
{
- int save_errno;
- ssize_t written;
+ int save_errno;
+ ssize_t written;
+ ERL_NIF_TERM writerCheck;
if (!descP->isWritable)
return enif_make_badarg(env);
+ /* Check if there is already a current writer and if its us */
+ if (!send_check_writer(env, descP, sendRef, &writerCheck))
+ return writerCheck;
+
/* We ignore the wrap for the moment.
* Maybe we should issue a wrap-message to controlling process...
*/
@@ -3811,8 +3850,12 @@ ERL_NIF_TERM nif_sendmsg(ErlNifEnv* env,
if (!esendflags2sendflags(eflags, &flags))
return esock_make_error(env, esock_atom_einval);
+ MLOCK(descP->writeMtx);
+
res = nsendmsg(env, descP, sendRef, eMsgHdr, flags);
+ MUNLOCK(descP->writeMtx);
+
SSDBG( descP,
("SOCKET", "nif_sendmsg -> done with result: "
"\r\n %T"
@@ -3839,12 +3882,16 @@ ERL_NIF_TERM nsendmsg(ErlNifEnv* env,
size_t ctrlBufLen, ctrlBufUsed;
int save_errno;
ssize_t written, dataSize;
+ ERL_NIF_TERM writerCheck;
char* xres;
if (!descP->isWritable)
return enif_make_badarg(env);
-
+ /* Check if there is already a current writer and if its us */
+ if (!send_check_writer(env, descP, sendRef, &writerCheck))
+ return writerCheck;
+
/* Depending on if we are *connected* or not, we require
* different things in the msghdr map.
*/
@@ -10607,7 +10654,7 @@ ERL_NIF_TERM ncancel_accept(ErlNifEnv* env,
}
-/* The current process has an ongoing select we first must
+/* The current acceptor process has an ongoing select we first must
* cancel. Then we must re-activate the "first" (the first
* in the acceptor queue).
*/
@@ -10664,7 +10711,7 @@ ERL_NIF_TERM ncancel_accept_waiting(ErlNifEnv* env,
SocketDescriptor* descP,
ERL_NIF_TERM opRef)
{
- ErlNifPid caller;
+ ErlNifPid caller;
if (enif_self(env, &caller) == NULL)
return esock_make_error(env, atom_exself);
@@ -10683,14 +10730,117 @@ ERL_NIF_TERM ncancel_accept_waiting(ErlNifEnv* env,
/* *** ncancel_send ***
*
- *
+ * Cancel a send operation.
+ * Its either the current writer or one of the waiting writers.
*/
static
ERL_NIF_TERM ncancel_send(ErlNifEnv* env,
SocketDescriptor* descP,
ERL_NIF_TERM opRef)
{
- return esock_make_error(env, esock_atom_einval);
+ ERL_NIF_TERM res;
+
+ SSDBG( descP,
+ ("SOCKET", "ncancel_send -> entry with"
+ "\r\n opRef: %T"
+ "\r\n %s"
+ "\r\n", opRef,
+ ((descP->currentWriterP == NULL) ? "without writer" : "with writer")) );
+
+ MLOCK(descP->writeMtx);
+
+ if (descP->currentWriterP != NULL) {
+ if (COMPARE(opRef, descP->currentWriter.ref) == 0) {
+ res = ncancel_send_current(env, descP);
+ } else {
+ res = ncancel_send_waiting(env, descP, opRef);
+ }
+ } else {
+ /* Or badarg? */
+ res = esock_make_error(env, esock_atom_einval);
+ }
+
+ MUNLOCK(descP->writeMtx);
+
+ SSDBG( descP,
+ ("SOCKET", "ncancel_send -> done with result:"
+ "\r\n %T"
+ "\r\n", res) );
+
+ return res;
+}
+
+
+
+/* The current writer process has an ongoing select we first must
+ * cancel. Then we must re-activate the "first" (the first
+ * in the writer queue).
+ */
+static
+ERL_NIF_TERM ncancel_send_current(ErlNifEnv* env,
+ SocketDescriptor* descP)
+{
+ ERL_NIF_TERM res;
+
+ SSDBG( descP, ("SOCKET", "ncancel_send_current -> entry\r\n") );
+
+ res = ncancel_write_select(env, descP, descP->currentWriter.ref);
+
+ SSDBG( descP, ("SOCKET", "ncancel_send_current -> cancel res: %T\r\n", res) );
+
+ if (writer_pop(env, descP,
+ &descP->currentWriter.pid,
+ &descP->currentWriter.mon,
+ &descP->currentWriter.ref)) {
+
+ /* There was another one */
+
+ SSDBG( descP, ("SOCKET", "ncancel_send_current -> new (active) writer: "
+ "\r\n pid: %T"
+ "\r\n ref: %T"
+ "\r\n",
+ descP->currentWriter.pid,
+ descP->currentWriter.ref) );
+
+ SELECT(env,
+ descP->sock,
+ (ERL_NIF_SELECT_WRITE),
+ descP, &descP->currentWriter.pid, descP->currentWriter.ref);
+
+ } else {
+ SSDBG( descP, ("SOCKET", "ncancel_send_current -> no more writers\r\n") );
+ descP->currentWriterP = NULL;
+ }
+
+ SSDBG( descP, ("SOCKET", "ncancel_send_current -> done with result:"
+ "\r\n %T"
+ "\r\n", res) );
+
+ return res;
+}
+
+
+/* These processes have not performed a select, so we can simply
+ * remove them from the writer queue.
+ */
+static
+ERL_NIF_TERM ncancel_send_waiting(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ ERL_NIF_TERM opRef)
+{
+ ErlNifPid caller;
+
+ if (enif_self(env, &caller) == NULL)
+ return esock_make_error(env, atom_exself);
+
+ /* unqueue request from (writer) queue */
+
+ if (writer_unqueue(env, descP, &caller)) {
+ return esock_atom_ok;
+ } else {
+ /* Race? */
+ return esock_make_error(env, esock_atom_not_found);
+ }
}
@@ -10764,6 +10914,66 @@ ERL_NIF_TERM ncancel_mode_select(ErlNifEnv* env,
* ----------------------------------------------------------------------
*/
+/* *** send_check_writer ***
+ *
+ * Checks if we have a current writer and if that is us. If not, then we must
+ * be made to wait for our turn. This is done by pushing us unto the writer queue.
+ */
+static
+BOOLEAN_T send_check_writer(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ ERL_NIF_TERM ref,
+ ERL_NIF_TERM* checkResult)
+{
+ if (descP->currentWriterP != NULL) {
+ ErlNifPid caller;
+
+ if (enif_self(env, &caller) == NULL) {
+ *checkResult = esock_make_error(env, atom_exself);
+ return FALSE;
+ }
+
+ if (!compare_pids(env, &descP->currentWriter.pid, &caller)) {
+ /* Not the "current writer", so (maybe) push onto queue */
+
+ SSDBG( descP,
+ ("SOCKET", "send_check_writer -> not (current) writer\r\n") );
+
+ if (!writer_search4pid(env, descP, &caller))
+ *checkResult = writer_push(env, descP, caller, ref);
+ else
+ *checkResult = esock_make_error(env, esock_atom_eagain);
+
+ SSDBG( descP,
+ ("SOCKET",
+ "nsend -> queue (push) result: %T\r\n", checkResult) );
+
+ return FALSE;
+
+ }
+
+ }
+
+ *checkResult = esock_atom_ok; // Does not actually matter in this case, but ...
+
+ return TRUE;
+}
+
+
+
+/* *** send_check_result ***
+ *
+ * Check the result of a socket send (send, sendto and sendmsg) call.
+ * If a "complete" send has been made, the next (waiting) writer will be
+ * scheduled (if there is one).
+ * If we did not manage to send the entire package, make another select,
+ * so that we can be informed when we can make another try (to send the rest),
+ * and return with the amount we actually managed to send (its up to the caller
+ * (that is the erlang code) to figure out hust much is left to send).
+ * If the write fail, we give up and return with the appropriate error code.
+ *
+ * What about the remaining writers!!
+ */
static
ERL_NIF_TERM send_check_result(ErlNifEnv* env,
SocketDescriptor* descP,
@@ -10783,24 +10993,67 @@ ERL_NIF_TERM send_check_result(ErlNifEnv* env,
cnt_inc(&descP->writePkgCnt, 1);
cnt_inc(&descP->writeByteCnt, written);
+ DEMONP(env, descP, &descP->currentWriter.mon);
SSDBG( descP,
("SOCKET", "send_check_result -> "
"everything written (%d,%d) - done\r\n", dataSize, written) );
+ /* Ok, this write is done maybe activate the next (if any) */
+
+ if (writer_pop(env, descP,
+ &descP->currentWriter.pid,
+ &descP->currentWriter.mon,
+ &descP->currentWriter.ref)) {
+
+ /* There was another one */
+
+ SSDBG( descP, ("SOCKET", "send_check_result -> new (active) writer: "
+ "\r\n pid: %T"
+ "\r\n ref: %T"
+ "\r\n",
+ descP->currentWriter.pid,
+ descP->currentWriter.ref) );
+
+ SELECT(env,
+ descP->sock,
+ (ERL_NIF_SELECT_WRITE),
+ descP, &descP->currentWriter.pid, descP->currentWriter.ref);
+
+ } else {
+ descP->currentWriterP = NULL;
+ }
+
return esock_atom_ok;
} else if (written < 0) {
- /* Ouch, check what kind of failure */
+ /* Some kind of send failure - check what kind */
+
if ((saveErrno != EAGAIN) && (saveErrno != EINTR)) {
+ ErlNifPid pid;
+ ErlNifMonitor mon;
+ ERL_NIF_TERM ref, res;
+
+ /*
+ * An actual failure - we (and everyone waiting) give up
+ */
cnt_inc(&descP->writeFails, 1);
SSDBG( descP,
("SOCKET", "send_check_result -> error: %d\r\n", saveErrno) );
- return esock_make_error_errno(env, saveErrno);
+ res = esock_make_error_errno(env, saveErrno);
+
+ while (writer_pop(env, descP, &pid, &mon, &ref)) {
+ SSDBG( descP,
+ ("SOCKET", "send_check_result -> abort %T\r\n", pid) );
+ send_msg_nif_abort(env, ref, res, &pid);
+ DEMONP(env, descP, &mon);
+ }
+
+ return res;
} else {
@@ -13561,6 +13814,95 @@ BOOLEAN_T acceptor_unqueue(ErlNifEnv* env,
}
+
+/* *** writer search for pid ***
+ *
+ * Search for a pid in the writer queue.
+ */
+static
+BOOLEAN_T writer_search4pid(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ ErlNifPid* pid)
+{
+ return qsearch4pid(env, &descP->writersQ, pid);
+}
+
+
+/* *** writer push ***
+ *
+ * Push an writer onto the writer queue.
+ * This happens when we already have atleast one current writer.
+ */
+static
+ERL_NIF_TERM writer_push(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ ErlNifPid pid,
+ ERL_NIF_TERM ref)
+{
+ SocketRequestQueueElement* e = MALLOC(sizeof(SocketRequestQueueElement));
+ SocketRequestor* reqP = &e->data;
+
+ reqP->pid = pid;
+ reqP->ref = enif_make_copy(descP->env, ref);
+
+ if (MONP(env, descP, &pid, &reqP->mon) > 0) {
+ FREE(reqP);
+ return esock_make_error(env, atom_exmon);
+ }
+
+ qpush(&descP->writersQ, e);
+
+ // THIS IS OK => MAKES THE CALLER WAIT FOR ITS TURN
+ return esock_make_error(env, esock_atom_eagain);
+}
+
+
+/* *** writer pop ***
+ *
+ * Pop an writer from the writer queue.
+ */
+static
+BOOLEAN_T writer_pop(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ ErlNifPid* pid,
+ ErlNifMonitor* mon,
+ ERL_NIF_TERM* ref)
+{
+ SocketRequestQueueElement* e = qpop(&descP->writersQ);
+
+ if (e != NULL) {
+ *pid = e->data.pid;
+ *mon = e->data.mon;
+ *ref = e->data.ref; // At this point the ref has already been copied (env)
+ FREE(e);
+ return TRUE;
+ } else {
+ /* (acceptors) Queue was empty */
+ // *pid = NULL; we have no null value for pids
+ // *mon = NULL; we have no null value for monitors
+ *ref = esock_atom_undefined; // Just in case
+ return FALSE;
+ }
+
+}
+
+
+/* *** writer unqueue ***
+ *
+ * Remove an writer from the writer queue.
+ */
+static
+BOOLEAN_T writer_unqueue(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ const ErlNifPid* pid)
+{
+ return qunqueue(env, &descP->writersQ, pid);
+}
+
+
+
+
+
static
BOOLEAN_T qsearch4pid(ErlNifEnv* env,
SocketRequestQueue* q,