diff options
Diffstat (limited to 'erts/emulator/nifs')
-rw-r--r-- | erts/emulator/nifs/common/socket_nif.c | 245 |
1 files changed, 160 insertions, 85 deletions
diff --git a/erts/emulator/nifs/common/socket_nif.c b/erts/emulator/nifs/common/socket_nif.c index 666e39b07c..2361a26ae3 100644 --- a/erts/emulator/nifs/common/socket_nif.c +++ b/erts/emulator/nifs/common/socket_nif.c @@ -2067,6 +2067,19 @@ static ERL_NIF_TERM send_check_result(ErlNifEnv* env, int saveErrno, ERL_NIF_TERM sockRef, ERL_NIF_TERM sendRef); +static ERL_NIF_TERM send_check_ok(ErlNifEnv* env, + SocketDescriptor* descP, + ssize_t written, + ssize_t dataSize, + ERL_NIF_TERM sockRef); +static ERL_NIF_TERM send_check_fail(ErlNifEnv* env, + SocketDescriptor* descP, + int saveErrno, + ERL_NIF_TERM sockRef); +static ERL_NIF_TERM send_check_retry(ErlNifEnv* env, + SocketDescriptor* descP, + ssize_t written, + ERL_NIF_TERM sendRef); static BOOLEAN_T recv_check_reader(ErlNifEnv* env, SocketDescriptor* descP, ERL_NIF_TERM ref, @@ -5487,11 +5500,6 @@ ERL_NIF_TERM nif_send(ErlNifEnv* env, * this time (resulting in an select). The write of the * other process must be made to wait until current * is done! - * Basically, we need a write queue! - * - * A 'writing' field (boolean), which is set if we did - * not manage to write the entire message and reset every - * time we do. */ res = nsend(env, descP, sockRef, sendRef, &sndData, flags); @@ -13646,8 +13654,9 @@ ERL_NIF_TERM ncancel_mode_select(ErlNifEnv* env, /* *** send_check_writer *** * - * Checks if we have a current writer and if that is us. If not, then we must - * be made to wait for our turn. This is done by pushing us unto the writer queue. + * Checks if we have a current writer and if that is us. + * If not (current writer), then we must be made to wait + * for our turn. This is done by pushing us unto the writer queue. */ #if !defined(__WIN32__) static @@ -13668,7 +13677,8 @@ BOOLEAN_T send_check_writer(ErlNifEnv* env, /* Not the "current writer", so (maybe) push onto queue */ SSDBG( descP, - ("SOCKET", "send_check_writer -> not (current) writer\r\n") ); + ("SOCKET", + "send_check_writer -> not (current) writer\r\n") ); if (!writer_search4pid(env, descP, &caller)) *checkResult = writer_push(env, descP, caller, ref); @@ -13686,7 +13696,8 @@ BOOLEAN_T send_check_writer(ErlNifEnv* env, } - *checkResult = esock_atom_ok; // Does not actually matter in this case, but ... + // Does not actually matter in this case, but ... + *checkResult = esock_atom_ok; return TRUE; } @@ -13705,6 +13716,7 @@ BOOLEAN_T send_check_writer(ErlNifEnv* env, * If the write fail, we give up and return with the appropriate error code. * * What about the remaining writers!! + * */ static ERL_NIF_TERM send_check_result(ErlNifEnv* env, @@ -13715,7 +13727,7 @@ ERL_NIF_TERM send_check_result(ErlNifEnv* env, ERL_NIF_TERM sockRef, ERL_NIF_TERM sendRef) { - int sres; + ERL_NIF_TERM res; SSDBG( descP, ("SOCKET", "send_check_result -> entry with" @@ -13726,137 +13738,201 @@ ERL_NIF_TERM send_check_result(ErlNifEnv* env, if (written >= dataSize) { - cnt_inc(&descP->writePkgCnt, 1); - cnt_inc(&descP->writeByteCnt, written); - if (descP->currentWriterP != NULL) - DEMONP("send_check_result -> current writer", - env, descP, &descP->currentWriter.mon); + res = send_check_ok(env, descP, written, dataSize, sockRef); + + } else if (written < 0) { + + /* Some kind of send failure - check what kind */ + + if ((saveErrno != EAGAIN) && (saveErrno != EINTR)) { + + res = send_check_fail(env, descP, saveErrno, sockRef); + + } else { + + /* Ok, try again later */ + + SSDBG( descP, ("SOCKET", "send_check_result -> try again\r\n") ); + + res = send_check_retry(env, descP, written, sendRef); + + } + + } else { + + /* Not the entire package */ SSDBG( descP, ("SOCKET", "send_check_result -> " - "everything written (%d,%d) - done\r\n", dataSize, written) ); + "not entire package written (%d of %d)\r\n", + written, dataSize) ); - /* Ok, this write is done maybe activate the next (if any) */ + res = send_check_retry(env, descP, written, sendRef); - if (!activate_next_writer(env, descP, sockRef)) { - descP->currentWriterP = NULL; - descP->currentWriter.ref = esock_atom_undefined; - enif_set_pid_undefined(&descP->currentWriter.pid); - esock_monitor_init(&descP->currentWriter.mon); - } + } - return esock_atom_ok; + SSDBG( descP, ("SOCKET", "send_check_result -> done: %T\r\n", res) ); - } else if (written < 0) { + return res; +} - /* Some kind of send failure - check what kind */ - if ((saveErrno != EAGAIN) && (saveErrno != EINTR)) { - SocketRequestor req; - ERL_NIF_TERM res, reason; +/* *** send_check_ok *** + * + * Processing done upon successful send. + */ +static +ERL_NIF_TERM send_check_ok(ErlNifEnv* env, + SocketDescriptor* descP, + ssize_t written, + ssize_t dataSize, + ERL_NIF_TERM sockRef) +{ + cnt_inc(&descP->writePkgCnt, 1); + cnt_inc(&descP->writeByteCnt, written); - /* - * An actual failure - we (and everyone waiting) give up - */ + if (descP->currentWriterP != NULL) + DEMONP("send_check_ok -> current writer", + env, descP, &descP->currentWriter.mon); - cnt_inc(&descP->writeFails, 1); + SSDBG( descP, + ("SOCKET", "send_check_ok -> " + "everything written (%d,%d) - done\r\n", dataSize, written) ); - SSDBG( descP, - ("SOCKET", - "send_check_result -> error: %d\r\n", saveErrno) ); + /* Ok, this write is done maybe activate the next (if any) */ - reason = MKA(env, erl_errno_id(saveErrno)); - res = esock_make_error(env, reason); + if (!activate_next_writer(env, descP, sockRef)) { + descP->currentWriterP = NULL; + descP->currentWriter.ref = esock_atom_undefined; + enif_set_pid_undefined(&descP->currentWriter.pid); + esock_monitor_init(&descP->currentWriter.mon); + } - if (descP->currentWriterP != NULL) { + return esock_atom_ok; +} - DEMONP("send_check_result -> current writer", - env, descP, &descP->currentWriter.mon); - while (writer_pop(env, descP, &req)) { - SSDBG( descP, - ("SOCKET", "send_check_result -> abort %T\r\n", - req.pid) ); - esock_send_abort_msg(env, sockRef, req.ref, - reason, &req.pid); - DEMONP("send_check_result -> pop'ed writer", - env, descP, &req.mon); - } - } - - return res; - } else { - /* Ok, try again later */ +/* *** send_check_failure *** + * + * Processing done upon failed send. + * An actual failure - we (and everyone waiting) give up. + */ +static +ERL_NIF_TERM send_check_fail(ErlNifEnv* env, + SocketDescriptor* descP, + int saveErrno, + ERL_NIF_TERM sockRef) +{ + SocketRequestor req; + ERL_NIF_TERM reason; - SSDBG( descP, ("SOCKET", "send_check_result -> try again\r\n") ); - } + cnt_inc(&descP->writeFails, 1); - } - else { - SSDBG( descP, - ("SOCKET", "send_check_result -> " - "not entire package written (%d of %d)\r\n", written, dataSize) ); + SSDBG( descP, ("SOCKET", "send_check_fail -> error: %d\r\n", saveErrno) ); + + reason = MKA(env, erl_errno_id(saveErrno)); + + if (descP->currentWriterP != NULL) { + + DEMONP("send_check_fail -> current writer", + env, descP, &descP->currentWriter.mon); + + while (writer_pop(env, descP, &req)) { + SSDBG( descP, + ("SOCKET", "send_check_fail -> abort %T\r\n", req.pid) ); + esock_send_abort_msg(env, sockRef, req.ref, reason, &req.pid); + DEMONP("send_check_fail -> pop'ed writer", env, descP, &req.mon); + } } - /* We failed to write the *entire* packet (anything less then size - * of the packet, which is 0 <= written < sizeof packet), - * so schedule the rest for later. - */ + return esock_make_error(env, reason); +} + + + +/* *** send_check_retry *** + * + * Processing done upon uncomplete or blocked send. + * + * We failed to write the *entire* packet (anything less + * then size of the packet, which is 0 <= written < sizeof + * packet, so schedule the rest for later. + */ +static +ERL_NIF_TERM send_check_retry(ErlNifEnv* env, + SocketDescriptor* descP, + ssize_t written, + ERL_NIF_TERM sendRef) +{ + int sres; + ErlNifPid caller; + ERL_NIF_TERM res; if (descP->currentWriterP == NULL) { - ErlNifPid caller; if (enif_self(env, &caller) == NULL) return esock_make_error(env, atom_exself); descP->currentWriter.pid = caller; - if (MONP("send_check_result -> current writer", + + if (MONP("send_check_retry -> current writer", env, descP, &descP->currentWriter.pid, &descP->currentWriter.mon) != 0) return esock_make_error(env, atom_exmon); + descP->currentWriter.ref = enif_make_copy(descP->env, sendRef); descP->currentWriterP = &descP->currentWriter; + } cnt_inc(&descP->writeWaits, 1); sres = esock_select_write(env, descP->sock, descP, NULL, sendRef); - + if (written >= 0) { + + /* Partial *write* success */ + if (sres < 0) { /* Returned: {error, Reason} * Reason: {select_failed, sres, written} */ - return esock_make_error(env, - MKT3(env, - esock_atom_select_failed, - MKI(env, sres), - MKI(env, written))); + res = esock_make_error(env, + MKT3(env, + esock_atom_select_failed, + MKI(env, sres), + MKI(env, written))); } else { - return esock_make_ok2(env, MKI(env, written)); + res = esock_make_ok2(env, MKI(env, written)); } + } else { + if (sres < 0) { /* Returned: {error, Reason} * Reason: {select_failed, sres} */ - return esock_make_error(env, - MKT2(env, - esock_atom_select_failed, - MKI(env, sres))); + res = esock_make_error(env, + MKT2(env, + esock_atom_select_failed, + MKI(env, sres))); } else { - return esock_make_error(env, esock_atom_eagain); + res = esock_make_error(env, esock_atom_eagain); } } + + return res; } + /* *** recv_check_reader *** * - * Checks if we have a current reader and if that is us. If not, then we must - * be made to wait for our turn. This is done by pushing us unto the reader queue. + * Checks if we have a current reader and if that is us. If not, + * then we must be made to wait for our turn. This is done by pushing + * us unto the reader queue. */ static BOOLEAN_T recv_check_reader(ErlNifEnv* env, @@ -16934,8 +17010,7 @@ char* esock_send_socket_msg(ErlNifEnv* env, ErlNifEnv* msgEnv) { ErlNifEnv* menv; - ERL_NIF_TERM msock, mtag, minfo; - ERL_NIF_TERM msg; + ERL_NIF_TERM msg, msock, mtag, minfo; if (msgEnv == NULL) { menv = enif_alloc_env(); @@ -16948,7 +17023,7 @@ char* esock_send_socket_msg(ErlNifEnv* env, mtag = tag; minfo = info; } - msg = MKT4(menv, esock_atom_socket_tag, socket, mtag, minfo); + msg = MKT4(menv, esock_atom_socket_tag, msock, mtag, minfo); return esock_send_msg(env, msg, pid, menv); } |