aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--erts/emulator/nifs/common/socket_int.h8
-rw-r--r--erts/emulator/nifs/common/socket_nif.c412
2 files changed, 329 insertions, 91 deletions
diff --git a/erts/emulator/nifs/common/socket_int.h b/erts/emulator/nifs/common/socket_int.h
index f9246856fa..d89970ecd6 100644
--- a/erts/emulator/nifs/common/socket_int.h
+++ b/erts/emulator/nifs/common/socket_int.h
@@ -216,8 +216,12 @@ extern ERL_NIF_TERM esock_atom_einval;
#define MLOCK(M) enif_mutex_lock((M))
#define MUNLOCK(M) enif_mutex_unlock((M))
-#define MONP(E,D,P,M) enif_monitor_process((E), (D), (P), (M))
-#define DEMONP(E,D,M) enif_demonitor_process((E), (D), (M))
+// #define MONP(S,E,D,P,M) enif_monitor_process((E), (D), (P), (M))
+// #define DEMONP(S,E,D,M) enif_demonitor_process((E), (D), (M))
+#define MONP(S,E,D,P,M) esock_monitor((S), (E), (D), (P), (M))
+#define DEMONP(S,E,D,M) esock_demonitor((S), (E), (D), (M))
+#define MON_INIT(M) esock_monitor_init((M))
+// #define MON_COMP(M1, M2) esock_monitor_compare((M1), (M2))
#define SELECT(E,FD,M,O,P,R) \
if (enif_select((E), (FD), (M), (O), (P), (R)) < 0) \
diff --git a/erts/emulator/nifs/common/socket_nif.c b/erts/emulator/nifs/common/socket_nif.c
index 3ec19c5a60..5d2cfbc10b 100644
--- a/erts/emulator/nifs/common/socket_nif.c
+++ b/erts/emulator/nifs/common/socket_nif.c
@@ -725,9 +725,16 @@ static unsigned long one_value = 1;
((sap)->in4.sin_port) : -1)
+typedef union {
+ ErlNifMonitor mon;
+ uint32_t raw[4];
+} ESockMonitor;
+
+
typedef struct {
ErlNifPid pid; // PID of the requesting process
- ErlNifMonitor mon; // Monitor to the requesting process
+ // ErlNifMonitor mon; Monitor to the requesting process
+ ESockMonitor mon; // Monitor to the requesting process
ERL_NIF_TERM ref; // The (unique) reference (ID) of the request
} SocketRequestor;
@@ -760,7 +767,8 @@ typedef struct {
/* +++ Controller (owner) process +++ */
ErlNifPid ctrlPid;
- ErlNifMonitor ctrlMon;
+ // ErlNifMonitor ctrlMon;
+ ESockMonitor ctrlMon;
/* +++ Write stuff +++ */
ErlNifMutex* writeMtx;
@@ -803,7 +811,8 @@ typedef struct {
/* +++ Close stuff +++ */
ErlNifMutex* closeMtx;
ErlNifPid closerPid;
- ErlNifMonitor closerMon;
+ // ErlNifMonitor closerMon;
+ ESockMonitor closerMon;
ERL_NIF_TERM closeRef;
BOOLEAN_T closeLocal;
@@ -2185,7 +2194,8 @@ static ERL_NIF_TERM acceptor_push(ErlNifEnv* env,
static BOOLEAN_T acceptor_pop(ErlNifEnv* env,
SocketDescriptor* descP,
ErlNifPid* pid,
- ErlNifMonitor* mon,
+ // ErlNifMonitor* mon,
+ ESockMonitor* mon,
ERL_NIF_TERM* ref);
static BOOLEAN_T acceptor_unqueue(ErlNifEnv* env,
SocketDescriptor* descP,
@@ -2201,7 +2211,8 @@ static ERL_NIF_TERM writer_push(ErlNifEnv* env,
static BOOLEAN_T writer_pop(ErlNifEnv* env,
SocketDescriptor* descP,
ErlNifPid* pid,
- ErlNifMonitor* mon,
+ // ErlNifMonitor* mon,
+ ESockMonitor* mon,
ERL_NIF_TERM* ref);
static BOOLEAN_T writer_unqueue(ErlNifEnv* env,
SocketDescriptor* descP,
@@ -2217,7 +2228,8 @@ static ERL_NIF_TERM reader_push(ErlNifEnv* env,
static BOOLEAN_T reader_pop(ErlNifEnv* env,
SocketDescriptor* descP,
ErlNifPid* pid,
- ErlNifMonitor* mon,
+ // ErlNifMonitor* mon,
+ ESockMonitor* mon,
ERL_NIF_TERM* ref);
static BOOLEAN_T reader_unqueue(ErlNifEnv* env,
SocketDescriptor* descP,
@@ -2230,8 +2242,27 @@ static void qpush(SocketRequestQueue* q,
SocketRequestQueueElement* e);
static SocketRequestQueueElement* qpop(SocketRequestQueue* q);
static BOOLEAN_T qunqueue(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ const char* slogan,
SocketRequestQueue* q,
const ErlNifPid* pid);
+
+static int esock_monitor(const char* slogan,
+ ErlNifEnv* env,
+ SocketDescriptor* descP,
+ const ErlNifPid* pid,
+ ESockMonitor* mon);
+static int esock_demonitor(const char* slogan,
+ ErlNifEnv* env,
+ SocketDescriptor* descP,
+ ESockMonitor* monP);
+static void esock_monitor_init(ESockMonitor* mon);
+/*
+static int esock_monitor_compare(const ErlNifMonitor* mon1,
+ const ESockMonitor* mon2);
+*/
+
+
/*
#if defined(HAVE_SYS_UN_H) || defined(SO_BINDTODEVICE)
static size_t my_strnlen(const char *s, size_t maxlen);
@@ -2282,6 +2313,7 @@ static BOOLEAN_T extract_iow(ErlNifEnv* env,
static int on_load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info);
+
#if HAVE_IN6
# if ! defined(HAVE_IN6ADDR_ANY) || ! HAVE_IN6ADDR_ANY
# if HAVE_DECL_IN6ADDR_ANY_INIT
@@ -2786,7 +2818,6 @@ ERL_NIF_TERM nopen(ErlNifEnv* env,
res = enif_make_resource(env, descP);
enif_release_resource(descP); // We should really store a reference ...
-
/* Keep track of the creator
* This should not be a problem but just in case
* the *open* function is used with the wrong kind
@@ -2795,9 +2826,10 @@ ERL_NIF_TERM nopen(ErlNifEnv* env,
if (enif_self(env, &descP->ctrlPid) == NULL)
return esock_make_error(env, atom_exself);
- if (MONP(env, descP,
+ if (MONP("nopen -> ctrl",
+ env, descP,
&descP->ctrlPid,
- &descP->ctrlMon) > 0)
+ &descP->ctrlMon) != 0)
return esock_make_error(env, atom_exmon);
@@ -3396,9 +3428,10 @@ ERL_NIF_TERM naccept_listening(ErlNifEnv* env,
SSDBG( descP, ("SOCKET", "naccept_listening -> would block\r\n") );
descP->currentAcceptor.pid = caller;
- if (MONP(env, descP,
+ if (MONP("naccept_listening -> current acceptor",
+ env, descP,
&descP->currentAcceptor.pid,
- &descP->currentAcceptor.mon) > 0)
+ &descP->currentAcceptor.mon) != 0)
return esock_make_error(env, atom_exmon);
descP->currentAcceptor.ref = enif_make_copy(descP->env, ref);
@@ -3473,9 +3506,10 @@ ERL_NIF_TERM naccept_listening(ErlNifEnv* env,
enif_release_resource(accDescP); // We should really store a reference ...
accDescP->ctrlPid = caller;
- if (MONP(env, accDescP,
+ if (MONP("naccept_listening -> ctrl",
+ env, accDescP,
&accDescP->ctrlPid,
- &accDescP->ctrlMon) > 0) {
+ &accDescP->ctrlMon) != 0) {
sock_close(accSock);
return esock_make_error(env, atom_exmon);
}
@@ -3589,7 +3623,8 @@ ERL_NIF_TERM naccept_accepting(ErlNifEnv* env,
SSDBG( descP, ("SOCKET", "naccept_accepting -> accept success\r\n") );
- DEMONP(env, descP, &descP->currentAcceptor.mon);
+ DEMONP("naccept_accepting -> current acceptor",
+ env, descP, &descP->currentAcceptor.mon);
if ((accEvent = sock_create_event(accSock)) == INVALID_EVENT) {
save_errno = sock_errno();
@@ -3611,9 +3646,10 @@ ERL_NIF_TERM naccept_accepting(ErlNifEnv* env,
enif_release_resource(accDescP); // We should really store a reference ...
accDescP->ctrlPid = caller;
- if (MONP(env, accDescP,
+ if (MONP("naccept_accepting -> ctrl",
+ env, accDescP,
&accDescP->ctrlPid,
- &accDescP->ctrlMon) > 0) {
+ &accDescP->ctrlMon) != 0) {
sock_close(accSock);
return esock_make_error(env, atom_exmon);
}
@@ -4728,7 +4764,11 @@ ERL_NIF_TERM nclose(ErlNifEnv* env,
int type = descP->type;
int protocol = descP->protocol;
- SSDBG( descP, ("SOCKET", "nclose -> [%d] entry\r\n", descP->sock) );
+ SSDBG( descP, ("SOCKET", "nclose -> [%d] entry (0x%lX, 0x%lX, 0x%lX)\r\n",
+ descP->sock,
+ descP->currentWriterP,
+ descP->currentReaderP,
+ descP->currentAcceptorP) );
MLOCK(descP->closeMtx);
@@ -4753,11 +4793,18 @@ ERL_NIF_TERM nclose(ErlNifEnv* env,
/* Monitor the caller, since we should complete this operation even if
* the caller dies (for whatever reason).
+ *
+ * <KOLLA>
+ *
+ * Can we actiually use this for anything?
+ *
+ * </KOLLA>
*/
- if (MONP(env, descP,
- &descP->closerPid,
- &descP->closerMon) > 0) {
+ if (MONP("nclose -> closer",
+ env, descP,
+ &descP->closerPid,
+ &descP->closerMon) != 0) {
MUNLOCK(descP->closeMtx);
return esock_make_error(env, atom_exmon);
}
@@ -4778,6 +4825,7 @@ ERL_NIF_TERM nclose(ErlNifEnv* env,
SSDBG( descP,
("SOCKET", "nclose -> [%d] stop was called\r\n", descP->sock) );
dec_socket(domain, type, protocol);
+ DEMONP("nclose -> closer", env, descP, &descP->closerMon);
reply = esock_atom_ok;
} else if (selectRes & ERL_NIF_SELECT_STOP_SCHEDULED) {
/* The stop callback function has been *scheduled* which means that we
@@ -4803,6 +4851,9 @@ ERL_NIF_TERM nclose(ErlNifEnv* env,
* </KOLLA>
*/
+ // No point in having this?
+ DEMONP("nclose -> closer", env, descP, &descP->closerMon);
+
reason = MKT2(env, atom_select, MKI(env, selectRes));
reply = esock_make_error(env, reason);
}
@@ -5160,7 +5211,8 @@ ERL_NIF_TERM nsetopt_otp_ctrl_proc(ErlNifEnv* env,
ERL_NIF_TERM eVal)
{
ErlNifPid caller, newCtrlPid;
- ErlNifMonitor newCtrlMon;
+ // ErlNifMonitor newCtrlMon;
+ ESockMonitor newCtrlMon;
int xres;
SSDBG( descP,
@@ -5183,20 +5235,22 @@ ERL_NIF_TERM nsetopt_otp_ctrl_proc(ErlNifEnv* env,
return esock_make_error(env, esock_atom_einval);
}
- if ((xres = MONP(env, descP, &newCtrlPid, &newCtrlMon)) != 0) {
+ if ((xres = MONP("nsetopt_otp_ctrl_proc -> (new) ctrl",
+ env, descP, &newCtrlPid, &newCtrlMon)) != 0) {
esock_warning_msg("Failed monitor %d) (new) controlling process\r\n", xres);
return esock_make_error(env, esock_atom_einval);
}
- if ((xres = DEMONP(env, descP, &descP->ctrlMon)) != 0) {
+ if ((xres = DEMONP("nsetopt_otp_ctrl_proc -> (old) ctrl",
+ env, descP, &descP->ctrlMon)) != 0) {
esock_warning_msg("Failed demonitor (%d) "
"old controlling process %T (%T)\r\n",
xres, descP->ctrlPid, descP->ctrlMon);
}
descP->ctrlPid = newCtrlPid;
- descP->ctrlMon = newCtrlMon;
-
+ descP->ctrlMon = newCtrlMon;
+
SSDBG( descP, ("SOCKET", "nsetopt_otp_ctrl_proc -> done\r\n") );
return esock_atom_ok;
@@ -11137,6 +11191,8 @@ ERL_NIF_TERM ncancel_accept_current(ErlNifEnv* env,
SSDBG( descP, ("SOCKET", "ncancel_accept_current -> entry\r\n") );
+ DEMONP("ncancel_accept_current -> current acceptor",
+ env, descP, &descP->currentAcceptor.mon);
res = ncancel_read_select(env, descP, descP->currentAcceptor.ref);
SSDBG( descP, ("SOCKET", "ncancel_accept_current -> cancel res: %T\r\n", res) );
@@ -11255,6 +11311,8 @@ ERL_NIF_TERM ncancel_send_current(ErlNifEnv* env,
SSDBG( descP, ("SOCKET", "ncancel_send_current -> entry\r\n") );
+ DEMONP("ncancel_recv_current -> current writer",
+ env, descP, &descP->currentWriter.mon);
res = ncancel_write_select(env, descP, descP->currentWriter.ref);
SSDBG( descP, ("SOCKET", "ncancel_send_current -> cancel res: %T\r\n", res) );
@@ -11371,6 +11429,8 @@ ERL_NIF_TERM ncancel_recv_current(ErlNifEnv* env,
SSDBG( descP, ("SOCKET", "ncancel_recv_current -> entry\r\n") );
+ DEMONP("ncancel_recv_current -> current reader",
+ env, descP, &descP->currentReader.mon);
res = ncancel_read_select(env, descP, descP->currentReader.ref);
SSDBG( descP, ("SOCKET", "ncancel_recv_current -> cancel res: %T\r\n", res) );
@@ -11568,7 +11628,8 @@ ERL_NIF_TERM send_check_result(ErlNifEnv* env,
cnt_inc(&descP->writePkgCnt, 1);
cnt_inc(&descP->writeByteCnt, written);
if (descP->currentWriterP != NULL)
- DEMONP(env, descP, &descP->currentWriter.mon);
+ DEMONP("send_check_result -> current writer",
+ env, descP, &descP->currentWriter.mon);
SSDBG( descP,
("SOCKET", "send_check_result -> "
@@ -11607,7 +11668,8 @@ ERL_NIF_TERM send_check_result(ErlNifEnv* env,
if ((saveErrno != EAGAIN) && (saveErrno != EINTR)) {
ErlNifPid pid;
- ErlNifMonitor mon;
+ // ErlNifMonitor mon;
+ ESockMonitor mon;
ERL_NIF_TERM ref, res;
/*
@@ -11622,13 +11684,15 @@ ERL_NIF_TERM send_check_result(ErlNifEnv* env,
res = esock_make_error_errno(env, saveErrno);
if (descP->currentWriterP != NULL) {
- DEMONP(env, descP, &descP->currentWriter.mon);
+ DEMONP("send_check_result -> current writer",
+ env, descP, &descP->currentWriter.mon);
while (writer_pop(env, descP, &pid, &mon, &ref)) {
SSDBG( descP,
("SOCKET", "send_check_result -> abort %T\r\n", pid) );
send_msg_nif_abort(env, ref, res, &pid);
- DEMONP(env, descP, &mon);
+ DEMONP("send_check_result -> pop'ed writer",
+ env, descP, &mon);
}
}
@@ -11660,9 +11724,10 @@ ERL_NIF_TERM send_check_result(ErlNifEnv* env,
if (enif_self(env, &caller) == NULL)
return esock_make_error(env, atom_exself);
descP->currentWriter.pid = caller;
- if (MONP(env, descP,
+ if (MONP("send_check_result -> current writer",
+ env, descP,
&descP->currentWriter.pid,
- &descP->currentWriter.mon) > 0)
+ &descP->currentWriter.mon) != 0)
return esock_make_error(env, atom_exmon);
descP->currentWriter.ref = enif_make_copy(descP->env, sendRef);
descP->currentWriterP = &descP->currentWriter;
@@ -11747,9 +11812,10 @@ char* recv_init_current_reader(ErlNifEnv* env,
return str_exself;
descP->currentReader.pid = caller;
- if (MONP(env, descP,
+ if (MONP("recv_init_current_reader -> current reader",
+ env, descP,
&descP->currentReader.pid,
- &descP->currentReader.mon) > 0) {
+ &descP->currentReader.mon) != 0) {
return str_exmon;
}
descP->currentReader.ref = enif_make_copy(descP->env, recvRef);
@@ -11774,7 +11840,8 @@ ERL_NIF_TERM recv_update_current_reader(ErlNifEnv* env,
{
if (descP->currentReaderP != NULL) {
- DEMONP(env, descP, &descP->currentReader.mon);
+ DEMONP("recv_update_current_reader -> current reader",
+ env, descP, &descP->currentReader.mon);
if (reader_pop(env, descP,
&descP->currentReader.pid,
@@ -11822,16 +11889,19 @@ void recv_error_current_reader(ErlNifEnv* env,
{
if (descP->currentReaderP != NULL) {
ErlNifPid pid;
- ErlNifMonitor mon;
+ // ErlNifMonitor mon;
+ ESockMonitor mon;
ERL_NIF_TERM ref;
- DEMONP(env, descP, &descP->currentReader.mon);
+ DEMONP("recv_error_current_reader -> current reader",
+ env, descP, &descP->currentReader.mon);
while (reader_pop(env, descP, &pid, &mon, &ref)) {
SSDBG( descP,
("SOCKET", "recv_error_current_reader -> abort %T\r\n", pid) );
send_msg_nif_abort(env, ref, reason, &pid);
- DEMONP(env, descP, &mon);
+ DEMONP("recv_error_current_reader -> pop'ed reader",
+ env, descP, &mon);
}
}
}
@@ -13975,6 +14045,11 @@ SocketDescriptor* alloc_descriptor(SOCKET sock, HANDLE event)
descP->sock = sock;
descP->event = event;
+ MON_INIT(&descP->currentWriter.mon);
+ MON_INIT(&descP->currentReader.mon);
+ MON_INIT(&descP->currentAcceptor.mon);
+ MON_INIT(&descP->ctrlMon);
+ MON_INIT(&descP->closerMon);
}
return descP;
@@ -14593,7 +14668,8 @@ ERL_NIF_TERM acceptor_push(ErlNifEnv* env,
reqP->pid = pid;
reqP->ref = enif_make_copy(descP->env, ref);
- if (MONP(env, descP, &pid, &reqP->mon) > 0) {
+ if (MONP("acceptor_push -> acceptor request",
+ env, descP, &pid, &reqP->mon) != 0) {
FREE(reqP);
return esock_make_error(env, atom_exmon);
}
@@ -14613,7 +14689,8 @@ static
BOOLEAN_T acceptor_pop(ErlNifEnv* env,
SocketDescriptor* descP,
ErlNifPid* pid,
- ErlNifMonitor* mon,
+ // ErlNifMonitor* mon,
+ ESockMonitor* mon,
ERL_NIF_TERM* ref)
{
SocketRequestQueueElement* e = qpop(&descP->acceptorsQ);
@@ -14644,7 +14721,8 @@ BOOLEAN_T acceptor_unqueue(ErlNifEnv* env,
SocketDescriptor* descP,
const ErlNifPid* pid)
{
- return qunqueue(env, &descP->acceptorsQ, pid);
+ return qunqueue(env, descP, "qunqueue -> waiting acceptor",
+ &descP->acceptorsQ, pid);
}
@@ -14679,7 +14757,8 @@ ERL_NIF_TERM writer_push(ErlNifEnv* env,
reqP->pid = pid;
reqP->ref = enif_make_copy(descP->env, ref);
- if (MONP(env, descP, &pid, &reqP->mon) > 0) {
+ if (MONP("writer_push -> writer request",
+ env, descP, &pid, &reqP->mon) != 0) {
FREE(reqP);
return esock_make_error(env, atom_exmon);
}
@@ -14699,7 +14778,8 @@ static
BOOLEAN_T writer_pop(ErlNifEnv* env,
SocketDescriptor* descP,
ErlNifPid* pid,
- ErlNifMonitor* mon,
+ // ErlNifMonitor* mon,
+ ESockMonitor* mon,
ERL_NIF_TERM* ref)
{
SocketRequestQueueElement* e = qpop(&descP->writersQ);
@@ -14730,7 +14810,8 @@ BOOLEAN_T writer_unqueue(ErlNifEnv* env,
SocketDescriptor* descP,
const ErlNifPid* pid)
{
- return qunqueue(env, &descP->writersQ, pid);
+ return qunqueue(env, descP, "qunqueue -> waiting writer",
+ &descP->writersQ, pid);
}
@@ -14765,7 +14846,8 @@ ERL_NIF_TERM reader_push(ErlNifEnv* env,
reqP->pid = pid;
reqP->ref = enif_make_copy(descP->env, ref);
- if (MONP(env, descP, &pid, &reqP->mon) > 0) {
+ if (MONP("reader_push -> reader request",
+ env, descP, &pid, &reqP->mon) != 0) {
FREE(reqP);
return esock_make_error(env, atom_exmon);
}
@@ -14785,7 +14867,8 @@ static
BOOLEAN_T reader_pop(ErlNifEnv* env,
SocketDescriptor* descP,
ErlNifPid* pid,
- ErlNifMonitor* mon,
+ // ErlNifMonitor* mon,
+ ESockMonitor* mon,
ERL_NIF_TERM* ref)
{
SocketRequestQueueElement* e = qpop(&descP->readersQ);
@@ -14816,7 +14899,8 @@ BOOLEAN_T reader_unqueue(ErlNifEnv* env,
SocketDescriptor* descP,
const ErlNifPid* pid)
{
- return qunqueue(env, &descP->readersQ, pid);
+ return qunqueue(env, descP, "qunqueue -> waiting reader",
+ &descP->readersQ, pid);
}
@@ -14880,6 +14964,8 @@ SocketRequestQueueElement* qpop(SocketRequestQueue* q)
static
BOOLEAN_T qunqueue(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ const char* slogan,
SocketRequestQueue* q,
const ErlNifPid* pid)
{
@@ -14892,6 +14978,8 @@ BOOLEAN_T qunqueue(ErlNifEnv* env,
/* We have a match */
+ DEMONP(slogan, env, descP, &e->data.mon);
+
if (p != NULL) {
/* Not the first, but could be the last */
if (q->last == e) {
@@ -14966,6 +15054,97 @@ void cnt_dec(uint32_t* cnt, uint32_t dec)
/* ----------------------------------------------------------------------
+ * M o n i t o r W r a p p e r F u n c t i o n s
+ * ----------------------------------------------------------------------
+ */
+
+static
+int esock_monitor(const char* slogan,
+ ErlNifEnv* env,
+ SocketDescriptor* descP,
+ const ErlNifPid* pid,
+ ESockMonitor* monP)
+{
+ int res;
+
+ SSDBG( descP, ("SOCKET", "[%d] %s: try monitor", descP->sock, slogan) );
+ // esock_dbg_printf("MONP", "[%d] %s\r\n", descP->sock, slogan);
+ res = enif_monitor_process(env, descP, pid, &monP->mon);
+
+ if (res != 0) {
+ SSDBG( descP, ("SOCKET", "[%d] monitor failed: %d", descP->sock, res) );
+ // esock_dbg_printf("MONP", "[%d] failed: %d\r\n", descP->sock, res);
+ } /* else {
+ esock_dbg_printf("MONP",
+ "[%d] success: "
+ "%u,%u,%u,%u\r\n",
+ descP->sock,
+ monP->raw[0], monP->raw[1],
+ monP->raw[2], monP->raw[3]);
+ } */
+
+ return res;
+}
+
+
+static
+int esock_demonitor(const char* slogan,
+ ErlNifEnv* env,
+ SocketDescriptor* descP,
+ ESockMonitor* monP)
+{
+ int res;
+
+ SSDBG( descP, ("SOCKET", "[%d] %s: try demonitor\r\n", descP->sock, slogan) );
+ /*
+ esock_dbg_printf("DEMONP", "[%d] %s: %u,%u,%u,%u\r\n",
+ descP->sock, slogan,
+ monP->raw[0], monP->raw[1],
+ monP->raw[2], monP->raw[3]);
+ */
+
+ res = enif_demonitor_process(env, descP, &monP->mon);
+
+ if (res == 0) {
+ esock_monitor_init(monP);
+ } else {
+ SSDBG( descP,
+ ("SOCKET", "[%d] demonitor failed: %d\r\n", descP->sock, res) );
+ /*
+ esock_dbg_printf("DEMONP", "[%d] failed: %d\r\n", descP->sock, res);
+ */
+ }
+
+ return res;
+}
+
+
+static
+void esock_monitor_init(ESockMonitor* monP)
+{
+ int i;
+
+ /*
+ * UGLY,
+ * but since we don't have a ERL_NIF_MONITOR_NULL,
+ * this will have to do for now...
+ */
+ for (i = 0; i < 4; i++)
+ monP->raw[i] = 0;
+
+}
+
+/*
+static
+int esock_monitor_compare(const ErlNifMonitor* mon1,
+ const ESockMonitor* mon2)
+{
+ return enif_compare_monitors(mon1, &mon2->mon);
+}
+*/
+
+
+/* ----------------------------------------------------------------------
* C a l l b a c k F u n c t i o n s
* ----------------------------------------------------------------------
*/
@@ -15015,13 +15194,15 @@ static
void socket_stop(ErlNifEnv* env, void* obj, int fd, int is_direct_call)
{
SocketDescriptor* descP = (SocketDescriptor*) obj;
- int dres;
-
+
+ /*
+ esock_dbg_printf("STOP", "[%d] begin\r\n", descP->sock);
+ */
+
SSDBG( descP,
- ("SOCKET", "socket_stop -> entry when"
- "\r\n sock: %d (%d)"
- "\r\n Is Direct Call: %s"
- "\r\n", descP->sock, fd, B2S(is_direct_call)) );
+ ("SOCKET", "socket_stop -> entry when %s"
+ "\r\n sock: %d (%d)"
+ "\r\n", ((is_direct_call) ? "called" : "scheduled"), descP->sock, fd) );
MLOCK(descP->writeMtx);
MLOCK(descP->readMtx);
@@ -15033,8 +15214,7 @@ void socket_stop(ErlNifEnv* env, void* obj, int fd, int is_direct_call)
descP->state = SOCKET_STATE_CLOSING; // Just in case...???
descP->isReadable = FALSE;
descP->isWritable = FALSE;
-
-
+
/* We should check that we actually have a monitor.
* This *should* be done with a "NULL" monitor value,
* which there currently is none...
@@ -15042,24 +15222,19 @@ void socket_stop(ErlNifEnv* env, void* obj, int fd, int is_direct_call)
* its no point in demonitor. Also, we not actually have
* a monitor in that case...
*/
- SSDBG( descP,
- ("SOCKET",
- "socket_stop -> demonitor (maybe) controlling process (%T)\r\n",
- descP->ctrlPid) );
- dres = DEMONP(env, descP, &descP->ctrlMon);
- SSDBG( descP, ("SOCKET", "socket_stop -> demonitor result: %d\r\n", dres) );
-
-
+ DEMONP("socket_stop -> ctrl", env, descP, &descP->ctrlMon);
+
+ /*
+ esock_dbg_printf("STOP", "[%d] maybe handle current writer (0x%lX)\r\n",
+ descP->sock, descP->currentReaderP);
+ */
if (descP->currentWriterP != NULL) {
/* We have a (current) writer and *may* therefor also have
* writers waiting.
*/
- SSDBG( descP,
- ("SOCKET",
- "socket_stop -> demonitor (maybe) current writer: %T, %T\r\n",
- descP->currentWriter.pid, descP->currentWriter.ref) );
- DEMONP(env, descP, &descP->currentWriter.mon);
+ DEMONP("socket_stop -> current writer",
+ env, descP, &descP->currentWriter.mon);
SSDBG( descP, ("SOCKET", "socket_stop -> handle current writer\r\n") );
if (!compare_pids(env,
@@ -15082,18 +15257,19 @@ void socket_stop(ErlNifEnv* env, void* obj, int fd, int is_direct_call)
SSDBG( descP, ("SOCKET", "socket_stop -> handle waiting writer(s)\r\n") );
inform_waiting_procs(env, descP, &descP->writersQ, TRUE, atom_closed);
}
-
+
+ /*
+ esock_dbg_printf("STOP", "[%d] maybe handle current reader (0x%lX)\r\n",
+ descP->sock, descP->currentReaderP);
+ */
if (descP->currentReaderP != NULL) {
/* We have a (current) reader and *may* therefor also have
* readers waiting.
*/
- SSDBG( descP,
- ("SOCKET",
- "socket_stop -> demonitor (maybe) current reader: %T, %T\r\n",
- descP->currentReader.pid, descP->currentReader.ref) );
- DEMONP(env, descP, &descP->currentReader.mon);
+ DEMONP("socket_stop -> current reader",
+ env, descP, &descP->currentReader.mon);
SSDBG( descP, ("SOCKET", "socket_stop -> handle current reader\r\n") );
if (!compare_pids(env,
@@ -15116,17 +15292,18 @@ void socket_stop(ErlNifEnv* env, void* obj, int fd, int is_direct_call)
SSDBG( descP, ("SOCKET", "socket_stop -> handle waiting reader(s)\r\n") );
inform_waiting_procs(env, descP, &descP->readersQ, TRUE, atom_closed);
}
-
+
+ /*
+ esock_dbg_printf("STOP", "[%d] maybe handle current acceptor (0x%lX)\r\n",
+ descP->sock, descP->currentReaderP);
+ */
if (descP->currentAcceptorP != NULL) {
/* We have a (current) acceptor and *may* therefor also have
* acceptors waiting.
*/
- SSDBG( descP,
- ("SOCKET",
- "socket_stop -> demonitor (maybe) current acceptor: %T, %T\r\n",
- descP->currentAcceptor.pid, descP->currentAcceptor.ref) );
- DEMONP(env, descP, &descP->currentAcceptor.mon);
+ DEMONP("socket_stop -> current acceptor",
+ env, descP, &descP->currentAcceptor.mon);
SSDBG( descP, ("SOCKET", "socket_stop -> handle current acceptor\r\n") );
if (!compare_pids(env,
@@ -15178,7 +15355,8 @@ void socket_stop(ErlNifEnv* env, void* obj, int fd, int is_direct_call)
send_msg(env, MKT2(env, atom_close, descP->closeRef), &descP->closerPid);
- DEMONP(env, descP, &descP->closerMon);
+ DEMONP("socket_stop -> closer",
+ env, descP, &descP->closerMon);
} else {
@@ -15194,6 +15372,10 @@ void socket_stop(ErlNifEnv* env, void* obj, int fd, int is_direct_call)
if (!is_direct_call) {
+ if (descP->closeLocal) {
+ DEMONP("socket_stop -> closer",
+ env, descP, &descP->closerMon);
+ }
descP->sock = INVALID_SOCKET;
descP->event = INVALID_EVENT;
descP->state = SOCKET_STATE_CLOSED;
@@ -15206,6 +15388,10 @@ void socket_stop(ErlNifEnv* env, void* obj, int fd, int is_direct_call)
MUNLOCK(descP->readMtx);
MUNLOCK(descP->writeMtx);
+ /*
+ esock_dbg_printf("STOP", "[%d] end\r\n", descP->sock);
+ */
+
SSDBG( descP,
("SOCKET", "socket_stop -> done (%d, %d)\r\n", descP->sock, fd) );
@@ -15245,7 +15431,8 @@ void inform_waiting_procs(ErlNifEnv* env,
currentP->data.ref,
reason,
&currentP->data.pid)) );
- DEMONP(env, descP, &currentP->data.mon);
+ DEMONP("inform_waiting_procs -> current 'request'",
+ env, descP, &currentP->data.mon);
nextP = currentP->nextP;
if (free) FREE(currentP);
currentP = nextP;
@@ -15270,16 +15457,43 @@ void socket_down(ErlNifEnv* env,
const ErlNifMonitor* mon)
{
SocketDescriptor* descP = (SocketDescriptor*) obj;
+ ESockMonitor* monP = (ESockMonitor*) mon;
SSDBG( descP, ("SOCKET", "socket_down -> entry with"
- "\r\n sock: %d"
- "\r\n pid: %T"
- "\r\n Closed: %s"
- "\r\n Closing: %s"
+ "\r\n sock: %d"
+ "\r\n pid: %T"
+ "\r\n Close: %s (%s)"
"\r\n",
descP->sock, *pid,
- B2S(IS_CLOSED(descP)), B2S(IS_CLOSING(descP))) );
+ B2S(IS_CLOSED(descP)),
+ B2S(IS_CLOSING(descP))) );
+
+ /*
+ esock_dbg_printf("DOWN",
+ "[%d] begin %u,%u,%u,%d\r\n",
+ descP->sock,
+ monP->raw[0], monP->raw[1],
+ monP->raw[2], monP->raw[3]);
+ */
+ /*
+ if (MON_COMP(mon, &descP->ctrlMon) == 0) {
+ SSDBG( descP, ("SOCKET", "socket_down -> controlling process mon\r\n") );
+ } else if (MON_COMP(mon, &descP->closerMon) == 0) {
+ SSDBG( descP, ("SOCKET", "socket_down -> closer mon\r\n") );
+ } else if ((descP->currentWriterP != NULL) &&
+ (MON_COMP(mon, &descP->currentWriter.mon) == 0)) {
+ SSDBG( descP, ("SOCKET", "socket_down -> current writer mon\r\n") );
+ } else if ((descP->currentReaderP != NULL) &&
+ (MON_COMP(mon, &descP->currentReader.mon) == 0)) {
+ SSDBG( descP, ("SOCKET", "socket_down -> current reader mon\r\n") );
+ } else if ((descP->currentAcceptorP != NULL) &&
+ (MON_COMP(mon, &descP->currentAcceptor.mon) == 0)) {
+ SSDBG( descP, ("SOCKET", "socket_down -> current acceptor mon\r\n") );
+ } else {
+ SSDBG( descP, ("SOCKET", "socket_down -> OTHER mon\r\n") );
+ }
+ */
if (compare_pids(env, &descP->ctrlPid, pid)) {
int selectRes;
@@ -15288,14 +15502,23 @@ void socket_down(ErlNifEnv* env,
* we leave it to the stop callback function.
*/
- SSDBG( descP, ("SOCKET", "socket_down -> controlling process exit\r\n") );
+ SSDBG( descP,
+ ("SOCKET", "socket_down -> controlling process exit\r\n") );
descP->state = SOCKET_STATE_CLOSING;
descP->closeLocal = TRUE;
descP->closerPid = *pid;
- descP->closerMon = *mon;
+ descP->closerMon = (ESockMonitor) *mon;
descP->closeRef = MKREF(env); // Do we really need this in this case?
+ /*
+ esock_dbg_printf("DOWN",
+ "[%d] select stop %u,%u,%u,%d\r\n",
+ descP->sock,
+ monP->raw[0], monP->raw[1],
+ monP->raw[2], monP->raw[3]);
+ */
+
selectRes = enif_select(env, descP->sock, (ERL_NIF_SELECT_STOP),
descP, NULL, descP->closeRef);
@@ -15374,7 +15597,10 @@ void socket_down(ErlNifEnv* env,
"\r\n Select Res: %d"
"\r\n Controlling Process: %T"
"\r\n Descriptor: %d"
- "\r\n", selectRes, pid, descP->sock);
+ "\r\n Monitor: %u.%u.%u.%u"
+ "\r\n", selectRes, pid, descP->sock,
+ monP->raw[0], monP->raw[1],
+ monP->raw[2], monP->raw[3]);
}
@@ -15400,6 +15626,14 @@ void socket_down(ErlNifEnv* env,
MUNLOCK(descP->readMtx);
}
+
+ /*
+ esock_dbg_printf("DOWN",
+ "[%d] end %u,%u,%u,%d\r\n",
+ descP->sock,
+ monP->raw[0], monP->raw[1],
+ monP->raw[2], monP->raw[3]);
+ */
SSDBG( descP, ("SOCKET", "socket_down -> done\r\n") );