aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--erts/emulator/nifs/common/socket_int.h5
-rw-r--r--erts/emulator/nifs/common/socket_nif.c659
-rw-r--r--erts/preloaded/ebin/socket.beambin70288 -> 70312 bytes
-rw-r--r--erts/preloaded/src/socket.erl22
4 files changed, 390 insertions, 296 deletions
diff --git a/erts/emulator/nifs/common/socket_int.h b/erts/emulator/nifs/common/socket_int.h
index 0f973855ae..a9e83adc21 100644
--- a/erts/emulator/nifs/common/socket_int.h
+++ b/erts/emulator/nifs/common/socket_int.h
@@ -258,6 +258,7 @@ extern ERL_NIF_TERM esock_atom_rxq_ovfl;
extern ERL_NIF_TERM esock_atom_scope_id;
extern ERL_NIF_TERM esock_atom_sctp;
extern ERL_NIF_TERM esock_atom_sec;
+extern ERL_NIF_TERM esock_atom_select_failed;
extern ERL_NIF_TERM esock_atom_select_sent;
extern ERL_NIF_TERM esock_atom_send;
extern ERL_NIF_TERM esock_atom_sendmsg;
@@ -349,10 +350,6 @@ extern ERL_NIF_TERM esock_atom_einval;
#define MON_INIT(M) esock_monitor_init((M))
// #define MON_COMP(M1, M2) esock_monitor_compare((M1), (M2))
-#define SELECT(E,FD,M,O,P,R) \
- if (enif_select((E), (FD), (M), (O), (P), (R)) < 0) \
- return enif_make_badarg((E));
-
#define COMPARE(A, B) enif_compare((A), (B))
#define IS_ATOM(E, TE) enif_is_atom((E), (TE))
diff --git a/erts/emulator/nifs/common/socket_nif.c b/erts/emulator/nifs/common/socket_nif.c
index c0a37450f9..d860cb4965 100644
--- a/erts/emulator/nifs/common/socket_nif.c
+++ b/erts/emulator/nifs/common/socket_nif.c
@@ -834,6 +834,7 @@ typedef struct {
ErlNifPid closerPid;
// ErlNifMonitor closerMon;
ESockMonitor closerMon;
+ ErlNifEnv* closeEnv;
ERL_NIF_TERM closeRef;
BOOLEAN_T closeLocal;
@@ -2335,9 +2336,8 @@ static void socket_down_reader(ErlNifEnv* env,
SocketDescriptor* descP,
const ErlNifPid* pid);
-static char* esock_send_close_msg(ErlNifEnv* env,
- ERL_NIF_TERM closeRef,
- ErlNifPid* pid);
+static char* esock_send_close_msg(ErlNifEnv* env,
+ SocketDescriptor* descP);
static char* esock_send_abort_msg(ErlNifEnv* env,
ERL_NIF_TERM sockRef,
ERL_NIF_TERM recvRef,
@@ -2354,6 +2354,24 @@ static char* esock_send_msg(ErlNifEnv* env,
ErlNifPid* pid,
ErlNifEnv* msg_env);
+static int esock_select_read(ErlNifEnv* env,
+ ErlNifEvent event,
+ void* obj,
+ const ErlNifPid* pid,
+ ERL_NIF_TERM ref);
+static int esock_select_write(ErlNifEnv* env,
+ ErlNifEvent event,
+ void* obj,
+ const ErlNifPid* pid,
+ ERL_NIF_TERM ref);
+static int esock_select_stop(ErlNifEnv* env,
+ ErlNifEvent event,
+ void* obj);
+static int esock_select_cancel(ErlNifEnv* env,
+ ErlNifEvent event,
+ enum ErlNifSelectFlags mode,
+ void* obj);
+
static BOOLEAN_T extract_debug(ErlNifEnv* env,
ERL_NIF_TERM map);
static BOOLEAN_T extract_iow(ErlNifEnv* env,
@@ -2618,6 +2636,7 @@ ERL_NIF_TERM esock_atom_rxq_ovfl;
ERL_NIF_TERM esock_atom_scope_id;
ERL_NIF_TERM esock_atom_sctp;
ERL_NIF_TERM esock_atom_sec;
+ERL_NIF_TERM esock_atom_select_failed;
ERL_NIF_TERM esock_atom_select_sent;
ERL_NIF_TERM esock_atom_send;
ERL_NIF_TERM esock_atom_sendmsg;
@@ -4582,7 +4601,17 @@ ERL_NIF_TERM nif_connect(ErlNifEnv* env,
return esock_make_error_str(env, xres);
}
+ /*
+ * <KOLLA>
+ *
+ * We should lock both the read and write mutex:es...
+ *
+ * </KOLLA>
+ *
+ */
+
return nconnect(env, descP);
+
#endif // if !defined(__WIN32__)
}
@@ -4592,16 +4621,12 @@ static
ERL_NIF_TERM nconnect(ErlNifEnv* env,
SocketDescriptor* descP)
{
- int code, save_errno = 0;
+ ERL_NIF_TERM res;
+ int code, sres, save_errno = 0;
/*
- * <KOLLA>
- *
- * We should look both the read and write mutex:es...
- *
- * </KOLLA>
- *
- * Verify that we are where in the proper state */
+ * Verify that we are where in the proper state
+ */
if (!IS_OPEN(descP)) {
SSDBG( descP, ("SOCKET", "nif_connect -> not open\r\n") );
@@ -4629,13 +4654,17 @@ ERL_NIF_TERM nconnect(ErlNifEnv* env,
if (IS_SOCKET_ERROR(code) &&
((save_errno == ERRNO_BLOCK) || /* Winsock2 */
(save_errno == EINPROGRESS))) { /* Unix & OSE!! */
+ /* THIS DOES NOT WORK!! WE NEED A "PERISTENT" ENV!! */
ERL_NIF_TERM ref = MKREF(env);
descP->state = SOCKET_STATE_CONNECTING;
- SELECT(env,
- descP->sock,
- (ERL_NIF_SELECT_WRITE),
- descP, NULL, ref);
- return esock_make_ok2(env, ref);
+ if ((sres = esock_select_write(env, descP->sock, descP, NULL, ref)) < 0) {
+ res = esock_make_error(env,
+ MKT2(env,
+ esock_atom_select_failed,
+ MKI(env, sres)));
+ } else {
+ res = esock_make_ok2(env, ref);
+ }
} else if (code == 0) { /* ok we are connected */
descP->state = SOCKET_STATE_CONNECTED;
descP->isReadable = TRUE;
@@ -4643,11 +4672,13 @@ ERL_NIF_TERM nconnect(ErlNifEnv* env,
/* Do we need to do somthing for "active" mode?
* Is there even such a thing *here*?
*/
- return esock_atom_ok;
+ res = esock_atom_ok;
} else {
- return esock_make_error_errno(env, save_errno);
+ res = esock_make_error_errno(env, save_errno);
}
+ return res;
+
}
#endif // if !defined(__WIN32__)
@@ -4925,7 +4956,7 @@ ERL_NIF_TERM naccept_listening(ErlNifEnv* env,
unsigned int n;
SOCKET accSock;
HANDLE accEvent;
- int save_errno;
+ int sres, save_errno;
ErlNifPid caller;
SSDBG( descP, ("SOCKET", "naccept_listening -> get caller\r\n") );
@@ -4960,37 +4991,41 @@ ERL_NIF_TERM naccept_listening(ErlNifEnv* env,
descP->currentAcceptor.ref = enif_make_copy(descP->env, ref);
descP->currentAcceptorP = &descP->currentAcceptor;
- SELECT(env,
- descP->sock,
- (ERL_NIF_SELECT_READ),
- descP, NULL, ref);
-
- /* Shall we really change state?
- * The ready event is sent directly to the calling
- * process, which simply calls this function again.
- * Basically, state accepting means that we have
- * an "outstanding" accept.
- * Shall we store the pid of the calling process?
- * And if someone else calls accept, return with ebusy?
- * Can any process call accept or just the controlling
- * process?
- * We also need a monitor it case the calling process is
- * called before we are done!
- *
- * Change state (to accepting) and store pid of the acceptor
- * (current process). Only accept calls from the acceptor
- * process (ebusy) and once we have a successful accept,
- * change state back to listening. If cancel is called instead
- * (only accepted from the acceptor process), we reset
- * state to listening and also resets the pid to "null"
- * (is there such a value?).
- * Need a mutex to secure that we don't test and change the
- * pid at the same time.
- */
+ if ((sres = esock_select_read(env, descP->sock, descP,
+ NULL, ref)) < 0) {
+ return esock_make_error(env,
+ MKT2(env,
+ esock_atom_select_failed,
+ MKI(env, sres)));
+ } else {
- descP->state = SOCKET_STATE_ACCEPTING;
+ /* Shall we really change state?
+ * The ready event is sent directly to the calling
+ * process, which simply calls this function again.
+ * Basically, state accepting means that we have
+ * an "outstanding" accept.
+ * Shall we store the pid of the calling process?
+ * And if someone else calls accept, return with ebusy?
+ * Can any process call accept or just the controlling
+ * process?
+ * We also need a monitor it case the calling process is
+ * called before we are done!
+ *
+ * Change state (to accepting) and store pid of the acceptor
+ * (current process). Only accept calls from the acceptor
+ * process (ebusy) and once we have a successful accept,
+ * change state back to listening. If cancel is called instead
+ * (only accepted from the acceptor process), we reset
+ * state to listening and also resets the pid to "null"
+ * (is there such a value?).
+ * Need a mutex to secure that we don't test and change the
+ * pid at the same time.
+ */
- return esock_make_error(env, esock_atom_eagain);
+ descP->state = SOCKET_STATE_ACCEPTING;
+
+ return esock_make_error(env, esock_atom_eagain);
+ }
} else {
SSDBG( descP,
@@ -5071,7 +5106,7 @@ ERL_NIF_TERM naccept_accepting(ErlNifEnv* env,
SOCKET accSock;
HANDLE accEvent;
ErlNifPid caller;
- int save_errno;
+ int save_errno, sres;
ERL_NIF_TERM result;
SSDBG( descP, ("SOCKET", "naccept_accepting -> get caller\r\n") );
@@ -5125,12 +5160,18 @@ ERL_NIF_TERM naccept_accepting(ErlNifEnv* env,
("SOCKET",
"naccept_accepting -> would block: try again\r\n") );
- SELECT(env,
- descP->sock,
- (ERL_NIF_SELECT_READ),
- descP, NULL, ref);
+ if ((sres = esock_select_read(env, descP->sock, descP,
+ NULL, ref)) < 0) {
+ result = esock_make_error(env,
+ MKT2(env,
+ esock_atom_select_failed,
+ MKI(env, sres)));
+ } else {
+ result = esock_make_error(env, esock_atom_eagain);
+ }
+
+ return result;
- return esock_make_error(env, esock_atom_eagain);
} else {
SSDBG( descP,
("SOCKET",
@@ -5201,10 +5242,14 @@ ERL_NIF_TERM naccept_accepting(ErlNifEnv* env,
descP->currentAcceptor.pid,
descP->currentAcceptor.ref) );
- SELECT(env,
- descP->sock,
- (ERL_NIF_SELECT_READ),
- descP, &descP->currentAcceptor.pid, descP->currentAcceptor.ref);
+ if ((sres = esock_select_read(env, descP->sock, descP,
+ &descP->currentAcceptor.pid,
+ descP->currentAcceptor.ref)) < 0) {
+ return esock_make_error(env,
+ MKT2(env,
+ esock_atom_select_failed,
+ MKI(env, sres)));
+ }
} else {
descP->currentAcceptorP = NULL;
descP->state = SOCKET_STATE_LISTENING;
@@ -6379,12 +6424,10 @@ ERL_NIF_TERM nclose(ErlNifEnv* env,
doClose = TRUE;
}
- MUNLOCK(descP->closeMtx);
-
if (doClose) {
- descP->closeRef = MKREF(env);
- selectRes = enif_select(env, descP->sock, (ERL_NIF_SELECT_STOP),
- descP, NULL, descP->closeRef);
+ descP->closeEnv = enif_alloc_env();
+ descP->closeRef = MKREF(descP->closeEnv);
+ selectRes = esock_select_stop(env, descP->sock, descP);
if (selectRes & ERL_NIF_SELECT_STOP_CALLED) {
/* Prep done - inform the caller it can finalize (close) directly */
SSDBG( descP,
@@ -6421,10 +6464,13 @@ ERL_NIF_TERM nclose(ErlNifEnv* env,
reason = MKT2(env, atom_select, MKI(env, selectRes));
reply = esock_make_error(env, reason);
}
+
} else {
reply = esock_make_error(env, reason);
}
+ MUNLOCK(descP->closeMtx);
+
SSDBG( descP,
("SOCKET", "nclose -> [%d] done when: "
"\r\n state: 0x%lX"
@@ -13065,6 +13111,7 @@ static
ERL_NIF_TERM ncancel_accept_current(ErlNifEnv* env,
SocketDescriptor* descP)
{
+ int sres;
ERL_NIF_TERM res;
SSDBG( descP, ("SOCKET", "ncancel_accept_current -> entry\r\n") );
@@ -13089,11 +13136,14 @@ ERL_NIF_TERM ncancel_accept_current(ErlNifEnv* env,
descP->currentAcceptor.pid,
descP->currentAcceptor.ref) );
- SELECT(env,
- descP->sock,
- (ERL_NIF_SELECT_READ),
- descP, &descP->currentAcceptor.pid, descP->currentAcceptor.ref);
-
+ if ((sres = esock_select_read(env, descP->sock, descP,
+ &descP->currentAcceptor.pid,
+ descP->currentAcceptor.ref)) < 0) {
+ return esock_make_error(env,
+ MKT2(env,
+ esock_atom_select_failed,
+ MKI(env, sres)));
+ }
} else {
SSDBG( descP, ("SOCKET", "ncancel_accept_current -> no more acceptors\r\n") );
descP->currentAcceptorP = NULL;
@@ -13185,6 +13235,7 @@ static
ERL_NIF_TERM ncancel_send_current(ErlNifEnv* env,
SocketDescriptor* descP)
{
+ int sres;
ERL_NIF_TERM res;
SSDBG( descP, ("SOCKET", "ncancel_send_current -> entry\r\n") );
@@ -13209,11 +13260,14 @@ ERL_NIF_TERM ncancel_send_current(ErlNifEnv* env,
descP->currentWriter.pid,
descP->currentWriter.ref) );
- SELECT(env,
- descP->sock,
- (ERL_NIF_SELECT_WRITE),
- descP, &descP->currentWriter.pid, descP->currentWriter.ref);
-
+ if ((sres = esock_select_write(env, descP->sock, descP,
+ &descP->currentWriter.pid,
+ descP->currentWriter.ref)) < 0) {
+ return esock_make_error(env,
+ MKT2(env,
+ esock_atom_select_failed,
+ MKI(env, sres)));
+ }
} else {
SSDBG( descP, ("SOCKET", "ncancel_send_current -> no more writers\r\n") );
descP->currentWriterP = NULL;
@@ -13303,6 +13357,7 @@ static
ERL_NIF_TERM ncancel_recv_current(ErlNifEnv* env,
SocketDescriptor* descP)
{
+ int sres;
ERL_NIF_TERM res;
SSDBG( descP, ("SOCKET", "ncancel_recv_current -> entry\r\n") );
@@ -13327,11 +13382,14 @@ ERL_NIF_TERM ncancel_recv_current(ErlNifEnv* env,
descP->currentReader.pid,
descP->currentReader.ref) );
- SELECT(env,
- descP->sock,
- (ERL_NIF_SELECT_READ),
- descP, &descP->currentReader.pid, descP->currentReader.ref);
-
+ if ((sres = esock_select_read(env, descP->sock, descP,
+ &descP->currentReader.pid,
+ descP->currentReader.ref)) < 0) {
+ return esock_make_error(env,
+ MKT2(env,
+ esock_atom_select_failed,
+ MKI(env, sres)));
+ }
} else {
SSDBG( descP, ("SOCKET", "ncancel_recv_current -> no more readers\r\n") );
descP->currentReaderP = NULL;
@@ -13399,9 +13457,7 @@ ERL_NIF_TERM ncancel_mode_select(ErlNifEnv* env,
int smode,
int rmode)
{
- int selectRes = enif_select(env, descP->sock,
- (ERL_NIF_SELECT_CANCEL | smode),
- descP, NULL, opRef);
+ int selectRes = esock_select_cancel(env, descP->sock, smode, descP);
if (selectRes & rmode) {
/* Was cancelled */
@@ -13498,6 +13554,8 @@ ERL_NIF_TERM send_check_result(ErlNifEnv* env,
ERL_NIF_TERM sockRef,
ERL_NIF_TERM sendRef)
{
+ int sres;
+
SSDBG( descP,
("SOCKET", "send_check_result -> entry with"
"\r\n written: %d"
@@ -13533,11 +13591,14 @@ ERL_NIF_TERM send_check_result(ErlNifEnv* env,
descP->currentWriter.pid,
descP->currentWriter.ref) );
- SELECT(env,
- descP->sock,
- (ERL_NIF_SELECT_WRITE),
- descP, &descP->currentWriter.pid, descP->currentWriter.ref);
-
+ if ((sres = esock_select_write(env, descP->sock, descP,
+ &descP->currentWriter.pid,
+ descP->currentWriter.ref)) < 0) {
+ return esock_make_error(env,
+ MKT2(env,
+ esock_atom_select_failed,
+ MKI(env, sres)));
+ }
} else {
descP->currentWriterP = NULL;
}
@@ -13615,12 +13676,34 @@ ERL_NIF_TERM send_check_result(ErlNifEnv* env,
cnt_inc(&descP->writeWaits, 1);
- SELECT(env, descP->sock, (ERL_NIF_SELECT_WRITE), descP, NULL, sendRef);
-
- if (written >= 0)
- return esock_make_ok2(env, MKI(env, written));
- else
- return esock_make_error(env, esock_atom_eagain);
+ sres = esock_select_write(env, descP->sock, descP, NULL, sendRef);
+
+ if (written >= 0) {
+ if (sres < 0) {
+ /* Returned: {error, Reason}
+ * Reason: {select_failed, sres, written}
+ */
+ return esock_make_error(env,
+ MKT3(env,
+ esock_atom_select_failed,
+ MKI(env, sres),
+ MKI(env, written)));
+ } else {
+ return esock_make_ok2(env, MKI(env, written));
+ }
+ } else {
+ if (sres < 0) {
+ /* Returned: {error, Reason}
+ * Reason: {select_failed, sres}
+ */
+ return esock_make_error(env,
+ MKT2(env,
+ esock_atom_select_failed,
+ MKI(env, sres)));
+ } else {
+ return esock_make_error(env, esock_atom_eagain);
+ }
+ }
}
@@ -13719,6 +13802,9 @@ static
ERL_NIF_TERM recv_update_current_reader(ErlNifEnv* env,
SocketDescriptor* descP)
{
+ int sres;
+ ERL_NIF_TERM res = esock_atom_ok;
+
if (descP->currentReaderP != NULL) {
DEMONP("recv_update_current_reader -> current reader",
@@ -13739,19 +13825,20 @@ ERL_NIF_TERM recv_update_current_reader(ErlNifEnv* env,
descP->currentReader.pid,
descP->currentReader.ref) );
- SELECT(env,
- descP->sock,
- (ERL_NIF_SELECT_READ),
- descP,
- &descP->currentReader.pid,
- descP->currentReader.ref);
-
+ if ((sres = esock_select_read(env, descP->sock, descP,
+ &descP->currentReader.pid,
+ descP->currentReader.ref)) < 0) {
+ res = esock_make_error(env,
+ MKT2(env,
+ esock_atom_select_failed,
+ MKI(env, sres)));
+ }
} else {
descP->currentReaderP = NULL;
}
}
- return esock_atom_ok;
+ return res;
}
@@ -13805,7 +13892,8 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env,
ERL_NIF_TERM recvRef)
{
char* xres;
- ERL_NIF_TERM data;
+ int sres;
+ ERL_NIF_TERM res, data;
SSDBG( descP,
("SOCKET", "recv_check_result -> entry with"
@@ -13825,7 +13913,8 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env,
*/
if ((read == 0) && (descP->type == SOCK_STREAM)) {
- ERL_NIF_TERM res = esock_make_error(env, atom_closed);
+
+ res = esock_make_error(env, atom_closed);
/*
* When a stream socket peer has performed an orderly shutdown, the return
@@ -13961,7 +14050,8 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env,
FREE_BIN(bufP);
if (saveErrno == ECONNRESET) {
- ERL_NIF_TERM res = esock_make_error(env, atom_closed);
+
+ res = esock_make_error(env, atom_closed);
/* +++ Oups - closed +++ */
@@ -13988,16 +14078,16 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env,
recv_error_current_reader(env, descP, sockRef, res);
- SELECT(env,
- descP->sock,
- (ERL_NIF_SELECT_STOP),
- descP, NULL, recvRef);
+ if ((sres = esock_select_stop(env, descP->sock, descP)) < 0) {
+ esock_warning_msg("Failed stop select (closed) "
+ "for current reader (%T): %d\r\n",
+ recvRef, sres);
+ }
return res;
} else if ((saveErrno == ERRNO_BLOCK) ||
(saveErrno == EAGAIN)) {
- int sres;
SSDBG( descP, ("SOCKET",
"recv_check_result -> [%d] eagain\r\n", toRead) );
@@ -14008,13 +14098,17 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env,
SSDBG( descP, ("SOCKET", "recv_check_result -> SELECT for more\r\n") );
- sres = enif_select(env, descP->sock, (ERL_NIF_SELECT_READ),
- descP, NULL, recvRef);
+ if ((sres = esock_select_read(env, descP->sock, descP,
+ NULL, recvRef)) < 0) {
+ res = esock_make_error(env,
+ MKT2(env,
+ esock_atom_select_failed,
+ MKI(env, sres)));
+ } else {
+ res = esock_make_error(env, esock_atom_eagain);
+ }
- SSDBG( descP,
- ("SOCKET", "recv_check_result -> SELECT res: %d\r\n", sres) );
-
- return esock_make_error(env, esock_atom_eagain);
+ return res;
} else {
ERL_NIF_TERM res = esock_make_error_errno(env, saveErrno);
@@ -14077,20 +14171,31 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env,
return esock_make_error_str(env, xres);
}
- /* SELECT for more data */
+ data = MKBIN(env, bufP);
+ data = MKSBIN(env, data, 0, read);
- SELECT(env, descP->sock, (ERL_NIF_SELECT_READ),
- descP, NULL, recvRef);
-
cnt_inc(&descP->readByteCnt, read);
+ /* SELECT for more data */
+
+ if ((sres = esock_select_read(env, descP->sock, descP,
+ NULL, recvRef)) < 0) {
+ /* Result: {error, Reason}
+ * Reason: {select_failed, sres, data}
+ */
+ res = esock_make_error(env,
+ MKT3(env,
+ esock_atom_select_failed,
+ MKI(env, sres),
+ data));
+ } else {
+ res = esock_make_ok3(env, atom_false, data);
+ }
+
/* This transfers "ownership" of the *allocated* binary to an
* erlang term (no need for an explicit free).
*/
- data = MKBIN(env, bufP);
- data = MKSBIN(env, data, 0, read);
-
- return esock_make_ok3(env, atom_false, data);
+ return res;
}
}
}
@@ -14113,7 +14218,8 @@ ERL_NIF_TERM recvfrom_check_result(ErlNifEnv* env,
ERL_NIF_TERM recvRef)
{
char* xres;
- ERL_NIF_TERM data;
+ int sres;
+ ERL_NIF_TERM data, res;
SSDBG( descP,
("SOCKET", "recvfrom_check_result -> entry with"
@@ -14133,7 +14239,7 @@ ERL_NIF_TERM recvfrom_check_result(ErlNifEnv* env,
/* +++ Error handling +++ */
if (saveErrno == ECONNRESET) {
- ERL_NIF_TERM res = esock_make_error(env, atom_closed);
+ res = esock_make_error(env, atom_closed);
/* +++ Oups - closed +++ */
@@ -14153,10 +14259,11 @@ ERL_NIF_TERM recvfrom_check_result(ErlNifEnv* env,
recv_error_current_reader(env, descP, sockRef, res);
- SELECT(env,
- descP->sock,
- (ERL_NIF_SELECT_STOP),
- descP, NULL, recvRef);
+ if ((sres = esock_select_stop(env, descP->sock, descP)) < 0) {
+ esock_warning_msg("Failed stop select (closed) "
+ "for current reader (%T): %d\r\n",
+ recvRef, sres);
+ }
FREE_BIN(bufP);
@@ -14166,19 +14273,26 @@ ERL_NIF_TERM recvfrom_check_result(ErlNifEnv* env,
(saveErrno == EAGAIN)) {
SSDBG( descP, ("SOCKET", "recvfrom_check_result -> eagain\r\n") );
-
- if ((xres = recv_init_current_reader(env, descP, recvRef)) != NULL)
- return esock_make_error_str(env, xres);
-
- SELECT(env, descP->sock, (ERL_NIF_SELECT_READ),
- descP, NULL, recvRef);
FREE_BIN(bufP);
- return esock_make_error(env, esock_atom_eagain);
+ if ((xres = recv_init_current_reader(env, descP, recvRef)) != NULL)
+ return esock_make_error_str(env, xres);
+
+ if ((sres = esock_select_read(env, descP->sock, descP,
+ NULL, recvRef)) < 0) {
+ res = esock_make_error(env,
+ MKT2(env,
+ esock_atom_select_failed,
+ MKI(env, sres)));
+ } else {
+ res = esock_make_error(env, esock_atom_eagain);
+ }
+ return res;
} else {
- ERL_NIF_TERM res = esock_make_error_errno(env, saveErrno);
+
+ res = esock_make_error_errno(env, saveErrno);
SSDBG( descP,
("SOCKET",
@@ -14240,6 +14354,8 @@ ERL_NIF_TERM recvmsg_check_result(ErlNifEnv* env,
ERL_NIF_TERM sockRef,
ERL_NIF_TERM recvRef)
{
+ int sres;
+ ERL_NIF_TERM res;
SSDBG( descP,
("SOCKET", "recvmsg_check_result -> entry with"
@@ -14284,7 +14400,6 @@ ERL_NIF_TERM recvmsg_check_result(ErlNifEnv* env,
/* +++ Error handling +++ */
if (saveErrno == ECONNRESET) {
- ERL_NIF_TERM res = esock_make_error(env, atom_closed);
/* +++ Oups - closed +++ */
@@ -14299,19 +14414,21 @@ ERL_NIF_TERM recvmsg_check_result(ErlNifEnv* env,
* </KOLLA>
*/
+ res = esock_make_error(env, atom_closed);
descP->closeLocal = FALSE;
descP->state = SOCKET_STATE_CLOSING;
recv_error_current_reader(env, descP, sockRef, res);
- SELECT(env,
- descP->sock,
- (ERL_NIF_SELECT_STOP),
- descP, NULL, recvRef);
+ if ((sres = esock_select_stop(env, descP->sock, descP)) < 0) {
+ esock_warning_msg("Failed stop select (closed) "
+ "for current reader (%T): %d\r\n",
+ recvRef, sres);
+ }
FREE_BIN(dataBufP); FREE_BIN(ctrlBufP);
- return res;
+ return res;;
} else if ((saveErrno == ERRNO_BLOCK) ||
(saveErrno == EAGAIN)) {
@@ -14319,18 +14436,26 @@ ERL_NIF_TERM recvmsg_check_result(ErlNifEnv* env,
SSDBG( descP, ("SOCKET", "recvmsg_check_result -> eagain\r\n") );
+ FREE_BIN(dataBufP); FREE_BIN(ctrlBufP);
+
if ((xres = recv_init_current_reader(env, descP, recvRef)) != NULL)
return esock_make_error_str(env, xres);
- SELECT(env, descP->sock, (ERL_NIF_SELECT_READ),
- descP, NULL, recvRef);
-
- FREE_BIN(dataBufP); FREE_BIN(ctrlBufP);
+ if ((sres = esock_select_read(env, descP->sock, descP,
+ NULL, recvRef)) < 0) {
+ res = esock_make_error(env,
+ MKT2(env,
+ esock_atom_select_failed,
+ MKI(env, sres)));
+ } else {
+ res = esock_make_error(env, esock_atom_eagain);
+ }
- return esock_make_error(env, esock_atom_eagain);
+ return res;
} else {
- ERL_NIF_TERM res = esock_make_error_errno(env, saveErrno);
+
+ res = esock_make_error_errno(env, saveErrno);
SSDBG( descP,
("SOCKET",
@@ -16069,6 +16194,8 @@ SocketDescriptor* alloc_descriptor(SOCKET sock, HANDLE event)
sprintf(buf, "socket[close,%d]", sock);
descP->closeMtx = MCREATE(buf);
+ descP->closeEnv = NULL;
+ descP->closeRef = esock_atom_undefined;
descP->rBufSz = SOCKET_RECV_BUFFER_SIZE_DEFAULT;
descP->rNum = 0;
@@ -16647,13 +16774,20 @@ char* send_msg_error(ErlNifEnv* env,
* (actually that the 'stop' callback function has been called).
*/
static
-char* esock_send_close_msg(ErlNifEnv* env,
- ERL_NIF_TERM closeRef,
- ErlNifPid* pid)
+char* esock_send_close_msg(ErlNifEnv* env,
+ SocketDescriptor* descP)
{
- return esock_send_socket_msg(env,
- esock_atom_undefined,
- esock_atom_close, closeRef, pid, NULL);
+ ERL_NIF_TERM sockRef = enif_make_resource(descP->closeEnv, descP);
+ char* res = esock_send_socket_msg(env,
+ sockRef,
+ esock_atom_close,
+ descP->closeRef,
+ &descP->closerPid,
+ descP->closeEnv);
+
+ descP->closeEnv = NULL;
+
+ return res;
}
@@ -16673,7 +16807,8 @@ char* esock_send_abort_msg(ErlNifEnv* env,
ErlNifPid* pid)
{
ErlNifEnv* msg_env = enif_alloc_env();
- ERL_NIF_TERM info = MKT2(msg_env, enif_make_copy(msg_env, recvRef),
+ ERL_NIF_TERM info = MKT2(msg_env,
+ enif_make_copy(msg_env, recvRef),
enif_make_copy(msg_env, reason));
return esock_send_socket_msg(env, sockRef, esock_atom_abort, info, pid,
@@ -16684,10 +16819,16 @@ char* esock_send_abort_msg(ErlNifEnv* env,
/* *** esock_send_socket_msg ***
*
* This function sends a general purpose socket message to an erlang
- * process. A general 'socket' message has the form:
+ * process. A general 'socket' message has the ("erlang") form:
*
* {'$socket', SockRef, Tag, Info}
*
+ * Where
+ *
+ * SockRef: reference()
+ * Tag: atom()
+ * Info: term()
+ *
*/
static
@@ -16696,7 +16837,7 @@ char* esock_send_socket_msg(ErlNifEnv* env,
ERL_NIF_TERM tag,
ERL_NIF_TERM info,
ErlNifPid* pid,
- ErlNifEnv* msg_env)
+ ErlNifEnv* msg_env)
{
ERL_NIF_TERM msg;
if (!msg_env) {
@@ -16731,6 +16872,52 @@ char* esock_send_msg(ErlNifEnv* env,
#endif // #if defined(__WIN32__)
+/* ----------------------------------------------------------------------
+ * S e l e c t W r a p p e r F u n c t i o n s
+ * ----------------------------------------------------------------------
+ */
+
+static
+int esock_select_read(ErlNifEnv* env,
+ ErlNifEvent event,
+ void* obj,
+ const ErlNifPid* pid,
+ ERL_NIF_TERM ref)
+{
+ return enif_select(env, event, (ERL_NIF_SELECT_READ), obj, pid, ref);
+}
+
+
+static
+int esock_select_write(ErlNifEnv* env,
+ ErlNifEvent event,
+ void* obj,
+ const ErlNifPid* pid,
+ ERL_NIF_TERM ref)
+{
+ return enif_select(env, event, (ERL_NIF_SELECT_WRITE), obj, pid, ref);
+}
+
+
+static
+int esock_select_stop(ErlNifEnv* env,
+ ErlNifEvent event,
+ void* obj)
+{
+ return enif_select(env, event, (ERL_NIF_SELECT_STOP), obj, NULL,
+ esock_atom_undefined);
+}
+
+static
+int esock_select_cancel(ErlNifEnv* env,
+ ErlNifEvent event,
+ enum ErlNifSelectFlags mode,
+ void* obj)
+{
+ return enif_select(env, event, (ERL_NIF_SELECT_CANCEL | mode), obj, NULL,
+ esock_atom_undefined);
+}
+
/* ----------------------------------------------------------------------
* R e q u e s t Q u e u e F u n c t i o n s
@@ -17180,20 +17367,13 @@ int esock_monitor(const char* slogan,
int res;
SSDBG( descP, ("SOCKET", "[%d] %s: try monitor\r\n", descP->sock, slogan) );
- /* esock_dbg_printf("MONP", "[%d] %s\r\n", descP->sock, slogan); */
res = enif_monitor_process(env, descP, pid, &monP->mon);
if (res != 0) {
monP->is_active = 0;
SSDBG( descP, ("SOCKET", "[%d] monitor failed: %d\r\n", descP->sock, res) );
- // esock_dbg_printf("MONP", "[%d] failed: %d\r\n", descP->sock, res);
} else {
monP->is_active = 1;
- /*esock_dbg_printf("MONP",
- "[%d] success: "
- "%T\r\n",
- descP->sock,
- my_make_monitor_term(env, &monP->mon));*/
}
return res;
@@ -17294,16 +17474,6 @@ void socket_dtor(ErlNifEnv* env, void* obj)
* (in case it is, for instance, both controlling process
* and a writer).
*
- * <KOLLA>
- *
- * We do not handle linger-issues yet! So anything in the out
- * buffers will be left for the OS to solve...
- * Do we need a special "close"-thread? Dirty scheduler?
- *
- * What happens if we are "stopped" for another reason then 'close'?
- * For instance, down?
- *
- * </KOLLA>
*/
static
void socket_stop(ErlNifEnv* env, void* obj, int fd, int is_direct_call)
@@ -17321,7 +17491,7 @@ void socket_stop(ErlNifEnv* env, void* obj, int fd, int is_direct_call)
MLOCK(descP->writeMtx);
MLOCK(descP->readMtx);
MLOCK(descP->accMtx);
- MLOCK(descP->closeMtx);
+ if (!is_direct_call) MLOCK(descP->closeMtx);
SSDBG( descP, ("SOCKET", "socket_stop -> "
"[%d, %T] all mutex(s) locked when counters:"
@@ -17360,10 +17530,6 @@ void socket_stop(ErlNifEnv* env, void* obj, int fd, int is_direct_call)
*/
DEMONP("socket_stop -> ctrl", env, descP, &descP->ctrlMon);
- /*
- esock_dbg_printf("STOP", "[%d] maybe handle current writer (0x%lX)\r\n",
- descP->sock, descP->currentReaderP);
- */
if (descP->currentWriterP != NULL) {
/* We have a (current) writer and *may* therefor also have
* writers waiting.
@@ -17399,10 +17565,6 @@ void socket_stop(ErlNifEnv* env, void* obj, int fd, int is_direct_call)
inform_waiting_procs(env, descP, &descP->writersQ, TRUE, atom_closed);
}
- /*
- esock_dbg_printf("STOP", "[%d] maybe handle current reader (0x%lX)\r\n",
- descP->sock, descP->currentReaderP);
- */
if (descP->currentReaderP != NULL) {
/* We have a (current) reader and *may* therefor also have
@@ -17439,10 +17601,6 @@ void socket_stop(ErlNifEnv* env, void* obj, int fd, int is_direct_call)
inform_waiting_procs(env, descP, &descP->readersQ, TRUE, atom_closed);
}
- /*
- esock_dbg_printf("STOP", "[%d] maybe handle current acceptor (0x%lX)\r\n",
- descP->sock, descP->currentReaderP);
- */
if (descP->currentAcceptorP != NULL) {
/* We have a (current) acceptor and *may* therefor also have
* acceptors waiting.
@@ -17490,56 +17648,32 @@ void socket_stop(ErlNifEnv* env, void* obj, int fd, int is_direct_call)
*
* </KOLLA>
*/
-
- if (descP->closeLocal &&
- !is_direct_call) {
-
- /* +++ send close message to the waiting process +++
- *
- * {close, CloseRef}
- *
- * <KOLLA>
- *
- * WHAT HAPPENS IF THE RECEIVER HAS DIED IN THE MEANTIME????
- *
- * </KOLLA>
- */
- /* We should actually move the sending out of the
- * safe region (after the MUNLOCK).
- * Construct it here, but do the actual send later.
- * Here:
- * cpid = descP->closePid;
- * cenv = enif_alloc_env();
- * closeMsg = esock_mk_close_msg(env,
- * CP_TERM(cenv, sockRef),
- * CP_TERM(cenv, descP->closeRef));
- * And after MUNLOCK:
- * if (cenv != NULL) {
- * esock_send_msg(env, closeMsg, &cpid, cenv);
- *
- * }
- */
- esock_send_close_msg(env, descP->closeRef, &descP->closerPid);
-
- DEMONP("socket_stop -> closer", env, descP, &descP->closerMon);
-
- } else {
-
- /*
- * <KOLLA>
- *
- * ABORT?
- *
- * </KOLLA>
- */
+ if (descP->closeLocal) {
+ if (!is_direct_call) {
+
+ /* +++ send close message to the waiting process +++ */
+
+ esock_send_close_msg(env, descP);
+
+ DEMONP("socket_stop -> closer", env, descP, &descP->closerMon);
+
+ } else {
+
+ /* We only need to explicitly free the environment here
+ * since the message send takes care of it if scheduled.
+ */
+
+ if (descP->closeEnv != NULL) enif_free_env(descP->closeEnv);
+
+ }
}
}
SSDBG( descP, ("SOCKET", "socket_stop -> unlock all mutex(s)\r\n") );
- MUNLOCK(descP->closeMtx);
+ if (!is_direct_call) MUNLOCK(descP->closeMtx);
MUNLOCK(descP->accMtx);
MUNLOCK(descP->readMtx);
MUNLOCK(descP->writeMtx);
@@ -17624,31 +17758,6 @@ void socket_down(ErlNifEnv* env,
B2S(IS_CLOSED(descP)),
B2S(IS_CLOSING(descP))) );
- /*
- esock_dbg_printf("DOWN",
- "[%d] begin %T\r\n",
- descP->sock, my_make_monitor_term(env, mon));
- */
-
- /*
- if (MON_COMP(mon, &descP->ctrlMon) == 0) {
- SSDBG( descP, ("SOCKET", "socket_down -> controlling process mon\r\n") );
- } else if (MON_COMP(mon, &descP->closerMon) == 0) {
- SSDBG( descP, ("SOCKET", "socket_down -> closer mon\r\n") );
- } else if ((descP->currentWriterP != NULL) &&
- (MON_COMP(mon, &descP->currentWriter.mon) == 0)) {
- SSDBG( descP, ("SOCKET", "socket_down -> current writer mon\r\n") );
- } else if ((descP->currentReaderP != NULL) &&
- (MON_COMP(mon, &descP->currentReader.mon) == 0)) {
- SSDBG( descP, ("SOCKET", "socket_down -> current reader mon\r\n") );
- } else if ((descP->currentAcceptorP != NULL) &&
- (MON_COMP(mon, &descP->currentAcceptor.mon) == 0)) {
- SSDBG( descP, ("SOCKET", "socket_down -> current acceptor mon\r\n") );
- } else {
- SSDBG( descP, ("SOCKET", "socket_down -> OTHER mon\r\n") );
- }
- */
-
if (!IS_CLOSED(descP)) {
if (compare_pids(env, &descP->ctrlPid, pid)) {
int selectRes;
@@ -17664,16 +17773,8 @@ void socket_down(ErlNifEnv* env,
descP->closeLocal = TRUE;
descP->closerPid = *pid;
MON_INIT(&descP->closerMon);
- descP->closeRef = MKREF(env); // Do we really need this in this case?
-
- /*
- esock_dbg_printf("DOWN",
- "[%d] select stop %T\r\n",
- descP->sock, my_make_monitor_term(env, mon));
- */
- selectRes = enif_select(env, descP->sock, (ERL_NIF_SELECT_STOP),
- descP, NULL, descP->closeRef);
+ selectRes = esock_select_stop(env, descP->sock, descP);
if (selectRes & ERL_NIF_SELECT_STOP_CALLED) {
/* We are done - we can finalize (socket close) directly */
@@ -17832,12 +17933,9 @@ void socket_down_acceptor(ErlNifEnv* env,
descP->currentAcceptor.pid,
descP->currentAcceptor.ref) );
- if ((res = enif_select(env,
- descP->sock,
- (ERL_NIF_SELECT_READ),
- descP,
- &descP->currentAcceptor.pid,
- descP->currentAcceptor.ref) < 0)) {
+ if ((res = esock_select_read(env, descP->sock, descP,
+ &descP->currentAcceptor.pid,
+ descP->currentAcceptor.ref) < 0)) {
esock_warning_msg("Failed select (%d) for new acceptor "
"after current (%T) died\r\n",
@@ -17900,12 +17998,9 @@ void socket_down_writer(ErlNifEnv* env,
descP->currentWriter.pid,
descP->currentWriter.ref) );
- if ((res = enif_select(env,
- descP->sock,
- (ERL_NIF_SELECT_WRITE),
- descP,
- &descP->currentWriter.pid,
- descP->currentWriter.ref) < 0)) {
+ if ((res = esock_select_write(env, descP->sock, descP,
+ &descP->currentWriter.pid,
+ descP->currentWriter.ref) < 0)) {
esock_warning_msg("Failed select (%d) for new writer "
"after current (%T) died\r\n",
@@ -17967,17 +18062,14 @@ void socket_down_reader(ErlNifEnv* env,
descP->currentReader.pid,
descP->currentReader.ref) );
- if ((res = enif_select(env,
- descP->sock,
- (ERL_NIF_SELECT_READ),
- descP,
- &descP->currentReader.pid,
- descP->currentReader.ref) < 0)) {
+ if ((res = esock_select_read(env, descP->sock, descP,
+ &descP->currentReader.pid,
+ descP->currentReader.ref) < 0)) {
esock_warning_msg("Failed select (%d) for new reader "
"after current (%T) died\r\n",
res, *pid);
-
+
}
} else {
@@ -18327,6 +18419,7 @@ int on_load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info)
esock_atom_scope_id = MKA(env, "scope_id");
esock_atom_sctp = MKA(env, "sctp");
esock_atom_sec = MKA(env, "sec");
+ esock_atom_select_failed = MKA(env, "select_failed");
esock_atom_select_sent = MKA(env, "select_sent");
esock_atom_send = MKA(env, "send");
esock_atom_sendmsg = MKA(env, "sendmsg");
diff --git a/erts/preloaded/ebin/socket.beam b/erts/preloaded/ebin/socket.beam
index 93b5fc215b..e44dff8475 100644
--- a/erts/preloaded/ebin/socket.beam
+++ b/erts/preloaded/ebin/socket.beam
Binary files differ
diff --git a/erts/preloaded/src/socket.erl b/erts/preloaded/src/socket.erl
index 12b6c3ac55..5c1647290d 100644
--- a/erts/preloaded/src/socket.erl
+++ b/erts/preloaded/src/socket.erl
@@ -2111,6 +2111,16 @@ do_recvmsg(SockRef, BufSz, CtrlSz, EFlags, Timeout) ->
%%
%% close - close a file descriptor
%%
+%% Closing a socket is a two stage rocket (because of linger).
+%% We need to perform the actual socket close while in BLOCKING mode.
+%% But that would hang the entire VM, so what we do is divide the
+%% close in two steps:
+%% 1) nif_close + the socket_stop (nif) callback function
+%% This is for everything that can be done safely NON-BLOCKING.
+%% 2) nif_finalize_close which is executed by a *dirty* scheduler
+%% Before we call the socket close function, we se the socket
+%% BLOCKING. Thereby linger is handled properly.
+
-spec close(Socket) -> ok | {error, Reason} when
Socket :: socket(),
@@ -2124,16 +2134,10 @@ do_close(SockRef) ->
ok ->
nif_finalize_close(SockRef);
{ok, CloseRef} ->
- %% We must wait
+ %% We must wait for the socket_stop callback function to
+ %% complete its work
receive
- {'$socket', _, close, CloseRef} ->
-%% {close, CloseRef} ->
- %% <KOLLA>
- %%
- %% WHAT HAPPENS IF THIS PROCESS IS KILLED
- %% BEFORE WE CAN EXECUTE THE FINAL CLOSE???
- %%
- %% </KOLLA>
+ {'$socket', SockRef, close, CloseRef} ->
nif_finalize_close(SockRef)
end;
{error, _} = ERROR ->