aboutsummaryrefslogtreecommitdiffstats
path: root/erts/emulator/nifs/common/socket_nif.c
diff options
context:
space:
mode:
authorMicael Karlberg <[email protected]>2019-03-14 17:22:48 +0100
committerMicael Karlberg <[email protected]>2019-04-17 16:56:33 +0200
commit1854d05fb73fb7a787a37e9ada9e4cafdc7a7e84 (patch)
tree9d1fa8b5ee97d205bc1b0c4880aa7f2aa59b6336 /erts/emulator/nifs/common/socket_nif.c
parent6a5eda1dc70bbfa8f53b2e2c3e79db1748a01bfd (diff)
downloadotp-1854d05fb73fb7a787a37e9ada9e4cafdc7a7e84.tar.gz
otp-1854d05fb73fb7a787a37e9ada9e4cafdc7a7e84.tar.bz2
otp-1854d05fb73fb7a787a37e9ada9e4cafdc7a7e84.zip
[socket] Make use of the new select (read|write) functions
Make use of the new select functions; enif_select_[read|write], for read and write select. These functions allows us to construct the select message ourseves: {'$socket', Socket, select, Ref} This is in preparations for when we introduce the 'nowait' (or something similar) value for the timeout argument (in accept, read and write funcions). It also solves (we hope) the term leakage problems (it was difficult to free the environment when there was only one/socket). OTP-15496
Diffstat (limited to 'erts/emulator/nifs/common/socket_nif.c')
-rw-r--r--erts/emulator/nifs/common/socket_nif.c880
1 files changed, 512 insertions, 368 deletions
diff --git a/erts/emulator/nifs/common/socket_nif.c b/erts/emulator/nifs/common/socket_nif.c
index 3faa9ac96d..ac1beba344 100644
--- a/erts/emulator/nifs/common/socket_nif.c
+++ b/erts/emulator/nifs/common/socket_nif.c
@@ -654,7 +654,9 @@ typedef union {
* *
* =================================================================== */
+/* Global socket debug */
#define SGDBG( proto ) ESOCK_DBG_PRINTF( data.dbg , proto )
+/* Socket specific debug */
#define SSDBG( __D__ , proto ) ESOCK_DBG_PRINTF( (__D__)->dbg , proto )
@@ -759,14 +761,21 @@ static unsigned long one_value = 1;
typedef struct {
- int is_active;
ErlNifMonitor mon;
+ BOOLEAN_T isActive;
} ESockMonitor;
typedef struct {
- ErlNifPid pid; // PID of the requesting process
- ESockMonitor mon; // Monitor to the requesting process
- ERL_NIF_TERM ref; // The (unique) reference (ID) of the request
+ ErlNifPid pid; // PID of the requesting process
+ ESockMonitor mon; // Monitor to the requesting process
+
+ /* We need an environment for the copy of the ref we store here.
+ * We will also use this environment for any messages we send
+ * (with the ref in it). Such as the select message (used in the
+ * select call) or the abort message.
+ */
+ ErlNifEnv* env;
+ ERL_NIF_TERM ref; // The (unique) reference (ID) of the request
} ESockRequestor;
typedef struct esock_request_queue_element {
@@ -794,8 +803,6 @@ typedef struct {
ESockAddress remote;
unsigned int addrLen;
- ErlNifEnv* env;
-
/* +++ Controller (owner) process +++ */
ErlNifPid ctrlPid;
ESockMonitor ctrlMon;
@@ -985,7 +992,8 @@ static ERL_NIF_TERM nbind(ErlNifEnv* env,
ESockAddress* sockAddrP,
unsigned int addrLen);
static ERL_NIF_TERM nconnect(ErlNifEnv* env,
- ESockDescriptor* descP);
+ ESockDescriptor* descP,
+ ERL_NIF_TERM sockRef);
static ERL_NIF_TERM nlisten(ErlNifEnv* env,
ESockDescriptor* descP,
int backlog);
@@ -995,10 +1003,12 @@ static ERL_NIF_TERM naccept(ErlNifEnv* env,
ERL_NIF_TERM ref);
static ERL_NIF_TERM naccept_listening(ErlNifEnv* env,
ESockDescriptor* descP,
+ ERL_NIF_TERM sockRef,
ERL_NIF_TERM ref);
static ERL_NIF_TERM naccept_listening_error(ErlNifEnv* env,
ESockDescriptor* descP,
- ERL_NIF_TERM ref,
+ ERL_NIF_TERM sockRef,
+ ERL_NIF_TERM accRef,
ErlNifPid caller,
int save_errno);
static ERL_NIF_TERM naccept_listening_accept(ErlNifEnv* env,
@@ -1030,7 +1040,8 @@ static ERL_NIF_TERM naccept_accepting_other(ErlNifEnv* env,
ErlNifPid caller);
static ERL_NIF_TERM naccept_busy_retry(ErlNifEnv* env,
ESockDescriptor* descP,
- ERL_NIF_TERM ref,
+ ERL_NIF_TERM sockRef,
+ ERL_NIF_TERM accRef,
ErlNifPid* pid,
unsigned int nextState);
static BOOLEAN_T naccept_accepted(ErlNifEnv* env,
@@ -2084,6 +2095,7 @@ static ERL_NIF_TERM send_check_fail(ErlNifEnv* env,
static ERL_NIF_TERM send_check_retry(ErlNifEnv* env,
ESockDescriptor* descP,
ssize_t written,
+ ERL_NIF_TERM sockRef,
ERL_NIF_TERM sendRef);
static BOOLEAN_T recv_check_reader(ErlNifEnv* env,
ESockDescriptor* descP,
@@ -2153,9 +2165,11 @@ static ERL_NIF_TERM recv_check_partial_part(ErlNifEnv* env,
ESockDescriptor* descP,
int read,
ErlNifBinary* bufP,
+ ERL_NIF_TERM sockRef,
ERL_NIF_TERM recvRef);
static ERL_NIF_TERM recv_check_retry(ErlNifEnv* env,
ESockDescriptor* descP,
+ ERL_NIF_TERM sockRef,
ERL_NIF_TERM recvRef);
static ERL_NIF_TERM recv_check_fail_gen(ErlNifEnv* env,
ESockDescriptor* descP,
@@ -2323,9 +2337,15 @@ static BOOLEAN_T decode_native_get_opt(ErlNifEnv* env,
// static void encode_bool(BOOLEAN_T val, ERL_NIF_TERM* eVal);
static ERL_NIF_TERM encode_ip_tos(ErlNifEnv* env, int val);
+static void socket_stop_handle_current(ErlNifEnv* env,
+ const char* role,
+ ESockDescriptor* descP,
+ ERL_NIF_TERM sockRef,
+ ESockRequestor* reqP);
static void inform_waiting_procs(ErlNifEnv* env,
- char* role,
+ const char* role,
ESockDescriptor* descP,
+ ERL_NIF_TERM sockRef,
ESockRequestQueue* q,
BOOLEAN_T free,
ERL_NIF_TERM reason);
@@ -2389,12 +2409,6 @@ static void dec_socket(int domain, int type, int protocol);
ACTIVATE_NEXT_FUNCS_DEFS
#undef ACTIVATE_NEXT_FUNC_DEF
-static BOOLEAN_T activate_next(ErlNifEnv* env,
- ESockDescriptor* descP,
- ESockRequestor* reqP,
- ESockRequestQueue* q,
- ERL_NIF_TERM sockRef);
-
/* *** acceptor_search4pid | writer_search4pid | reader_search4pid ***
* *** acceptor_push | writer_push | reader_push ***
* *** acceptor_pop | writer_pop | reader_pop ***
@@ -2487,37 +2501,48 @@ static void socket_down_reader(ErlNifEnv* env,
const ErlNifPid* pid);
static char* esock_send_close_msg(ErlNifEnv* env,
- ESockDescriptor* descP);
-
+ ESockDescriptor* descP,
+ ErlNifPid* pid);
static char* esock_send_abort_msg(ErlNifEnv* env,
ERL_NIF_TERM sockRef,
ERL_NIF_TERM recvRef,
+ ErlNifEnv* msgEnv,
ERL_NIF_TERM reason,
ErlNifPid* pid);
-static char* esock_send_socket_msg(ErlNifEnv* env,
- ERL_NIF_TERM sockRef,
- ERL_NIF_TERM tag,
- ERL_NIF_TERM info,
- ErlNifPid* pid,
- ErlNifEnv* msg_env);
static char* esock_send_msg(ErlNifEnv* env,
- ERL_NIF_TERM msg,
ErlNifPid* pid,
- ErlNifEnv* msg_env);
-
-static ERL_NIF_TERM make_socket_record(ErlNifEnv* env,
- ERL_NIF_TERM sockRef);
+ ERL_NIF_TERM msg,
+ ErlNifEnv* msgEnv);
+
+static ERL_NIF_TERM mk_abort_msg(ErlNifEnv* env,
+ ERL_NIF_TERM sockRef,
+ ERL_NIF_TERM opRef,
+ ERL_NIF_TERM reason);
+static ERL_NIF_TERM mk_close_msg(ErlNifEnv* env,
+ ERL_NIF_TERM sockRef,
+ ERL_NIF_TERM closeRef);
+static ERL_NIF_TERM mk_select_msg(ErlNifEnv* env,
+ ERL_NIF_TERM sockRef,
+ ERL_NIF_TERM selectRef);
+static ERL_NIF_TERM mk_socket_msg(ErlNifEnv* env,
+ ERL_NIF_TERM sockRef,
+ ERL_NIF_TERM tag,
+ ERL_NIF_TERM info);
+static ERL_NIF_TERM mk_socket(ErlNifEnv* env,
+ ERL_NIF_TERM sockRef);
static int esock_select_read(ErlNifEnv* env,
ErlNifEvent event,
void* obj,
const ErlNifPid* pid,
- ERL_NIF_TERM ref);
+ ERL_NIF_TERM sockRef,
+ ERL_NIF_TERM selectRef);
static int esock_select_write(ErlNifEnv* env,
ErlNifEvent event,
void* obj,
const ErlNifPid* pid,
- ERL_NIF_TERM ref);
+ ERL_NIF_TERM sockRef,
+ ERL_NIF_TERM selectRef);
static int esock_select_stop(ErlNifEnv* env,
ErlNifEvent event,
void* obj);
@@ -2869,6 +2894,27 @@ static ErlNifResourceTypeInit socketInit = {
static ESockData data;
+/* These two (inline) functions are primarily intended for debugging,
+ * that is, to make it easy to add debug printouts.
+ */
+static inline void esock_free_env(const char* slogan, ErlNifEnv* env)
+{
+ SGDBG( ("SOCKET", "env free - %s: 0x%lX\r\n", slogan, env) );
+
+ if (env != NULL) enif_free_env(env);
+}
+
+
+static inline ErlNifEnv* esock_alloc_env(const char* slogan)
+{
+ ErlNifEnv* env = enif_alloc_env();
+
+ SGDBG( ("SOCKET", "env alloc - %s: 0x%lX\r\n", slogan, env) );
+
+ return env;
+}
+
+
/* ----------------------------------------------------------------------
* N I F F u n c t i o n s
* ----------------------------------------------------------------------
@@ -4684,15 +4730,16 @@ ERL_NIF_TERM nif_connect(ErlNifEnv* env,
return enif_raise_exception(env, MKA(env, "notsup"));
#else
ESockDescriptor* descP;
- ERL_NIF_TERM res, eSockAddr;
+ ERL_NIF_TERM res, eSockAddr, sockRef;
char* xres;
SGDBG( ("SOCKET", "nif_connect -> entry with argc: %d\r\n", argc) );
/* Extract arguments and perform preliminary validation */
+ sockRef = argv[0];
if ((argc != 2) ||
- !enif_get_resource(env, argv[0], sockets, (void**) &descP)) {
+ !enif_get_resource(env, sockRef, sockets, (void**) &descP)) {
return enif_make_badarg(env);
}
eSockAddr = argv[1];
@@ -4704,16 +4751,24 @@ ERL_NIF_TERM nif_connect(ErlNifEnv* env,
"\r\n", descP->sock, argv[0], eSockAddr) );
if ((xres = esock_decode_sockaddr(env, eSockAddr,
- &descP->remote, &descP->addrLen)) != NULL) {
+ &descP->remote,
+ &descP->addrLen)) != NULL) {
return esock_make_error_str(env, xres);
}
+ /* Only a *!%&$*# would send an opened but non-connected socket
+ * somewhere (before its actually usable), but just to be on the
+ * safe side we do the best we can to avoid complications...
+ */
+
MLOCK(descP->readMtx);
MLOCK(descP->writeMtx);
+ MLOCK(descP->cfgMtx);
- res = nconnect(env, descP);
+ res = nconnect(env, descP, sockRef);
+ MUNLOCK(descP->cfgMtx);
MUNLOCK(descP->writeMtx);
MUNLOCK(descP->readMtx);
@@ -4726,7 +4781,8 @@ ERL_NIF_TERM nif_connect(ErlNifEnv* env,
#if !defined(__WIN32__)
static
ERL_NIF_TERM nconnect(ErlNifEnv* env,
- ESockDescriptor* descP)
+ ESockDescriptor* descP,
+ ERL_NIF_TERM sockRef)
{
ERL_NIF_TERM res, ref;
int code, sres, save_errno = 0;
@@ -4771,7 +4827,8 @@ ERL_NIF_TERM nconnect(ErlNifEnv* env,
(save_errno == EINPROGRESS))) { /* Unix & OSE!! */
ref = MKREF(env);
descP->state = SOCKET_STATE_CONNECTING;
- if ((sres = esock_select_write(env, descP->sock, descP, NULL, ref)) < 0) {
+ if ((sres = esock_select_write(env, descP->sock, descP, NULL,
+ sockRef, ref)) < 0) {
res = esock_make_error(env,
MKT2(env,
esock_atom_select_failed,
@@ -4801,6 +4858,9 @@ ERL_NIF_TERM nconnect(ErlNifEnv* env,
*
* Description:
* Make socket ready for input and output.
+ * This function is called if we where made to wait when we called the
+ * nif_connect function (we made a select, and the select message has
+ * now been received).
*
* Arguments:
* Socket (ref) - Points to the socket descriptor.
@@ -5056,7 +5116,7 @@ ERL_NIF_TERM naccept(ErlNifEnv* env,
switch (descP->state) {
case SOCKET_STATE_LISTENING:
- res = naccept_listening(env, descP, ref);
+ res = naccept_listening(env, descP, sockRef, ref);
break;
case SOCKET_STATE_ACCEPTING:
@@ -5074,13 +5134,15 @@ ERL_NIF_TERM naccept(ErlNifEnv* env,
/* *** naccept_listening ***
- * We have no active acceptor (and no acceptors in queue).
+ *
+ * We have no active acceptor (and therefor no acceptors in queue).
*/
#if !defined(__WIN32__)
static
ERL_NIF_TERM naccept_listening(ErlNifEnv* env,
ESockDescriptor* descP,
- ERL_NIF_TERM ref)
+ ERL_NIF_TERM sockRef,
+ ERL_NIF_TERM accRef)
{
ESockAddress remote;
unsigned int n;
@@ -5106,7 +5168,8 @@ ERL_NIF_TERM naccept_listening(ErlNifEnv* env,
("SOCKET",
"naccept_listening -> accept failed (%d)\r\n", save_errno) );
- res = naccept_listening_error(env, descP, ref, caller, save_errno);
+ res = naccept_listening_error(env, descP, sockRef, accRef,
+ caller, save_errno);
} else {
@@ -5125,6 +5188,7 @@ ERL_NIF_TERM naccept_listening(ErlNifEnv* env,
/* *** naccept_listening_error ***
+ *
* The accept call resultet in an error - handle it.
* There are only two cases:
* 1) BLOCK => Attempt a "retry"
@@ -5133,7 +5197,8 @@ ERL_NIF_TERM naccept_listening(ErlNifEnv* env,
static
ERL_NIF_TERM naccept_listening_error(ErlNifEnv* env,
ESockDescriptor* descP,
- ERL_NIF_TERM ref,
+ ERL_NIF_TERM sockRef,
+ ERL_NIF_TERM accRef,
ErlNifPid caller,
int save_errno)
{
@@ -5142,21 +5207,26 @@ ERL_NIF_TERM naccept_listening_error(ErlNifEnv* env,
if (save_errno == ERRNO_BLOCK) {
/* *** Try again later *** */
- SSDBG( descP, ("SOCKET", "naccept_listening_error -> would block\r\n") );
+
+ SSDBG( descP,
+ ("SOCKET", "naccept_listening_error -> would block\r\n") );
descP->currentAcceptor.pid = caller;
if (MONP("naccept_listening -> current acceptor",
env, descP,
&descP->currentAcceptor.pid,
- &descP->currentAcceptor.mon) != 0)
- return esock_make_error(env, atom_exmon);
-
- descP->currentAcceptor.ref = enif_make_copy(descP->env, ref);
- descP->currentAcceptorP = &descP->currentAcceptor;
-
- res = naccept_busy_retry(env, descP, ref, NULL, SOCKET_STATE_ACCEPTING);
-
-
+ &descP->currentAcceptor.mon) != 0) {
+ enif_set_pid_undefined(&descP->currentAcceptor.pid);
+ res = esock_make_error(env, atom_exmon);
+ } else {
+ descP->currentAcceptor.env = esock_alloc_env("current acceptor");
+ descP->currentAcceptor.ref = CP_TERM(descP->currentAcceptor.env,
+ accRef);
+ descP->currentAcceptorP = &descP->currentAcceptor;
+ res = naccept_busy_retry(env, descP,
+ sockRef, descP->currentAcceptor.ref,
+ NULL, SOCKET_STATE_ACCEPTING);
+ }
} else {
SSDBG( descP,
("SOCKET",
@@ -5169,6 +5239,7 @@ ERL_NIF_TERM naccept_listening_error(ErlNifEnv* env,
/* *** naccept_listening_accept ***
+ *
* The accept call was successful (accepted) - handle the new connection.
*/
static
@@ -5189,6 +5260,7 @@ ERL_NIF_TERM naccept_listening_accept(ErlNifEnv* env,
/* *** naccept_accepting ***
+ *
* We have an active acceptor and possibly acceptors waiting in queue.
* If the pid of the calling process is not the pid of the "current process",
* push the requester onto the (acceptor) queue.
@@ -5298,14 +5370,6 @@ ERL_NIF_TERM naccept_accepting_current_accept(ErlNifEnv* env,
if (naccept_accepted(env, descP, accSock,
descP->currentAcceptor.pid, remote, &res)) {
- /* We should really go through the queue until we succeed to activate
- * a waiting acceptor. For now we just pop once and hope for the best...
- * This will leave any remaining acceptors *hanging*...
- *
- * We need a "activate-next" function.
- *
- */
-
if (!activate_next_acceptor(env, descP, sockRef)) {
SSDBG( descP,
@@ -5318,7 +5382,11 @@ ERL_NIF_TERM naccept_accepting_current_accept(ErlNifEnv* env,
descP->currentAcceptorP = NULL;
descP->currentAcceptor.ref = esock_atom_undefined;
enif_set_pid_undefined(&descP->currentAcceptor.pid);
- esock_monitor_init(&descP->currentAcceptor.mon);
+ esock_free_env("naccept_accepting_current_accept - "
+ "current-accept-env",
+ descP->currentAcceptor.env);
+ descP->currentAcceptor.env = NULL;
+ MON_INIT(&descP->currentAcceptor.mon);
}
}
@@ -5354,7 +5422,8 @@ ERL_NIF_TERM naccept_accepting_current_error(ErlNifEnv* env,
"naccept_accepting_current_error -> "
"would block: try again\r\n") );
- res = naccept_busy_retry(env, descP, opRef, &descP->currentAcceptor.pid,
+ res = naccept_busy_retry(env, descP, sockRef, opRef,
+ &descP->currentAcceptor.pid,
/* No state change */
descP->state);
@@ -5367,7 +5436,8 @@ ERL_NIF_TERM naccept_accepting_current_error(ErlNifEnv* env,
SSDBG( descP,
("SOCKET", "naccept_accepting_current_error -> abort %T\r\n",
req.pid) );
- esock_send_abort_msg(env, sockRef, req.ref, reason, &req.pid);
+ esock_send_abort_msg(env, sockRef, req.ref, req.env,
+ reason, &req.pid);
DEMONP("naccept_accepting_current_error -> pop'ed writer",
env, descP, &req.mon);
}
@@ -5403,25 +5473,28 @@ ERL_NIF_TERM naccept_accepting_other(ErlNifEnv* env,
/* *** naccept_busy_retry ***
+ *
* Perform a retry select. If successful, set nextState.
*/
#if !defined(__WIN32__)
static
ERL_NIF_TERM naccept_busy_retry(ErlNifEnv* env,
ESockDescriptor* descP,
- ERL_NIF_TERM ref,
+ ERL_NIF_TERM sockRef,
+ ERL_NIF_TERM accRef, // Not needed
ErlNifPid* pid,
unsigned int nextState)
{
int sres;
ERL_NIF_TERM res, reason;
- if ((sres = esock_select_read(env, descP->sock, descP, pid, ref)) < 0) {
+ if ((sres = esock_select_read(env, descP->sock, descP, pid,
+ sockRef, accRef)) < 0) {
reason = MKT2(env, esock_atom_select_failed, MKI(env, sres));
res = esock_make_error(env, reason);
} else {
descP->state = nextState;
- res = esock_make_error(env, esock_atom_eagain);
+ res = esock_make_error(env, esock_atom_eagain); // OK!!
}
return res;
@@ -5430,6 +5503,7 @@ ERL_NIF_TERM naccept_busy_retry(ErlNifEnv* env,
/* *** naccept_accepted ***
+ *
* Generic function handling a successful accept.
*/
static
@@ -5469,7 +5543,7 @@ BOOLEAN_T naccept_accepted(ErlNifEnv* env,
accDescP->rBufSz = descP->rBufSz; // Inherit buffer size
accDescP->rNum = descP->rNum; // Inherit buffer uses
accDescP->rNumCnt = 0;
- accDescP->rCtrlSz = descP->rCtrlSz; // Inherit buffer siez
+ accDescP->rCtrlSz = descP->rCtrlSz; // Inherit buffer size
accDescP->wCtrlSz = descP->wCtrlSz; // Inherit buffer size
accRef = enif_make_resource(env, accDescP);
@@ -5481,6 +5555,7 @@ BOOLEAN_T naccept_accepted(ErlNifEnv* env,
&accDescP->ctrlPid,
&accDescP->ctrlMon) != 0) {
sock_close(accSock);
+ enif_set_pid_undefined(&descP->ctrlPid);
*result = esock_make_error(env, atom_exmon);
return FALSE;
}
@@ -6668,8 +6743,8 @@ BOOLEAN_T nclose_check(ErlNifEnv* env,
} else {
- /* Monitor the caller, since we should complete this operation even if
- * the caller dies (for whatever reason).
+ /* Monitor the caller, since we should complete this
+ * operation even if the caller dies (for whatever reason).
*
* <KOLLA>
*
@@ -6719,8 +6794,9 @@ ERL_NIF_TERM nclose_do(ErlNifEnv* env,
int sres;
ERL_NIF_TERM reply, reason;
- descP->closeEnv = enif_alloc_env();
+ descP->closeEnv = esock_alloc_env("nclose-do - close-env");
descP->closeRef = MKREF(descP->closeEnv);
+
sres = esock_select_stop(env, descP->sock, descP);
if (sres & ERL_NIF_SELECT_STOP_CALLED) {
@@ -13883,7 +13959,7 @@ ERL_NIF_TERM send_check_result(ErlNifEnv* env,
SSDBG( descP, ("SOCKET", "send_check_result -> try again\r\n") );
- res = send_check_retry(env, descP, written, sendRef);
+ res = send_check_retry(env, descP, written, sockRef, sendRef);
}
@@ -13896,7 +13972,7 @@ ERL_NIF_TERM send_check_result(ErlNifEnv* env,
"not entire package written (%d of %d)\r\n",
written, dataSize) );
- res = send_check_retry(env, descP, written, sendRef);
+ res = send_check_retry(env, descP, written, sockRef, sendRef);
}
@@ -13920,18 +13996,23 @@ ERL_NIF_TERM send_check_ok(ErlNifEnv* env,
cnt_inc(&descP->writePkgCnt, 1);
cnt_inc(&descP->writeByteCnt, written);
- if (descP->currentWriterP != NULL)
+ if (descP->currentWriterP != NULL) {
DEMONP("send_check_ok -> current writer",
env, descP, &descP->currentWriter.mon);
+ esock_free_env("send_check_ok", descP->currentWriter.env);
+ }
SSDBG( descP,
("SOCKET", "send_check_ok -> "
"everything written (%d,%d) - done\r\n", dataSize, written) );
- /* Ok, this write is done maybe activate the next (if any) */
+ /*
+ * Ok, this write is done maybe activate the next (if any)
+ */
if (!activate_next_writer(env, descP, sockRef)) {
descP->currentWriterP = NULL;
+ descP->currentWriter.env = NULL;
descP->currentWriter.ref = esock_atom_undefined;
enif_set_pid_undefined(&descP->currentWriter.pid);
esock_monitor_init(&descP->currentWriter.mon);
@@ -13970,7 +14051,8 @@ ERL_NIF_TERM send_check_fail(ErlNifEnv* env,
while (writer_pop(env, descP, &req)) {
SSDBG( descP,
("SOCKET", "send_check_fail -> abort %T\r\n", req.pid) );
- esock_send_abort_msg(env, sockRef, req.ref, reason, &req.pid);
+ esock_send_abort_msg(env, sockRef, req.ref, req.env,
+ reason, &req.pid);
DEMONP("send_check_fail -> pop'ed writer", env, descP, &req.mon);
}
}
@@ -13992,6 +14074,7 @@ static
ERL_NIF_TERM send_check_retry(ErlNifEnv* env,
ESockDescriptor* descP,
ssize_t written,
+ ERL_NIF_TERM sockRef,
ERL_NIF_TERM sendRef)
{
int sres;
@@ -14007,17 +14090,19 @@ ERL_NIF_TERM send_check_retry(ErlNifEnv* env,
if (MONP("send_check_retry -> current writer",
env, descP,
&descP->currentWriter.pid,
- &descP->currentWriter.mon) != 0)
+ &descP->currentWriter.mon) != 0) {
+ enif_set_pid_undefined(&descP->currentWriter.pid);
return esock_make_error(env, atom_exmon);
-
- descP->currentWriter.ref = enif_make_copy(descP->env, sendRef);
- descP->currentWriterP = &descP->currentWriter;
-
+ } else {
+ descP->currentWriter.env = esock_alloc_env("current-writer");
+ descP->currentWriter.ref = CP_TERM(descP->currentWriter.env, sendRef);
+ descP->currentWriterP = &descP->currentWriter;
+ }
}
cnt_inc(&descP->writeWaits, 1);
- sres = esock_select_write(env, descP->sock, descP, NULL, sendRef);
+ sres = esock_select_write(env, descP->sock, descP, NULL, sockRef, sendRef);
if (written >= 0) {
@@ -14062,6 +14147,9 @@ ERL_NIF_TERM send_check_retry(ErlNifEnv* env,
* Checks if we have a current reader and if that is us. If not,
* then we must be made to wait for our turn. This is done by pushing
* us unto the reader queue.
+ * Note that we do *not* actually initiate the currentReader structure
+ * here, since we do not actually know yet if we need to! We do that in
+ * the [recv|recvfrom|recvmsg]_check_result function.
*/
static
BOOLEAN_T recv_check_reader(ErlNifEnv* env,
@@ -14132,10 +14220,26 @@ char* recv_init_current_reader(ErlNifEnv* env,
env, descP,
&descP->currentReader.pid,
&descP->currentReader.mon) != 0) {
+ enif_set_pid_undefined(&descP->currentReader.pid);
return str_exmon;
+ } else {
+
+ descP->currentReader.env = esock_alloc_env("current-reader");
+ descP->currentReader.ref = CP_TERM(descP->currentReader.env,
+ recvRef);
+ descP->currentReaderP = &descP->currentReader;
}
- descP->currentReader.ref = enif_make_copy(descP->env, recvRef);
- descP->currentReaderP = &descP->currentReader;
+ } else {
+
+ /*
+ * This is a retry:
+ * We have done, for instance, recv(Sock, X), but only received Y < X.
+ * We then call recv again with size = X-Y. So, we then get a new ref.
+ *
+ * Make use of the existing environment
+ */
+
+ descP->currentReader.ref = CP_TERM(descP->currentReader.env, recvRef);
}
return NULL;
@@ -14159,9 +14263,13 @@ ERL_NIF_TERM recv_update_current_reader(ErlNifEnv* env,
if (descP->currentReaderP != NULL) {
- DEMONP("recv_update_current_reader -> current reader",
+ DEMONP("recv_update_current_reader",
env, descP, &descP->currentReader.mon);
-
+
+ esock_free_env("recv_update_current_reader - current-read-env",
+ descP->currentReader.env);
+ descP->currentReader.env = NULL;
+
if (!activate_next_reader(env, descP, sockRef)) {
SSDBG( descP,
@@ -14205,7 +14313,8 @@ void recv_error_current_reader(ErlNifEnv* env,
SSDBG( descP,
("SOCKET", "recv_error_current_reader -> abort %T\r\n",
req.pid) );
- esock_send_abort_msg(env, sockRef, req.ref, reason, &req.pid);
+ esock_send_abort_msg(env, sockRef, req.ref, req.env,
+ reason, &req.pid);
DEMONP("recv_error_current_reader -> pop'ed reader",
env, descP, &req.mon);
}
@@ -14317,7 +14426,8 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env,
* This function is called if we filled the allocated buffer.
* But are we done yet?
*
- * toRead = 0 means: Give me everything you have
+ * toRead = 0 means: Give me everything you have => maybe
+ * toRead > 0 means: Yes
*/
static
ERL_NIF_TERM recv_check_full(ErlNifEnv* env,
@@ -14442,7 +14552,7 @@ ERL_NIF_TERM recv_check_full_maybe_done(ErlNifEnv* env,
/* *** recv_check_full_done ***
*
- * A successful and full (that is, the buffer was filled) recv.
+ * A successful recv and we filled the buffer.
*/
static
ERL_NIF_TERM recv_check_full_done(ErlNifEnv* env,
@@ -14498,7 +14608,7 @@ ERL_NIF_TERM recv_check_fail(ErlNifEnv* env,
SSDBG( descP, ("SOCKET", "recv_check_fail -> eagain\r\n") );
- res = recv_check_retry(env, descP, recvRef);
+ res = recv_check_retry(env, descP, sockRef, recvRef);
} else {
@@ -14565,11 +14675,12 @@ ERL_NIF_TERM recv_check_fail_closed(ErlNifEnv* env,
static
ERL_NIF_TERM recv_check_retry(ErlNifEnv* env,
ESockDescriptor* descP,
+ ERL_NIF_TERM sockRef,
ERL_NIF_TERM recvRef)
{
int sres;
char* xres;
- ERL_NIF_TERM res;
+ ERL_NIF_TERM reason;
descP->rNumCnt = 0;
if ((xres = recv_init_current_reader(env, descP, recvRef)) != NULL)
@@ -14577,16 +14688,17 @@ ERL_NIF_TERM recv_check_retry(ErlNifEnv* env,
SSDBG( descP, ("SOCKET", "recv_check_retry -> SELECT for more\r\n") );
- if ((sres = esock_select_read(env, descP->sock, descP,
- NULL, recvRef)) < 0) {
- res = esock_make_error(env,
- MKT2(env,
- esock_atom_select_failed, MKI(env, sres)));
+ if ((sres = esock_select_read(env, descP->sock, descP, NULL,
+ sockRef, descP->currentReader.ref)) < 0) {
+ /* Ouch
+ * Now what? We have copied ref into *its own* environment!
+ */
+ reason = MKT2(env, esock_atom_select_failed, MKI(env, sres));
} else {
- res = esock_make_error(env, esock_atom_eagain);
+ reason = esock_atom_eagain;
}
- return res;
+ return esock_make_error(env, reason);
}
@@ -14644,7 +14756,7 @@ ERL_NIF_TERM recv_check_partial(ErlNifEnv* env,
SSDBG( descP, ("SOCKET", "recv_check_partial -> [%d] "
"only part of message - expect more\r\n", toRead) );
- res = recv_check_partial_part(env, descP, read, bufP, recvRef);
+ res = recv_check_partial_part(env, descP, read, bufP, sockRef, recvRef);
}
return res;
@@ -14695,6 +14807,7 @@ ERL_NIF_TERM recv_check_partial_part(ErlNifEnv* env,
ESockDescriptor* descP,
int read,
ErlNifBinary* bufP,
+ ERL_NIF_TERM sockRef,
ERL_NIF_TERM recvRef)
{
ERL_NIF_TERM res, reason, data;
@@ -14713,7 +14826,8 @@ ERL_NIF_TERM recv_check_partial_part(ErlNifEnv* env,
/* SELECT for more data */
- sres = esock_select_read(env, descP->sock, descP, NULL, recvRef);
+ sres = esock_select_read(env, descP->sock, descP, NULL,
+ sockRef, descP->currentReader.ref);
if (sres < 0) {
/* Result: {error, Reason}
* Reason: {select_failed, sres, data}
@@ -16591,10 +16705,14 @@ ESockDescriptor* alloc_descriptor(SOCKET sock, HANDLE event)
char buf[64]; /* Buffer used for building the mutex name */
// This needs to be released when the socket is closed!
- descP->env = enif_alloc_env();
+ // descP->env = enif_alloc_env();
sprintf(buf, "esock[w,%d]", sock);
descP->writeMtx = MCREATE(buf);
+ enif_set_pid_undefined(&descP->currentWriter.pid);
+ MON_INIT(&descP->currentWriter.mon);
+ descP->currentWriter.env = NULL;
+ descP->currentWriter.ref = esock_atom_undefined;
descP->currentWriterP = NULL; // currentWriter not used
descP->writersQ.first = NULL;
descP->writersQ.last = NULL;
@@ -16607,6 +16725,10 @@ ESockDescriptor* alloc_descriptor(SOCKET sock, HANDLE event)
sprintf(buf, "esock[r,%d]", sock);
descP->readMtx = MCREATE(buf);
+ enif_set_pid_undefined(&descP->currentReader.pid);
+ MON_INIT(&descP->currentReader.mon);
+ descP->currentReader.env = NULL;
+ descP->currentReader.ref = esock_atom_undefined;
descP->currentReaderP = NULL; // currentReader not used
descP->readersQ.first = NULL;
descP->readersQ.last = NULL;
@@ -16618,6 +16740,10 @@ ESockDescriptor* alloc_descriptor(SOCKET sock, HANDLE event)
sprintf(buf, "esock[acc,%d]", sock);
descP->accMtx = MCREATE(buf);
+ enif_set_pid_undefined(&descP->currentAcceptor.pid);
+ MON_INIT(&descP->currentAcceptor.mon);
+ descP->currentAcceptor.env = NULL;
+ descP->currentAcceptor.ref = esock_atom_undefined;
descP->currentAcceptorP = NULL; // currentAcceptor not used
descP->acceptorsQ.first = NULL;
descP->acceptorsQ.last = NULL;
@@ -16626,6 +16752,8 @@ ESockDescriptor* alloc_descriptor(SOCKET sock, HANDLE event)
descP->closeMtx = MCREATE(buf);
descP->closeEnv = NULL;
descP->closeRef = esock_atom_undefined;
+ enif_set_pid_undefined(&descP->closerPid);
+ MON_INIT(&descP->closerMon);
sprintf(buf, "esock[cfg,%d]", sock);
descP->cfgMtx = MCREATE(buf);
@@ -16640,11 +16768,9 @@ ESockDescriptor* alloc_descriptor(SOCKET sock, HANDLE event)
descP->sock = sock;
descP->event = event;
- MON_INIT(&descP->currentWriter.mon);
- MON_INIT(&descP->currentReader.mon);
- MON_INIT(&descP->currentAcceptor.mon);
+ enif_set_pid_undefined(&descP->ctrlPid);
MON_INIT(&descP->ctrlMon);
- MON_INIT(&descP->closerMon);
+
}
return descP;
@@ -17166,21 +17292,23 @@ size_t my_strnlen(const char *s, size_t maxlen)
*/
static
char* esock_send_close_msg(ErlNifEnv* env,
- ESockDescriptor* descP)
+ ESockDescriptor* descP,
+ ErlNifPid* pid)
{
- ERL_NIF_TERM sr = ((descP->closeEnv != NULL) ?
- enif_make_copy(descP->closeEnv, sockRef) :
- sockRef);
- char* res = esock_send_socket_msg(env,
- sr,
- esock_atom_close,
- descP->closeRef,
- &descP->closerPid,
- descP->closeEnv);
-
- descP->closeEnv = NULL;
-
- return res;
+ ERL_NIF_TERM sockRef, msg;
+ ErlNifEnv* menv;
+
+ if (descP->closeEnv != NULL) {
+ sockRef = enif_make_resource(descP->closeEnv, descP);
+ msg = mk_close_msg(descP->closeEnv, sockRef, descP->closeRef);
+ menv = descP->closeEnv;
+ } else {
+ sockRef = enif_make_resource(env, descP);
+ msg = mk_close_msg(env, sockRef, descP->closeRef);
+ menv = NULL; // This has the effect that the message will be copied
+ }
+
+ return esock_send_msg(env, pid, msg, menv);
}
@@ -17195,93 +17323,126 @@ char* esock_send_close_msg(ErlNifEnv* env,
static
char* esock_send_abort_msg(ErlNifEnv* env,
ERL_NIF_TERM sockRef,
- ERL_NIF_TERM recvRef,
+ ERL_NIF_TERM opRef,
+ ErlNifEnv* msgEnv,
ERL_NIF_TERM reason,
ErlNifPid* pid)
{
- ErlNifEnv* msg_env = enif_alloc_env();
- ERL_NIF_TERM info = MKT2(msg_env,
- enif_make_copy(msg_env, recvRef),
- enif_make_copy(msg_env, reason));
+ ERL_NIF_TERM msg = mk_abort_msg(msgEnv,
+ /* sockRef not in msgEnv so copy */
+ CP_TERM(msgEnv, sockRef),
+ opRef, reason);
+
+ return esock_send_msg(env, pid, msg, msgEnv);
+}
+
+
+/* Send a message to the specified process.
+ */
+static
+char* esock_send_msg(ErlNifEnv* env,
+ ErlNifPid* pid,
+ ERL_NIF_TERM msg,
+ ErlNifEnv* msgEnv)
+{
+ int res = enif_send(env, pid, msgEnv, msg);
+ if (msgEnv)
+ esock_free_env("esock_msg_send - msg-env", msgEnv);
- return esock_send_socket_msg(env, sockRef, esock_atom_abort, info, pid,
- msg_env);
+ if (!res)
+ return str_exsend;
+ else
+ return NULL;
}
+#endif // #if defined(__WIN32__)
+
-/* *** esock_send_socket_msg ***
+/* *** mk_abort_msg ***
*
- * This function sends a general purpose socket message to an erlang
- * process. A general 'socket' message has the ("erlang") form:
+ * Create the abort message, which has the following form:
*
- * {'$socket', Socket, Tag, Info}
+ * {'$socket', Socket, abort, {OpRef, Reason}}
*
- * Where
+ * This message is for processes that are waiting in the
+ * erlang API functions for a select (or this) message.
+ */
+static
+ERL_NIF_TERM mk_abort_msg(ErlNifEnv* env,
+ ERL_NIF_TERM sockRef,
+ ERL_NIF_TERM opRef,
+ ERL_NIF_TERM reason)
+{
+ ERL_NIF_TERM info = MKT2(env, opRef, reason);
+
+ return mk_socket_msg(env, sockRef, esock_atom_abort, info);
+}
+
+
+/* *** mk_close_msg ***
*
- * Socket: #socket{ref = SockRef} ({socket, SockRef})
- * SockRef: reference()
- * Tag: atom()
- * Info: term()
+ * Construct a close (socket) message. It has the form:
+ *
+ * {'$socket', Socket, close, closeRef}
*
*/
-
static
-char* esock_send_socket_msg(ErlNifEnv* env,
- ERL_NIF_TERM sockRef,
- ERL_NIF_TERM tag,
- ERL_NIF_TERM info,
- ErlNifPid* pid,
- ErlNifEnv* msgEnv)
+ERL_NIF_TERM mk_close_msg(ErlNifEnv* env,
+ ERL_NIF_TERM sockRef,
+ ERL_NIF_TERM closeRef)
{
- ErlNifEnv* menv;
- ERL_NIF_TERM msg, msock, mtag, minfo;
+ return mk_socket_msg(env, sockRef, esock_atom_close, closeRef);
+}
- if (msgEnv == NULL) {
- menv = enif_alloc_env();
- msock = make_socket_record(menv, enif_make_copy(menv, sockRef));
- mtag = enif_make_copy(menv, tag);
- minfo = enif_make_copy(menv, info);
- } else {
- menv = msgEnv;
- msock = make_socket_record(menv, sockRef);
- mtag = tag;
- minfo = info;
- }
- msg = MKT4(menv, esock_atom_socket_tag, msock, mtag, minfo);
- return esock_send_msg(env, msg, pid, menv);
+/* *** mk_select_msg ***
+ *
+ * Construct a select (socket) message. It has the form:
+ *
+ * {'$socket', Socket, select, selectRef}
+ *
+ */
+static
+ERL_NIF_TERM mk_select_msg(ErlNifEnv* env,
+ ERL_NIF_TERM sockRef,
+ ERL_NIF_TERM selectRef)
+{
+ return mk_socket_msg(env, sockRef, atom_select, selectRef);
}
-/* Send a message to the specified process.
+/* *** mk_socket_msg ***
+ *
+ * Construct the socket message:
+ *
+ * {'$socket', Socket, Tag, Info}
+ *
+ * Socket :: socket() (#socket{})
+ * Tag :: atom()
+ * Info :: term()
+ *
*/
static
-char* esock_send_msg(ErlNifEnv* env,
- ERL_NIF_TERM msg,
- ErlNifPid* pid,
- ErlNifEnv* msg_env)
+ERL_NIF_TERM mk_socket_msg(ErlNifEnv* env,
+ ERL_NIF_TERM sockRef,
+ ERL_NIF_TERM tag,
+ ERL_NIF_TERM info)
{
- int res = enif_send(env, pid, msg_env, msg);
- if (msg_env)
- enif_free_env(msg_env);
+ ERL_NIF_TERM socket = mk_socket(env, sockRef);
- if (!res)
- return str_exsend;
- else
- return NULL;
+ return MKT4(env, esock_atom_socket_tag, socket, tag, info);
}
-#endif // #if defined(__WIN32__)
-/* *** make_socket_record ***
+/* *** mk_socket ***
*
* Simple utility function that construct the socket resord:
*
* #socket{ref = SockRef} => {socket, SockRef :: reference()}
*/
static
-ERL_NIF_TERM make_socket_record(ErlNifEnv* env,
- ERL_NIF_TERM sockRef)
+ERL_NIF_TERM mk_socket(ErlNifEnv* env,
+ ERL_NIF_TERM sockRef)
{
return MKT2(env, esock_atom_socket, sockRef);
}
@@ -17293,25 +17454,53 @@ ERL_NIF_TERM make_socket_record(ErlNifEnv* env,
* ----------------------------------------------------------------------
*/
+/* *** esock_select_read ***
+ *
+ * Perform a read select. When the select is triggered, a 'select'
+ * message (see mk_select_msg) will be sent.
+ *
+ * There are two ways to handle the select message:
+ * 1) Create "your own" environment and create the message using it
+ * and then pass it on to the select function.
+ * 2) Or, to create the message using any available environment,
+ * and then pass a NULL pointer to the select function.
+ * This will have the effect that the select function will
+ * create its own environment and then copy the message to it.
+ * We choose the second alternative.
+ */
static
int esock_select_read(ErlNifEnv* env,
- ErlNifEvent event,
- void* obj,
- const ErlNifPid* pid,
- ERL_NIF_TERM ref)
+ ErlNifEvent event, // The file descriptor
+ void* obj, // The socket descriptor object
+ const ErlNifPid* pid, // Destination
+ ERL_NIF_TERM sockRef, // Socket
+ ERL_NIF_TERM selectRef) // "ID" of the operation
{
- return enif_select(env, event, (ERL_NIF_SELECT_READ), obj, pid, ref);
+ ERL_NIF_TERM selectMsg = mk_select_msg(env, sockRef, selectRef);
+
+ return enif_select_read(env, event, obj, pid, selectMsg, NULL);
+
}
+/* *** esock_select_write ***
+ *
+ * Perform a write select. When the select is triggered, a 'select'
+ * message (see mk_select_msg) will be sent.
+ * The sockRef is copied to the msgEnv when the socket message is created,
+ * so no need to do that here, but the selectRef needs to be copied.
+ */
static
int esock_select_write(ErlNifEnv* env,
- ErlNifEvent event,
- void* obj,
- const ErlNifPid* pid,
- ERL_NIF_TERM ref)
+ ErlNifEvent event, // The file descriptor
+ void* obj, // The socket descriptor
+ const ErlNifPid* pid, // Destination
+ ERL_NIF_TERM sockRef, // Socket
+ ERL_NIF_TERM selectRef) // "ID" of the operation
{
- return enif_select(env, event, (ERL_NIF_SELECT_WRITE), obj, pid, ref);
+ ERL_NIF_TERM selectMsg = mk_select_msg(env, sockRef, selectRef);
+
+ return enif_select_write(env, event, obj, pid, selectMsg, NULL);
}
@@ -17354,84 +17543,78 @@ int esock_select_cancel(ErlNifEnv* env,
ACTIVATE_NEXT_FUNC_DECL(writer, write, currentWriter, writersQ) \
ACTIVATE_NEXT_FUNC_DECL(reader, read, currentReader, readersQ)
-#define ACTIVATE_NEXT_FUNC_DECL(F, R, Q) \
- static \
- BOOLEAN_T activate_next_##F(ErlNifEnv* env, \
- ESockDescriptor* descP, \
- ERL_NIF_TERM sockRef) \
- { \
- return activate_next(env, descP, \
- &descP->R, &descP->Q, \
- sockRef); \
+#define ACTIVATE_NEXT_FUNC_DECL(F, S, R, Q) \
+ static \
+ BOOLEAN_T activate_next_##F(ErlNifEnv* env, \
+ ESockDescriptor* descP, \
+ ERL_NIF_TERM sockRef) \
+ { \
+ BOOLEAN_T popped, activated; \
+ int sres; \
+ ERL_NIF_TERM reason; \
+ ESockRequestor* reqP = &descP->R; \
+ ESockRequestQueue* q = &descP->Q; \
+ \
+ popped = FALSE; \
+ do { \
+ \
+ if (requestor_pop(q, reqP)) { \
+ \
+ /* There was another one */ \
+ \
+ SSDBG( descP, \
+ ("SOCKET", \
+ "activate_next_" #F " -> new (active) requestor: " \
+ "\r\n pid: %T" \
+ "\r\n ref: %T" \
+ "\r\n", reqP->pid, reqP->ref) ); \
+ \
+ if ((sres = esock_select_##S(env, descP->sock, descP, \
+ &reqP->pid, sockRef, \
+ reqP->ref)) < 0) { \
+ \
+ /* We need to inform this process, reqP->pid, */ \
+ /* that we failed to select, so we don't leave */ \
+ /* it hanging. */ \
+ /* => send abort */ \
+ \
+ reason = MKT2(env, \
+ esock_atom_select_failed, \
+ MKI(env, sres)); \
+ esock_send_abort_msg(env, sockRef, \
+ reqP->ref, reqP->env, \
+ reason, &reqP->pid); \
+ \
+ } else { \
+ \
+ /* Success: New requestor selected */ \
+ popped = TRUE; \
+ activated = FALSE; \
+ \
+ } \
+ \
+ } else { \
+ \
+ SSDBG( descP, \
+ ("SOCKET", \
+ "activate_next_" #F " -> no more requestors\r\n") ); \
+ \
+ popped = TRUE; \
+ activated = FALSE; \
+ } \
+ \
+ } while (!popped); \
+ \
+ SSDBG( descP, \
+ ("SOCKET", "activate_next_" #F " -> " \
+ "done with %s\r\n", B2S(activated)) ); \
+ \
+ return activated; \
}
ACTIVATE_NEXT_FUNCS
#undef ACTIVATE_NEXT_FUNC_DECL
-/* *** activate_next ***
- *
- * This functions pops the requestor queue and then selects until it
- * manages to successfully activate a new requestor or the queue is empty.
- * Return value indicates if a new requestor was activated or not.
- */
-
-static
-BOOLEAN_T activate_next(ErlNifEnv* env,
- ESockDescriptor* descP,
- ESockRequestor* reqP,
- ESockRequestQueue* q,
- ERL_NIF_TERM sockRef)
-{
- BOOLEAN_T popped, activated;
- int sres;
-
- popped = FALSE;
- do {
-
- if (requestor_pop(q, reqP)) {
-
- /* There was another one */
-
- SSDBG( descP,
- ("SOCKET", "activate_next -> new (active) requestor: "
- "\r\n pid: %T"
- "\r\n ref: %T"
- "\r\n", reqP->pid, reqP->ref) );
-
- if ((sres = esock_select_read(env, descP->sock, descP,
- &reqP->pid, reqP->ref)) < 0) {
- /* We need to inform this process, reqP->pid, that we
- * failed to select, so we don't leave it hanging.
- * => send abort
- */
-
- esock_send_abort_msg(env, sockRef, reqP->ref, sres, &reqP->pid);
-
- } else {
-
- /* Success: New requestor selected */
- popped = TRUE;
- activated = FALSE;
-
- }
-
- } else {
-
- SSDBG( descP,
- ("SOCKET", "send_activate_next -> no more requestors\r\n") );
-
- popped = TRUE;
- activated = FALSE;
- }
-
- } while (!popped);
-
- SSDBG( descP,
- ("SOCKET", "activate_next -> "
- "done with %s\r\n", B2S(activated)) );
-
- return activated;
-}
@@ -17497,13 +17680,13 @@ REQ_SEARCH4PID_FUNCS
ESockRequestor* reqP = &e->data; \
\
reqP->pid = pid; \
- reqP->ref = enif_make_copy(descP->env, ref); \
- \
if (MONP("reader_push -> " #F " request", \
env, descP, &pid, &reqP->mon) != 0) { \
FREE(reqP); \
return esock_make_error(env, atom_exmon); \
} \
+ reqP->env = esock_alloc_env(#F "_push"); \
+ reqP->ref = enif_make_copy(reqP->env, ref); \
\
qpush(&descP->Q, e); \
\
@@ -17579,13 +17762,15 @@ BOOLEAN_T requestor_pop(ESockRequestQueue* q,
if (e != NULL) {
reqP->pid = e->data.pid;
reqP->mon = e->data.mon;
+ reqP->env = e->data.env;
reqP->ref = e->data.ref;
FREE(e);
return TRUE;
} else {
- /* (writers) Queue was empty */
+ /* Queue was empty */
enif_set_pid_undefined(&reqP->pid);
- // *reqP->mon = NULL; we have no null value for monitors
+ MON_INIT(&reqP->mon);
+ reqP->env = NULL;
reqP->ref = esock_atom_undefined; // Just in case
return FALSE;
}
@@ -17763,10 +17948,10 @@ int esock_monitor(const char* slogan,
res = enif_monitor_process(env, descP, pid, &monP->mon);
if (res != 0) {
- monP->is_active = 0;
+ monP->isActive = FALSE;
SSDBG( descP, ("SOCKET", "[%d] monitor failed: %d\r\n", descP->sock, res) );
} else {
- monP->is_active = 1;
+ monP->isActive = TRUE;
}
return res;
@@ -17781,7 +17966,7 @@ int esock_demonitor(const char* slogan,
{
int res;
- if (!monP->is_active)
+ if (!monP->isActive)
return 1;
SSDBG( descP, ("SOCKET", "[%d] %s: try demonitor\r\n", descP->sock, slogan) );
@@ -17802,7 +17987,7 @@ int esock_demonitor(const char* slogan,
static
void esock_monitor_init(ESockMonitor* monP)
{
- monP->is_active = 0;
+ monP->isActive = FALSE;
}
#endif // if !defined(__WIN32__)
@@ -17824,10 +18009,6 @@ void socket_dtor(ErlNifEnv* env, void* obj)
#if !defined(__WIN32__)
ESockDescriptor* descP = (ESockDescriptor*) obj;
- enif_clear_env(descP->env);
- enif_free_env(descP->env);
- descP->env = NULL;
-
MDESTROY(descP->writeMtx);
MDESTROY(descP->readMtx);
MDESTROY(descP->accMtx);
@@ -17920,37 +18101,19 @@ void socket_stop(ErlNifEnv* env, void* obj, int fd, int is_direct_call)
*/
if (descP->currentWriterP != NULL) {
+
/* We have a (current) writer and *may* therefor also have
* writers waiting.
*/
- DEMONP("socket_stop -> current writer",
- env, descP, &descP->currentWriter.mon);
+ socket_stop_handle_current(env,
+ "writer",
+ descP, sockRef, &descP->currentWriter);
- SSDBG( descP, ("SOCKET", "socket_stop -> handle current writer\r\n") );
- if (COMPARE_PIDS(&descP->closerPid, &descP->currentWriter.pid) != 0) {
- SSDBG( descP, ("SOCKET", "socket_stop -> "
- "send abort message to current writer %T\r\n",
- descP->currentWriter.pid) );
- if (esock_send_abort_msg(env,
- sockRef,
- descP->currentWriter.ref,
- atom_closed,
- &descP->currentWriter.pid) != NULL) {
- /* Shall we really do this?
- * This happens if the controlling process has been killed!
- */
- esock_warning_msg("Failed sending abort (%T) message to "
- "current writer %T\r\n",
- descP->currentWriter.ref,
- descP->currentWriter.pid);
- }
- }
-
/* And also deal with the waiting writers (in the same way) */
SSDBG( descP, ("SOCKET", "socket_stop -> handle waiting writer(s)\r\n") );
inform_waiting_procs(env, "writer",
- descP, &descP->writersQ, TRUE, atom_closed);
+ descP, sockRef, &descP->writersQ, TRUE, atom_closed);
}
@@ -17967,38 +18130,14 @@ void socket_stop(ErlNifEnv* env, void* obj, int fd, int is_direct_call)
* readers waiting.
*/
- DEMONP("socket_stop -> current reader",
- env, descP, &descP->currentReader.mon);
+ socket_stop_handle_current(env,
+ "reader",
+ descP, sockRef, &descP->currentReader);
- SSDBG( descP, ("SOCKET", "socket_stop -> handle current reader\r\n") );
- if (COMPARE_PIDS(&descP->closerPid, &descP->currentReader.pid) != 0) {
- SSDBG( descP, ("SOCKET", "socket_stop -> "
- "send abort message to current reader %T\r\n",
- descP->currentReader.pid) );
- /*
- esock_dbg_printf("SOCKET", "socket_stop -> "
- "send abort message to current reader %T\r\n",
- descP->currentReader.pid);
- */
- if (esock_send_abort_msg(env,
- sockRef,
- descP->currentReader.ref,
- atom_closed,
- &descP->currentReader.pid) != NULL) {
- /* Shall we really do this?
- * This happens if the controlling process has been killed!
- */
- esock_warning_msg("Failed sending abort (%T) message to "
- "current reader %T\r\n",
- descP->currentReader.ref,
- descP->currentReader.pid);
- }
- }
-
/* And also deal with the waiting readers (in the same way) */
SSDBG( descP, ("SOCKET", "socket_stop -> handle waiting reader(s)\r\n") );
inform_waiting_procs(env, "reader",
- descP, &descP->readersQ, TRUE, atom_closed);
+ descP, sockRef, &descP->readersQ, TRUE, atom_closed);
}
@@ -18011,37 +18150,19 @@ void socket_stop(ErlNifEnv* env, void* obj, int fd, int is_direct_call)
*/
if (descP->currentAcceptorP != NULL) {
+
/* We have a (current) acceptor and *may* therefor also have
* acceptors waiting.
*/
-
- DEMONP("socket_stop -> current acceptor",
- env, descP, &descP->currentAcceptor.mon);
-
- SSDBG( descP, ("SOCKET", "socket_stop -> handle current acceptor\r\n") );
- if (COMPARE_PIDS(&descP->closerPid, &descP->currentAcceptor.pid) != 0) {
- SSDBG( descP, ("SOCKET", "socket_stop -> "
- "send abort message to current acceptor %T\r\n",
- descP->currentAcceptor.pid) );
- if (esock_send_abort_msg(env,
- sockRef,
- descP->currentAcceptor.ref,
- atom_closed,
- &descP->currentAcceptor.pid) != NULL) {
- /* Shall we really do this?
- * This happens if the controlling process has been killed!
- */
- esock_warning_msg("Failed sending abort (%T) message to "
- "current acceptor %T\r\n",
- descP->currentAcceptor.ref,
- descP->currentAcceptor.pid);
- }
- }
-
+
+ socket_stop_handle_current(env,
+ "acceptor",
+ descP, sockRef, &descP->currentAcceptor);
+
/* And also deal with the waiting acceptors (in the same way) */
SSDBG( descP, ("SOCKET", "socket_stop -> handle waiting acceptor(s)\r\n") );
inform_waiting_procs(env, "acceptor",
- descP, &descP->acceptorsQ, TRUE, atom_closed);
+ descP, sockRef, &descP->acceptorsQ, TRUE, atom_closed);
}
@@ -18054,15 +18175,15 @@ void socket_stop(ErlNifEnv* env, void* obj, int fd, int is_direct_call)
*/
if (descP->sock != INVALID_SOCKET) {
-
+
if (descP->closeLocal) {
if (!is_direct_call) {
/* +++ send close message to the waiting process +++ */
- esock_send_close_msg(env, descP, sockRef);
-
+ esock_send_close_msg(env, descP, &descP->closerPid);
+
DEMONP("socket_stop -> closer", env, descP, &descP->closerMon);
} else {
@@ -18071,11 +18192,8 @@ void socket_stop(ErlNifEnv* env, void* obj, int fd, int is_direct_call)
* since the message send takes care of it if scheduled.
*/
- if (descP->closeEnv != NULL) {
- enif_clear_env(descP->closeEnv);
- enif_free_env(descP->closeEnv);
- descP->closeEnv = NULL;
- }
+ if (descP->closeEnv != NULL)
+ esock_free_env("socket_stop - close-env", descP->closeEnv);
}
}
@@ -18098,29 +18216,59 @@ void socket_stop(ErlNifEnv* env, void* obj, int fd, int is_direct_call)
+/* *** socket_stop_handle_current ***
+ *
+ * Handle current requestor (reader, writer or acceptor) during
+ * socket stop.
+ */
+static
+void socket_stop_handle_current(ErlNifEnv* env,
+ const char* role,
+ ESockDescriptor* descP,
+ ERL_NIF_TERM sockRef,
+ ESockRequestor* reqP)
+{
+ SSDBG( descP, ("SOCKET", "socket_stop -> handle current %s\r\n", role) );
+
+ DEMONP("socket_stop_handle_current", env, descP, &reqP->mon);
+
+ if (COMPARE_PIDS(&descP->closerPid, &reqP->pid) != 0) {
+
+ SSDBG( descP, ("SOCKET", "socket_stop_handle_current -> "
+ "send abort message to current %s %T\r\n",
+ role, reqP->pid) );
+
+ if (esock_send_abort_msg(env, sockRef, reqP->ref, reqP->env,
+ atom_closed, &reqP->pid) != NULL) {
+
+ esock_warning_msg("Failed sending abort (%T) message to "
+ "current %s %T\r\n",
+ reqP->ref, role, reqP->pid);
+ }
+ }
+}
+
+
+
/* This function traverse the queue and sends the specified
* nif_abort message with the specified reason to each member,
* and if the 'free' argument is TRUE, the queue will be emptied.
*/
#if !defined(__WIN32__)
static void inform_waiting_procs(ErlNifEnv* env,
- char* role,
+ const char* role,
ESockDescriptor* descP,
+ ERL_NIF_TERM sockRef,
ESockRequestQueue* q,
BOOLEAN_T free,
ERL_NIF_TERM reason)
{
ESockRequestQueueElement* currentP = q->first;
ESockRequestQueueElement* nextP;
- ERL_NIF_TERM sockRef = enif_make_resource(env, descP);
- /*
- esock_dbg_printf("SOCKET", "inform_waiting_procs -> entry with: "
- "\r\n role: %s"
- "\r\n free: %s"
- "\r\n reason: %T"
- "\r\n", role, B2S(free), reason);
- */
+ SSDBG( descP,
+ ("SOCKET",
+ "inform_waiting_procs -> handle waiting %s(s)\r\n", role) );
while (currentP != NULL) {
@@ -18134,18 +18282,14 @@ static void inform_waiting_procs(ErlNifEnv* env,
*/
SSDBG( descP,
- ("SOCKET", "inform_waiting_procs -> abort request %T (from %T)\r\n",
+ ("SOCKET",
+ "inform_waiting_procs -> abort request %T (from %T)\r\n",
currentP->data.ref, currentP->data.pid) );
- /*
- esock_dbg_printf("SOCKET", "inform_waiting_procs -> "
- "try sending abort to %s %T "
- "\r\n", role, currentP->data.pid);
- */
-
if (esock_send_abort_msg(env,
sockRef,
currentP->data.ref,
+ currentP->data.env,
reason,
&currentP->data.pid) != NULL) {
@@ -18211,7 +18355,7 @@ void socket_down(ErlNifEnv* env,
descP->closeLocal = TRUE;
descP->closerPid = *pid;
MON_INIT(&descP->closerMon);
-
+
sres = esock_select_stop(env, descP->sock, descP);
if (sres & ERL_NIF_SELECT_STOP_CALLED) {