aboutsummaryrefslogtreecommitdiffstats
path: root/erts/emulator/nifs/common/socket_nif.c
diff options
context:
space:
mode:
authorMicael Karlberg <[email protected]>2018-10-18 17:24:19 +0200
committerMicael Karlberg <[email protected]>2018-10-18 17:24:19 +0200
commita9de37dd6d2d15aa5d569e952f90c16b5e6a3ff5 (patch)
tree7702b7a519fc8bdf6e3ce17c7e2bdd19e8743e2f /erts/emulator/nifs/common/socket_nif.c
parentff3d31f7d102d68a585f8ce636753bf48b9553ab (diff)
parent3c4c36587df5a847dd6f03011a4ecb76d0b70b40 (diff)
downloadotp-a9de37dd6d2d15aa5d569e952f90c16b5e6a3ff5.tar.gz
otp-a9de37dd6d2d15aa5d569e952f90c16b5e6a3ff5.tar.bz2
otp-a9de37dd6d2d15aa5d569e952f90c16b5e6a3ff5.zip
Merge branch 'bmk/20181004/nififying_inet_rework_test_suite/OTP-14831' into bmk/20180918/nififying_inet/OTP-14831
Diffstat (limited to 'erts/emulator/nifs/common/socket_nif.c')
-rw-r--r--erts/emulator/nifs/common/socket_nif.c587
1 files changed, 515 insertions, 72 deletions
diff --git a/erts/emulator/nifs/common/socket_nif.c b/erts/emulator/nifs/common/socket_nif.c
index 4544604f28..a137692d68 100644
--- a/erts/emulator/nifs/common/socket_nif.c
+++ b/erts/emulator/nifs/common/socket_nif.c
@@ -394,7 +394,18 @@ static void (*esock_sctp_freepaddrs)(struct sockaddr *addrs) = NULL;
#define SOCKET_STATE_ACCEPTING (SOCKET_STATE_LISTENING | SOCKET_FLAG_ACC)
#define SOCKET_STATE_CLOSING (SOCKET_FLAG_CLOSE)
-#define IS_OPEN(d) \
+#define IS_CLOSED(d) \
+ ((d)->state == SOCKET_STATE_CLOSED)
+
+/*
+#define IS_STATE(d, f) \
+ (((d)->state & (f)) == (f))
+*/
+
+#define IS_CLOSING(d) \
+ (((d)->state & SOCKET_STATE_CLOSING) == SOCKET_STATE_CLOSING)
+
+#define IS_OPEN(d) \
(((d)->state & SOCKET_FLAG_OPEN) == SOCKET_FLAG_OPEN)
#define IS_CONNECTED(d) \
@@ -723,9 +734,16 @@ static unsigned long one_value = 1;
((sap)->in4.sin_port) : -1)
+typedef union {
+ ErlNifMonitor mon;
+ uint32_t raw[4];
+} ESockMonitor;
+
+
typedef struct {
ErlNifPid pid; // PID of the requesting process
- ErlNifMonitor mon; // Monitor to the requesting process
+ // ErlNifMonitor mon; Monitor to the requesting process
+ ESockMonitor mon; // Monitor to the requesting process
ERL_NIF_TERM ref; // The (unique) reference (ID) of the request
} SocketRequestor;
@@ -758,7 +776,8 @@ typedef struct {
/* +++ Controller (owner) process +++ */
ErlNifPid ctrlPid;
- ErlNifMonitor ctrlMon;
+ // ErlNifMonitor ctrlMon;
+ ESockMonitor ctrlMon;
/* +++ Write stuff +++ */
ErlNifMutex* writeMtx;
@@ -801,7 +820,8 @@ typedef struct {
/* +++ Close stuff +++ */
ErlNifMutex* closeMtx;
ErlNifPid closerPid;
- ErlNifMonitor closerMon;
+ // ErlNifMonitor closerMon;
+ ESockMonitor closerMon;
ERL_NIF_TERM closeRef;
BOOLEAN_T closeLocal;
@@ -2196,7 +2216,8 @@ static ERL_NIF_TERM acceptor_push(ErlNifEnv* env,
static BOOLEAN_T acceptor_pop(ErlNifEnv* env,
SocketDescriptor* descP,
ErlNifPid* pid,
- ErlNifMonitor* mon,
+ // ErlNifMonitor* mon,
+ ESockMonitor* mon,
ERL_NIF_TERM* ref);
static BOOLEAN_T acceptor_unqueue(ErlNifEnv* env,
SocketDescriptor* descP,
@@ -2212,7 +2233,8 @@ static ERL_NIF_TERM writer_push(ErlNifEnv* env,
static BOOLEAN_T writer_pop(ErlNifEnv* env,
SocketDescriptor* descP,
ErlNifPid* pid,
- ErlNifMonitor* mon,
+ // ErlNifMonitor* mon,
+ ESockMonitor* mon,
ERL_NIF_TERM* ref);
static BOOLEAN_T writer_unqueue(ErlNifEnv* env,
SocketDescriptor* descP,
@@ -2228,7 +2250,8 @@ static ERL_NIF_TERM reader_push(ErlNifEnv* env,
static BOOLEAN_T reader_pop(ErlNifEnv* env,
SocketDescriptor* descP,
ErlNifPid* pid,
- ErlNifMonitor* mon,
+ // ErlNifMonitor* mon,
+ ESockMonitor* mon,
ERL_NIF_TERM* ref);
static BOOLEAN_T reader_unqueue(ErlNifEnv* env,
SocketDescriptor* descP,
@@ -2241,8 +2264,27 @@ static void qpush(SocketRequestQueue* q,
SocketRequestQueueElement* e);
static SocketRequestQueueElement* qpop(SocketRequestQueue* q);
static BOOLEAN_T qunqueue(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ const char* slogan,
SocketRequestQueue* q,
const ErlNifPid* pid);
+
+static int esock_monitor(const char* slogan,
+ ErlNifEnv* env,
+ SocketDescriptor* descP,
+ const ErlNifPid* pid,
+ ESockMonitor* mon);
+static int esock_demonitor(const char* slogan,
+ ErlNifEnv* env,
+ SocketDescriptor* descP,
+ ESockMonitor* monP);
+static void esock_monitor_init(ESockMonitor* mon);
+/*
+static int esock_monitor_compare(const ErlNifMonitor* mon1,
+ const ESockMonitor* mon2);
+*/
+
+
/*
#if defined(HAVE_SYS_UN_H) || defined(SO_BINDTODEVICE)
static size_t my_strnlen(const char *s, size_t maxlen);
@@ -2293,6 +2335,7 @@ static BOOLEAN_T extract_iow(ErlNifEnv* env,
static int on_load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info);
+
#if HAVE_IN6
# if ! defined(HAVE_IN6ADDR_ANY) || ! HAVE_IN6ADDR_ANY
# if HAVE_DECL_IN6ADDR_ANY_INIT
@@ -4193,7 +4236,6 @@ ERL_NIF_TERM nopen(ErlNifEnv* env,
res = enif_make_resource(env, descP);
enif_release_resource(descP); // We should really store a reference ...
-
/* Keep track of the creator
* This should not be a problem but just in case
* the *open* function is used with the wrong kind
@@ -4202,9 +4244,10 @@ ERL_NIF_TERM nopen(ErlNifEnv* env,
if (enif_self(env, &descP->ctrlPid) == NULL)
return esock_make_error(env, atom_exself);
- if (MONP(env, descP,
+ if (MONP("nopen -> ctrl",
+ env, descP,
&descP->ctrlPid,
- &descP->ctrlMon) > 0)
+ &descP->ctrlMon) != 0)
return esock_make_error(env, atom_exmon);
@@ -4363,6 +4406,9 @@ ERL_NIF_TERM nif_bind(ErlNifEnv* env,
}
eSockAddr = argv[1];
+ if (IS_CLOSED(descP) || IS_CLOSING(descP))
+ return esock_make_error(env, atom_closed);
+
SSDBG( descP,
("SOCKET", "nif_bind -> args when sock = %d (0x%lX)"
"\r\n Socket: %T"
@@ -4452,6 +4498,9 @@ ERL_NIF_TERM nif_connect(ErlNifEnv* env,
}
eSockAddr = argv[1];
+ if (IS_CLOSED(descP) || IS_CLOSING(descP))
+ return esock_make_error(env, atom_closed);
+
SSDBG( descP,
("SOCKET", "nif_connect -> args when sock = %d:"
"\r\n Socket: %T"
@@ -4653,6 +4702,9 @@ ERL_NIF_TERM nif_listen(ErlNifEnv* env,
return enif_make_badarg(env);
}
+ if (IS_CLOSED(descP) || IS_CLOSING(descP))
+ return esock_make_error(env, atom_closed);
+
SSDBG( descP,
("SOCKET", "nif_listen -> args when sock = %d:"
"\r\n Socket: %T"
@@ -4714,6 +4766,9 @@ ERL_NIF_TERM nif_accept(ErlNifEnv* env,
}
ref = argv[1];
+ if (IS_CLOSED(descP) || IS_CLOSING(descP))
+ return esock_make_error(env, atom_closed);
+
SSDBG( descP,
("SOCKET", "nif_accept -> args when sock = %d:"
"\r\n Socket: %T"
@@ -4791,9 +4846,10 @@ ERL_NIF_TERM naccept_listening(ErlNifEnv* env,
SSDBG( descP, ("SOCKET", "naccept_listening -> would block\r\n") );
descP->currentAcceptor.pid = caller;
- if (MONP(env, descP,
+ if (MONP("naccept_listening -> current acceptor",
+ env, descP,
&descP->currentAcceptor.pid,
- &descP->currentAcceptor.mon) > 0)
+ &descP->currentAcceptor.mon) != 0)
return esock_make_error(env, atom_exmon);
descP->currentAcceptor.ref = enif_make_copy(descP->env, ref);
@@ -4868,9 +4924,10 @@ ERL_NIF_TERM naccept_listening(ErlNifEnv* env,
enif_release_resource(accDescP); // We should really store a reference ...
accDescP->ctrlPid = caller;
- if (MONP(env, accDescP,
+ if (MONP("naccept_listening -> ctrl",
+ env, accDescP,
&accDescP->ctrlPid,
- &accDescP->ctrlMon) > 0) {
+ &accDescP->ctrlMon) != 0) {
sock_close(accSock);
return esock_make_error(env, atom_exmon);
}
@@ -4984,7 +5041,8 @@ ERL_NIF_TERM naccept_accepting(ErlNifEnv* env,
SSDBG( descP, ("SOCKET", "naccept_accepting -> accept success\r\n") );
- DEMONP(env, descP, &descP->currentAcceptor.mon);
+ DEMONP("naccept_accepting -> current acceptor",
+ env, descP, &descP->currentAcceptor.mon);
if ((accEvent = sock_create_event(accSock)) == INVALID_EVENT) {
save_errno = sock_errno();
@@ -5006,9 +5064,10 @@ ERL_NIF_TERM naccept_accepting(ErlNifEnv* env,
enif_release_resource(accDescP); // We should really store a reference ...
accDescP->ctrlPid = caller;
- if (MONP(env, accDescP,
+ if (MONP("naccept_accepting -> ctrl",
+ env, accDescP,
&accDescP->ctrlPid,
- &accDescP->ctrlMon) > 0) {
+ &accDescP->ctrlMon) != 0) {
sock_close(accSock);
return esock_make_error(env, atom_exmon);
}
@@ -5094,6 +5153,9 @@ ERL_NIF_TERM nif_send(ErlNifEnv* env,
}
sendRef = argv[1];
+ if (IS_CLOSED(descP) || IS_CLOSING(descP))
+ return esock_make_error(env, atom_closed);
+
SSDBG( descP,
("SOCKET", "nif_send -> args when sock = %d:"
"\r\n Socket: %T"
@@ -5218,6 +5280,9 @@ ERL_NIF_TERM nif_sendto(ErlNifEnv* env,
sendRef = argv[1];
eSockAddr = argv[3];
+ if (IS_CLOSED(descP) || IS_CLOSING(descP))
+ return esock_make_error(env, atom_closed);
+
SSDBG( descP,
("SOCKET", "nif_sendto -> args when sock = %d:"
"\r\n Socket: %T"
@@ -5341,6 +5406,9 @@ ERL_NIF_TERM nif_sendmsg(ErlNifEnv* env,
sendRef = argv[1];
eMsgHdr = argv[2];
+ if (IS_CLOSED(descP) || IS_CLOSING(descP))
+ return esock_make_error(env, atom_closed);
+
SSDBG( descP,
("SOCKET", "nif_sendmsg -> args when sock = %d:"
"\r\n Socket: %T"
@@ -5630,6 +5698,9 @@ ERL_NIF_TERM nif_recv(ErlNifEnv* env,
}
recvRef = argv[1];
+ if (IS_CLOSED(descP) || IS_CLOSING(descP))
+ return esock_make_error(env, atom_closed);
+
if (!IS_CONNECTED(descP))
return esock_make_error(env, atom_enotconn);
@@ -5769,6 +5840,9 @@ ERL_NIF_TERM nif_recvfrom(ErlNifEnv* env,
}
recvRef = argv[1];
+ if (IS_CLOSED(descP) || IS_CLOSING(descP))
+ return esock_make_error(env, atom_closed);
+
SSDBG( descP,
("SOCKET", "nif_recvfrom -> args when sock = %d:"
"\r\n Socket: %T"
@@ -5929,6 +6003,9 @@ ERL_NIF_TERM nif_recvmsg(ErlNifEnv* env,
}
recvRef = argv[1];
+ if (IS_CLOSED(descP) || IS_CLOSING(descP))
+ return esock_make_error(env, atom_closed);
+
SSDBG( descP,
("SOCKET", "nif_recvmsg -> args when sock = %d:"
"\r\n Socket: %T"
@@ -6087,6 +6164,9 @@ ERL_NIF_TERM nif_close(ErlNifEnv* env,
return enif_make_badarg(env);
}
+ if (IS_CLOSED(descP) || IS_CLOSING(descP))
+ return esock_make_error(env, atom_closed);
+
return nclose(env, descP);
}
@@ -6102,7 +6182,11 @@ ERL_NIF_TERM nclose(ErlNifEnv* env,
int type = descP->type;
int protocol = descP->protocol;
- SSDBG( descP, ("SOCKET", "nclose -> [%d] entry\r\n", descP->sock) );
+ SSDBG( descP, ("SOCKET", "nclose -> [%d] entry (0x%lX, 0x%lX, 0x%lX)\r\n",
+ descP->sock,
+ descP->currentWriterP,
+ descP->currentReaderP,
+ descP->currentAcceptorP) );
MLOCK(descP->closeMtx);
@@ -6127,11 +6211,18 @@ ERL_NIF_TERM nclose(ErlNifEnv* env,
/* Monitor the caller, since we should complete this operation even if
* the caller dies (for whatever reason).
+ *
+ * <KOLLA>
+ *
+ * Can we actiually use this for anything?
+ *
+ * </KOLLA>
*/
- if (MONP(env, descP,
- &descP->closerPid,
- &descP->closerMon) > 0) {
+ if (MONP("nclose -> closer",
+ env, descP,
+ &descP->closerPid,
+ &descP->closerMon) != 0) {
MUNLOCK(descP->closeMtx);
return esock_make_error(env, atom_exmon);
}
@@ -6150,17 +6241,24 @@ ERL_NIF_TERM nclose(ErlNifEnv* env,
if (selectRes & ERL_NIF_SELECT_STOP_CALLED) {
/* Prep done - inform the caller it can finalize (close) directly */
SSDBG( descP,
- ("SOCKET", "nclose -> [%d] stop called\r\n", descP->sock) );
+ ("SOCKET", "nclose -> [%d] stop was called\r\n", descP->sock) );
dec_socket(domain, type, protocol);
+ DEMONP("nclose -> closer", env, descP, &descP->closerMon);
reply = esock_atom_ok;
} else if (selectRes & ERL_NIF_SELECT_STOP_SCHEDULED) {
/* The stop callback function has been *scheduled* which means that we
* have to wait for it to complete. */
SSDBG( descP,
- ("SOCKET", "nclose -> [%d] stop scheduled\r\n", descP->sock) );
+ ("SOCKET", "nclose -> [%d] stop was scheduled\r\n",
+ descP->sock) );
dec_socket(domain, type, protocol); // SHALL WE DO THIS AT finalize?
reply = esock_make_ok2(env, descP->closeRef);
} else {
+
+ SSDBG( descP,
+ ("SOCKET", "nclose -> [%d] stop failed: %d\r\n",
+ descP->sock, selectRes) );
+
/* <KOLLA>
*
* WE SHOULD REALLY HAVE A WAY TO CLOBBER THE SOCKET,
@@ -6170,6 +6268,10 @@ ERL_NIF_TERM nclose(ErlNifEnv* env,
*
* </KOLLA>
*/
+
+ // No point in having this?
+ DEMONP("nclose -> closer", env, descP, &descP->closerMon);
+
reason = MKT2(env, atom_select, MKI(env, selectRes));
reply = esock_make_error(env, reason);
}
@@ -6225,10 +6327,10 @@ ERL_NIF_TERM nfinalize_close(ErlNifEnv* env,
{
ERL_NIF_TERM reply;
- if (descP->state == SOCKET_STATE_CLOSED)
+ if (IS_CLOSED(descP))
return esock_atom_ok;
- if (descP->state != SOCKET_STATE_CLOSING)
+ if (!IS_CLOSING(descP))
return esock_make_error(env, atom_enotclosing);
/* This nif is executed in a dirty scheduler just so that
@@ -6291,6 +6393,9 @@ ERL_NIF_TERM nif_shutdown(ErlNifEnv* env,
return enif_make_badarg(env);
}
+ if (IS_CLOSED(descP) || IS_CLOSING(descP))
+ return esock_make_error(env, atom_closed);
+
if (!ehow2how(ehow, &how))
return enif_make_badarg(env);
@@ -6375,6 +6480,9 @@ ERL_NIF_TERM nif_setopt(ErlNifEnv* env,
eIsEncoded = argv[1];
eVal = argv[4];
+ if (IS_CLOSED(descP) || IS_CLOSING(descP))
+ return esock_make_error(env, atom_closed);
+
isEncoded = esock_decode_bool(eIsEncoded);
/* SGDBG( ("SOCKET", "nif_setopt -> eIsDecoded (%T) decoded: %d\r\n", */
@@ -6521,7 +6629,8 @@ ERL_NIF_TERM nsetopt_otp_ctrl_proc(ErlNifEnv* env,
ERL_NIF_TERM eVal)
{
ErlNifPid caller, newCtrlPid;
- ErlNifMonitor newCtrlMon;
+ // ErlNifMonitor newCtrlMon;
+ ESockMonitor newCtrlMon;
int xres;
SSDBG( descP,
@@ -6544,20 +6653,22 @@ ERL_NIF_TERM nsetopt_otp_ctrl_proc(ErlNifEnv* env,
return esock_make_error(env, esock_atom_einval);
}
- if ((xres = MONP(env, descP, &newCtrlPid, &newCtrlMon)) != 0) {
+ if ((xres = MONP("nsetopt_otp_ctrl_proc -> (new) ctrl",
+ env, descP, &newCtrlPid, &newCtrlMon)) != 0) {
esock_warning_msg("Failed monitor %d) (new) controlling process\r\n", xres);
return esock_make_error(env, esock_atom_einval);
}
- if ((xres = DEMONP(env, descP, &descP->ctrlMon)) != 0) {
+ if ((xres = DEMONP("nsetopt_otp_ctrl_proc -> (old) ctrl",
+ env, descP, &descP->ctrlMon)) != 0) {
esock_warning_msg("Failed demonitor (%d) "
"old controlling process %T (%T)\r\n",
xres, descP->ctrlPid, descP->ctrlMon);
}
descP->ctrlPid = newCtrlPid;
- descP->ctrlMon = newCtrlMon;
-
+ descP->ctrlMon = newCtrlMon;
+
SSDBG( descP, ("SOCKET", "nsetopt_otp_ctrl_proc -> done\r\n") );
return esock_atom_ok;
@@ -9640,6 +9751,9 @@ ERL_NIF_TERM nif_getopt(ErlNifEnv* env,
eIsEncoded = argv[1];
eOpt = argv[3]; // Is "normally" an int, but if raw mode: {Int, ValueSz}
+ if (IS_CLOSED(descP) || IS_CLOSING(descP))
+ return esock_make_error(env, atom_closed);
+
SSDBG( descP,
("SOCKET", "nif_getopt -> args when sock = %d:"
"\r\n Socket: %T"
@@ -12237,6 +12351,9 @@ ERL_NIF_TERM nif_sockname(ErlNifEnv* env,
return enif_make_badarg(env);
}
+ if (IS_CLOSED(descP) || IS_CLOSING(descP))
+ return esock_make_error(env, atom_closed);
+
SSDBG( descP,
("SOCKET", "nif_sockname -> args when sock = %d:"
"\r\n Socket: %T"
@@ -12302,6 +12419,9 @@ ERL_NIF_TERM nif_peername(ErlNifEnv* env,
return enif_make_badarg(env);
}
+ if (IS_CLOSED(descP) || IS_CLOSING(descP))
+ return esock_make_error(env, atom_closed);
+
SSDBG( descP,
("SOCKET", "nif_peername -> args when sock = %d:"
"\r\n Socket: %T"
@@ -12370,6 +12490,9 @@ ERL_NIF_TERM nif_cancel(ErlNifEnv* env,
op = argv[1];
opRef = argv[2];
+ if (IS_CLOSED(descP) || IS_CLOSING(descP))
+ return esock_make_error(env, atom_closed);
+
SSDBG( descP,
("SOCKET", "nif_cancel -> args when sock = %d:"
"\r\n op: %T"
@@ -12498,6 +12621,8 @@ ERL_NIF_TERM ncancel_accept_current(ErlNifEnv* env,
SSDBG( descP, ("SOCKET", "ncancel_accept_current -> entry\r\n") );
+ DEMONP("ncancel_accept_current -> current acceptor",
+ env, descP, &descP->currentAcceptor.mon);
res = ncancel_read_select(env, descP, descP->currentAcceptor.ref);
SSDBG( descP, ("SOCKET", "ncancel_accept_current -> cancel res: %T\r\n", res) );
@@ -12616,6 +12741,8 @@ ERL_NIF_TERM ncancel_send_current(ErlNifEnv* env,
SSDBG( descP, ("SOCKET", "ncancel_send_current -> entry\r\n") );
+ DEMONP("ncancel_recv_current -> current writer",
+ env, descP, &descP->currentWriter.mon);
res = ncancel_write_select(env, descP, descP->currentWriter.ref);
SSDBG( descP, ("SOCKET", "ncancel_send_current -> cancel res: %T\r\n", res) );
@@ -12732,6 +12859,8 @@ ERL_NIF_TERM ncancel_recv_current(ErlNifEnv* env,
SSDBG( descP, ("SOCKET", "ncancel_recv_current -> entry\r\n") );
+ DEMONP("ncancel_recv_current -> current reader",
+ env, descP, &descP->currentReader.mon);
res = ncancel_read_select(env, descP, descP->currentReader.ref);
SSDBG( descP, ("SOCKET", "ncancel_recv_current -> cancel res: %T\r\n", res) );
@@ -12929,7 +13058,8 @@ ERL_NIF_TERM send_check_result(ErlNifEnv* env,
cnt_inc(&descP->writePkgCnt, 1);
cnt_inc(&descP->writeByteCnt, written);
if (descP->currentWriterP != NULL)
- DEMONP(env, descP, &descP->currentWriter.mon);
+ DEMONP("send_check_result -> current writer",
+ env, descP, &descP->currentWriter.mon);
SSDBG( descP,
("SOCKET", "send_check_result -> "
@@ -12968,7 +13098,8 @@ ERL_NIF_TERM send_check_result(ErlNifEnv* env,
if ((saveErrno != EAGAIN) && (saveErrno != EINTR)) {
ErlNifPid pid;
- ErlNifMonitor mon;
+ // ErlNifMonitor mon;
+ ESockMonitor mon;
ERL_NIF_TERM ref, res;
/*
@@ -12983,13 +13114,15 @@ ERL_NIF_TERM send_check_result(ErlNifEnv* env,
res = esock_make_error_errno(env, saveErrno);
if (descP->currentWriterP != NULL) {
- DEMONP(env, descP, &descP->currentWriter.mon);
+ DEMONP("send_check_result -> current writer",
+ env, descP, &descP->currentWriter.mon);
while (writer_pop(env, descP, &pid, &mon, &ref)) {
SSDBG( descP,
("SOCKET", "send_check_result -> abort %T\r\n", pid) );
send_msg_nif_abort(env, ref, res, &pid);
- DEMONP(env, descP, &mon);
+ DEMONP("send_check_result -> pop'ed writer",
+ env, descP, &mon);
}
}
@@ -13021,9 +13154,10 @@ ERL_NIF_TERM send_check_result(ErlNifEnv* env,
if (enif_self(env, &caller) == NULL)
return esock_make_error(env, atom_exself);
descP->currentWriter.pid = caller;
- if (MONP(env, descP,
+ if (MONP("send_check_result -> current writer",
+ env, descP,
&descP->currentWriter.pid,
- &descP->currentWriter.mon) > 0)
+ &descP->currentWriter.mon) != 0)
return esock_make_error(env, atom_exmon);
descP->currentWriter.ref = enif_make_copy(descP->env, sendRef);
descP->currentWriterP = &descP->currentWriter;
@@ -13108,9 +13242,10 @@ char* recv_init_current_reader(ErlNifEnv* env,
return str_exself;
descP->currentReader.pid = caller;
- if (MONP(env, descP,
+ if (MONP("recv_init_current_reader -> current reader",
+ env, descP,
&descP->currentReader.pid,
- &descP->currentReader.mon) > 0) {
+ &descP->currentReader.mon) != 0) {
return str_exmon;
}
descP->currentReader.ref = enif_make_copy(descP->env, recvRef);
@@ -13135,7 +13270,8 @@ ERL_NIF_TERM recv_update_current_reader(ErlNifEnv* env,
{
if (descP->currentReaderP != NULL) {
- DEMONP(env, descP, &descP->currentReader.mon);
+ DEMONP("recv_update_current_reader -> current reader",
+ env, descP, &descP->currentReader.mon);
if (reader_pop(env, descP,
&descP->currentReader.pid,
@@ -13183,16 +13319,19 @@ void recv_error_current_reader(ErlNifEnv* env,
{
if (descP->currentReaderP != NULL) {
ErlNifPid pid;
- ErlNifMonitor mon;
+ // ErlNifMonitor mon;
+ ESockMonitor mon;
ERL_NIF_TERM ref;
- DEMONP(env, descP, &descP->currentReader.mon);
+ DEMONP("recv_error_current_reader -> current reader",
+ env, descP, &descP->currentReader.mon);
while (reader_pop(env, descP, &pid, &mon, &ref)) {
SSDBG( descP,
("SOCKET", "recv_error_current_reader -> abort %T\r\n", pid) );
send_msg_nif_abort(env, ref, reason, &pid);
- DEMONP(env, descP, &mon);
+ DEMONP("recv_error_current_reader -> pop'ed reader",
+ env, descP, &mon);
}
}
}
@@ -13368,6 +13507,9 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env,
SSDBG( descP, ("SOCKET",
"recv_check_result -> [%d] eagain\r\n", toRead) );
+ if ((xres = recv_init_current_reader(env, descP, recvRef)) != NULL)
+ return esock_make_error_str(env, xres);
+
SELECT(env, descP->sock, (ERL_NIF_SELECT_READ),
descP, NULL, recvRef);
@@ -13622,9 +13764,13 @@ ERL_NIF_TERM recvmsg_check_result(ErlNifEnv* env,
} else if ((saveErrno == ERRNO_BLOCK) ||
(saveErrno == EAGAIN)) {
+ char* xres;
SSDBG( descP, ("SOCKET", "recvmsg_check_result -> eagain\r\n") );
+ if ((xres = recv_init_current_reader(env, descP, recvRef)) != NULL)
+ return esock_make_error_str(env, xres);
+
SELECT(env, descP->sock, (ERL_NIF_SELECT_READ),
descP, NULL, recvRef);
@@ -15353,6 +15499,11 @@ SocketDescriptor* alloc_descriptor(SOCKET sock, HANDLE event)
descP->sock = sock;
descP->event = event;
+ MON_INIT(&descP->currentWriter.mon);
+ MON_INIT(&descP->currentReader.mon);
+ MON_INIT(&descP->currentAcceptor.mon);
+ MON_INIT(&descP->ctrlMon);
+ MON_INIT(&descP->closerMon);
}
return descP;
@@ -15971,7 +16122,8 @@ ERL_NIF_TERM acceptor_push(ErlNifEnv* env,
reqP->pid = pid;
reqP->ref = enif_make_copy(descP->env, ref);
- if (MONP(env, descP, &pid, &reqP->mon) > 0) {
+ if (MONP("acceptor_push -> acceptor request",
+ env, descP, &pid, &reqP->mon) != 0) {
FREE(reqP);
return esock_make_error(env, atom_exmon);
}
@@ -15991,7 +16143,8 @@ static
BOOLEAN_T acceptor_pop(ErlNifEnv* env,
SocketDescriptor* descP,
ErlNifPid* pid,
- ErlNifMonitor* mon,
+ // ErlNifMonitor* mon,
+ ESockMonitor* mon,
ERL_NIF_TERM* ref)
{
SocketRequestQueueElement* e = qpop(&descP->acceptorsQ);
@@ -16022,7 +16175,8 @@ BOOLEAN_T acceptor_unqueue(ErlNifEnv* env,
SocketDescriptor* descP,
const ErlNifPid* pid)
{
- return qunqueue(env, &descP->acceptorsQ, pid);
+ return qunqueue(env, descP, "qunqueue -> waiting acceptor",
+ &descP->acceptorsQ, pid);
}
@@ -16057,7 +16211,8 @@ ERL_NIF_TERM writer_push(ErlNifEnv* env,
reqP->pid = pid;
reqP->ref = enif_make_copy(descP->env, ref);
- if (MONP(env, descP, &pid, &reqP->mon) > 0) {
+ if (MONP("writer_push -> writer request",
+ env, descP, &pid, &reqP->mon) != 0) {
FREE(reqP);
return esock_make_error(env, atom_exmon);
}
@@ -16077,7 +16232,8 @@ static
BOOLEAN_T writer_pop(ErlNifEnv* env,
SocketDescriptor* descP,
ErlNifPid* pid,
- ErlNifMonitor* mon,
+ // ErlNifMonitor* mon,
+ ESockMonitor* mon,
ERL_NIF_TERM* ref)
{
SocketRequestQueueElement* e = qpop(&descP->writersQ);
@@ -16108,7 +16264,8 @@ BOOLEAN_T writer_unqueue(ErlNifEnv* env,
SocketDescriptor* descP,
const ErlNifPid* pid)
{
- return qunqueue(env, &descP->writersQ, pid);
+ return qunqueue(env, descP, "qunqueue -> waiting writer",
+ &descP->writersQ, pid);
}
@@ -16143,7 +16300,8 @@ ERL_NIF_TERM reader_push(ErlNifEnv* env,
reqP->pid = pid;
reqP->ref = enif_make_copy(descP->env, ref);
- if (MONP(env, descP, &pid, &reqP->mon) > 0) {
+ if (MONP("reader_push -> reader request",
+ env, descP, &pid, &reqP->mon) != 0) {
FREE(reqP);
return esock_make_error(env, atom_exmon);
}
@@ -16163,7 +16321,8 @@ static
BOOLEAN_T reader_pop(ErlNifEnv* env,
SocketDescriptor* descP,
ErlNifPid* pid,
- ErlNifMonitor* mon,
+ // ErlNifMonitor* mon,
+ ESockMonitor* mon,
ERL_NIF_TERM* ref)
{
SocketRequestQueueElement* e = qpop(&descP->readersQ);
@@ -16194,7 +16353,8 @@ BOOLEAN_T reader_unqueue(ErlNifEnv* env,
SocketDescriptor* descP,
const ErlNifPid* pid)
{
- return qunqueue(env, &descP->readersQ, pid);
+ return qunqueue(env, descP, "qunqueue -> waiting reader",
+ &descP->readersQ, pid);
}
@@ -16258,6 +16418,8 @@ SocketRequestQueueElement* qpop(SocketRequestQueue* q)
static
BOOLEAN_T qunqueue(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ const char* slogan,
SocketRequestQueue* q,
const ErlNifPid* pid)
{
@@ -16270,6 +16432,8 @@ BOOLEAN_T qunqueue(ErlNifEnv* env,
/* We have a match */
+ DEMONP(slogan, env, descP, &e->data.mon);
+
if (p != NULL) {
/* Not the first, but could be the last */
if (q->last == e) {
@@ -16344,6 +16508,97 @@ void cnt_dec(uint32_t* cnt, uint32_t dec)
/* ----------------------------------------------------------------------
+ * M o n i t o r W r a p p e r F u n c t i o n s
+ * ----------------------------------------------------------------------
+ */
+
+static
+int esock_monitor(const char* slogan,
+ ErlNifEnv* env,
+ SocketDescriptor* descP,
+ const ErlNifPid* pid,
+ ESockMonitor* monP)
+{
+ int res;
+
+ SSDBG( descP, ("SOCKET", "[%d] %s: try monitor", descP->sock, slogan) );
+ // esock_dbg_printf("MONP", "[%d] %s\r\n", descP->sock, slogan);
+ res = enif_monitor_process(env, descP, pid, &monP->mon);
+
+ if (res != 0) {
+ SSDBG( descP, ("SOCKET", "[%d] monitor failed: %d", descP->sock, res) );
+ // esock_dbg_printf("MONP", "[%d] failed: %d\r\n", descP->sock, res);
+ } /* else {
+ esock_dbg_printf("MONP",
+ "[%d] success: "
+ "%u,%u,%u,%u\r\n",
+ descP->sock,
+ monP->raw[0], monP->raw[1],
+ monP->raw[2], monP->raw[3]);
+ } */
+
+ return res;
+}
+
+
+static
+int esock_demonitor(const char* slogan,
+ ErlNifEnv* env,
+ SocketDescriptor* descP,
+ ESockMonitor* monP)
+{
+ int res;
+
+ SSDBG( descP, ("SOCKET", "[%d] %s: try demonitor\r\n", descP->sock, slogan) );
+ /*
+ esock_dbg_printf("DEMONP", "[%d] %s: %u,%u,%u,%u\r\n",
+ descP->sock, slogan,
+ monP->raw[0], monP->raw[1],
+ monP->raw[2], monP->raw[3]);
+ */
+
+ res = enif_demonitor_process(env, descP, &monP->mon);
+
+ if (res == 0) {
+ esock_monitor_init(monP);
+ } else {
+ SSDBG( descP,
+ ("SOCKET", "[%d] demonitor failed: %d\r\n", descP->sock, res) );
+ /*
+ esock_dbg_printf("DEMONP", "[%d] failed: %d\r\n", descP->sock, res);
+ */
+ }
+
+ return res;
+}
+
+
+static
+void esock_monitor_init(ESockMonitor* monP)
+{
+ int i;
+
+ /*
+ * UGLY,
+ * but since we don't have a ERL_NIF_MONITOR_NULL,
+ * this will have to do for now...
+ */
+ for (i = 0; i < 4; i++)
+ monP->raw[i] = 0;
+
+}
+
+/*
+static
+int esock_monitor_compare(const ErlNifMonitor* mon1,
+ const ESockMonitor* mon2)
+{
+ return enif_compare_monitors(mon1, &mon2->mon);
+}
+*/
+
+
+/* ----------------------------------------------------------------------
* C a l l b a c k F u n c t i o n s
* ----------------------------------------------------------------------
*/
@@ -16393,35 +16648,49 @@ static
void socket_stop(ErlNifEnv* env, void* obj, int fd, int is_direct_call)
{
SocketDescriptor* descP = (SocketDescriptor*) obj;
-
+
+ /*
+ esock_dbg_printf("STOP", "[%d] begin\r\n", descP->sock);
+ */
+
SSDBG( descP,
- ("SOCKET", "socket_stop -> entry when"
- "\r\n sock: %d (%d)"
- "\r\n Is Direct Call: %s"
- "\r\n", descP->sock, fd, B2S(is_direct_call)) );
+ ("SOCKET", "socket_stop -> entry when %s"
+ "\r\n sock: %d (%d)"
+ "\r\n", ((is_direct_call) ? "called" : "scheduled"), descP->sock, fd) );
MLOCK(descP->writeMtx);
MLOCK(descP->readMtx);
MLOCK(descP->accMtx);
MLOCK(descP->closeMtx);
+ SSDBG( descP, ("SOCKET", "socket_stop -> all mutex(s) locked\r\n") );
descP->state = SOCKET_STATE_CLOSING; // Just in case...???
descP->isReadable = FALSE;
descP->isWritable = FALSE;
-
-
+
/* We should check that we actually have a monitor.
* This *should* be done with a "NULL" monitor value,
* which there currently is none...
+ * If we got here because the controlling process died,
+ * its no point in demonitor. Also, we not actually have
+ * a monitor in that case...
*/
- DEMONP(env, descP, &descP->ctrlMon);
-
+ DEMONP("socket_stop -> ctrl", env, descP, &descP->ctrlMon);
+
+ /*
+ esock_dbg_printf("STOP", "[%d] maybe handle current writer (0x%lX)\r\n",
+ descP->sock, descP->currentReaderP);
+ */
if (descP->currentWriterP != NULL) {
/* We have a (current) writer and *may* therefor also have
* writers waiting.
*/
+ DEMONP("socket_stop -> current writer",
+ env, descP, &descP->currentWriter.mon);
+
+ SSDBG( descP, ("SOCKET", "socket_stop -> handle current writer\r\n") );
if (!compare_pids(env,
&descP->closerPid,
&descP->currentWriter.pid) &&
@@ -16439,15 +16708,24 @@ void socket_stop(ErlNifEnv* env, void* obj, int fd, int is_direct_call)
}
/* And also deal with the waiting writers (in the same way) */
+ SSDBG( descP, ("SOCKET", "socket_stop -> handle waiting writer(s)\r\n") );
inform_waiting_procs(env, descP, &descP->writersQ, TRUE, atom_closed);
}
-
+
+ /*
+ esock_dbg_printf("STOP", "[%d] maybe handle current reader (0x%lX)\r\n",
+ descP->sock, descP->currentReaderP);
+ */
if (descP->currentReaderP != NULL) {
/* We have a (current) reader and *may* therefor also have
* readers waiting.
*/
+ DEMONP("socket_stop -> current reader",
+ env, descP, &descP->currentReader.mon);
+
+ SSDBG( descP, ("SOCKET", "socket_stop -> handle current reader\r\n") );
if (!compare_pids(env,
&descP->closerPid,
&descP->currentReader.pid) &&
@@ -16465,14 +16743,23 @@ void socket_stop(ErlNifEnv* env, void* obj, int fd, int is_direct_call)
}
/* And also deal with the waiting readers (in the same way) */
+ SSDBG( descP, ("SOCKET", "socket_stop -> handle waiting reader(s)\r\n") );
inform_waiting_procs(env, descP, &descP->readersQ, TRUE, atom_closed);
}
-
+
+ /*
+ esock_dbg_printf("STOP", "[%d] maybe handle current acceptor (0x%lX)\r\n",
+ descP->sock, descP->currentReaderP);
+ */
if (descP->currentAcceptorP != NULL) {
/* We have a (current) acceptor and *may* therefor also have
* acceptors waiting.
*/
+ DEMONP("socket_stop -> current acceptor",
+ env, descP, &descP->currentAcceptor.mon);
+
+ SSDBG( descP, ("SOCKET", "socket_stop -> handle current acceptor\r\n") );
if (!compare_pids(env,
&descP->closerPid,
&descP->currentAcceptor.pid) &&
@@ -16490,6 +16777,7 @@ void socket_stop(ErlNifEnv* env, void* obj, int fd, int is_direct_call)
}
/* And also deal with the waiting acceptors (in the same way) */
+ SSDBG( descP, ("SOCKET", "socket_stop -> handle waiting acceptor(s)\r\n") );
inform_waiting_procs(env, descP, &descP->acceptorsQ, TRUE, atom_closed);
}
@@ -16521,7 +16809,8 @@ void socket_stop(ErlNifEnv* env, void* obj, int fd, int is_direct_call)
send_msg(env, MKT2(env, atom_close, descP->closeRef), &descP->closerPid);
- DEMONP(env, descP, &descP->closerMon);
+ DEMONP("socket_stop -> closer",
+ env, descP, &descP->closerMon);
} else {
@@ -16535,12 +16824,28 @@ void socket_stop(ErlNifEnv* env, void* obj, int fd, int is_direct_call)
}
}
-
+
+ if (!is_direct_call) {
+ if (descP->closeLocal) {
+ DEMONP("socket_stop -> closer",
+ env, descP, &descP->closerMon);
+ }
+ descP->sock = INVALID_SOCKET;
+ descP->event = INVALID_EVENT;
+ descP->state = SOCKET_STATE_CLOSED;
+ }
+
+ SSDBG( descP, ("SOCKET", "socket_stop -> unlock all mutex(s)\r\n") );
+
MUNLOCK(descP->closeMtx);
MUNLOCK(descP->accMtx);
MUNLOCK(descP->readMtx);
MUNLOCK(descP->writeMtx);
+ /*
+ esock_dbg_printf("STOP", "[%d] end\r\n", descP->sock);
+ */
+
SSDBG( descP,
("SOCKET", "socket_stop -> done (%d, %d)\r\n", descP->sock, fd) );
@@ -16580,7 +16885,8 @@ void inform_waiting_procs(ErlNifEnv* env,
currentP->data.ref,
reason,
&currentP->data.pid)) );
- DEMONP(env, descP, &currentP->data.mon);
+ DEMONP("inform_waiting_procs -> current 'request'",
+ env, descP, &currentP->data.mon);
nextP = currentP->nextP;
if (free) FREE(currentP);
currentP = nextP;
@@ -16605,30 +16911,159 @@ void socket_down(ErlNifEnv* env,
const ErlNifMonitor* mon)
{
SocketDescriptor* descP = (SocketDescriptor*) obj;
+ ESockMonitor* monP = (ESockMonitor*) mon;
SSDBG( descP, ("SOCKET", "socket_down -> entry with"
- "\r\n sock: %d"
- "\r\n pid: %T"
- "\r\n", descP->sock, *pid) );
+ "\r\n sock: %d"
+ "\r\n pid: %T"
+ "\r\n Close: %s (%s)"
+ "\r\n",
+ descP->sock, *pid,
+ B2S(IS_CLOSED(descP)),
+ B2S(IS_CLOSING(descP))) );
+ /*
+ esock_dbg_printf("DOWN",
+ "[%d] begin %u,%u,%u,%d\r\n",
+ descP->sock,
+ monP->raw[0], monP->raw[1],
+ monP->raw[2], monP->raw[3]);
+ */
+
+ /*
+ if (MON_COMP(mon, &descP->ctrlMon) == 0) {
+ SSDBG( descP, ("SOCKET", "socket_down -> controlling process mon\r\n") );
+ } else if (MON_COMP(mon, &descP->closerMon) == 0) {
+ SSDBG( descP, ("SOCKET", "socket_down -> closer mon\r\n") );
+ } else if ((descP->currentWriterP != NULL) &&
+ (MON_COMP(mon, &descP->currentWriter.mon) == 0)) {
+ SSDBG( descP, ("SOCKET", "socket_down -> current writer mon\r\n") );
+ } else if ((descP->currentReaderP != NULL) &&
+ (MON_COMP(mon, &descP->currentReader.mon) == 0)) {
+ SSDBG( descP, ("SOCKET", "socket_down -> current reader mon\r\n") );
+ } else if ((descP->currentAcceptorP != NULL) &&
+ (MON_COMP(mon, &descP->currentAcceptor.mon) == 0)) {
+ SSDBG( descP, ("SOCKET", "socket_down -> current acceptor mon\r\n") );
+ } else {
+ SSDBG( descP, ("SOCKET", "socket_down -> OTHER mon\r\n") );
+ }
+ */
if (compare_pids(env, &descP->ctrlPid, pid)) {
+ int selectRes;
+
/* We don't bother with the queue cleanup here -
* we leave it to the stop callback function.
*/
+ SSDBG( descP,
+ ("SOCKET", "socket_down -> controlling process exit\r\n") );
+
descP->state = SOCKET_STATE_CLOSING;
descP->closeLocal = TRUE;
descP->closerPid = *pid;
- descP->closerMon = *mon;
- descP->closeRef = MKREF(env);
- enif_select(env, descP->sock, (ERL_NIF_SELECT_STOP),
- descP, NULL, descP->closeRef);
+ descP->closerMon = (ESockMonitor) *mon;
+ descP->closeRef = MKREF(env); // Do we really need this in this case?
+
+ /*
+ esock_dbg_printf("DOWN",
+ "[%d] select stop %u,%u,%u,%d\r\n",
+ descP->sock,
+ monP->raw[0], monP->raw[1],
+ monP->raw[2], monP->raw[3]);
+ */
+
+ selectRes = enif_select(env, descP->sock, (ERL_NIF_SELECT_STOP),
+ descP, NULL, descP->closeRef);
+
+ if (selectRes & ERL_NIF_SELECT_STOP_CALLED) {
+ /* We are done - wwe can finalize (socket close) directly */
+ SSDBG( descP,
+ ("SOCKET", "socket_down -> [%d] stop called\r\n", descP->sock) );
+ dec_socket(descP->domain, descP->type, descP->protocol);
+ descP->state = SOCKET_STATE_CLOSED;
+
+ /* And finally close the socket.
+ * Since we close the socket because of an exiting owner,
+ * we do not need to wait for buffers to sync (linger).
+ * If the owner wish to ensure the buffer are written,
+ * it should have closed teh socket explicitly...
+ */
+ if (sock_close(descP->sock) != 0) {
+ int save_errno = sock_errno();
+
+ esock_warning_msg("Failed closing socket for terminating "
+ "controlling process: "
+ "\r\n Controlling Process: %T"
+ "\r\n Descriptor: %d"
+ "\r\n Errno: %d"
+ "\r\n", pid, descP->sock, save_errno);
+ }
+ sock_close_event(descP->event);
+
+ descP->sock = INVALID_SOCKET;
+ descP->event = INVALID_EVENT;
+
+ descP->state = SOCKET_STATE_CLOSED;
+
+ } else if (selectRes & ERL_NIF_SELECT_STOP_SCHEDULED) {
+ /* The stop callback function has been *scheduled* which means that
+ * "should" wait for it to complete. But since we are in a callback
+ * (down) function, we cannot...
+ * So, we must close the socket
+ */
+ SSDBG( descP,
+ ("SOCKET",
+ "socket_down -> [%d] stop scheduled\r\n", descP->sock) );
+ dec_socket(descP->domain, descP->type, descP->protocol);
+
+ /* And now what? We can't wait for the stop function here...
+ * So, we simply close it here and leave the rest of the "close"
+ * for later (when the stop function actually gets called...
+ */
+
+ if (sock_close(descP->sock) != 0) {
+ int save_errno = sock_errno();
+
+ esock_warning_msg("Failed closing socket for terminating "
+ "controlling process: "
+ "\r\n Controlling Process: %T"
+ "\r\n Descriptor: %d"
+ "\r\n Errno: %d"
+ "\r\n", pid, descP->sock, save_errno);
+ }
+ sock_close_event(descP->event);
+
+ } else {
+
+ /*
+ * <KOLLA>
+ *
+ * WE SHOULD REALLY HAVE A WAY TO CLOBBER THE SOCKET,
+ * SO WE DON'T LET STUFF LEAK.
+ * NOW, BECAUSE WE FAILED TO SELECT, WE CANNOT FINISH
+ * THE CLOSE, WHAT TO DO? ABORT?
+ *
+ * </KOLLA>
+ */
+ esock_warning_msg("Failed selecting stop when handling down "
+ "of controlling process: "
+ "\r\n Select Res: %d"
+ "\r\n Controlling Process: %T"
+ "\r\n Descriptor: %d"
+ "\r\n Monitor: %u.%u.%u.%u"
+ "\r\n", selectRes, pid, descP->sock,
+ monP->raw[0], monP->raw[1],
+ monP->raw[2], monP->raw[3]);
+ }
+
} else {
/* check all operation queue(s): acceptor, writer and reader. */
+ SSDBG( descP, ("SOCKET", "socket_down -> other process term\r\n") );
+
MLOCK(descP->accMtx);
if (descP->currentAcceptorP != NULL)
socket_down_acceptor(env, descP, pid);
@@ -16645,6 +17080,14 @@ void socket_down(ErlNifEnv* env,
MUNLOCK(descP->readMtx);
}
+
+ /*
+ esock_dbg_printf("DOWN",
+ "[%d] end %u,%u,%u,%d\r\n",
+ descP->sock,
+ monP->raw[0], monP->raw[1],
+ monP->raw[2], monP->raw[3]);
+ */
SSDBG( descP, ("SOCKET", "socket_down -> done\r\n") );