aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMicael Karlberg <[email protected]>2018-10-18 16:17:42 +0200
committerMicael Karlberg <[email protected]>2018-10-18 16:17:42 +0200
commit6a5013d89a02401b132483711325ca07b8357020 (patch)
treebe6ab64679b4d3e1d320bdd0b9182e261c639927
parentf945aa4a8067d745ee75fe695272104e220d7bc3 (diff)
downloadotp-6a5013d89a02401b132483711325ca07b8357020.tar.gz
otp-6a5013d89a02401b132483711325ca07b8357020.tar.bz2
otp-6a5013d89a02401b132483711325ca07b8357020.zip
[socket-nif] Socket close
Fixed a number of issues regarding monitor handling. Monitors are now wrapped in its own type (ESockMonitor). This is hopefully temporary! The reason for this is that there is no way to "initialize" a monitor after a demonitor. This could mean that after a successful demonitor, you could still get a down-callback, and thenh compare with, for instance, ctrlMOn field and get a match, even though it was just (successfully) demonitor'ed. To make debugging easier, the monitor and demonitor calls are now wrapped in their own functions, with debug printouts before and in case of failure, also after the actual calls Also, fixed a number of problems with cancel, where monitors where left still active after cleaning up current and active reader, writer and acceptor. OTP-14831
-rw-r--r--erts/emulator/nifs/common/socket_int.h8
-rw-r--r--erts/emulator/nifs/common/socket_nif.c412
2 files changed, 329 insertions, 91 deletions
diff --git a/erts/emulator/nifs/common/socket_int.h b/erts/emulator/nifs/common/socket_int.h
index f9246856fa..d89970ecd6 100644
--- a/erts/emulator/nifs/common/socket_int.h
+++ b/erts/emulator/nifs/common/socket_int.h
@@ -216,8 +216,12 @@ extern ERL_NIF_TERM esock_atom_einval;
#define MLOCK(M) enif_mutex_lock((M))
#define MUNLOCK(M) enif_mutex_unlock((M))
-#define MONP(E,D,P,M) enif_monitor_process((E), (D), (P), (M))
-#define DEMONP(E,D,M) enif_demonitor_process((E), (D), (M))
+// #define MONP(S,E,D,P,M) enif_monitor_process((E), (D), (P), (M))
+// #define DEMONP(S,E,D,M) enif_demonitor_process((E), (D), (M))
+#define MONP(S,E,D,P,M) esock_monitor((S), (E), (D), (P), (M))
+#define DEMONP(S,E,D,M) esock_demonitor((S), (E), (D), (M))
+#define MON_INIT(M) esock_monitor_init((M))
+// #define MON_COMP(M1, M2) esock_monitor_compare((M1), (M2))
#define SELECT(E,FD,M,O,P,R) \
if (enif_select((E), (FD), (M), (O), (P), (R)) < 0) \
diff --git a/erts/emulator/nifs/common/socket_nif.c b/erts/emulator/nifs/common/socket_nif.c
index 3ec19c5a60..5d2cfbc10b 100644
--- a/erts/emulator/nifs/common/socket_nif.c
+++ b/erts/emulator/nifs/common/socket_nif.c
@@ -725,9 +725,16 @@ static unsigned long one_value = 1;
((sap)->in4.sin_port) : -1)
+typedef union {
+ ErlNifMonitor mon;
+ uint32_t raw[4];
+} ESockMonitor;
+
+
typedef struct {
ErlNifPid pid; // PID of the requesting process
- ErlNifMonitor mon; // Monitor to the requesting process
+ // ErlNifMonitor mon; Monitor to the requesting process
+ ESockMonitor mon; // Monitor to the requesting process
ERL_NIF_TERM ref; // The (unique) reference (ID) of the request
} SocketRequestor;
@@ -760,7 +767,8 @@ typedef struct {
/* +++ Controller (owner) process +++ */
ErlNifPid ctrlPid;
- ErlNifMonitor ctrlMon;
+ // ErlNifMonitor ctrlMon;
+ ESockMonitor ctrlMon;
/* +++ Write stuff +++ */
ErlNifMutex* writeMtx;
@@ -803,7 +811,8 @@ typedef struct {
/* +++ Close stuff +++ */
ErlNifMutex* closeMtx;
ErlNifPid closerPid;
- ErlNifMonitor closerMon;
+ // ErlNifMonitor closerMon;
+ ESockMonitor closerMon;
ERL_NIF_TERM closeRef;
BOOLEAN_T closeLocal;
@@ -2185,7 +2194,8 @@ static ERL_NIF_TERM acceptor_push(ErlNifEnv* env,
static BOOLEAN_T acceptor_pop(ErlNifEnv* env,
SocketDescriptor* descP,
ErlNifPid* pid,
- ErlNifMonitor* mon,
+ // ErlNifMonitor* mon,
+ ESockMonitor* mon,
ERL_NIF_TERM* ref);
static BOOLEAN_T acceptor_unqueue(ErlNifEnv* env,
SocketDescriptor* descP,
@@ -2201,7 +2211,8 @@ static ERL_NIF_TERM writer_push(ErlNifEnv* env,
static BOOLEAN_T writer_pop(ErlNifEnv* env,
SocketDescriptor* descP,
ErlNifPid* pid,
- ErlNifMonitor* mon,
+ // ErlNifMonitor* mon,
+ ESockMonitor* mon,
ERL_NIF_TERM* ref);
static BOOLEAN_T writer_unqueue(ErlNifEnv* env,
SocketDescriptor* descP,
@@ -2217,7 +2228,8 @@ static ERL_NIF_TERM reader_push(ErlNifEnv* env,
static BOOLEAN_T reader_pop(ErlNifEnv* env,
SocketDescriptor* descP,
ErlNifPid* pid,
- ErlNifMonitor* mon,
+ // ErlNifMonitor* mon,
+ ESockMonitor* mon,
ERL_NIF_TERM* ref);
static BOOLEAN_T reader_unqueue(ErlNifEnv* env,
SocketDescriptor* descP,
@@ -2230,8 +2242,27 @@ static void qpush(SocketRequestQueue* q,
SocketRequestQueueElement* e);
static SocketRequestQueueElement* qpop(SocketRequestQueue* q);
static BOOLEAN_T qunqueue(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ const char* slogan,
SocketRequestQueue* q,
const ErlNifPid* pid);
+
+static int esock_monitor(const char* slogan,
+ ErlNifEnv* env,
+ SocketDescriptor* descP,
+ const ErlNifPid* pid,
+ ESockMonitor* mon);
+static int esock_demonitor(const char* slogan,
+ ErlNifEnv* env,
+ SocketDescriptor* descP,
+ ESockMonitor* monP);
+static void esock_monitor_init(ESockMonitor* mon);
+/*
+static int esock_monitor_compare(const ErlNifMonitor* mon1,
+ const ESockMonitor* mon2);
+*/
+
+
/*
#if defined(HAVE_SYS_UN_H) || defined(SO_BINDTODEVICE)
static size_t my_strnlen(const char *s, size_t maxlen);
@@ -2282,6 +2313,7 @@ static BOOLEAN_T extract_iow(ErlNifEnv* env,
static int on_load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info);
+
#if HAVE_IN6
# if ! defined(HAVE_IN6ADDR_ANY) || ! HAVE_IN6ADDR_ANY
# if HAVE_DECL_IN6ADDR_ANY_INIT
@@ -2786,7 +2818,6 @@ ERL_NIF_TERM nopen(ErlNifEnv* env,
res = enif_make_resource(env, descP);
enif_release_resource(descP); // We should really store a reference ...
-
/* Keep track of the creator
* This should not be a problem but just in case
* the *open* function is used with the wrong kind
@@ -2795,9 +2826,10 @@ ERL_NIF_TERM nopen(ErlNifEnv* env,
if (enif_self(env, &descP->ctrlPid) == NULL)
return esock_make_error(env, atom_exself);
- if (MONP(env, descP,
+ if (MONP("nopen -> ctrl",
+ env, descP,
&descP->ctrlPid,
- &descP->ctrlMon) > 0)
+ &descP->ctrlMon) != 0)
return esock_make_error(env, atom_exmon);
@@ -3396,9 +3428,10 @@ ERL_NIF_TERM naccept_listening(ErlNifEnv* env,
SSDBG( descP, ("SOCKET", "naccept_listening -> would block\r\n") );
descP->currentAcceptor.pid = caller;
- if (MONP(env, descP,
+ if (MONP("naccept_listening -> current acceptor",
+ env, descP,
&descP->currentAcceptor.pid,
- &descP->currentAcceptor.mon) > 0)
+ &descP->currentAcceptor.mon) != 0)
return esock_make_error(env, atom_exmon);
descP->currentAcceptor.ref = enif_make_copy(descP->env, ref);
@@ -3473,9 +3506,10 @@ ERL_NIF_TERM naccept_listening(ErlNifEnv* env,
enif_release_resource(accDescP); // We should really store a reference ...
accDescP->ctrlPid = caller;
- if (MONP(env, accDescP,
+ if (MONP("naccept_listening -> ctrl",
+ env, accDescP,
&accDescP->ctrlPid,
- &accDescP->ctrlMon) > 0) {
+ &accDescP->ctrlMon) != 0) {
sock_close(accSock);
return esock_make_error(env, atom_exmon);
}
@@ -3589,7 +3623,8 @@ ERL_NIF_TERM naccept_accepting(ErlNifEnv* env,
SSDBG( descP, ("SOCKET", "naccept_accepting -> accept success\r\n") );
- DEMONP(env, descP, &descP->currentAcceptor.mon);
+ DEMONP("naccept_accepting -> current acceptor",
+ env, descP, &descP->currentAcceptor.mon);
if ((accEvent = sock_create_event(accSock)) == INVALID_EVENT) {
save_errno = sock_errno();
@@ -3611,9 +3646,10 @@ ERL_NIF_TERM naccept_accepting(ErlNifEnv* env,
enif_release_resource(accDescP); // We should really store a reference ...
accDescP->ctrlPid = caller;
- if (MONP(env, accDescP,
+ if (MONP("naccept_accepting -> ctrl",
+ env, accDescP,
&accDescP->ctrlPid,
- &accDescP->ctrlMon) > 0) {
+ &accDescP->ctrlMon) != 0) {
sock_close(accSock);
return esock_make_error(env, atom_exmon);
}
@@ -4728,7 +4764,11 @@ ERL_NIF_TERM nclose(ErlNifEnv* env,
int type = descP->type;
int protocol = descP->protocol;
- SSDBG( descP, ("SOCKET", "nclose -> [%d] entry\r\n", descP->sock) );
+ SSDBG( descP, ("SOCKET", "nclose -> [%d] entry (0x%lX, 0x%lX, 0x%lX)\r\n",
+ descP->sock,
+ descP->currentWriterP,
+ descP->currentReaderP,
+ descP->currentAcceptorP) );
MLOCK(descP->closeMtx);
@@ -4753,11 +4793,18 @@ ERL_NIF_TERM nclose(ErlNifEnv* env,
/* Monitor the caller, since we should complete this operation even if
* the caller dies (for whatever reason).
+ *
+ * <KOLLA>
+ *
+ * Can we actiually use this for anything?
+ *
+ * </KOLLA>
*/
- if (MONP(env, descP,
- &descP->closerPid,
- &descP->closerMon) > 0) {
+ if (MONP("nclose -> closer",
+ env, descP,
+ &descP->closerPid,
+ &descP->closerMon) != 0) {
MUNLOCK(descP->closeMtx);
return esock_make_error(env, atom_exmon);
}
@@ -4778,6 +4825,7 @@ ERL_NIF_TERM nclose(ErlNifEnv* env,
SSDBG( descP,
("SOCKET", "nclose -> [%d] stop was called\r\n", descP->sock) );
dec_socket(domain, type, protocol);
+ DEMONP("nclose -> closer", env, descP, &descP->closerMon);
reply = esock_atom_ok;
} else if (selectRes & ERL_NIF_SELECT_STOP_SCHEDULED) {
/* The stop callback function has been *scheduled* which means that we
@@ -4803,6 +4851,9 @@ ERL_NIF_TERM nclose(ErlNifEnv* env,
* </KOLLA>
*/
+ // No point in having this?
+ DEMONP("nclose -> closer", env, descP, &descP->closerMon);
+
reason = MKT2(env, atom_select, MKI(env, selectRes));
reply = esock_make_error(env, reason);
}
@@ -5160,7 +5211,8 @@ ERL_NIF_TERM nsetopt_otp_ctrl_proc(ErlNifEnv* env,
ERL_NIF_TERM eVal)
{
ErlNifPid caller, newCtrlPid;
- ErlNifMonitor newCtrlMon;
+ // ErlNifMonitor newCtrlMon;
+ ESockMonitor newCtrlMon;
int xres;
SSDBG( descP,
@@ -5183,20 +5235,22 @@ ERL_NIF_TERM nsetopt_otp_ctrl_proc(ErlNifEnv* env,
return esock_make_error(env, esock_atom_einval);
}
- if ((xres = MONP(env, descP, &newCtrlPid, &newCtrlMon)) != 0) {
+ if ((xres = MONP("nsetopt_otp_ctrl_proc -> (new) ctrl",
+ env, descP, &newCtrlPid, &newCtrlMon)) != 0) {
esock_warning_msg("Failed monitor %d) (new) controlling process\r\n", xres);
return esock_make_error(env, esock_atom_einval);
}
- if ((xres = DEMONP(env, descP, &descP->ctrlMon)) != 0) {
+ if ((xres = DEMONP("nsetopt_otp_ctrl_proc -> (old) ctrl",
+ env, descP, &descP->ctrlMon)) != 0) {
esock_warning_msg("Failed demonitor (%d) "
"old controlling process %T (%T)\r\n",
xres, descP->ctrlPid, descP->ctrlMon);
}
descP->ctrlPid = newCtrlPid;
- descP->ctrlMon = newCtrlMon;
-
+ descP->ctrlMon = newCtrlMon;
+
SSDBG( descP, ("SOCKET", "nsetopt_otp_ctrl_proc -> done\r\n") );
return esock_atom_ok;
@@ -11137,6 +11191,8 @@ ERL_NIF_TERM ncancel_accept_current(ErlNifEnv* env,
SSDBG( descP, ("SOCKET", "ncancel_accept_current -> entry\r\n") );
+ DEMONP("ncancel_accept_current -> current acceptor",
+ env, descP, &descP->currentAcceptor.mon);
res = ncancel_read_select(env, descP, descP->currentAcceptor.ref);
SSDBG( descP, ("SOCKET", "ncancel_accept_current -> cancel res: %T\r\n", res) );
@@ -11255,6 +11311,8 @@ ERL_NIF_TERM ncancel_send_current(ErlNifEnv* env,
SSDBG( descP, ("SOCKET", "ncancel_send_current -> entry\r\n") );
+ DEMONP("ncancel_recv_current -> current writer",
+ env, descP, &descP->currentWriter.mon);
res = ncancel_write_select(env, descP, descP->currentWriter.ref);
SSDBG( descP, ("SOCKET", "ncancel_send_current -> cancel res: %T\r\n", res) );
@@ -11371,6 +11429,8 @@ ERL_NIF_TERM ncancel_recv_current(ErlNifEnv* env,
SSDBG( descP, ("SOCKET", "ncancel_recv_current -> entry\r\n") );
+ DEMONP("ncancel_recv_current -> current reader",
+ env, descP, &descP->currentReader.mon);
res = ncancel_read_select(env, descP, descP->currentReader.ref);
SSDBG( descP, ("SOCKET", "ncancel_recv_current -> cancel res: %T\r\n", res) );
@@ -11568,7 +11628,8 @@ ERL_NIF_TERM send_check_result(ErlNifEnv* env,
cnt_inc(&descP->writePkgCnt, 1);
cnt_inc(&descP->writeByteCnt, written);
if (descP->currentWriterP != NULL)
- DEMONP(env, descP, &descP->currentWriter.mon);
+ DEMONP("send_check_result -> current writer",
+ env, descP, &descP->currentWriter.mon);
SSDBG( descP,
("SOCKET", "send_check_result -> "
@@ -11607,7 +11668,8 @@ ERL_NIF_TERM send_check_result(ErlNifEnv* env,
if ((saveErrno != EAGAIN) && (saveErrno != EINTR)) {
ErlNifPid pid;
- ErlNifMonitor mon;
+ // ErlNifMonitor mon;
+ ESockMonitor mon;
ERL_NIF_TERM ref, res;
/*
@@ -11622,13 +11684,15 @@ ERL_NIF_TERM send_check_result(ErlNifEnv* env,
res = esock_make_error_errno(env, saveErrno);
if (descP->currentWriterP != NULL) {
- DEMONP(env, descP, &descP->currentWriter.mon);
+ DEMONP("send_check_result -> current writer",
+ env, descP, &descP->currentWriter.mon);
while (writer_pop(env, descP, &pid, &mon, &ref)) {
SSDBG( descP,
("SOCKET", "send_check_result -> abort %T\r\n", pid) );
send_msg_nif_abort(env, ref, res, &pid);
- DEMONP(env, descP, &mon);
+ DEMONP("send_check_result -> pop'ed writer",
+ env, descP, &mon);
}
}
@@ -11660,9 +11724,10 @@ ERL_NIF_TERM send_check_result(ErlNifEnv* env,
if (enif_self(env, &caller) == NULL)
return esock_make_error(env, atom_exself);
descP->currentWriter.pid = caller;
- if (MONP(env, descP,
+ if (MONP("send_check_result -> current writer",
+ env, descP,
&descP->currentWriter.pid,
- &descP->currentWriter.mon) > 0)
+ &descP->currentWriter.mon) != 0)
return esock_make_error(env, atom_exmon);
descP->currentWriter.ref = enif_make_copy(descP->env, sendRef);
descP->currentWriterP = &descP->currentWriter;
@@ -11747,9 +11812,10 @@ char* recv_init_current_reader(ErlNifEnv* env,
return str_exself;
descP->currentReader.pid = caller;
- if (MONP(env, descP,
+ if (MONP("recv_init_current_reader -> current reader",
+ env, descP,
&descP->currentReader.pid,
- &descP->currentReader.mon) > 0) {
+ &descP->currentReader.mon) != 0) {
return str_exmon;
}
descP->currentReader.ref = enif_make_copy(descP->env, recvRef);
@@ -11774,7 +11840,8 @@ ERL_NIF_TERM recv_update_current_reader(ErlNifEnv* env,
{
if (descP->currentReaderP != NULL) {
- DEMONP(env, descP, &descP->currentReader.mon);
+ DEMONP("recv_update_current_reader -> current reader",
+ env, descP, &descP->currentReader.mon);
if (reader_pop(env, descP,
&descP->currentReader.pid,
@@ -11822,16 +11889,19 @@ void recv_error_current_reader(ErlNifEnv* env,
{
if (descP->currentReaderP != NULL) {
ErlNifPid pid;
- ErlNifMonitor mon;
+ // ErlNifMonitor mon;
+ ESockMonitor mon;
ERL_NIF_TERM ref;
- DEMONP(env, descP, &descP->currentReader.mon);
+ DEMONP("recv_error_current_reader -> current reader",
+ env, descP, &descP->currentReader.mon);
while (reader_pop(env, descP, &pid, &mon, &ref)) {
SSDBG( descP,
("SOCKET", "recv_error_current_reader -> abort %T\r\n", pid) );
send_msg_nif_abort(env, ref, reason, &pid);
- DEMONP(env, descP, &mon);
+ DEMONP("recv_error_current_reader -> pop'ed reader",
+ env, descP, &mon);
}
}
}
@@ -13975,6 +14045,11 @@ SocketDescriptor* alloc_descriptor(SOCKET sock, HANDLE event)
descP->sock = sock;
descP->event = event;
+ MON_INIT(&descP->currentWriter.mon);
+ MON_INIT(&descP->currentReader.mon);
+ MON_INIT(&descP->currentAcceptor.mon);
+ MON_INIT(&descP->ctrlMon);
+ MON_INIT(&descP->closerMon);
}
return descP;
@@ -14593,7 +14668,8 @@ ERL_NIF_TERM acceptor_push(ErlNifEnv* env,
reqP->pid = pid;
reqP->ref = enif_make_copy(descP->env, ref);
- if (MONP(env, descP, &pid, &reqP->mon) > 0) {
+ if (MONP("acceptor_push -> acceptor request",
+ env, descP, &pid, &reqP->mon) != 0) {
FREE(reqP);
return esock_make_error(env, atom_exmon);
}
@@ -14613,7 +14689,8 @@ static
BOOLEAN_T acceptor_pop(ErlNifEnv* env,
SocketDescriptor* descP,
ErlNifPid* pid,
- ErlNifMonitor* mon,
+ // ErlNifMonitor* mon,
+ ESockMonitor* mon,
ERL_NIF_TERM* ref)
{
SocketRequestQueueElement* e = qpop(&descP->acceptorsQ);
@@ -14644,7 +14721,8 @@ BOOLEAN_T acceptor_unqueue(ErlNifEnv* env,
SocketDescriptor* descP,
const ErlNifPid* pid)
{
- return qunqueue(env, &descP->acceptorsQ, pid);
+ return qunqueue(env, descP, "qunqueue -> waiting acceptor",
+ &descP->acceptorsQ, pid);
}
@@ -14679,7 +14757,8 @@ ERL_NIF_TERM writer_push(ErlNifEnv* env,
reqP->pid = pid;
reqP->ref = enif_make_copy(descP->env, ref);
- if (MONP(env, descP, &pid, &reqP->mon) > 0) {
+ if (MONP("writer_push -> writer request",
+ env, descP, &pid, &reqP->mon) != 0) {
FREE(reqP);
return esock_make_error(env, atom_exmon);
}
@@ -14699,7 +14778,8 @@ static
BOOLEAN_T writer_pop(ErlNifEnv* env,
SocketDescriptor* descP,
ErlNifPid* pid,
- ErlNifMonitor* mon,
+ // ErlNifMonitor* mon,
+ ESockMonitor* mon,
ERL_NIF_TERM* ref)
{
SocketRequestQueueElement* e = qpop(&descP->writersQ);
@@ -14730,7 +14810,8 @@ BOOLEAN_T writer_unqueue(ErlNifEnv* env,
SocketDescriptor* descP,
const ErlNifPid* pid)
{
- return qunqueue(env, &descP->writersQ, pid);
+ return qunqueue(env, descP, "qunqueue -> waiting writer",
+ &descP->writersQ, pid);
}
@@ -14765,7 +14846,8 @@ ERL_NIF_TERM reader_push(ErlNifEnv* env,
reqP->pid = pid;
reqP->ref = enif_make_copy(descP->env, ref);
- if (MONP(env, descP, &pid, &reqP->mon) > 0) {
+ if (MONP("reader_push -> reader request",
+ env, descP, &pid, &reqP->mon) != 0) {
FREE(reqP);
return esock_make_error(env, atom_exmon);
}
@@ -14785,7 +14867,8 @@ static
BOOLEAN_T reader_pop(ErlNifEnv* env,
SocketDescriptor* descP,
ErlNifPid* pid,
- ErlNifMonitor* mon,
+ // ErlNifMonitor* mon,
+ ESockMonitor* mon,
ERL_NIF_TERM* ref)
{
SocketRequestQueueElement* e = qpop(&descP->readersQ);
@@ -14816,7 +14899,8 @@ BOOLEAN_T reader_unqueue(ErlNifEnv* env,
SocketDescriptor* descP,
const ErlNifPid* pid)
{
- return qunqueue(env, &descP->readersQ, pid);
+ return qunqueue(env, descP, "qunqueue -> waiting reader",
+ &descP->readersQ, pid);
}
@@ -14880,6 +14964,8 @@ SocketRequestQueueElement* qpop(SocketRequestQueue* q)
static
BOOLEAN_T qunqueue(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ const char* slogan,
SocketRequestQueue* q,
const ErlNifPid* pid)
{
@@ -14892,6 +14978,8 @@ BOOLEAN_T qunqueue(ErlNifEnv* env,
/* We have a match */
+ DEMONP(slogan, env, descP, &e->data.mon);
+
if (p != NULL) {
/* Not the first, but could be the last */
if (q->last == e) {
@@ -14966,6 +15054,97 @@ void cnt_dec(uint32_t* cnt, uint32_t dec)
/* ----------------------------------------------------------------------
+ * M o n i t o r W r a p p e r F u n c t i o n s
+ * ----------------------------------------------------------------------
+ */
+
+static
+int esock_monitor(const char* slogan,
+ ErlNifEnv* env,
+ SocketDescriptor* descP,
+ const ErlNifPid* pid,
+ ESockMonitor* monP)
+{
+ int res;
+
+ SSDBG( descP, ("SOCKET", "[%d] %s: try monitor", descP->sock, slogan) );
+ // esock_dbg_printf("MONP", "[%d] %s\r\n", descP->sock, slogan);
+ res = enif_monitor_process(env, descP, pid, &monP->mon);
+
+ if (res != 0) {
+ SSDBG( descP, ("SOCKET", "[%d] monitor failed: %d", descP->sock, res) );
+ // esock_dbg_printf("MONP", "[%d] failed: %d\r\n", descP->sock, res);
+ } /* else {
+ esock_dbg_printf("MONP",
+ "[%d] success: "
+ "%u,%u,%u,%u\r\n",
+ descP->sock,
+ monP->raw[0], monP->raw[1],
+ monP->raw[2], monP->raw[3]);
+ } */
+
+ return res;
+}
+
+
+static
+int esock_demonitor(const char* slogan,
+ ErlNifEnv* env,
+ SocketDescriptor* descP,
+ ESockMonitor* monP)
+{
+ int res;
+
+ SSDBG( descP, ("SOCKET", "[%d] %s: try demonitor\r\n", descP->sock, slogan) );
+ /*
+ esock_dbg_printf("DEMONP", "[%d] %s: %u,%u,%u,%u\r\n",
+ descP->sock, slogan,
+ monP->raw[0], monP->raw[1],
+ monP->raw[2], monP->raw[3]);
+ */
+
+ res = enif_demonitor_process(env, descP, &monP->mon);
+
+ if (res == 0) {
+ esock_monitor_init(monP);
+ } else {
+ SSDBG( descP,
+ ("SOCKET", "[%d] demonitor failed: %d\r\n", descP->sock, res) );
+ /*
+ esock_dbg_printf("DEMONP", "[%d] failed: %d\r\n", descP->sock, res);
+ */
+ }
+
+ return res;
+}
+
+
+static
+void esock_monitor_init(ESockMonitor* monP)
+{
+ int i;
+
+ /*
+ * UGLY,
+ * but since we don't have a ERL_NIF_MONITOR_NULL,
+ * this will have to do for now...
+ */
+ for (i = 0; i < 4; i++)
+ monP->raw[i] = 0;
+
+}
+
+/*
+static
+int esock_monitor_compare(const ErlNifMonitor* mon1,
+ const ESockMonitor* mon2)
+{
+ return enif_compare_monitors(mon1, &mon2->mon);
+}
+*/
+
+
+/* ----------------------------------------------------------------------
* C a l l b a c k F u n c t i o n s
* ----------------------------------------------------------------------
*/
@@ -15015,13 +15194,15 @@ static
void socket_stop(ErlNifEnv* env, void* obj, int fd, int is_direct_call)
{
SocketDescriptor* descP = (SocketDescriptor*) obj;
- int dres;
-
+
+ /*
+ esock_dbg_printf("STOP", "[%d] begin\r\n", descP->sock);
+ */
+
SSDBG( descP,
- ("SOCKET", "socket_stop -> entry when"
- "\r\n sock: %d (%d)"
- "\r\n Is Direct Call: %s"
- "\r\n", descP->sock, fd, B2S(is_direct_call)) );
+ ("SOCKET", "socket_stop -> entry when %s"
+ "\r\n sock: %d (%d)"
+ "\r\n", ((is_direct_call) ? "called" : "scheduled"), descP->sock, fd) );
MLOCK(descP->writeMtx);
MLOCK(descP->readMtx);
@@ -15033,8 +15214,7 @@ void socket_stop(ErlNifEnv* env, void* obj, int fd, int is_direct_call)
descP->state = SOCKET_STATE_CLOSING; // Just in case...???
descP->isReadable = FALSE;
descP->isWritable = FALSE;
-
-
+
/* We should check that we actually have a monitor.
* This *should* be done with a "NULL" monitor value,
* which there currently is none...
@@ -15042,24 +15222,19 @@ void socket_stop(ErlNifEnv* env, void* obj, int fd, int is_direct_call)
* its no point in demonitor. Also, we not actually have
* a monitor in that case...
*/
- SSDBG( descP,
- ("SOCKET",
- "socket_stop -> demonitor (maybe) controlling process (%T)\r\n",
- descP->ctrlPid) );
- dres = DEMONP(env, descP, &descP->ctrlMon);
- SSDBG( descP, ("SOCKET", "socket_stop -> demonitor result: %d\r\n", dres) );
-
-
+ DEMONP("socket_stop -> ctrl", env, descP, &descP->ctrlMon);
+
+ /*
+ esock_dbg_printf("STOP", "[%d] maybe handle current writer (0x%lX)\r\n",
+ descP->sock, descP->currentReaderP);
+ */
if (descP->currentWriterP != NULL) {
/* We have a (current) writer and *may* therefor also have
* writers waiting.
*/
- SSDBG( descP,
- ("SOCKET",
- "socket_stop -> demonitor (maybe) current writer: %T, %T\r\n",
- descP->currentWriter.pid, descP->currentWriter.ref) );
- DEMONP(env, descP, &descP->currentWriter.mon);
+ DEMONP("socket_stop -> current writer",
+ env, descP, &descP->currentWriter.mon);
SSDBG( descP, ("SOCKET", "socket_stop -> handle current writer\r\n") );
if (!compare_pids(env,
@@ -15082,18 +15257,19 @@ void socket_stop(ErlNifEnv* env, void* obj, int fd, int is_direct_call)
SSDBG( descP, ("SOCKET", "socket_stop -> handle waiting writer(s)\r\n") );
inform_waiting_procs(env, descP, &descP->writersQ, TRUE, atom_closed);
}
-
+
+ /*
+ esock_dbg_printf("STOP", "[%d] maybe handle current reader (0x%lX)\r\n",
+ descP->sock, descP->currentReaderP);
+ */
if (descP->currentReaderP != NULL) {
/* We have a (current) reader and *may* therefor also have
* readers waiting.
*/
- SSDBG( descP,
- ("SOCKET",
- "socket_stop -> demonitor (maybe) current reader: %T, %T\r\n",
- descP->currentReader.pid, descP->currentReader.ref) );
- DEMONP(env, descP, &descP->currentReader.mon);
+ DEMONP("socket_stop -> current reader",
+ env, descP, &descP->currentReader.mon);
SSDBG( descP, ("SOCKET", "socket_stop -> handle current reader\r\n") );
if (!compare_pids(env,
@@ -15116,17 +15292,18 @@ void socket_stop(ErlNifEnv* env, void* obj, int fd, int is_direct_call)
SSDBG( descP, ("SOCKET", "socket_stop -> handle waiting reader(s)\r\n") );
inform_waiting_procs(env, descP, &descP->readersQ, TRUE, atom_closed);
}
-
+
+ /*
+ esock_dbg_printf("STOP", "[%d] maybe handle current acceptor (0x%lX)\r\n",
+ descP->sock, descP->currentReaderP);
+ */
if (descP->currentAcceptorP != NULL) {
/* We have a (current) acceptor and *may* therefor also have
* acceptors waiting.
*/
- SSDBG( descP,
- ("SOCKET",
- "socket_stop -> demonitor (maybe) current acceptor: %T, %T\r\n",
- descP->currentAcceptor.pid, descP->currentAcceptor.ref) );
- DEMONP(env, descP, &descP->currentAcceptor.mon);
+ DEMONP("socket_stop -> current acceptor",
+ env, descP, &descP->currentAcceptor.mon);
SSDBG( descP, ("SOCKET", "socket_stop -> handle current acceptor\r\n") );
if (!compare_pids(env,
@@ -15178,7 +15355,8 @@ void socket_stop(ErlNifEnv* env, void* obj, int fd, int is_direct_call)
send_msg(env, MKT2(env, atom_close, descP->closeRef), &descP->closerPid);
- DEMONP(env, descP, &descP->closerMon);
+ DEMONP("socket_stop -> closer",
+ env, descP, &descP->closerMon);
} else {
@@ -15194,6 +15372,10 @@ void socket_stop(ErlNifEnv* env, void* obj, int fd, int is_direct_call)
if (!is_direct_call) {
+ if (descP->closeLocal) {
+ DEMONP("socket_stop -> closer",
+ env, descP, &descP->closerMon);
+ }
descP->sock = INVALID_SOCKET;
descP->event = INVALID_EVENT;
descP->state = SOCKET_STATE_CLOSED;
@@ -15206,6 +15388,10 @@ void socket_stop(ErlNifEnv* env, void* obj, int fd, int is_direct_call)
MUNLOCK(descP->readMtx);
MUNLOCK(descP->writeMtx);
+ /*
+ esock_dbg_printf("STOP", "[%d] end\r\n", descP->sock);
+ */
+
SSDBG( descP,
("SOCKET", "socket_stop -> done (%d, %d)\r\n", descP->sock, fd) );
@@ -15245,7 +15431,8 @@ void inform_waiting_procs(ErlNifEnv* env,
currentP->data.ref,
reason,
&currentP->data.pid)) );
- DEMONP(env, descP, &currentP->data.mon);
+ DEMONP("inform_waiting_procs -> current 'request'",
+ env, descP, &currentP->data.mon);
nextP = currentP->nextP;
if (free) FREE(currentP);
currentP = nextP;
@@ -15270,16 +15457,43 @@ void socket_down(ErlNifEnv* env,
const ErlNifMonitor* mon)
{
SocketDescriptor* descP = (SocketDescriptor*) obj;
+ ESockMonitor* monP = (ESockMonitor*) mon;
SSDBG( descP, ("SOCKET", "socket_down -> entry with"
- "\r\n sock: %d"
- "\r\n pid: %T"
- "\r\n Closed: %s"
- "\r\n Closing: %s"
+ "\r\n sock: %d"
+ "\r\n pid: %T"
+ "\r\n Close: %s (%s)"
"\r\n",
descP->sock, *pid,
- B2S(IS_CLOSED(descP)), B2S(IS_CLOSING(descP))) );
+ B2S(IS_CLOSED(descP)),
+ B2S(IS_CLOSING(descP))) );
+
+ /*
+ esock_dbg_printf("DOWN",
+ "[%d] begin %u,%u,%u,%d\r\n",
+ descP->sock,
+ monP->raw[0], monP->raw[1],
+ monP->raw[2], monP->raw[3]);
+ */
+ /*
+ if (MON_COMP(mon, &descP->ctrlMon) == 0) {
+ SSDBG( descP, ("SOCKET", "socket_down -> controlling process mon\r\n") );
+ } else if (MON_COMP(mon, &descP->closerMon) == 0) {
+ SSDBG( descP, ("SOCKET", "socket_down -> closer mon\r\n") );
+ } else if ((descP->currentWriterP != NULL) &&
+ (MON_COMP(mon, &descP->currentWriter.mon) == 0)) {
+ SSDBG( descP, ("SOCKET", "socket_down -> current writer mon\r\n") );
+ } else if ((descP->currentReaderP != NULL) &&
+ (MON_COMP(mon, &descP->currentReader.mon) == 0)) {
+ SSDBG( descP, ("SOCKET", "socket_down -> current reader mon\r\n") );
+ } else if ((descP->currentAcceptorP != NULL) &&
+ (MON_COMP(mon, &descP->currentAcceptor.mon) == 0)) {
+ SSDBG( descP, ("SOCKET", "socket_down -> current acceptor mon\r\n") );
+ } else {
+ SSDBG( descP, ("SOCKET", "socket_down -> OTHER mon\r\n") );
+ }
+ */
if (compare_pids(env, &descP->ctrlPid, pid)) {
int selectRes;
@@ -15288,14 +15502,23 @@ void socket_down(ErlNifEnv* env,
* we leave it to the stop callback function.
*/
- SSDBG( descP, ("SOCKET", "socket_down -> controlling process exit\r\n") );
+ SSDBG( descP,
+ ("SOCKET", "socket_down -> controlling process exit\r\n") );
descP->state = SOCKET_STATE_CLOSING;
descP->closeLocal = TRUE;
descP->closerPid = *pid;
- descP->closerMon = *mon;
+ descP->closerMon = (ESockMonitor) *mon;
descP->closeRef = MKREF(env); // Do we really need this in this case?
+ /*
+ esock_dbg_printf("DOWN",
+ "[%d] select stop %u,%u,%u,%d\r\n",
+ descP->sock,
+ monP->raw[0], monP->raw[1],
+ monP->raw[2], monP->raw[3]);
+ */
+
selectRes = enif_select(env, descP->sock, (ERL_NIF_SELECT_STOP),
descP, NULL, descP->closeRef);
@@ -15374,7 +15597,10 @@ void socket_down(ErlNifEnv* env,
"\r\n Select Res: %d"
"\r\n Controlling Process: %T"
"\r\n Descriptor: %d"
- "\r\n", selectRes, pid, descP->sock);
+ "\r\n Monitor: %u.%u.%u.%u"
+ "\r\n", selectRes, pid, descP->sock,
+ monP->raw[0], monP->raw[1],
+ monP->raw[2], monP->raw[3]);
}
@@ -15400,6 +15626,14 @@ void socket_down(ErlNifEnv* env,
MUNLOCK(descP->readMtx);
}
+
+ /*
+ esock_dbg_printf("DOWN",
+ "[%d] end %u,%u,%u,%d\r\n",
+ descP->sock,
+ monP->raw[0], monP->raw[1],
+ monP->raw[2], monP->raw[3]);
+ */
SSDBG( descP, ("SOCKET", "socket_down -> done\r\n") );