aboutsummaryrefslogtreecommitdiffstats
path: root/erts
diff options
context:
space:
mode:
Diffstat (limited to 'erts')
-rw-r--r--erts/emulator/nifs/common/socket_nif.c245
1 files changed, 160 insertions, 85 deletions
diff --git a/erts/emulator/nifs/common/socket_nif.c b/erts/emulator/nifs/common/socket_nif.c
index 666e39b07c..2361a26ae3 100644
--- a/erts/emulator/nifs/common/socket_nif.c
+++ b/erts/emulator/nifs/common/socket_nif.c
@@ -2067,6 +2067,19 @@ static ERL_NIF_TERM send_check_result(ErlNifEnv* env,
int saveErrno,
ERL_NIF_TERM sockRef,
ERL_NIF_TERM sendRef);
+static ERL_NIF_TERM send_check_ok(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ ssize_t written,
+ ssize_t dataSize,
+ ERL_NIF_TERM sockRef);
+static ERL_NIF_TERM send_check_fail(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ int saveErrno,
+ ERL_NIF_TERM sockRef);
+static ERL_NIF_TERM send_check_retry(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ ssize_t written,
+ ERL_NIF_TERM sendRef);
static BOOLEAN_T recv_check_reader(ErlNifEnv* env,
SocketDescriptor* descP,
ERL_NIF_TERM ref,
@@ -5487,11 +5500,6 @@ ERL_NIF_TERM nif_send(ErlNifEnv* env,
* this time (resulting in an select). The write of the
* other process must be made to wait until current
* is done!
- * Basically, we need a write queue!
- *
- * A 'writing' field (boolean), which is set if we did
- * not manage to write the entire message and reset every
- * time we do.
*/
res = nsend(env, descP, sockRef, sendRef, &sndData, flags);
@@ -13646,8 +13654,9 @@ ERL_NIF_TERM ncancel_mode_select(ErlNifEnv* env,
/* *** send_check_writer ***
*
- * Checks if we have a current writer and if that is us. If not, then we must
- * be made to wait for our turn. This is done by pushing us unto the writer queue.
+ * Checks if we have a current writer and if that is us.
+ * If not (current writer), then we must be made to wait
+ * for our turn. This is done by pushing us unto the writer queue.
*/
#if !defined(__WIN32__)
static
@@ -13668,7 +13677,8 @@ BOOLEAN_T send_check_writer(ErlNifEnv* env,
/* Not the "current writer", so (maybe) push onto queue */
SSDBG( descP,
- ("SOCKET", "send_check_writer -> not (current) writer\r\n") );
+ ("SOCKET",
+ "send_check_writer -> not (current) writer\r\n") );
if (!writer_search4pid(env, descP, &caller))
*checkResult = writer_push(env, descP, caller, ref);
@@ -13686,7 +13696,8 @@ BOOLEAN_T send_check_writer(ErlNifEnv* env,
}
- *checkResult = esock_atom_ok; // Does not actually matter in this case, but ...
+ // Does not actually matter in this case, but ...
+ *checkResult = esock_atom_ok;
return TRUE;
}
@@ -13705,6 +13716,7 @@ BOOLEAN_T send_check_writer(ErlNifEnv* env,
* If the write fail, we give up and return with the appropriate error code.
*
* What about the remaining writers!!
+ *
*/
static
ERL_NIF_TERM send_check_result(ErlNifEnv* env,
@@ -13715,7 +13727,7 @@ ERL_NIF_TERM send_check_result(ErlNifEnv* env,
ERL_NIF_TERM sockRef,
ERL_NIF_TERM sendRef)
{
- int sres;
+ ERL_NIF_TERM res;
SSDBG( descP,
("SOCKET", "send_check_result -> entry with"
@@ -13726,137 +13738,201 @@ ERL_NIF_TERM send_check_result(ErlNifEnv* env,
if (written >= dataSize) {
- cnt_inc(&descP->writePkgCnt, 1);
- cnt_inc(&descP->writeByteCnt, written);
- if (descP->currentWriterP != NULL)
- DEMONP("send_check_result -> current writer",
- env, descP, &descP->currentWriter.mon);
+ res = send_check_ok(env, descP, written, dataSize, sockRef);
+
+ } else if (written < 0) {
+
+ /* Some kind of send failure - check what kind */
+
+ if ((saveErrno != EAGAIN) && (saveErrno != EINTR)) {
+
+ res = send_check_fail(env, descP, saveErrno, sockRef);
+
+ } else {
+
+ /* Ok, try again later */
+
+ SSDBG( descP, ("SOCKET", "send_check_result -> try again\r\n") );
+
+ res = send_check_retry(env, descP, written, sendRef);
+
+ }
+
+ } else {
+
+ /* Not the entire package */
SSDBG( descP,
("SOCKET", "send_check_result -> "
- "everything written (%d,%d) - done\r\n", dataSize, written) );
+ "not entire package written (%d of %d)\r\n",
+ written, dataSize) );
- /* Ok, this write is done maybe activate the next (if any) */
+ res = send_check_retry(env, descP, written, sendRef);
- if (!activate_next_writer(env, descP, sockRef)) {
- descP->currentWriterP = NULL;
- descP->currentWriter.ref = esock_atom_undefined;
- enif_set_pid_undefined(&descP->currentWriter.pid);
- esock_monitor_init(&descP->currentWriter.mon);
- }
+ }
- return esock_atom_ok;
+ SSDBG( descP, ("SOCKET", "send_check_result -> done: %T\r\n", res) );
- } else if (written < 0) {
+ return res;
+}
- /* Some kind of send failure - check what kind */
- if ((saveErrno != EAGAIN) && (saveErrno != EINTR)) {
- SocketRequestor req;
- ERL_NIF_TERM res, reason;
+/* *** send_check_ok ***
+ *
+ * Processing done upon successful send.
+ */
+static
+ERL_NIF_TERM send_check_ok(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ ssize_t written,
+ ssize_t dataSize,
+ ERL_NIF_TERM sockRef)
+{
+ cnt_inc(&descP->writePkgCnt, 1);
+ cnt_inc(&descP->writeByteCnt, written);
- /*
- * An actual failure - we (and everyone waiting) give up
- */
+ if (descP->currentWriterP != NULL)
+ DEMONP("send_check_ok -> current writer",
+ env, descP, &descP->currentWriter.mon);
- cnt_inc(&descP->writeFails, 1);
+ SSDBG( descP,
+ ("SOCKET", "send_check_ok -> "
+ "everything written (%d,%d) - done\r\n", dataSize, written) );
- SSDBG( descP,
- ("SOCKET",
- "send_check_result -> error: %d\r\n", saveErrno) );
+ /* Ok, this write is done maybe activate the next (if any) */
- reason = MKA(env, erl_errno_id(saveErrno));
- res = esock_make_error(env, reason);
+ if (!activate_next_writer(env, descP, sockRef)) {
+ descP->currentWriterP = NULL;
+ descP->currentWriter.ref = esock_atom_undefined;
+ enif_set_pid_undefined(&descP->currentWriter.pid);
+ esock_monitor_init(&descP->currentWriter.mon);
+ }
- if (descP->currentWriterP != NULL) {
+ return esock_atom_ok;
+}
- DEMONP("send_check_result -> current writer",
- env, descP, &descP->currentWriter.mon);
- while (writer_pop(env, descP, &req)) {
- SSDBG( descP,
- ("SOCKET", "send_check_result -> abort %T\r\n",
- req.pid) );
- esock_send_abort_msg(env, sockRef, req.ref,
- reason, &req.pid);
- DEMONP("send_check_result -> pop'ed writer",
- env, descP, &req.mon);
- }
- }
-
- return res;
- } else {
- /* Ok, try again later */
+/* *** send_check_failure ***
+ *
+ * Processing done upon failed send.
+ * An actual failure - we (and everyone waiting) give up.
+ */
+static
+ERL_NIF_TERM send_check_fail(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ int saveErrno,
+ ERL_NIF_TERM sockRef)
+{
+ SocketRequestor req;
+ ERL_NIF_TERM reason;
- SSDBG( descP, ("SOCKET", "send_check_result -> try again\r\n") );
- }
+ cnt_inc(&descP->writeFails, 1);
- }
- else {
- SSDBG( descP,
- ("SOCKET", "send_check_result -> "
- "not entire package written (%d of %d)\r\n", written, dataSize) );
+ SSDBG( descP, ("SOCKET", "send_check_fail -> error: %d\r\n", saveErrno) );
+
+ reason = MKA(env, erl_errno_id(saveErrno));
+
+ if (descP->currentWriterP != NULL) {
+
+ DEMONP("send_check_fail -> current writer",
+ env, descP, &descP->currentWriter.mon);
+
+ while (writer_pop(env, descP, &req)) {
+ SSDBG( descP,
+ ("SOCKET", "send_check_fail -> abort %T\r\n", req.pid) );
+ esock_send_abort_msg(env, sockRef, req.ref, reason, &req.pid);
+ DEMONP("send_check_fail -> pop'ed writer", env, descP, &req.mon);
+ }
}
- /* We failed to write the *entire* packet (anything less then size
- * of the packet, which is 0 <= written < sizeof packet),
- * so schedule the rest for later.
- */
+ return esock_make_error(env, reason);
+}
+
+
+
+/* *** send_check_retry ***
+ *
+ * Processing done upon uncomplete or blocked send.
+ *
+ * We failed to write the *entire* packet (anything less
+ * then size of the packet, which is 0 <= written < sizeof
+ * packet, so schedule the rest for later.
+ */
+static
+ERL_NIF_TERM send_check_retry(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ ssize_t written,
+ ERL_NIF_TERM sendRef)
+{
+ int sres;
+ ErlNifPid caller;
+ ERL_NIF_TERM res;
if (descP->currentWriterP == NULL) {
- ErlNifPid caller;
if (enif_self(env, &caller) == NULL)
return esock_make_error(env, atom_exself);
descP->currentWriter.pid = caller;
- if (MONP("send_check_result -> current writer",
+
+ if (MONP("send_check_retry -> current writer",
env, descP,
&descP->currentWriter.pid,
&descP->currentWriter.mon) != 0)
return esock_make_error(env, atom_exmon);
+
descP->currentWriter.ref = enif_make_copy(descP->env, sendRef);
descP->currentWriterP = &descP->currentWriter;
+
}
cnt_inc(&descP->writeWaits, 1);
sres = esock_select_write(env, descP->sock, descP, NULL, sendRef);
-
+
if (written >= 0) {
+
+ /* Partial *write* success */
+
if (sres < 0) {
/* Returned: {error, Reason}
* Reason: {select_failed, sres, written}
*/
- return esock_make_error(env,
- MKT3(env,
- esock_atom_select_failed,
- MKI(env, sres),
- MKI(env, written)));
+ res = esock_make_error(env,
+ MKT3(env,
+ esock_atom_select_failed,
+ MKI(env, sres),
+ MKI(env, written)));
} else {
- return esock_make_ok2(env, MKI(env, written));
+ res = esock_make_ok2(env, MKI(env, written));
}
+
} else {
+
if (sres < 0) {
/* Returned: {error, Reason}
* Reason: {select_failed, sres}
*/
- return esock_make_error(env,
- MKT2(env,
- esock_atom_select_failed,
- MKI(env, sres)));
+ res = esock_make_error(env,
+ MKT2(env,
+ esock_atom_select_failed,
+ MKI(env, sres)));
} else {
- return esock_make_error(env, esock_atom_eagain);
+ res = esock_make_error(env, esock_atom_eagain);
}
}
+
+ return res;
}
+
/* *** recv_check_reader ***
*
- * Checks if we have a current reader and if that is us. If not, then we must
- * be made to wait for our turn. This is done by pushing us unto the reader queue.
+ * Checks if we have a current reader and if that is us. If not,
+ * then we must be made to wait for our turn. This is done by pushing
+ * us unto the reader queue.
*/
static
BOOLEAN_T recv_check_reader(ErlNifEnv* env,
@@ -16934,8 +17010,7 @@ char* esock_send_socket_msg(ErlNifEnv* env,
ErlNifEnv* msgEnv)
{
ErlNifEnv* menv;
- ERL_NIF_TERM msock, mtag, minfo;
- ERL_NIF_TERM msg;
+ ERL_NIF_TERM msg, msock, mtag, minfo;
if (msgEnv == NULL) {
menv = enif_alloc_env();
@@ -16948,7 +17023,7 @@ char* esock_send_socket_msg(ErlNifEnv* env,
mtag = tag;
minfo = info;
}
- msg = MKT4(menv, esock_atom_socket_tag, socket, mtag, minfo);
+ msg = MKT4(menv, esock_atom_socket_tag, msock, mtag, minfo);
return esock_send_msg(env, msg, pid, menv);
}