aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--erts/emulator/nifs/common/socket_int.h1
-rw-r--r--erts/emulator/nifs/common/socket_nif.c803
-rw-r--r--lib/kernel/test/socket_server.erl4
3 files changed, 727 insertions, 81 deletions
diff --git a/erts/emulator/nifs/common/socket_int.h b/erts/emulator/nifs/common/socket_int.h
index 3595c483d7..c3595e495d 100644
--- a/erts/emulator/nifs/common/socket_int.h
+++ b/erts/emulator/nifs/common/socket_int.h
@@ -184,6 +184,7 @@ extern ERL_NIF_TERM esock_atom_einval;
#define MKLA(E,A,L) enif_make_list_from_array((E), (A), (L))
#define MKEL(E) enif_make_list((E), 0)
#define MKMA(E,KA,VA,L,M) enif_make_map_from_arrays((E), (KA), (VA), (L), (M))
+#define MKPID(E, P) enif_make_pid((E), (P))
#define MKREF(E) enif_make_ref((E))
#define MKS(E,S) enif_make_string((E), (S), ERL_NIF_LATIN1)
#define MKSL(E,S,L) enif_make_string_len((E), (S), (L), ERL_NIF_LATIN1)
diff --git a/erts/emulator/nifs/common/socket_nif.c b/erts/emulator/nifs/common/socket_nif.c
index c48d6eab00..a6940f788c 100644
--- a/erts/emulator/nifs/common/socket_nif.c
+++ b/erts/emulator/nifs/common/socket_nif.c
@@ -1490,6 +1490,8 @@ static ERL_NIF_TERM ngetopt_otp_debug(ErlNifEnv* env,
SocketDescriptor* descP);
static ERL_NIF_TERM ngetopt_otp_iow(ErlNifEnv* env,
SocketDescriptor* descP);
+static ERL_NIF_TERM ngetopt_otp_ctrl_proc(ErlNifEnv* env,
+ SocketDescriptor* descP);
static ERL_NIF_TERM ngetopt_native(ErlNifEnv* env,
SocketDescriptor* descP,
int level,
@@ -1849,6 +1851,11 @@ static ERL_NIF_TERM ncancel_send_waiting(ErlNifEnv* env,
static ERL_NIF_TERM ncancel_recv(ErlNifEnv* env,
SocketDescriptor* descP,
ERL_NIF_TERM opRef);
+static ERL_NIF_TERM ncancel_recv_current(ErlNifEnv* env,
+ SocketDescriptor* descP);
+static ERL_NIF_TERM ncancel_recv_waiting(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ ERL_NIF_TERM opRef);
static ERL_NIF_TERM ncancel_read_select(ErlNifEnv* env,
SocketDescriptor* descP,
ERL_NIF_TERM opRef);
@@ -1911,6 +1918,18 @@ static ERL_NIF_TERM send_check_result(ErlNifEnv* env,
ssize_t dataSize,
int saveErrno,
ERL_NIF_TERM sendRef);
+static BOOLEAN_T recv_check_reader(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ ERL_NIF_TERM ref,
+ ERL_NIF_TERM* checkResult);
+static char* recv_init_current_reader(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ ERL_NIF_TERM ref);
+static ERL_NIF_TERM recv_update_current_reader(ErlNifEnv* env,
+ SocketDescriptor* descP);
+static void recv_error_current_reader(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ ERL_NIF_TERM reason);
static ERL_NIF_TERM recv_check_result(ErlNifEnv* env,
SocketDescriptor* descP,
int read,
@@ -2150,6 +2169,22 @@ static BOOLEAN_T writer_unqueue(ErlNifEnv* env,
SocketDescriptor* descP,
const ErlNifPid* pid);
+static BOOLEAN_T reader_search4pid(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ ErlNifPid* pid);
+static ERL_NIF_TERM reader_push(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ ErlNifPid pid,
+ ERL_NIF_TERM ref);
+static BOOLEAN_T reader_pop(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ ErlNifPid* pid,
+ ErlNifMonitor* mon,
+ ERL_NIF_TERM* ref);
+static BOOLEAN_T reader_unqueue(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ const ErlNifPid* pid);
+
static BOOLEAN_T qsearch4pid(ErlNifEnv* env,
SocketRequestQueue* q,
ErlNifPid* pid);
@@ -2159,7 +2194,6 @@ static SocketRequestQueueElement* qpop(SocketRequestQueue* q);
static BOOLEAN_T qunqueue(ErlNifEnv* env,
SocketRequestQueue* q,
const ErlNifPid* pid);
-
/*
#if defined(HAVE_SYS_UN_H) || defined(SO_BINDTODEVICE)
static size_t my_strnlen(const char *s, size_t maxlen);
@@ -2175,6 +2209,15 @@ static void socket_down(ErlNifEnv* env,
void* obj,
const ErlNifPid* pid,
const ErlNifMonitor* mon);
+static void socket_down_acceptor(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ const ErlNifPid* pid);
+static void socket_down_writer(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ const ErlNifPid* pid);
+static void socket_down_reader(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ const ErlNifPid* pid);
/*
static char* send_msg_error_closed(ErlNifEnv* env,
@@ -3986,6 +4029,8 @@ ERL_NIF_TERM nsendmsg(ErlNifEnv* env,
if (ctrlBuf != NULL) FREE(ctrlBuf);
return esock_make_error_str(env, xres);
}
+ } else {
+ ctrlBufUsed = 0;
}
msgHdr.msg_control = ctrlBuf;
msgHdr.msg_controllen = ctrlBufUsed;
@@ -4168,6 +4213,7 @@ ERL_NIF_TERM nrecv(ErlNifEnv* env,
{
ssize_t read;
ErlNifBinary buf;
+ ERL_NIF_TERM readerCheck;
int save_errno;
int bufSz = (len ? len : descP->rBufSz);
@@ -4179,6 +4225,10 @@ ERL_NIF_TERM nrecv(ErlNifEnv* env,
if (!descP->isReadable)
return enif_make_badarg(env);
+ /* Check if there is already a current reader and if its us */
+ if (!recv_check_reader(env, descP, recvRef, &readerCheck))
+ return readerCheck;
+
/* Allocate a buffer:
* Either as much as we want to read or (if zero (0)) use the "default"
* size (what has been configured).
@@ -4315,6 +4365,7 @@ ERL_NIF_TERM nrecvfrom(ErlNifEnv* env,
ssize_t read;
int save_errno;
ErlNifBinary buf;
+ ERL_NIF_TERM readerCheck;
int bufSz = (len ? len : descP->rBufSz);
SSDBG( descP, ("SOCKET", "nrecvfrom -> entry with"
@@ -4325,6 +4376,10 @@ ERL_NIF_TERM nrecvfrom(ErlNifEnv* env,
if (!descP->isReadable)
return enif_make_badarg(env);
+ /* Check if there is already a current reader and if its us */
+ if (!recv_check_reader(env, descP, recvRef, &readerCheck))
+ return readerCheck;
+
/* Allocate a buffer:
* Either as much as we want to read or (if zero (0)) use the "default"
* size (what has been configured).
@@ -4476,6 +4531,7 @@ ERL_NIF_TERM nrecvmsg(ErlNifEnv* env,
struct iovec iov[1]; // Shall we always use 1?
ErlNifBinary data[1]; // Shall we always use 1?
ErlNifBinary ctrl;
+ ERL_NIF_TERM readerCheck;
SocketAddress addr;
SSDBG( descP, ("SOCKET", "nrecvmsg -> entry with"
@@ -4487,6 +4543,10 @@ ERL_NIF_TERM nrecvmsg(ErlNifEnv* env,
if (!descP->isReadable)
return enif_make_badarg(env);
+ /* Check if there is already a current reader and if its us */
+ if (!recv_check_reader(env, descP, recvRef, &readerCheck))
+ return readerCheck;
+
/*
for (i = 0; i < sizeof(buf); i++) {
if (!ALLOC_BIN(bifSz, &buf[i]))
@@ -8093,7 +8153,7 @@ ERL_NIF_TERM ngetopt_otp(ErlNifEnv* env,
ERL_NIF_TERM result;
SSDBG( descP,
- ("SOCKET", "ngetopt_opt -> entry with"
+ ("SOCKET", "ngetopt_otp -> entry with"
"\r\n eOpt: %d"
"\r\n", eOpt) );
@@ -8106,13 +8166,17 @@ ERL_NIF_TERM ngetopt_otp(ErlNifEnv* env,
result = ngetopt_otp_iow(env, descP);
break;
+ case SOCKET_OPT_OTP_CTRL_PROC:
+ result = ngetopt_otp_ctrl_proc(env, descP);
+ break;
+
default:
result = esock_make_error(env, esock_atom_einval);
break;
}
SSDBG( descP,
- ("SOCKET", "ngetopt_opt -> done when"
+ ("SOCKET", "ngetopt_otp -> done when"
"\r\n result: %T"
"\r\n", result) );
@@ -8144,6 +8208,18 @@ ERL_NIF_TERM ngetopt_otp_iow(ErlNifEnv* env,
}
+/* ngetopt_otp_ctrl_proc - Handle the OTP (level) controlling_process options
+ */
+static
+ERL_NIF_TERM ngetopt_otp_ctrl_proc(ErlNifEnv* env,
+ SocketDescriptor* descP)
+{
+ ERL_NIF_TERM eVal = MKPID(env, &descP->ctrlPid);
+
+ return esock_make_ok2(env, eVal);
+}
+
+
/* The option has *not* been encoded. Instead it has been provided
* in "native mode" (option is provided as is). In this case it will have the
@@ -10752,6 +10828,8 @@ ERL_NIF_TERM ncancel_send(ErlNifEnv* env,
{
ERL_NIF_TERM res;
+ MLOCK(descP->writeMtx);
+
SSDBG( descP,
("SOCKET", "ncancel_send -> entry with"
"\r\n opRef: %T"
@@ -10759,8 +10837,6 @@ ERL_NIF_TERM ncancel_send(ErlNifEnv* env,
"\r\n", opRef,
((descP->currentWriterP == NULL) ? "without writer" : "with writer")) );
- MLOCK(descP->writeMtx);
-
if (descP->currentWriterP != NULL) {
if (COMPARE(opRef, descP->currentWriter.ref) == 0) {
res = ncancel_send_current(env, descP);
@@ -10859,14 +10935,116 @@ ERL_NIF_TERM ncancel_send_waiting(ErlNifEnv* env,
/* *** ncancel_recv ***
*
- *
+ * Cancel a read operation.
+ * Its either the current reader or one of the waiting readers.
*/
static
ERL_NIF_TERM ncancel_recv(ErlNifEnv* env,
SocketDescriptor* descP,
ERL_NIF_TERM opRef)
{
- return esock_make_error(env, esock_atom_einval);
+ ERL_NIF_TERM res;
+
+ MLOCK(descP->readMtx);
+
+ SSDBG( descP,
+ ("SOCKET", "ncancel_recv -> entry with"
+ "\r\n opRef: %T"
+ "\r\n %s"
+ "\r\n", opRef,
+ ((descP->currentReaderP == NULL) ? "without reader" : "with reader")) );
+
+ if (descP->currentReaderP != NULL) {
+ if (COMPARE(opRef, descP->currentReader.ref) == 0) {
+ res = ncancel_recv_current(env, descP);
+ } else {
+ res = ncancel_recv_waiting(env, descP, opRef);
+ }
+ } else {
+ /* Or badarg? */
+ res = esock_make_error(env, esock_atom_einval);
+ }
+
+ MUNLOCK(descP->readMtx);
+
+ SSDBG( descP,
+ ("SOCKET", "ncancel_recv -> done with result:"
+ "\r\n %T"
+ "\r\n", res) );
+
+ return res;
+}
+
+
+/* The current reader process has an ongoing select we first must
+ * cancel. Then we must re-activate the "first" (the first
+ * in the reader queue).
+ */
+static
+ERL_NIF_TERM ncancel_recv_current(ErlNifEnv* env,
+ SocketDescriptor* descP)
+{
+ ERL_NIF_TERM res;
+
+ SSDBG( descP, ("SOCKET", "ncancel_recv_current -> entry\r\n") );
+
+ res = ncancel_read_select(env, descP, descP->currentReader.ref);
+
+ SSDBG( descP, ("SOCKET", "ncancel_recv_current -> cancel res: %T\r\n", res) );
+
+ if (reader_pop(env, descP,
+ &descP->currentReader.pid,
+ &descP->currentReader.mon,
+ &descP->currentReader.ref)) {
+
+ /* There was another one */
+
+ SSDBG( descP, ("SOCKET", "ncancel_recv_current -> new (active) reader: "
+ "\r\n pid: %T"
+ "\r\n ref: %T"
+ "\r\n",
+ descP->currentReader.pid,
+ descP->currentReader.ref) );
+
+ SELECT(env,
+ descP->sock,
+ (ERL_NIF_SELECT_READ),
+ descP, &descP->currentReader.pid, descP->currentReader.ref);
+
+ } else {
+ SSDBG( descP, ("SOCKET", "ncancel_recv_current -> no more readers\r\n") );
+ descP->currentReaderP = NULL;
+ }
+
+ SSDBG( descP, ("SOCKET", "ncancel_recv_current -> done with result:"
+ "\r\n %T"
+ "\r\n", res) );
+
+ return res;
+}
+
+
+/* These processes have not performed a select, so we can simply
+ * remove them from the reader queue.
+ */
+static
+ERL_NIF_TERM ncancel_recv_waiting(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ ERL_NIF_TERM opRef)
+{
+ ErlNifPid caller;
+
+ if (enif_self(env, &caller) == NULL)
+ return esock_make_error(env, atom_exself);
+
+ /* unqueue request from (reader) queue */
+
+ if (reader_unqueue(env, descP, &caller)) {
+ return esock_atom_ok;
+ } else {
+ /* Race? */
+ return esock_make_error(env, esock_atom_not_found);
+ }
}
@@ -10958,7 +11136,8 @@ BOOLEAN_T send_check_writer(ErlNifEnv* env,
SSDBG( descP,
("SOCKET",
- "nsend -> queue (push) result: %T\r\n", checkResult) );
+ "send_check_writer -> queue (push) result: %T\r\n",
+ checkResult) );
return FALSE;
@@ -11005,7 +11184,8 @@ ERL_NIF_TERM send_check_result(ErlNifEnv* env,
cnt_inc(&descP->writePkgCnt, 1);
cnt_inc(&descP->writeByteCnt, written);
- DEMONP(env, descP, &descP->currentWriter.mon);
+ if (descP->currentWriterP != NULL)
+ DEMONP(env, descP, &descP->currentWriter.mon);
SSDBG( descP,
("SOCKET", "send_check_result -> "
@@ -11058,11 +11238,15 @@ ERL_NIF_TERM send_check_result(ErlNifEnv* env,
res = esock_make_error_errno(env, saveErrno);
- while (writer_pop(env, descP, &pid, &mon, &ref)) {
- SSDBG( descP,
- ("SOCKET", "send_check_result -> abort %T\r\n", pid) );
- send_msg_nif_abort(env, ref, res, &pid);
- DEMONP(env, descP, &mon);
+ if (descP->currentWriterP != NULL) {
+ DEMONP(env, descP, &descP->currentWriter.mon);
+
+ while (writer_pop(env, descP, &pid, &mon, &ref)) {
+ SSDBG( descP,
+ ("SOCKET", "send_check_result -> abort %T\r\n", pid) );
+ send_msg_nif_abort(env, ref, res, &pid);
+ DEMONP(env, descP, &mon);
+ }
}
return res;
@@ -11087,6 +11271,20 @@ ERL_NIF_TERM send_check_result(ErlNifEnv* env,
* so schedule the rest for later.
*/
+ if (descP->currentWriterP == NULL) {
+ ErlNifPid caller;
+
+ if (enif_self(env, &caller) == NULL)
+ return esock_make_error(env, atom_exself);
+ descP->currentWriter.pid = caller;
+ if (MONP(env, descP,
+ &descP->currentWriter.pid,
+ &descP->currentWriter.mon) > 0)
+ return esock_make_error(env, atom_exmon);
+ descP->currentWriter.ref = enif_make_copy(descP->env, sendRef);
+ descP->currentWriterP = &descP->currentWriter;
+ }
+
cnt_inc(&descP->writeWaits, 1);
SELECT(env, descP->sock, (ERL_NIF_SELECT_WRITE),
@@ -11100,6 +11298,167 @@ ERL_NIF_TERM send_check_result(ErlNifEnv* env,
}
+
+/* *** recv_check_reader ***
+ *
+ * Checks if we have a current reader and if that is us. If not, then we must
+ * be made to wait for our turn. This is done by pushing us unto the reader queue.
+ */
+static
+BOOLEAN_T recv_check_reader(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ ERL_NIF_TERM ref,
+ ERL_NIF_TERM* checkResult)
+{
+ if (descP->currentReaderP != NULL) {
+ ErlNifPid caller;
+
+ if (enif_self(env, &caller) == NULL) {
+ *checkResult = esock_make_error(env, atom_exself);
+ return FALSE;
+ }
+
+ if (!compare_pids(env, &descP->currentReader.pid, &caller)) {
+ /* Not the "current reader", so (maybe) push onto queue */
+
+ SSDBG( descP,
+ ("SOCKET", "recv_check_reader -> not (current) reader\r\n") );
+
+ if (!reader_search4pid(env, descP, &caller))
+ *checkResult = reader_push(env, descP, caller, ref);
+ else
+ *checkResult = esock_make_error(env, esock_atom_eagain);
+
+ SSDBG( descP,
+ ("SOCKET",
+ "recv_check_reader -> queue (push) result: %T\r\n",
+ checkResult) );
+
+ return FALSE;
+
+ }
+
+ }
+
+ *checkResult = esock_atom_ok; // Does not actually matter in this case, but ...
+
+ return TRUE;
+}
+
+
+
+/* *** recv_init_current_reader ***
+ *
+ * Initiate (maybe) the currentReader structure of the descriptor.
+ * Including monitoring the calling process.
+ */
+static
+char* recv_init_current_reader(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ ERL_NIF_TERM recvRef)
+{
+ if (descP->currentReaderP == NULL) {
+ ErlNifPid caller;
+
+ if (enif_self(env, &caller) == NULL)
+ return str_exself;
+
+ descP->currentReader.pid = caller;
+ if (MONP(env, descP,
+ &descP->currentReader.pid,
+ &descP->currentReader.mon) > 0) {
+ return str_exmon;
+ }
+ descP->currentReader.ref = enif_make_copy(descP->env, recvRef);
+ descP->currentReaderP = &descP->currentReader;
+ }
+
+ return NULL;
+}
+
+
+
+/* *** recv_update_current_reader ***
+ *
+ * Demonitors the current reader process and pop's the reader queue.
+ * If there is a waiting (reader) process, then it will be assigned
+ * as the new current reader and a new (read) select will be done.
+ */
+
+static
+ERL_NIF_TERM recv_update_current_reader(ErlNifEnv* env,
+ SocketDescriptor* descP)
+{
+ if (descP->currentReaderP != NULL) {
+
+ DEMONP(env, descP, &descP->currentReader.mon);
+
+ if (reader_pop(env, descP,
+ &descP->currentReader.pid,
+ &descP->currentReader.mon,
+ &descP->currentReader.ref)) {
+
+ /* There was another one */
+
+ SSDBG( descP,
+ ("SOCKET", "recv_update_current_reader -> new (active) reader: "
+ "\r\n pid: %T"
+ "\r\n ref: %T"
+ "\r\n",
+ descP->currentReader.pid,
+ descP->currentReader.ref) );
+
+ SELECT(env,
+ descP->sock,
+ (ERL_NIF_SELECT_READ),
+ descP,
+ &descP->currentReader.pid,
+ descP->currentReader.ref);
+
+ } else {
+ descP->currentWriterP = NULL;
+ }
+ }
+
+ return esock_atom_ok;
+}
+
+
+
+/* *** recv_error_current_reader ***
+ *
+ * Process the current reader and any waiting readers
+ * when a read (fatal) error has occured.
+ * All waiting readers will be "aborted", that is a
+ * nif_abort message will be sent (with reaf and reason).
+ */
+static
+void recv_error_current_reader(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ ERL_NIF_TERM reason)
+{
+ if (descP->currentReaderP != NULL) {
+ ErlNifPid pid;
+ ErlNifMonitor mon;
+ ERL_NIF_TERM ref;
+
+ DEMONP(env, descP, &descP->currentReader.mon);
+
+ while (reader_pop(env, descP, &pid, &mon, &ref)) {
+ SSDBG( descP,
+ ("SOCKET", "recv_error_current_reader -> abort %T\r\n", pid) );
+ send_msg_nif_abort(env, ref, reason, &pid);
+ DEMONP(env, descP, &mon);
+ }
+ }
+}
+
+
+
+/* *** recv_check_result ***
+ *
+ * Process the result of a call to recv.
+ */
static
ERL_NIF_TERM recv_check_result(ErlNifEnv* env,
SocketDescriptor* descP,
@@ -11109,6 +11468,7 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env,
ErlNifBinary* bufP,
ERL_NIF_TERM recvRef)
{
+ char* xres;
ERL_NIF_TERM data;
SSDBG( descP,
@@ -11129,15 +11489,20 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env,
*/
if ((read == 0) && (descP->type == SOCK_STREAM)) {
+ ERL_NIF_TERM res = esock_make_error(env, atom_closed);
/*
* When a stream socket peer has performed an orderly shutdown, the return
* value will be 0 (the traditional "end-of-file" return).
*
* *We* do never actually try to read 0 bytes from a stream socket!
+ *
+ * We must also notify any waiting readers!
*/
- return esock_make_error(env, atom_closed);
+ recv_error_current_reader(env, descP, res);
+
+ return res;
}
@@ -11173,6 +11538,11 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env,
* => We choose alt 1 for now.
*/
+ cnt_inc(&descP->readByteCnt, read);
+
+ if ((xres = recv_init_current_reader(env, descP, recvRef)) != NULL)
+ return esock_make_error_str(env, xres);
+
data = MKBIN(env, bufP);
SSDBG( descP,
@@ -11188,16 +11558,24 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env,
/* <KOLLA>
* WE NEED TO INFORM ANY WAITING READERS
+ *
+ * DEMONP of the current reader!
+ *
* </KOLLA>
*/
- data = MKBIN(env, bufP);
+ cnt_inc(&descP->readPkgCnt, 1);
+ cnt_inc(&descP->readByteCnt, read);
SSDBG( descP,
("SOCKET",
"recv_check_result -> [%d] "
"we got exactly what we could fit\r\n", toRead) );
+ recv_update_current_reader(env, descP);
+
+ data = MKBIN(env, bufP);
+
return esock_make_ok3(env, atom_true, data);
}
@@ -11207,6 +11585,7 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env,
/* +++ Error handling +++ */
if (saveErrno == ECONNRESET) {
+ ERL_NIF_TERM res = esock_make_error(env, atom_closed);
/* +++ Oups - closed +++ */
@@ -11231,12 +11610,14 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env,
descP->closeLocal = FALSE;
descP->state = SOCKET_STATE_CLOSING;
+ recv_error_current_reader(env, descP, res);
+
SELECT(env,
descP->sock,
(ERL_NIF_SELECT_STOP),
descP, NULL, recvRef);
- return esock_make_error(env, atom_closed);
+ return res;
} else if ((saveErrno == ERRNO_BLOCK) ||
(saveErrno == EAGAIN)) {
@@ -11248,9 +11629,14 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env,
return esock_make_error(env, esock_atom_eagain);
} else {
+ ERL_NIF_TERM res = esock_make_error_errno(env, saveErrno);
+
SSDBG( descP, ("SOCKET", "recv_check_result -> [%d] errno: %d\r\n",
toRead, saveErrno) );
- return esock_make_error_errno(env, saveErrno);
+
+ recv_error_current_reader(env, descP, res);
+
+ return res;
}
} else {
@@ -11265,19 +11651,24 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env,
if (toRead == 0) {
- /* +++ We got a chunk of data but +++
- * +++ since we did not fill the +++
- * +++ buffer, we must split it +++
- * +++ into a sub-binary. +++
+ /* +++ We got it all, but since we +++
+ * +++ did not fill the buffer, we +++
+ * +++ must split it into a sub-binary. +++
*/
SSDBG( descP, ("SOCKET",
"recv_check_result -> [%d] split buffer\r\n", toRead) );
+ cnt_inc(&descP->readPkgCnt, 1);
+ cnt_inc(&descP->readByteCnt, read);
+
+ recv_update_current_reader(env, descP);
+
data = MKBIN(env, bufP);
data = MKSBIN(env, data, 0, read);
- SSDBG( descP, ("SOCKET", "recv_check_result -> [%d] done\r\n", toRead) );
+ SSDBG( descP,
+ ("SOCKET", "recv_check_result -> [%d] done\r\n", toRead) );
return esock_make_ok3(env, atom_true, data);
@@ -11289,6 +11680,8 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env,
SSDBG( descP, ("SOCKET", "recv_check_result -> [%d] "
"only part of message - expect more\r\n", toRead) );
+ cnt_inc(&descP->readByteCnt, read);
+
return esock_make_ok3(env, atom_false, MKBIN(env, bufP));
}
}
@@ -13907,10 +14300,10 @@ ERL_NIF_TERM writer_push(ErlNifEnv* env,
*/
static
BOOLEAN_T writer_pop(ErlNifEnv* env,
- SocketDescriptor* descP,
- ErlNifPid* pid,
- ErlNifMonitor* mon,
- ERL_NIF_TERM* ref)
+ SocketDescriptor* descP,
+ ErlNifPid* pid,
+ ErlNifMonitor* mon,
+ ERL_NIF_TERM* ref)
{
SocketRequestQueueElement* e = qpop(&descP->writersQ);
@@ -13921,7 +14314,7 @@ BOOLEAN_T writer_pop(ErlNifEnv* env,
FREE(e);
return TRUE;
} else {
- /* (acceptors) Queue was empty */
+ /* (writers) Queue was empty */
// *pid = NULL; we have no null value for pids
// *mon = NULL; we have no null value for monitors
*ref = esock_atom_undefined; // Just in case
@@ -13945,6 +14338,92 @@ BOOLEAN_T writer_unqueue(ErlNifEnv* env,
+/* *** reader search for pid ***
+ *
+ * Search for a pid in the reader queue.
+ */
+static
+BOOLEAN_T reader_search4pid(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ ErlNifPid* pid)
+{
+ return qsearch4pid(env, &descP->readersQ, pid);
+}
+
+
+/* *** reader push ***
+ *
+ * Push an reader onto the raeder queue.
+ * This happens when we already have atleast one current reader.
+ */
+static
+ERL_NIF_TERM reader_push(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ ErlNifPid pid,
+ ERL_NIF_TERM ref)
+{
+ SocketRequestQueueElement* e = MALLOC(sizeof(SocketRequestQueueElement));
+ SocketRequestor* reqP = &e->data;
+
+ reqP->pid = pid;
+ reqP->ref = enif_make_copy(descP->env, ref);
+
+ if (MONP(env, descP, &pid, &reqP->mon) > 0) {
+ FREE(reqP);
+ return esock_make_error(env, atom_exmon);
+ }
+
+ qpush(&descP->readersQ, e);
+
+ // THIS IS OK => MAKES THE CALLER WAIT FOR ITS TURN
+ return esock_make_error(env, esock_atom_eagain);
+}
+
+
+/* *** reader pop ***
+ *
+ * Pop an writer from the reader queue.
+ */
+static
+BOOLEAN_T reader_pop(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ ErlNifPid* pid,
+ ErlNifMonitor* mon,
+ ERL_NIF_TERM* ref)
+{
+ SocketRequestQueueElement* e = qpop(&descP->readersQ);
+
+ if (e != NULL) {
+ *pid = e->data.pid;
+ *mon = e->data.mon;
+ *ref = e->data.ref; // At this point the ref has already been copied (env)
+ FREE(e);
+ return TRUE;
+ } else {
+ /* (readers) Queue was empty */
+ // *pid = NULL; we have no null value for pids
+ // *mon = NULL; we have no null value for monitors
+ *ref = esock_atom_undefined; // Just in case
+ return FALSE;
+ }
+
+}
+
+
+/* *** reader unqueue ***
+ *
+ * Remove an reader from the reader queue.
+ */
+static
+BOOLEAN_T reader_unqueue(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ const ErlNifPid* pid)
+{
+ return qunqueue(env, &descP->readersQ, pid);
+}
+
+
+
static
@@ -14324,77 +14803,243 @@ void socket_down(ErlNifEnv* env,
"\r\n pid: %T"
"\r\n", descP->sock, *pid) );
- /* Eventually we should go through the other queues also,
- * the process can be one of them...
- *
- * Currently only the accteptors actuallu use the queues.
- */
+
+ if (compare_pids(env, &descP->ctrlPid, pid)) {
+ /* We don't bother with the queue cleanup here -
+ * we leave it to the stop callback function.
+ */
- if (descP->currentAcceptorP != NULL) {
+ descP->state = SOCKET_STATE_CLOSING;
+ descP->closeLocal = TRUE;
+ descP->closeRef = MKREF(env);
+ enif_select(env, descP->sock, (ERL_NIF_SELECT_STOP),
+ descP, NULL, descP->closeRef);
- /*
- * We have acceptor(s) (atleast one)
- *
- * Check first if its the current acceptor,
- * and if not check the queue.
- */
+ } else {
+
+ /* check all operation queue(s): acceptor, writer and reader. */
+
+ MLOCK(descP->accMtx);
+ if (descP->currentAcceptorP != NULL)
+ socket_down_acceptor(env, descP, pid);
+ MUNLOCK(descP->accMtx);
+
+ MLOCK(descP->writeMtx);
+ if (descP->currentWriterP != NULL)
+ socket_down_writer(env, descP, pid);
+ MUNLOCK(descP->writeMtx);
+
+ MLOCK(descP->readMtx);
+ if (descP->currentReaderP != NULL)
+ socket_down_reader(env, descP, pid);
+ MUNLOCK(descP->readMtx);
+
+ }
+
+ SSDBG( descP, ("SOCKET", "socket_down -> done\r\n") );
+
+}
- if (compare_pids(env, &descP->currentAcceptor.pid, pid)) {
+
+/* *** socket_down_acceptor ***
+ *
+ * Check and then handle a downed acceptor process.
+ *
+ */
+static
+void socket_down_acceptor(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ const ErlNifPid* pid)
+{
+ if (compare_pids(env, &descP->currentAcceptor.pid, pid)) {
+
+ SSDBG( descP, ("SOCKET",
+ "socket_down_acceptor -> "
+ "current acceptor - try pop the queue\r\n") );
+
+ if (acceptor_pop(env, descP,
+ &descP->currentAcceptor.pid,
+ &descP->currentAcceptor.mon,
+ &descP->currentAcceptor.ref)) {
+ int res;
+
+ /* There was another one, so we will still be in accepting state */
+
SSDBG( descP, ("SOCKET",
- "socket_down -> "
- "current acceptor - try pop the queue\r\n") );
+ "socket_down_acceptor -> new (active) acceptor: "
+ "\r\n pid: %T"
+ "\r\n ref: %T"
+ "\r\n",
+ descP->currentAcceptor.pid,
+ descP->currentAcceptor.ref) );
- if (acceptor_pop(env, descP,
- &descP->currentAcceptor.pid,
- &descP->currentAcceptor.mon,
- &descP->currentAcceptor.ref)) {
- int res;
+ if ((res = enif_select(env,
+ descP->sock,
+ (ERL_NIF_SELECT_READ),
+ descP,
+ &descP->currentAcceptor.pid,
+ descP->currentAcceptor.ref) < 0)) {
- /* There was another one, so we will still be in accepting state */
+ esock_warning_msg("Failed select (%d) for new acceptor "
+ "after current (%T) died\r\n",
+ res, *pid);
- SSDBG( descP, ("SOCKET", "socket_down -> new (active) acceptor: "
- "\r\n pid: %T"
- "\r\n ref: %T"
- "\r\n",
- descP->currentAcceptor.pid,
- descP->currentAcceptor.ref) );
+ }
+
+ } else {
+
+ SSDBG( descP, ("SOCKET",
+ "socket_down_acceptor -> no active acceptor\r\n") );
+
+ descP->currentAcceptorP = NULL;
+ descP->state = SOCKET_STATE_LISTENING;
+ }
+
+ } else {
+
+ /* Maybe unqueue one of the waiting acceptors */
+
+ SSDBG( descP, ("SOCKET",
+ "socket_down_acceptor -> "
+ "not current acceptor - maybe a waiting acceptor\r\n") );
+
+ acceptor_unqueue(env, descP, pid);
+ }
+}
+
+
+
+
+/* *** socket_down_writer ***
+ *
+ * Check and then handle a downed writer process.
+ *
+ */
+static
+void socket_down_writer(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ const ErlNifPid* pid)
+{
+ if (compare_pids(env, &descP->currentWriter.pid, pid)) {
+
+ SSDBG( descP, ("SOCKET",
+ "socket_down_writer -> "
+ "current writer - try pop the queue\r\n") );
+
+ if (writer_pop(env, descP,
+ &descP->currentWriter.pid,
+ &descP->currentWriter.mon,
+ &descP->currentWriter.ref)) {
+ int res;
+
+ /* There was another one */
+
+ SSDBG( descP, ("SOCKET", "socket_down_writer -> new (current) writer: "
+ "\r\n pid: %T"
+ "\r\n ref: %T"
+ "\r\n",
+ descP->currentWriter.pid,
+ descP->currentWriter.ref) );
+
+ if ((res = enif_select(env,
+ descP->sock,
+ (ERL_NIF_SELECT_WRITE),
+ descP,
+ &descP->currentWriter.pid,
+ descP->currentWriter.ref) < 0)) {
- if ((res = enif_select(env,
- descP->sock,
- (ERL_NIF_SELECT_READ),
- descP,
- &descP->currentAcceptor.pid,
- descP->currentAcceptor.ref) < 0)) {
-
- esock_warning_msg("Failed select (%d) for new acceptor "
- "after current (%T) died\r\n",
- res, *pid);
-
- }
+ esock_warning_msg("Failed select (%d) for new writer "
+ "after current (%T) died\r\n",
+ res, *pid);
- } else {
+ }
+
+ } else {
+
+ SSDBG( descP, ("SOCKET",
+ "socket_down_writer -> no active writer\r\n") );
+
+ descP->currentWriterP = NULL;
+ }
+
+ } else {
+
+ /* Maybe unqueue one of the waiting writer(s) */
+
+ SSDBG( descP, ("SOCKET",
+ "socket_down_writer -> "
+ "not current writer - maybe a waiting writer\r\n") );
+
+ writer_unqueue(env, descP, pid);
+ }
+}
+
+
+
+
+/* *** socket_down_reader ***
+ *
+ * Check and then handle a downed reader process.
+ *
+ */
+static
+void socket_down_reader(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ const ErlNifPid* pid)
+{
+ if (compare_pids(env, &descP->currentReader.pid, pid)) {
+
+ SSDBG( descP, ("SOCKET",
+ "socket_down_reader -> "
+ "current reader - try pop the queue\r\n") );
+
+ if (reader_pop(env, descP,
+ &descP->currentReader.pid,
+ &descP->currentReader.mon,
+ &descP->currentReader.ref)) {
+ int res;
+
+ /* There was another one */
+
+ SSDBG( descP, ("SOCKET", "socket_down_reader -> new (current) reader: "
+ "\r\n pid: %T"
+ "\r\n ref: %T"
+ "\r\n",
+ descP->currentReader.pid,
+ descP->currentReader.ref) );
+
+ if ((res = enif_select(env,
+ descP->sock,
+ (ERL_NIF_SELECT_READ),
+ descP,
+ &descP->currentReader.pid,
+ descP->currentReader.ref) < 0)) {
- SSDBG( descP, ("SOCKET", "socket_down -> no active acceptor\r\n") );
+ esock_warning_msg("Failed select (%d) for new reader "
+ "after current (%T) died\r\n",
+ res, *pid);
- descP->currentAcceptorP = NULL;
- descP->state = SOCKET_STATE_LISTENING;
}
} else {
- /* Maybe unqueue one of the waiting acceptors */
-
SSDBG( descP, ("SOCKET",
- "socket_down -> "
- "not current acceptor - maybe a waiting acceptor\r\n") );
+ "socket_down_reader -> no active reader\r\n") );
- acceptor_unqueue(env, descP, pid);
+ descP->currentReaderP = NULL;
}
+
+ } else {
+
+ /* Maybe unqueue one of the waiting reader(s) */
+
+ SSDBG( descP, ("SOCKET",
+ "socket_down_reader -> "
+ "not current reader - maybe a waiting reader\r\n") );
+
+ reader_unqueue(env, descP, pid);
}
-
- SSDBG( descP, ("SOCKET", "socket_down -> done\r\n") );
-
}
diff --git a/lib/kernel/test/socket_server.erl b/lib/kernel/test/socket_server.erl
index 9142942428..45adffc5e6 100644
--- a/lib/kernel/test/socket_server.erl
+++ b/lib/kernel/test/socket_server.erl
@@ -922,8 +922,8 @@ send(#handler{socket = Sock, msg = true, type = dgram, stimeout = Timeout},
CMsgHdr = #{level => ip, type => tos, data => reliability},
CMsgHdrs = [CMsgHdr],
MsgHdr = #{addr => Dest,
- iov => [Msg],
- ctrl => CMsgHdrs},
+ ctrl => CMsgHdrs,
+ iov => [Msg]},
%% ok = socket:setopt(Sock, otp, debug, true),
Res = socket:sendmsg(Sock, MsgHdr, Timeout),
%% ok = socket:setopt(Sock, otp, debug, false),