aboutsummaryrefslogtreecommitdiffstats
path: root/erts/emulator/nifs/common/socket_nif.c
diff options
context:
space:
mode:
Diffstat (limited to 'erts/emulator/nifs/common/socket_nif.c')
-rw-r--r--erts/emulator/nifs/common/socket_nif.c608
1 files changed, 326 insertions, 282 deletions
diff --git a/erts/emulator/nifs/common/socket_nif.c b/erts/emulator/nifs/common/socket_nif.c
index 2d8dc3c201..965dcfe471 100644
--- a/erts/emulator/nifs/common/socket_nif.c
+++ b/erts/emulator/nifs/common/socket_nif.c
@@ -964,6 +964,7 @@ static ERL_NIF_TERM nlisten(ErlNifEnv* env,
int backlog);
static ERL_NIF_TERM naccept(ErlNifEnv* env,
SocketDescriptor* descP,
+ ERL_NIF_TERM sockRef,
ERL_NIF_TERM ref);
static ERL_NIF_TERM naccept_listening(ErlNifEnv* env,
SocketDescriptor* descP,
@@ -980,17 +981,21 @@ static ERL_NIF_TERM naccept_listening_accept(ErlNifEnv* env,
SocketAddress* remote);
static ERL_NIF_TERM naccept_accepting(ErlNifEnv* env,
SocketDescriptor* descP,
+ ERL_NIF_TERM sockRef,
ERL_NIF_TERM ref);
static ERL_NIF_TERM naccept_accepting_current(ErlNifEnv* env,
SocketDescriptor* descP,
+ ERL_NIF_TERM sockRef,
ERL_NIF_TERM ref);
static ERL_NIF_TERM naccept_accepting_current_accept(ErlNifEnv* env,
SocketDescriptor* descP,
+ ERL_NIF_TERM sockRef,
SOCKET accSock,
SocketAddress* remote);
static ERL_NIF_TERM naccept_accepting_current_error(ErlNifEnv* env,
SocketDescriptor* descP,
- ERL_NIF_TERM ref,
+ ERL_NIF_TERM sockRef,
+ ERL_NIF_TERM opRef,
int save_errno);
static ERL_NIF_TERM naccept_accepting_other(ErlNifEnv* env,
SocketDescriptor* descP,
@@ -1915,31 +1920,38 @@ static ERL_NIF_TERM npeername(ErlNifEnv* env,
static ERL_NIF_TERM ncancel(ErlNifEnv* env,
SocketDescriptor* descP,
ERL_NIF_TERM op,
+ ERL_NIF_TERM sockRef,
ERL_NIF_TERM opRef);
static ERL_NIF_TERM ncancel_connect(ErlNifEnv* env,
SocketDescriptor* descP,
ERL_NIF_TERM opRef);
static ERL_NIF_TERM ncancel_accept(ErlNifEnv* env,
SocketDescriptor* descP,
+ ERL_NIF_TERM sockRef,
ERL_NIF_TERM opRef);
static ERL_NIF_TERM ncancel_accept_current(ErlNifEnv* env,
- SocketDescriptor* descP);
+ SocketDescriptor* descP,
+ ERL_NIF_TERM sockRef);
static ERL_NIF_TERM ncancel_accept_waiting(ErlNifEnv* env,
SocketDescriptor* descP,
ERL_NIF_TERM opRef);
static ERL_NIF_TERM ncancel_send(ErlNifEnv* env,
SocketDescriptor* descP,
+ ERL_NIF_TERM sockRef,
ERL_NIF_TERM opRef);
static ERL_NIF_TERM ncancel_send_current(ErlNifEnv* env,
- SocketDescriptor* descP);
+ SocketDescriptor* descP,
+ ERL_NIF_TERM sockRef);
static ERL_NIF_TERM ncancel_send_waiting(ErlNifEnv* env,
SocketDescriptor* descP,
ERL_NIF_TERM opRef);
static ERL_NIF_TERM ncancel_recv(ErlNifEnv* env,
SocketDescriptor* descP,
+ ERL_NIF_TERM sockRef,
ERL_NIF_TERM opRef);
static ERL_NIF_TERM ncancel_recv_current(ErlNifEnv* env,
- SocketDescriptor* descP);
+ SocketDescriptor* descP,
+ ERL_NIF_TERM sockRef);
static ERL_NIF_TERM ncancel_recv_waiting(ErlNifEnv* env,
SocketDescriptor* descP,
ERL_NIF_TERM opRef);
@@ -2018,7 +2030,8 @@ static char* recv_init_current_reader(ErlNifEnv* env,
SocketDescriptor* descP,
ERL_NIF_TERM ref);
static ERL_NIF_TERM recv_update_current_reader(ErlNifEnv* env,
- SocketDescriptor* descP);
+ SocketDescriptor* descP,
+ ERL_NIF_TERM sockRef);
static void recv_error_current_reader(ErlNifEnv* env,
SocketDescriptor* descP,
ERL_NIF_TERM sockRef,
@@ -2234,7 +2247,43 @@ static void inc_socket(int domain, int type, int protocol);
static void dec_socket(int domain, int type, int protocol);
-/* All the queue operator functions (search4pid, push, pop
+extern char* erl_errno_id(int error); /* THIS IS JUST TEMPORARY??? */
+
+
+/* *** activate_next_acceptor ***
+ * *** activate_next_writer ***
+ * *** activate_next_reader ***
+ *
+ * All the activate-next functions for acceptor, writer and reader
+ * have exactly the same API, so we apply some macro magic to simplify.
+ * They simply operates on dufferent data structures.
+ *
+ */
+
+#define ACTIVATE_NEXT_FUNCS_DEFS \
+ ACTIVATE_NEXT_FUNC_DEF(acceptor) \
+ ACTIVATE_NEXT_FUNC_DEF(writer) \
+ ACTIVATE_NEXT_FUNC_DEF(reader)
+
+#define ACTIVATE_NEXT_FUNC_DEF(F) \
+ static BOOLEAN_T activate_next_##F(ErlNifEnv* env, \
+ SocketDescriptor* descP, \
+ ERL_NIF_TERM sockRef);
+ACTIVATE_NEXT_FUNCS_DEFS
+#undef ACTIVATE_NEXT_FUNC_DEF
+
+static BOOLEAN_T activate_next(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ SocketRequestor* reqP,
+ SocketRequestQueue* q,
+ ERL_NIF_TERM sockRef);
+
+/* *** acceptor_search4pid | writer_search4pid | reader_search4pid ***
+ * *** acceptor_push | writer_push | reader_push ***
+ * *** acceptor_pop | writer_pop | reader_pop ***
+ * *** acceptor_unqueue | writer_unqueue | reader_unqueue ***
+ *
+ * All the queue operator functions (search4pid, push, pop
* and unqueue) for acceptor, writer and reader has exactly
* the same API, so we apply some macro magic to simplify.
*/
@@ -2305,12 +2354,15 @@ static void socket_down(ErlNifEnv* env,
const ErlNifMonitor* mon);
static void socket_down_acceptor(ErlNifEnv* env,
SocketDescriptor* descP,
+ ERL_NIF_TERM sockRef,
const ErlNifPid* pid);
static void socket_down_writer(ErlNifEnv* env,
SocketDescriptor* descP,
+ ERL_NIF_TERM sockRef,
const ErlNifPid* pid);
static void socket_down_reader(ErlNifEnv* env,
SocketDescriptor* descP,
+ ERL_NIF_TERM sockRef,
const ErlNifPid* pid);
static char* esock_send_close_msg(ErlNifEnv* env,
@@ -4832,14 +4884,15 @@ ERL_NIF_TERM nif_accept(ErlNifEnv* env,
return enif_raise_exception(env, MKA(env, "notsup"));
#else
SocketDescriptor* descP;
- ERL_NIF_TERM ref, res;
+ ERL_NIF_TERM sockRef, ref, res;
SGDBG( ("SOCKET", "nif_accept -> entry with argc: %d\r\n", argc) );
/* Extract arguments and perform preliminary validation */
+ sockRef = argv[0];
if ((argc != 2) ||
- !enif_get_resource(env, argv[0], sockets, (void**) &descP)) {
+ !enif_get_resource(env, sockRef, sockets, (void**) &descP)) {
return enif_make_badarg(env);
}
ref = argv[1];
@@ -4852,7 +4905,7 @@ ERL_NIF_TERM nif_accept(ErlNifEnv* env,
MLOCK(descP->accMtx);
- res = naccept(env, descP, ref);
+ res = naccept(env, descP, sockRef, ref);
MUNLOCK(descP->accMtx);
@@ -4866,6 +4919,7 @@ ERL_NIF_TERM nif_accept(ErlNifEnv* env,
static
ERL_NIF_TERM naccept(ErlNifEnv* env,
SocketDescriptor* descP,
+ ERL_NIF_TERM sockRef,
ERL_NIF_TERM ref)
{
ERL_NIF_TERM res;
@@ -4879,7 +4933,7 @@ ERL_NIF_TERM naccept(ErlNifEnv* env,
break;
case SOCKET_STATE_ACCEPTING:
- res = naccept_accepting(env, descP, ref);
+ res = naccept_accepting(env, descP, sockRef, ref);
break;
default:
@@ -5016,6 +5070,7 @@ ERL_NIF_TERM naccept_listening_accept(ErlNifEnv* env,
static
ERL_NIF_TERM naccept_accepting(ErlNifEnv* env,
SocketDescriptor* descP,
+ ERL_NIF_TERM sockRef,
ERL_NIF_TERM ref)
{
ErlNifPid caller;
@@ -5032,15 +5087,12 @@ ERL_NIF_TERM naccept_accepting(ErlNifEnv* env,
"\r\n Current: %T"
"\r\n", caller, descP->currentAcceptor.pid) );
-
-
-
if (compare_pids(env, &descP->currentAcceptor.pid, &caller)) {
SSDBG( descP,
("SOCKET", "naccept_accepting -> current acceptor\r\n") );
- res = naccept_accepting_current(env, descP, ref);
+ res = naccept_accepting_current(env, descP, sockRef, ref);
} else {
@@ -5065,7 +5117,8 @@ ERL_NIF_TERM naccept_accepting(ErlNifEnv* env,
static
ERL_NIF_TERM naccept_accepting_current(ErlNifEnv* env,
SocketDescriptor* descP,
- ERL_NIF_TERM ref)
+ ERL_NIF_TERM sockRef,
+ ERL_NIF_TERM accRef)
{
SocketAddress remote;
unsigned int n;
@@ -5086,13 +5139,15 @@ ERL_NIF_TERM naccept_accepting_current(ErlNifEnv* env,
"naccept_accepting_current -> accept failed: %d\r\n",
save_errno) );
- res = naccept_accepting_current_error(env, descP, ref, save_errno);
+ res = naccept_accepting_current_error(env, descP, sockRef,
+ accRef, save_errno);
} else {
SSDBG( descP, ("SOCKET", "naccept_accepting_current -> accepted\r\n") );
- res = naccept_accepting_current_accept(env, descP, accSock, &remote);
+ res = naccept_accepting_current_accept(env, descP, sockRef,
+ accSock, &remote);
}
@@ -5107,10 +5162,10 @@ ERL_NIF_TERM naccept_accepting_current(ErlNifEnv* env,
static
ERL_NIF_TERM naccept_accepting_current_accept(ErlNifEnv* env,
SocketDescriptor* descP,
+ ERL_NIF_TERM sockRef,
SOCKET accSock,
SocketAddress* remote)
{
- int sres;
ERL_NIF_TERM res;
if (naccept_accepted(env, descP, accSock,
@@ -5124,30 +5179,21 @@ ERL_NIF_TERM naccept_accepting_current_accept(ErlNifEnv* env,
*
*/
- if (acceptor_pop(env, descP, &descP->currentAcceptor)) {
-
- /* There was another one */
+ if (!activate_next_acceptor(env, descP, sockRef)) {
SSDBG( descP,
("SOCKET",
- "naccept_accepting_current_accept -> new (active) acceptor: "
- "\r\n pid: %T"
- "\r\n ref: %T"
- "\r\n",
- descP->currentAcceptor.pid,
- descP->currentAcceptor.ref) );
+ "naccept_accepting_current_accept -> "
+ "no more writers\r\n") );
- if ((sres = esock_select_read(env, descP->sock, descP,
- &descP->currentAcceptor.pid,
- descP->currentAcceptor.ref)) < 0) {
- esock_warning_msg("Failed select (%d) for new acceptor "
- "after current (%T) died\r\n",
- sres, descP->currentAcceptor.pid);
- }
- } else {
- descP->currentAcceptorP = NULL;
- descP->state = SOCKET_STATE_LISTENING;
+ descP->state = SOCKET_STATE_LISTENING;
+
+ descP->currentAcceptorP = NULL;
+ descP->currentAcceptor.ref = esock_atom_undefined;
+ enif_set_pid_undefined(&descP->currentAcceptor.pid);
+ esock_monitor_init(&descP->currentAcceptor.mon);
}
+
}
return res;
@@ -5163,10 +5209,12 @@ ERL_NIF_TERM naccept_accepting_current_accept(ErlNifEnv* env,
static
ERL_NIF_TERM naccept_accepting_current_error(ErlNifEnv* env,
SocketDescriptor* descP,
- ERL_NIF_TERM ref,
+ ERL_NIF_TERM sockRef,
+ ERL_NIF_TERM opRef,
int save_errno)
{
- ERL_NIF_TERM res;
+ SocketRequestor req;
+ ERL_NIF_TERM res, reason;
if (save_errno == ERRNO_BLOCK) {
@@ -5176,14 +5224,27 @@ ERL_NIF_TERM naccept_accepting_current_error(ErlNifEnv* env,
SSDBG( descP,
("SOCKET",
- "naccept_accepting_current_error -> would block: try again\r\n") );
+ "naccept_accepting_current_error -> "
+ "would block: try again\r\n") );
- res = naccept_busy_retry(env, descP, ref, &descP->currentAcceptor.pid,
+ res = naccept_busy_retry(env, descP, opRef, &descP->currentAcceptor.pid,
/* No state change */
descP->state);
} else {
- res = esock_make_error_errno(env, save_errno);
+
+ reason = MKA(env, erl_errno_id(save_errno));
+ res = esock_make_error(env, reason);
+
+ while (acceptor_pop(env, descP, &req)) {
+ SSDBG( descP,
+ ("SOCKET", "naccept_accepting_current_error -> abort %T\r\n",
+ req.pid) );
+ esock_send_abort_msg(env, sockRef, req.ref, reason, &req.pid);
+ DEMONP("naccept_accepting_current_error -> pop'ed writer",
+ env, descP, &req.mon);
+ }
+
}
return res;
@@ -13063,14 +13124,15 @@ ERL_NIF_TERM nif_cancel(ErlNifEnv* env,
return enif_raise_exception(env, MKA(env, "notsup"));
#else
SocketDescriptor* descP;
- ERL_NIF_TERM op, opRef, result;
+ ERL_NIF_TERM op, sockRef, opRef, result;
SGDBG( ("SOCKET", "nif_cancel -> entry with argc: %d\r\n", argc) );
/* Extract arguments and perform preliminary validation */
+ sockRef = argv[0];
if ((argc != 3) ||
- !enif_get_resource(env, argv[0], sockets, (void**) &descP)) {
+ !enif_get_resource(env, sockRef, sockets, (void**) &descP)) {
return enif_make_badarg(env);
}
op = argv[1];
@@ -13085,7 +13147,7 @@ ERL_NIF_TERM nif_cancel(ErlNifEnv* env,
"\r\n opRef: %T"
"\r\n", descP->sock, op, opRef) );
- result = ncancel(env, descP, op, opRef);
+ result = ncancel(env, descP, op, sockRef, opRef);
SSDBG( descP,
("SOCKET", "nif_cancel -> done with result: "
@@ -13102,6 +13164,7 @@ static
ERL_NIF_TERM ncancel(ErlNifEnv* env,
SocketDescriptor* descP,
ERL_NIF_TERM op,
+ ERL_NIF_TERM sockRef,
ERL_NIF_TERM opRef)
{
/* <KOLLA>
@@ -13115,19 +13178,19 @@ ERL_NIF_TERM ncancel(ErlNifEnv* env,
if (COMPARE(op, esock_atom_connect) == 0) {
return ncancel_connect(env, descP, opRef);
} else if (COMPARE(op, esock_atom_accept) == 0) {
- return ncancel_accept(env, descP, opRef);
+ return ncancel_accept(env, descP, sockRef, opRef);
} else if (COMPARE(op, esock_atom_send) == 0) {
- return ncancel_send(env, descP, opRef);
+ return ncancel_send(env, descP, sockRef, opRef);
} else if (COMPARE(op, esock_atom_sendto) == 0) {
- return ncancel_send(env, descP, opRef);
+ return ncancel_send(env, descP, sockRef, opRef);
} else if (COMPARE(op, esock_atom_sendmsg) == 0) {
- return ncancel_send(env, descP, opRef);
+ return ncancel_send(env, descP, sockRef, opRef);
} else if (COMPARE(op, esock_atom_recv) == 0) {
- return ncancel_recv(env, descP, opRef);
+ return ncancel_recv(env, descP, sockRef, opRef);
} else if (COMPARE(op, esock_atom_recvfrom) == 0) {
- return ncancel_recv(env, descP, opRef);
+ return ncancel_recv(env, descP, sockRef, opRef);
} else if (COMPARE(op, esock_atom_recvmsg) == 0) {
- return ncancel_recv(env, descP, opRef);
+ return ncancel_recv(env, descP, sockRef, opRef);
} else {
return esock_make_error(env, esock_atom_einval);
}
@@ -13161,6 +13224,7 @@ ERL_NIF_TERM ncancel_connect(ErlNifEnv* env,
static
ERL_NIF_TERM ncancel_accept(ErlNifEnv* env,
SocketDescriptor* descP,
+ ERL_NIF_TERM sockRef,
ERL_NIF_TERM opRef)
{
ERL_NIF_TERM res;
@@ -13176,7 +13240,7 @@ ERL_NIF_TERM ncancel_accept(ErlNifEnv* env,
if (descP->currentAcceptorP != NULL) {
if (COMPARE(opRef, descP->currentAcceptor.ref) == 0) {
- res = ncancel_accept_current(env, descP);
+ res = ncancel_accept_current(env, descP, sockRef);
} else {
res = ncancel_accept_waiting(env, descP, opRef);
}
@@ -13202,9 +13266,9 @@ ERL_NIF_TERM ncancel_accept(ErlNifEnv* env,
*/
static
ERL_NIF_TERM ncancel_accept_current(ErlNifEnv* env,
- SocketDescriptor* descP)
+ SocketDescriptor* descP,
+ ERL_NIF_TERM sockRef)
{
- int sres;
ERL_NIF_TERM res;
SSDBG( descP, ("SOCKET", "ncancel_accept_current -> entry\r\n") );
@@ -13213,33 +13277,22 @@ ERL_NIF_TERM ncancel_accept_current(ErlNifEnv* env,
env, descP, &descP->currentAcceptor.mon);
res = ncancel_read_select(env, descP, descP->currentAcceptor.ref);
- SSDBG( descP, ("SOCKET", "ncancel_accept_current -> cancel res: %T\r\n", res) );
+ SSDBG( descP, ("SOCKET",
+ "ncancel_accept_current -> cancel res: %T\r\n", res) );
- if (acceptor_pop(env, descP, &descP->currentAcceptor)) {
-
- /* There was another one */
-
- SSDBG( descP, ("SOCKET", "ncancel_accept_current -> new (active) acceptor: "
- "\r\n pid: %T"
- "\r\n ref: %T"
- "\r\n",
- descP->currentAcceptor.pid,
- descP->currentAcceptor.ref) );
-
- if ((sres = esock_select_read(env, descP->sock, descP,
- &descP->currentAcceptor.pid,
- descP->currentAcceptor.ref)) < 0) {
- return esock_make_error(env,
- MKT2(env,
- esock_atom_select_failed,
- MKI(env, sres)));
- }
- } else {
- SSDBG( descP, ("SOCKET", "ncancel_accept_current -> no more acceptors\r\n") );
- descP->currentAcceptorP = NULL;
- descP->state = SOCKET_STATE_LISTENING;
+ if (!activate_next_acceptor(env, descP, sockRef)) {
+
+ SSDBG( descP,
+ ("SOCKET", "ncancel_accept_current -> no more writers\r\n") );
+
+ descP->state = SOCKET_STATE_LISTENING;
+
+ descP->currentAcceptorP = NULL;
+ descP->currentAcceptor.ref = esock_atom_undefined;
+ enif_set_pid_undefined(&descP->currentAcceptor.pid);
+ esock_monitor_init(&descP->currentAcceptor.mon);
}
-
+
SSDBG( descP, ("SOCKET", "ncancel_accept_current -> done with result:"
"\r\n %T"
"\r\n", res) );
@@ -13281,6 +13334,7 @@ ERL_NIF_TERM ncancel_accept_waiting(ErlNifEnv* env,
static
ERL_NIF_TERM ncancel_send(ErlNifEnv* env,
SocketDescriptor* descP,
+ ERL_NIF_TERM sockRef,
ERL_NIF_TERM opRef)
{
ERL_NIF_TERM res;
@@ -13296,7 +13350,7 @@ ERL_NIF_TERM ncancel_send(ErlNifEnv* env,
if (descP->currentWriterP != NULL) {
if (COMPARE(opRef, descP->currentWriter.ref) == 0) {
- res = ncancel_send_current(env, descP);
+ res = ncancel_send_current(env, descP, sockRef);
} else {
res = ncancel_send_waiting(env, descP, opRef);
}
@@ -13323,43 +13377,29 @@ ERL_NIF_TERM ncancel_send(ErlNifEnv* env,
*/
static
ERL_NIF_TERM ncancel_send_current(ErlNifEnv* env,
- SocketDescriptor* descP)
+ SocketDescriptor* descP,
+ ERL_NIF_TERM sockRef)
{
- int sres;
ERL_NIF_TERM res;
SSDBG( descP, ("SOCKET", "ncancel_send_current -> entry\r\n") );
- DEMONP("ncancel_recv_current -> current writer",
+ DEMONP("ncancel_send_current -> current writer",
env, descP, &descP->currentWriter.mon);
res = ncancel_write_select(env, descP, descP->currentWriter.ref);
- SSDBG( descP, ("SOCKET", "ncancel_send_current -> cancel res: %T\r\n", res) );
+ SSDBG( descP,
+ ("SOCKET", "ncancel_send_current -> cancel res: %T\r\n", res) );
- if (writer_pop(env, descP, &descP->currentWriter)) {
-
- /* There was another one */
-
- SSDBG( descP, ("SOCKET", "ncancel_send_current -> new (active) writer: "
- "\r\n pid: %T"
- "\r\n ref: %T"
- "\r\n",
- descP->currentWriter.pid,
- descP->currentWriter.ref) );
-
- if ((sres = esock_select_write(env, descP->sock, descP,
- &descP->currentWriter.pid,
- descP->currentWriter.ref)) < 0) {
- return esock_make_error(env,
- MKT2(env,
- esock_atom_select_failed,
- MKI(env, sres)));
- }
- } else {
- SSDBG( descP, ("SOCKET", "ncancel_send_current -> no more writers\r\n") );
- descP->currentWriterP = NULL;
+ if (!activate_next_writer(env, descP, sockRef)) {
+ SSDBG( descP,
+ ("SOCKET", "ncancel_send_current -> no more writers\r\n") );
+ descP->currentWriterP = NULL;
+ descP->currentWriter.ref = esock_atom_undefined;
+ enif_set_pid_undefined(&descP->currentWriter.pid);
+ esock_monitor_init(&descP->currentWriter.mon);
}
-
+
SSDBG( descP, ("SOCKET", "ncancel_send_current -> done with result:"
"\r\n %T"
"\r\n", res) );
@@ -13401,6 +13441,7 @@ ERL_NIF_TERM ncancel_send_waiting(ErlNifEnv* env,
static
ERL_NIF_TERM ncancel_recv(ErlNifEnv* env,
SocketDescriptor* descP,
+ ERL_NIF_TERM sockRef,
ERL_NIF_TERM opRef)
{
ERL_NIF_TERM res;
@@ -13416,7 +13457,7 @@ ERL_NIF_TERM ncancel_recv(ErlNifEnv* env,
if (descP->currentReaderP != NULL) {
if (COMPARE(opRef, descP->currentReader.ref) == 0) {
- res = ncancel_recv_current(env, descP);
+ res = ncancel_recv_current(env, descP, sockRef);
} else {
res = ncancel_recv_waiting(env, descP, opRef);
}
@@ -13442,9 +13483,9 @@ ERL_NIF_TERM ncancel_recv(ErlNifEnv* env,
*/
static
ERL_NIF_TERM ncancel_recv_current(ErlNifEnv* env,
- SocketDescriptor* descP)
+ SocketDescriptor* descP,
+ ERL_NIF_TERM sockRef)
{
- int sres;
ERL_NIF_TERM res;
SSDBG( descP, ("SOCKET", "ncancel_recv_current -> entry\r\n") );
@@ -13453,32 +13494,18 @@ ERL_NIF_TERM ncancel_recv_current(ErlNifEnv* env,
env, descP, &descP->currentReader.mon);
res = ncancel_read_select(env, descP, descP->currentReader.ref);
- SSDBG( descP, ("SOCKET", "ncancel_recv_current -> cancel res: %T\r\n", res) );
+ SSDBG( descP,
+ ("SOCKET", "ncancel_recv_current -> cancel res: %T\r\n", res) );
- if (reader_pop(env, descP, &descP->currentReader)) {
-
- /* There was another one */
-
- SSDBG( descP, ("SOCKET", "ncancel_recv_current -> new (active) reader: "
- "\r\n pid: %T"
- "\r\n ref: %T"
- "\r\n",
- descP->currentReader.pid,
- descP->currentReader.ref) );
-
- if ((sres = esock_select_read(env, descP->sock, descP,
- &descP->currentReader.pid,
- descP->currentReader.ref)) < 0) {
- return esock_make_error(env,
- MKT2(env,
- esock_atom_select_failed,
- MKI(env, sres)));
- }
- } else {
- SSDBG( descP, ("SOCKET", "ncancel_recv_current -> no more readers\r\n") );
- descP->currentReaderP = NULL;
+ if (!activate_next_reader(env, descP, sockRef)) {
+ SSDBG( descP,
+ ("SOCKET", "ncancel_recv_current -> no more readers\r\n") );
+ descP->currentReaderP = NULL;
+ descP->currentReader.ref = esock_atom_undefined;
+ enif_set_pid_undefined(&descP->currentReader.pid);
+ esock_monitor_init(&descP->currentReader.mon);
}
-
+
SSDBG( descP, ("SOCKET", "ncancel_recv_current -> done with result:"
"\r\n %T"
"\r\n", res) );
@@ -13661,27 +13688,11 @@ ERL_NIF_TERM send_check_result(ErlNifEnv* env,
/* Ok, this write is done maybe activate the next (if any) */
- if (writer_pop(env, descP, &descP->currentWriter)) {
-
- /* There was another one */
-
- SSDBG( descP, ("SOCKET", "send_check_result -> new (active) writer: "
- "\r\n pid: %T"
- "\r\n ref: %T"
- "\r\n",
- descP->currentWriter.pid,
- descP->currentWriter.ref) );
-
- if ((sres = esock_select_write(env, descP->sock, descP,
- &descP->currentWriter.pid,
- descP->currentWriter.ref)) < 0) {
- return esock_make_error(env,
- MKT2(env,
- esock_atom_select_failed,
- MKI(env, sres)));
- }
- } else {
- descP->currentWriterP = NULL;
+ if (!activate_next_writer(env, descP, sockRef)) {
+ descP->currentWriterP = NULL;
+ descP->currentWriter.ref = esock_atom_undefined;
+ enif_set_pid_undefined(&descP->currentWriter.pid);
+ esock_monitor_init(&descP->currentWriter.mon);
}
return esock_atom_ok;
@@ -13692,7 +13703,7 @@ ERL_NIF_TERM send_check_result(ErlNifEnv* env,
if ((saveErrno != EAGAIN) && (saveErrno != EINTR)) {
SocketRequestor req;
- ERL_NIF_TERM res;
+ ERL_NIF_TERM res, reason;
/*
* An actual failure - we (and everyone waiting) give up
@@ -13701,9 +13712,11 @@ ERL_NIF_TERM send_check_result(ErlNifEnv* env,
cnt_inc(&descP->writeFails, 1);
SSDBG( descP,
- ("SOCKET", "send_check_result -> error: %d\r\n", saveErrno) );
+ ("SOCKET",
+ "send_check_result -> error: %d\r\n", saveErrno) );
- res = esock_make_error_errno(env, saveErrno);
+ reason = MKA(env, erl_errno_id(saveErrno));
+ res = esock_make_error(env, reason);
if (descP->currentWriterP != NULL) {
@@ -13714,7 +13727,8 @@ ERL_NIF_TERM send_check_result(ErlNifEnv* env,
SSDBG( descP,
("SOCKET", "send_check_result -> abort %T\r\n",
req.pid) );
- esock_send_abort_msg(env, sockRef, req.ref, res, &req.pid);
+ esock_send_abort_msg(env, sockRef, req.ref,
+ reason, &req.pid);
DEMONP("send_check_result -> pop'ed writer",
env, descP, &req.mon);
}
@@ -13881,9 +13895,9 @@ char* recv_init_current_reader(ErlNifEnv* env,
static
ERL_NIF_TERM recv_update_current_reader(ErlNifEnv* env,
- SocketDescriptor* descP)
+ SocketDescriptor* descP,
+ ERL_NIF_TERM sockRef)
{
- int sres;
ERL_NIF_TERM res = esock_atom_ok;
if (descP->currentReaderP != NULL) {
@@ -13891,29 +13905,18 @@ ERL_NIF_TERM recv_update_current_reader(ErlNifEnv* env,
DEMONP("recv_update_current_reader -> current reader",
env, descP, &descP->currentReader.mon);
- if (reader_pop(env, descP, &descP->currentReader)) {
-
- /* There was another one */
-
+ if (!activate_next_reader(env, descP, sockRef)) {
+
SSDBG( descP,
- ("SOCKET", "recv_update_current_reader -> new (active) reader: "
- "\r\n pid: %T"
- "\r\n ref: %T"
- "\r\n",
- descP->currentReader.pid,
- descP->currentReader.ref) );
-
- if ((sres = esock_select_read(env, descP->sock, descP,
- &descP->currentReader.pid,
- descP->currentReader.ref)) < 0) {
- res = esock_make_error(env,
- MKT2(env,
- esock_atom_select_failed,
- MKI(env, sres)));
- }
- } else {
- descP->currentReaderP = NULL;
+ ("SOCKET",
+ "recv_update_current_reader -> no more readers\r\n") );
+
+ descP->currentReaderP = NULL;
+ descP->currentReader.ref = esock_atom_undefined;
+ enif_set_pid_undefined(&descP->currentReader.pid);
+ esock_monitor_init(&descP->currentReader.mon);
}
+
}
return res;
@@ -14064,7 +14067,7 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env,
cnt_inc(&descP->readPkgCnt, 1);
- recv_update_current_reader(env, descP);
+ recv_update_current_reader(env, descP, sockRef);
/* This transfers "ownership" of the *allocated* binary to an
* erlang term (no need for an explicit free).
@@ -14109,7 +14112,7 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env,
"recv_check_result -> [%d] "
"we got exactly what we could fit\r\n", toRead) );
- recv_update_current_reader(env, descP);
+ recv_update_current_reader(env, descP, sockRef);
/* This transfers "ownership" of the *allocated* binary to an
* erlang term (no need for an explicit free).
@@ -14221,7 +14224,7 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env,
cnt_inc(&descP->readPkgCnt, 1);
cnt_inc(&descP->readByteCnt, read);
- recv_update_current_reader(env, descP);
+ recv_update_current_reader(env, descP, sockRef);
/* This transfers "ownership" of the *allocated* binary to an
* erlang term (no need for an explicit free).
@@ -14406,7 +14409,7 @@ ERL_NIF_TERM recvfrom_check_result(ErlNifEnv* env,
data = MKSBIN(env, data, 0, read);
}
- recv_update_current_reader(env, descP);
+ recv_update_current_reader(env, descP, sockRef);
return esock_make_ok2(env, MKT2(env, eSockAddr, data));
@@ -14571,7 +14574,7 @@ ERL_NIF_TERM recvmsg_check_result(ErlNifEnv* env,
"recvmsg_check_result -> "
"(msghdr) encode failed: %s\r\n", xres) );
- recv_update_current_reader(env, descP);
+ recv_update_current_reader(env, descP, sockRef);
FREE_BIN(dataBufP); FREE_BIN(ctrlBufP);
@@ -14583,7 +14586,7 @@ ERL_NIF_TERM recvmsg_check_result(ErlNifEnv* env,
"recvmsg_check_result -> "
"(msghdr) encode ok: %T\r\n", eMsgHdr) );
- recv_update_current_reader(env, descP);
+ recv_update_current_reader(env, descP, sockRef);
return esock_make_ok2(env, eMsgHdr);
}
@@ -17006,6 +17009,102 @@ int esock_select_cancel(ErlNifEnv* env,
/* ----------------------------------------------------------------------
+ * A c t i v a t e N e x t ( o p e r a t o r ) F u n c t i o n s
+ * ----------------------------------------------------------------------
+ */
+
+/* *** activate_next_acceptor | activate_next_writer | activate_next_reader ***
+ *
+ * This functions pops the writer queue and then selects until it
+ * manages to successfully activate a writer or the queue is empty.
+ */
+
+#define ACTIVATE_NEXT_FUNCS \
+ ACTIVATE_NEXT_FUNC_DECL(acceptor, currentAcceptor, acceptorsQ) \
+ ACTIVATE_NEXT_FUNC_DECL(writer, currentWriter, writersQ) \
+ ACTIVATE_NEXT_FUNC_DECL(reader, currentReader, readersQ)
+
+#define ACTIVATE_NEXT_FUNC_DECL(F, R, Q) \
+ static \
+ BOOLEAN_T activate_next_##F(ErlNifEnv* env, \
+ SocketDescriptor* descP, \
+ ERL_NIF_TERM sockRef) \
+ { \
+ return activate_next(env, descP, \
+ &descP->R, &descP->Q, \
+ sockRef); \
+ }
+ACTIVATE_NEXT_FUNCS
+#undef ACTIVATE_NEXT_FUNC_DECL
+
+
+/* *** activate_next ***
+ *
+ * This functions pops the requestor queue and then selects until it
+ * manages to successfully activate a new requestor or the queue is empty.
+ * Return value indicates if a new requestor was activated or not.
+ */
+
+static
+BOOLEAN_T activate_next(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ SocketRequestor* reqP,
+ SocketRequestQueue* q,
+ ERL_NIF_TERM sockRef)
+{
+ BOOLEAN_T popped, activated;
+ int sres;
+
+ popped = FALSE;
+ do {
+
+ if (requestor_pop(q, reqP)) {
+
+ /* There was another one */
+
+ SSDBG( descP,
+ ("SOCKET", "activate_next -> new (active) requestor: "
+ "\r\n pid: %T"
+ "\r\n ref: %T"
+ "\r\n", reqP->pid, reqP->ref) );
+
+ if ((sres = esock_select_read(env, descP->sock, descP,
+ &reqP->pid, reqP->ref)) < 0) {
+ /* We need to inform this process, reqP->pid, that we
+ * failed to select, so we don't leave it hanging.
+ * => send abort
+ */
+
+ esock_send_abort_msg(env, sockRef, reqP->ref, sres, &reqP->pid);
+
+ } else {
+
+ /* Success: New requestor selected */
+ popped = TRUE;
+ activated = FALSE;
+
+ }
+
+ } else {
+
+ SSDBG( descP,
+ ("SOCKET", "send_activate_next -> no more requestors\r\n") );
+
+ popped = TRUE;
+ activated = FALSE;
+ }
+
+ } while (!popped);
+
+ SSDBG( descP,
+ ("SOCKET", "activate_next -> "
+ "done with %s\r\n", B2S(activated)) );
+
+ return activated;
+}
+
+
+/* ----------------------------------------------------------------------
* R e q u e s t o r Q u e u e F u n c t i o n s
* ----------------------------------------------------------------------
*
@@ -17462,7 +17561,7 @@ void socket_stop(ErlNifEnv* env, void* obj, int fd, int is_direct_call)
descP->readTries,
descP->readWaits) );
- sockRef = enif_make_resource(env, descP),
+ sockRef = enif_make_resource(env, descP);
descP->state = SOCKET_STATE_CLOSING; // Just in case...???
descP->isReadable = FALSE;
descP->isWritable = FALSE;
@@ -17751,6 +17850,7 @@ void socket_down(ErlNifEnv* env,
#if !defined(__WIN32__)
SocketDescriptor* descP = (SocketDescriptor*) obj;
int sres;
+ ERL_NIF_TERM sockRef;
SSDBG( descP, ("SOCKET", "socket_down -> entry with"
"\r\n sock: %d"
@@ -17865,19 +17965,21 @@ void socket_down(ErlNifEnv* env,
SSDBG( descP, ("SOCKET", "socket_down -> other process term\r\n") );
+ sockRef = enif_make_resource(env, descP);
+
MLOCK(descP->accMtx);
if (descP->currentAcceptorP != NULL)
- socket_down_acceptor(env, descP, pid);
+ socket_down_acceptor(env, descP, sockRef, pid);
MUNLOCK(descP->accMtx);
MLOCK(descP->writeMtx);
if (descP->currentWriterP != NULL)
- socket_down_writer(env, descP, pid);
+ socket_down_writer(env, descP, sockRef, pid);
MUNLOCK(descP->writeMtx);
MLOCK(descP->readMtx);
if (descP->currentReaderP != NULL)
- socket_down_reader(env, descP, pid);
+ socket_down_reader(env, descP, sockRef, pid);
MUNLOCK(descP->readMtx);
}
@@ -17899,46 +18001,28 @@ void socket_down(ErlNifEnv* env,
static
void socket_down_acceptor(ErlNifEnv* env,
SocketDescriptor* descP,
+ ERL_NIF_TERM sockRef,
const ErlNifPid* pid)
{
if (compare_pids(env, &descP->currentAcceptor.pid, pid)) {
SSDBG( descP, ("SOCKET",
"socket_down_acceptor -> "
- "current acceptor - try pop the queue\r\n") );
+ "current acceptor - try activate next\r\n") );
- if (acceptor_pop(env, descP, &descP->currentAcceptor)) {
- int res;
-
- /* There was another one, so we will still be in accepting state */
-
- SSDBG( descP, ("SOCKET",
- "socket_down_acceptor -> new (active) acceptor: "
- "\r\n pid: %T"
- "\r\n ref: %T"
- "\r\n",
- descP->currentAcceptor.pid,
- descP->currentAcceptor.ref) );
-
- if ((res = esock_select_read(env, descP->sock, descP,
- &descP->currentAcceptor.pid,
- descP->currentAcceptor.ref) < 0)) {
-
- esock_warning_msg("Failed select (%d) for new acceptor "
- "after current (%T) died\r\n",
- res, *pid);
-
- }
-
- } else {
-
- SSDBG( descP, ("SOCKET",
- "socket_down_acceptor -> no active acceptor\r\n") );
-
- descP->currentAcceptorP = NULL;
- descP->state = SOCKET_STATE_LISTENING;
+ if (!activate_next_acceptor(env, descP, sockRef)) {
+
+ SSDBG( descP,
+ ("SOCKET", "socket_down_acceptor -> no more writers\r\n") );
+
+ descP->state = SOCKET_STATE_LISTENING;
+
+ descP->currentAcceptorP = NULL;
+ descP->currentAcceptor.ref = esock_atom_undefined;
+ enif_set_pid_undefined(&descP->currentAcceptor.pid);
+ esock_monitor_init(&descP->currentAcceptor.mon);
}
-
+
} else {
/* Maybe unqueue one of the waiting acceptors */
@@ -17962,42 +18046,22 @@ void socket_down_acceptor(ErlNifEnv* env,
static
void socket_down_writer(ErlNifEnv* env,
SocketDescriptor* descP,
+ ERL_NIF_TERM sockRef,
const ErlNifPid* pid)
{
if (compare_pids(env, &descP->currentWriter.pid, pid)) {
SSDBG( descP, ("SOCKET",
"socket_down_writer -> "
- "current writer - try pop the queue\r\n") );
+ "current writer - try activate next\r\n") );
- if (writer_pop(env, descP, &descP->currentWriter)) {
- int res;
-
- /* There was another one */
-
- SSDBG( descP, ("SOCKET", "socket_down_writer -> new (current) writer: "
- "\r\n pid: %T"
- "\r\n ref: %T"
- "\r\n",
- descP->currentWriter.pid,
- descP->currentWriter.ref) );
-
- if ((res = esock_select_write(env, descP->sock, descP,
- &descP->currentWriter.pid,
- descP->currentWriter.ref) < 0)) {
-
- esock_warning_msg("Failed select (%d) for new writer "
- "after current (%T) died\r\n",
- res, *pid);
-
- }
-
- } else {
-
+ if (!activate_next_writer(env, descP, sockRef)) {
SSDBG( descP, ("SOCKET",
"socket_down_writer -> no active writer\r\n") );
-
- descP->currentWriterP = NULL;
+ descP->currentWriterP = NULL;
+ descP->currentWriter.ref = esock_atom_undefined;
+ enif_set_pid_undefined(&descP->currentWriter.pid);
+ esock_monitor_init(&descP->currentWriter.mon);
}
} else {
@@ -18023,44 +18087,24 @@ void socket_down_writer(ErlNifEnv* env,
static
void socket_down_reader(ErlNifEnv* env,
SocketDescriptor* descP,
+ ERL_NIF_TERM sockRef,
const ErlNifPid* pid)
{
if (compare_pids(env, &descP->currentReader.pid, pid)) {
SSDBG( descP, ("SOCKET",
"socket_down_reader -> "
- "current reader - try pop the queue\r\n") );
+ "current reader - try activate next\r\n") );
- if (reader_pop(env, descP, &descP->currentReader)) {
- int res;
-
- /* There was another one */
-
- SSDBG( descP, ("SOCKET", "socket_down_reader -> new (current) reader: "
- "\r\n pid: %T"
- "\r\n ref: %T"
- "\r\n",
- descP->currentReader.pid,
- descP->currentReader.ref) );
-
- if ((res = esock_select_read(env, descP->sock, descP,
- &descP->currentReader.pid,
- descP->currentReader.ref) < 0)) {
-
- esock_warning_msg("Failed select (%d) for new reader "
- "after current (%T) died\r\n",
- res, *pid);
-
- }
-
- } else {
-
- SSDBG( descP, ("SOCKET",
- "socket_down_reader -> no active reader\r\n") );
-
- descP->currentReaderP = NULL;
+ if (!activate_next_reader(env, descP, sockRef)) {
+ SSDBG( descP,
+ ("SOCKET", "ncancel_recv_current -> no more readers\r\n") );
+ descP->currentReaderP = NULL;
+ descP->currentReader.ref = esock_atom_undefined;
+ enif_set_pid_undefined(&descP->currentReader.pid);
+ esock_monitor_init(&descP->currentReader.mon);
}
-
+
} else {
/* Maybe unqueue one of the waiting reader(s) */