diff options
author | Micael Karlberg <[email protected]> | 2019-03-04 18:14:06 +0100 |
---|---|---|
committer | Micael Karlberg <[email protected]> | 2019-03-06 11:23:58 +0100 |
commit | 309204fc1cc9b40bd04d8636204c9dea80c4e6b2 (patch) | |
tree | f96bbdb436998801f987a85fa682c22cf4eb4f89 /erts | |
parent | 49eb95319e878832cb3d6aa51a63d08d6b1b36b7 (diff) | |
download | otp-309204fc1cc9b40bd04d8636204c9dea80c4e6b2.tar.gz otp-309204fc1cc9b40bd04d8636204c9dea80c4e6b2.tar.bz2 otp-309204fc1cc9b40bd04d8636204c9dea80c4e6b2.zip |
[socket] Macro abuse of activate-next
Implemented the activate_next function and added its
"users" acceptor, writer and reader (macro abuse).
After a request (accept, write or send) has been either
successfully completed or failed, another request should
be activated. Previously only one attempt was made, which
might leave the other (waiting) requestors hanging.
Now, instead we use a 'activate-next' function that pop's
the request (accept, wrote or read) queue until success
or its empty, thereby making sure that no waiting processes
is left hanging.
OTP-15565
Diffstat (limited to 'erts')
-rw-r--r-- | erts/emulator/nifs/common/socket_nif.c | 608 |
1 files changed, 326 insertions, 282 deletions
diff --git a/erts/emulator/nifs/common/socket_nif.c b/erts/emulator/nifs/common/socket_nif.c index 2d8dc3c201..965dcfe471 100644 --- a/erts/emulator/nifs/common/socket_nif.c +++ b/erts/emulator/nifs/common/socket_nif.c @@ -964,6 +964,7 @@ static ERL_NIF_TERM nlisten(ErlNifEnv* env, int backlog); static ERL_NIF_TERM naccept(ErlNifEnv* env, SocketDescriptor* descP, + ERL_NIF_TERM sockRef, ERL_NIF_TERM ref); static ERL_NIF_TERM naccept_listening(ErlNifEnv* env, SocketDescriptor* descP, @@ -980,17 +981,21 @@ static ERL_NIF_TERM naccept_listening_accept(ErlNifEnv* env, SocketAddress* remote); static ERL_NIF_TERM naccept_accepting(ErlNifEnv* env, SocketDescriptor* descP, + ERL_NIF_TERM sockRef, ERL_NIF_TERM ref); static ERL_NIF_TERM naccept_accepting_current(ErlNifEnv* env, SocketDescriptor* descP, + ERL_NIF_TERM sockRef, ERL_NIF_TERM ref); static ERL_NIF_TERM naccept_accepting_current_accept(ErlNifEnv* env, SocketDescriptor* descP, + ERL_NIF_TERM sockRef, SOCKET accSock, SocketAddress* remote); static ERL_NIF_TERM naccept_accepting_current_error(ErlNifEnv* env, SocketDescriptor* descP, - ERL_NIF_TERM ref, + ERL_NIF_TERM sockRef, + ERL_NIF_TERM opRef, int save_errno); static ERL_NIF_TERM naccept_accepting_other(ErlNifEnv* env, SocketDescriptor* descP, @@ -1915,31 +1920,38 @@ static ERL_NIF_TERM npeername(ErlNifEnv* env, static ERL_NIF_TERM ncancel(ErlNifEnv* env, SocketDescriptor* descP, ERL_NIF_TERM op, + ERL_NIF_TERM sockRef, ERL_NIF_TERM opRef); static ERL_NIF_TERM ncancel_connect(ErlNifEnv* env, SocketDescriptor* descP, ERL_NIF_TERM opRef); static ERL_NIF_TERM ncancel_accept(ErlNifEnv* env, SocketDescriptor* descP, + ERL_NIF_TERM sockRef, ERL_NIF_TERM opRef); static ERL_NIF_TERM ncancel_accept_current(ErlNifEnv* env, - SocketDescriptor* descP); + SocketDescriptor* descP, + ERL_NIF_TERM sockRef); static ERL_NIF_TERM ncancel_accept_waiting(ErlNifEnv* env, SocketDescriptor* descP, ERL_NIF_TERM opRef); static ERL_NIF_TERM ncancel_send(ErlNifEnv* env, SocketDescriptor* descP, + ERL_NIF_TERM sockRef, ERL_NIF_TERM opRef); static ERL_NIF_TERM ncancel_send_current(ErlNifEnv* env, - SocketDescriptor* descP); + SocketDescriptor* descP, + ERL_NIF_TERM sockRef); static ERL_NIF_TERM ncancel_send_waiting(ErlNifEnv* env, SocketDescriptor* descP, ERL_NIF_TERM opRef); static ERL_NIF_TERM ncancel_recv(ErlNifEnv* env, SocketDescriptor* descP, + ERL_NIF_TERM sockRef, ERL_NIF_TERM opRef); static ERL_NIF_TERM ncancel_recv_current(ErlNifEnv* env, - SocketDescriptor* descP); + SocketDescriptor* descP, + ERL_NIF_TERM sockRef); static ERL_NIF_TERM ncancel_recv_waiting(ErlNifEnv* env, SocketDescriptor* descP, ERL_NIF_TERM opRef); @@ -2018,7 +2030,8 @@ static char* recv_init_current_reader(ErlNifEnv* env, SocketDescriptor* descP, ERL_NIF_TERM ref); static ERL_NIF_TERM recv_update_current_reader(ErlNifEnv* env, - SocketDescriptor* descP); + SocketDescriptor* descP, + ERL_NIF_TERM sockRef); static void recv_error_current_reader(ErlNifEnv* env, SocketDescriptor* descP, ERL_NIF_TERM sockRef, @@ -2234,7 +2247,43 @@ static void inc_socket(int domain, int type, int protocol); static void dec_socket(int domain, int type, int protocol); -/* All the queue operator functions (search4pid, push, pop +extern char* erl_errno_id(int error); /* THIS IS JUST TEMPORARY??? */ + + +/* *** activate_next_acceptor *** + * *** activate_next_writer *** + * *** activate_next_reader *** + * + * All the activate-next functions for acceptor, writer and reader + * have exactly the same API, so we apply some macro magic to simplify. + * They simply operates on dufferent data structures. + * + */ + +#define ACTIVATE_NEXT_FUNCS_DEFS \ + ACTIVATE_NEXT_FUNC_DEF(acceptor) \ + ACTIVATE_NEXT_FUNC_DEF(writer) \ + ACTIVATE_NEXT_FUNC_DEF(reader) + +#define ACTIVATE_NEXT_FUNC_DEF(F) \ + static BOOLEAN_T activate_next_##F(ErlNifEnv* env, \ + SocketDescriptor* descP, \ + ERL_NIF_TERM sockRef); +ACTIVATE_NEXT_FUNCS_DEFS +#undef ACTIVATE_NEXT_FUNC_DEF + +static BOOLEAN_T activate_next(ErlNifEnv* env, + SocketDescriptor* descP, + SocketRequestor* reqP, + SocketRequestQueue* q, + ERL_NIF_TERM sockRef); + +/* *** acceptor_search4pid | writer_search4pid | reader_search4pid *** + * *** acceptor_push | writer_push | reader_push *** + * *** acceptor_pop | writer_pop | reader_pop *** + * *** acceptor_unqueue | writer_unqueue | reader_unqueue *** + * + * All the queue operator functions (search4pid, push, pop * and unqueue) for acceptor, writer and reader has exactly * the same API, so we apply some macro magic to simplify. */ @@ -2305,12 +2354,15 @@ static void socket_down(ErlNifEnv* env, const ErlNifMonitor* mon); static void socket_down_acceptor(ErlNifEnv* env, SocketDescriptor* descP, + ERL_NIF_TERM sockRef, const ErlNifPid* pid); static void socket_down_writer(ErlNifEnv* env, SocketDescriptor* descP, + ERL_NIF_TERM sockRef, const ErlNifPid* pid); static void socket_down_reader(ErlNifEnv* env, SocketDescriptor* descP, + ERL_NIF_TERM sockRef, const ErlNifPid* pid); static char* esock_send_close_msg(ErlNifEnv* env, @@ -4832,14 +4884,15 @@ ERL_NIF_TERM nif_accept(ErlNifEnv* env, return enif_raise_exception(env, MKA(env, "notsup")); #else SocketDescriptor* descP; - ERL_NIF_TERM ref, res; + ERL_NIF_TERM sockRef, ref, res; SGDBG( ("SOCKET", "nif_accept -> entry with argc: %d\r\n", argc) ); /* Extract arguments and perform preliminary validation */ + sockRef = argv[0]; if ((argc != 2) || - !enif_get_resource(env, argv[0], sockets, (void**) &descP)) { + !enif_get_resource(env, sockRef, sockets, (void**) &descP)) { return enif_make_badarg(env); } ref = argv[1]; @@ -4852,7 +4905,7 @@ ERL_NIF_TERM nif_accept(ErlNifEnv* env, MLOCK(descP->accMtx); - res = naccept(env, descP, ref); + res = naccept(env, descP, sockRef, ref); MUNLOCK(descP->accMtx); @@ -4866,6 +4919,7 @@ ERL_NIF_TERM nif_accept(ErlNifEnv* env, static ERL_NIF_TERM naccept(ErlNifEnv* env, SocketDescriptor* descP, + ERL_NIF_TERM sockRef, ERL_NIF_TERM ref) { ERL_NIF_TERM res; @@ -4879,7 +4933,7 @@ ERL_NIF_TERM naccept(ErlNifEnv* env, break; case SOCKET_STATE_ACCEPTING: - res = naccept_accepting(env, descP, ref); + res = naccept_accepting(env, descP, sockRef, ref); break; default: @@ -5016,6 +5070,7 @@ ERL_NIF_TERM naccept_listening_accept(ErlNifEnv* env, static ERL_NIF_TERM naccept_accepting(ErlNifEnv* env, SocketDescriptor* descP, + ERL_NIF_TERM sockRef, ERL_NIF_TERM ref) { ErlNifPid caller; @@ -5032,15 +5087,12 @@ ERL_NIF_TERM naccept_accepting(ErlNifEnv* env, "\r\n Current: %T" "\r\n", caller, descP->currentAcceptor.pid) ); - - - if (compare_pids(env, &descP->currentAcceptor.pid, &caller)) { SSDBG( descP, ("SOCKET", "naccept_accepting -> current acceptor\r\n") ); - res = naccept_accepting_current(env, descP, ref); + res = naccept_accepting_current(env, descP, sockRef, ref); } else { @@ -5065,7 +5117,8 @@ ERL_NIF_TERM naccept_accepting(ErlNifEnv* env, static ERL_NIF_TERM naccept_accepting_current(ErlNifEnv* env, SocketDescriptor* descP, - ERL_NIF_TERM ref) + ERL_NIF_TERM sockRef, + ERL_NIF_TERM accRef) { SocketAddress remote; unsigned int n; @@ -5086,13 +5139,15 @@ ERL_NIF_TERM naccept_accepting_current(ErlNifEnv* env, "naccept_accepting_current -> accept failed: %d\r\n", save_errno) ); - res = naccept_accepting_current_error(env, descP, ref, save_errno); + res = naccept_accepting_current_error(env, descP, sockRef, + accRef, save_errno); } else { SSDBG( descP, ("SOCKET", "naccept_accepting_current -> accepted\r\n") ); - res = naccept_accepting_current_accept(env, descP, accSock, &remote); + res = naccept_accepting_current_accept(env, descP, sockRef, + accSock, &remote); } @@ -5107,10 +5162,10 @@ ERL_NIF_TERM naccept_accepting_current(ErlNifEnv* env, static ERL_NIF_TERM naccept_accepting_current_accept(ErlNifEnv* env, SocketDescriptor* descP, + ERL_NIF_TERM sockRef, SOCKET accSock, SocketAddress* remote) { - int sres; ERL_NIF_TERM res; if (naccept_accepted(env, descP, accSock, @@ -5124,30 +5179,21 @@ ERL_NIF_TERM naccept_accepting_current_accept(ErlNifEnv* env, * */ - if (acceptor_pop(env, descP, &descP->currentAcceptor)) { - - /* There was another one */ + if (!activate_next_acceptor(env, descP, sockRef)) { SSDBG( descP, ("SOCKET", - "naccept_accepting_current_accept -> new (active) acceptor: " - "\r\n pid: %T" - "\r\n ref: %T" - "\r\n", - descP->currentAcceptor.pid, - descP->currentAcceptor.ref) ); + "naccept_accepting_current_accept -> " + "no more writers\r\n") ); - if ((sres = esock_select_read(env, descP->sock, descP, - &descP->currentAcceptor.pid, - descP->currentAcceptor.ref)) < 0) { - esock_warning_msg("Failed select (%d) for new acceptor " - "after current (%T) died\r\n", - sres, descP->currentAcceptor.pid); - } - } else { - descP->currentAcceptorP = NULL; - descP->state = SOCKET_STATE_LISTENING; + descP->state = SOCKET_STATE_LISTENING; + + descP->currentAcceptorP = NULL; + descP->currentAcceptor.ref = esock_atom_undefined; + enif_set_pid_undefined(&descP->currentAcceptor.pid); + esock_monitor_init(&descP->currentAcceptor.mon); } + } return res; @@ -5163,10 +5209,12 @@ ERL_NIF_TERM naccept_accepting_current_accept(ErlNifEnv* env, static ERL_NIF_TERM naccept_accepting_current_error(ErlNifEnv* env, SocketDescriptor* descP, - ERL_NIF_TERM ref, + ERL_NIF_TERM sockRef, + ERL_NIF_TERM opRef, int save_errno) { - ERL_NIF_TERM res; + SocketRequestor req; + ERL_NIF_TERM res, reason; if (save_errno == ERRNO_BLOCK) { @@ -5176,14 +5224,27 @@ ERL_NIF_TERM naccept_accepting_current_error(ErlNifEnv* env, SSDBG( descP, ("SOCKET", - "naccept_accepting_current_error -> would block: try again\r\n") ); + "naccept_accepting_current_error -> " + "would block: try again\r\n") ); - res = naccept_busy_retry(env, descP, ref, &descP->currentAcceptor.pid, + res = naccept_busy_retry(env, descP, opRef, &descP->currentAcceptor.pid, /* No state change */ descP->state); } else { - res = esock_make_error_errno(env, save_errno); + + reason = MKA(env, erl_errno_id(save_errno)); + res = esock_make_error(env, reason); + + while (acceptor_pop(env, descP, &req)) { + SSDBG( descP, + ("SOCKET", "naccept_accepting_current_error -> abort %T\r\n", + req.pid) ); + esock_send_abort_msg(env, sockRef, req.ref, reason, &req.pid); + DEMONP("naccept_accepting_current_error -> pop'ed writer", + env, descP, &req.mon); + } + } return res; @@ -13063,14 +13124,15 @@ ERL_NIF_TERM nif_cancel(ErlNifEnv* env, return enif_raise_exception(env, MKA(env, "notsup")); #else SocketDescriptor* descP; - ERL_NIF_TERM op, opRef, result; + ERL_NIF_TERM op, sockRef, opRef, result; SGDBG( ("SOCKET", "nif_cancel -> entry with argc: %d\r\n", argc) ); /* Extract arguments and perform preliminary validation */ + sockRef = argv[0]; if ((argc != 3) || - !enif_get_resource(env, argv[0], sockets, (void**) &descP)) { + !enif_get_resource(env, sockRef, sockets, (void**) &descP)) { return enif_make_badarg(env); } op = argv[1]; @@ -13085,7 +13147,7 @@ ERL_NIF_TERM nif_cancel(ErlNifEnv* env, "\r\n opRef: %T" "\r\n", descP->sock, op, opRef) ); - result = ncancel(env, descP, op, opRef); + result = ncancel(env, descP, op, sockRef, opRef); SSDBG( descP, ("SOCKET", "nif_cancel -> done with result: " @@ -13102,6 +13164,7 @@ static ERL_NIF_TERM ncancel(ErlNifEnv* env, SocketDescriptor* descP, ERL_NIF_TERM op, + ERL_NIF_TERM sockRef, ERL_NIF_TERM opRef) { /* <KOLLA> @@ -13115,19 +13178,19 @@ ERL_NIF_TERM ncancel(ErlNifEnv* env, if (COMPARE(op, esock_atom_connect) == 0) { return ncancel_connect(env, descP, opRef); } else if (COMPARE(op, esock_atom_accept) == 0) { - return ncancel_accept(env, descP, opRef); + return ncancel_accept(env, descP, sockRef, opRef); } else if (COMPARE(op, esock_atom_send) == 0) { - return ncancel_send(env, descP, opRef); + return ncancel_send(env, descP, sockRef, opRef); } else if (COMPARE(op, esock_atom_sendto) == 0) { - return ncancel_send(env, descP, opRef); + return ncancel_send(env, descP, sockRef, opRef); } else if (COMPARE(op, esock_atom_sendmsg) == 0) { - return ncancel_send(env, descP, opRef); + return ncancel_send(env, descP, sockRef, opRef); } else if (COMPARE(op, esock_atom_recv) == 0) { - return ncancel_recv(env, descP, opRef); + return ncancel_recv(env, descP, sockRef, opRef); } else if (COMPARE(op, esock_atom_recvfrom) == 0) { - return ncancel_recv(env, descP, opRef); + return ncancel_recv(env, descP, sockRef, opRef); } else if (COMPARE(op, esock_atom_recvmsg) == 0) { - return ncancel_recv(env, descP, opRef); + return ncancel_recv(env, descP, sockRef, opRef); } else { return esock_make_error(env, esock_atom_einval); } @@ -13161,6 +13224,7 @@ ERL_NIF_TERM ncancel_connect(ErlNifEnv* env, static ERL_NIF_TERM ncancel_accept(ErlNifEnv* env, SocketDescriptor* descP, + ERL_NIF_TERM sockRef, ERL_NIF_TERM opRef) { ERL_NIF_TERM res; @@ -13176,7 +13240,7 @@ ERL_NIF_TERM ncancel_accept(ErlNifEnv* env, if (descP->currentAcceptorP != NULL) { if (COMPARE(opRef, descP->currentAcceptor.ref) == 0) { - res = ncancel_accept_current(env, descP); + res = ncancel_accept_current(env, descP, sockRef); } else { res = ncancel_accept_waiting(env, descP, opRef); } @@ -13202,9 +13266,9 @@ ERL_NIF_TERM ncancel_accept(ErlNifEnv* env, */ static ERL_NIF_TERM ncancel_accept_current(ErlNifEnv* env, - SocketDescriptor* descP) + SocketDescriptor* descP, + ERL_NIF_TERM sockRef) { - int sres; ERL_NIF_TERM res; SSDBG( descP, ("SOCKET", "ncancel_accept_current -> entry\r\n") ); @@ -13213,33 +13277,22 @@ ERL_NIF_TERM ncancel_accept_current(ErlNifEnv* env, env, descP, &descP->currentAcceptor.mon); res = ncancel_read_select(env, descP, descP->currentAcceptor.ref); - SSDBG( descP, ("SOCKET", "ncancel_accept_current -> cancel res: %T\r\n", res) ); + SSDBG( descP, ("SOCKET", + "ncancel_accept_current -> cancel res: %T\r\n", res) ); - if (acceptor_pop(env, descP, &descP->currentAcceptor)) { - - /* There was another one */ - - SSDBG( descP, ("SOCKET", "ncancel_accept_current -> new (active) acceptor: " - "\r\n pid: %T" - "\r\n ref: %T" - "\r\n", - descP->currentAcceptor.pid, - descP->currentAcceptor.ref) ); - - if ((sres = esock_select_read(env, descP->sock, descP, - &descP->currentAcceptor.pid, - descP->currentAcceptor.ref)) < 0) { - return esock_make_error(env, - MKT2(env, - esock_atom_select_failed, - MKI(env, sres))); - } - } else { - SSDBG( descP, ("SOCKET", "ncancel_accept_current -> no more acceptors\r\n") ); - descP->currentAcceptorP = NULL; - descP->state = SOCKET_STATE_LISTENING; + if (!activate_next_acceptor(env, descP, sockRef)) { + + SSDBG( descP, + ("SOCKET", "ncancel_accept_current -> no more writers\r\n") ); + + descP->state = SOCKET_STATE_LISTENING; + + descP->currentAcceptorP = NULL; + descP->currentAcceptor.ref = esock_atom_undefined; + enif_set_pid_undefined(&descP->currentAcceptor.pid); + esock_monitor_init(&descP->currentAcceptor.mon); } - + SSDBG( descP, ("SOCKET", "ncancel_accept_current -> done with result:" "\r\n %T" "\r\n", res) ); @@ -13281,6 +13334,7 @@ ERL_NIF_TERM ncancel_accept_waiting(ErlNifEnv* env, static ERL_NIF_TERM ncancel_send(ErlNifEnv* env, SocketDescriptor* descP, + ERL_NIF_TERM sockRef, ERL_NIF_TERM opRef) { ERL_NIF_TERM res; @@ -13296,7 +13350,7 @@ ERL_NIF_TERM ncancel_send(ErlNifEnv* env, if (descP->currentWriterP != NULL) { if (COMPARE(opRef, descP->currentWriter.ref) == 0) { - res = ncancel_send_current(env, descP); + res = ncancel_send_current(env, descP, sockRef); } else { res = ncancel_send_waiting(env, descP, opRef); } @@ -13323,43 +13377,29 @@ ERL_NIF_TERM ncancel_send(ErlNifEnv* env, */ static ERL_NIF_TERM ncancel_send_current(ErlNifEnv* env, - SocketDescriptor* descP) + SocketDescriptor* descP, + ERL_NIF_TERM sockRef) { - int sres; ERL_NIF_TERM res; SSDBG( descP, ("SOCKET", "ncancel_send_current -> entry\r\n") ); - DEMONP("ncancel_recv_current -> current writer", + DEMONP("ncancel_send_current -> current writer", env, descP, &descP->currentWriter.mon); res = ncancel_write_select(env, descP, descP->currentWriter.ref); - SSDBG( descP, ("SOCKET", "ncancel_send_current -> cancel res: %T\r\n", res) ); + SSDBG( descP, + ("SOCKET", "ncancel_send_current -> cancel res: %T\r\n", res) ); - if (writer_pop(env, descP, &descP->currentWriter)) { - - /* There was another one */ - - SSDBG( descP, ("SOCKET", "ncancel_send_current -> new (active) writer: " - "\r\n pid: %T" - "\r\n ref: %T" - "\r\n", - descP->currentWriter.pid, - descP->currentWriter.ref) ); - - if ((sres = esock_select_write(env, descP->sock, descP, - &descP->currentWriter.pid, - descP->currentWriter.ref)) < 0) { - return esock_make_error(env, - MKT2(env, - esock_atom_select_failed, - MKI(env, sres))); - } - } else { - SSDBG( descP, ("SOCKET", "ncancel_send_current -> no more writers\r\n") ); - descP->currentWriterP = NULL; + if (!activate_next_writer(env, descP, sockRef)) { + SSDBG( descP, + ("SOCKET", "ncancel_send_current -> no more writers\r\n") ); + descP->currentWriterP = NULL; + descP->currentWriter.ref = esock_atom_undefined; + enif_set_pid_undefined(&descP->currentWriter.pid); + esock_monitor_init(&descP->currentWriter.mon); } - + SSDBG( descP, ("SOCKET", "ncancel_send_current -> done with result:" "\r\n %T" "\r\n", res) ); @@ -13401,6 +13441,7 @@ ERL_NIF_TERM ncancel_send_waiting(ErlNifEnv* env, static ERL_NIF_TERM ncancel_recv(ErlNifEnv* env, SocketDescriptor* descP, + ERL_NIF_TERM sockRef, ERL_NIF_TERM opRef) { ERL_NIF_TERM res; @@ -13416,7 +13457,7 @@ ERL_NIF_TERM ncancel_recv(ErlNifEnv* env, if (descP->currentReaderP != NULL) { if (COMPARE(opRef, descP->currentReader.ref) == 0) { - res = ncancel_recv_current(env, descP); + res = ncancel_recv_current(env, descP, sockRef); } else { res = ncancel_recv_waiting(env, descP, opRef); } @@ -13442,9 +13483,9 @@ ERL_NIF_TERM ncancel_recv(ErlNifEnv* env, */ static ERL_NIF_TERM ncancel_recv_current(ErlNifEnv* env, - SocketDescriptor* descP) + SocketDescriptor* descP, + ERL_NIF_TERM sockRef) { - int sres; ERL_NIF_TERM res; SSDBG( descP, ("SOCKET", "ncancel_recv_current -> entry\r\n") ); @@ -13453,32 +13494,18 @@ ERL_NIF_TERM ncancel_recv_current(ErlNifEnv* env, env, descP, &descP->currentReader.mon); res = ncancel_read_select(env, descP, descP->currentReader.ref); - SSDBG( descP, ("SOCKET", "ncancel_recv_current -> cancel res: %T\r\n", res) ); + SSDBG( descP, + ("SOCKET", "ncancel_recv_current -> cancel res: %T\r\n", res) ); - if (reader_pop(env, descP, &descP->currentReader)) { - - /* There was another one */ - - SSDBG( descP, ("SOCKET", "ncancel_recv_current -> new (active) reader: " - "\r\n pid: %T" - "\r\n ref: %T" - "\r\n", - descP->currentReader.pid, - descP->currentReader.ref) ); - - if ((sres = esock_select_read(env, descP->sock, descP, - &descP->currentReader.pid, - descP->currentReader.ref)) < 0) { - return esock_make_error(env, - MKT2(env, - esock_atom_select_failed, - MKI(env, sres))); - } - } else { - SSDBG( descP, ("SOCKET", "ncancel_recv_current -> no more readers\r\n") ); - descP->currentReaderP = NULL; + if (!activate_next_reader(env, descP, sockRef)) { + SSDBG( descP, + ("SOCKET", "ncancel_recv_current -> no more readers\r\n") ); + descP->currentReaderP = NULL; + descP->currentReader.ref = esock_atom_undefined; + enif_set_pid_undefined(&descP->currentReader.pid); + esock_monitor_init(&descP->currentReader.mon); } - + SSDBG( descP, ("SOCKET", "ncancel_recv_current -> done with result:" "\r\n %T" "\r\n", res) ); @@ -13661,27 +13688,11 @@ ERL_NIF_TERM send_check_result(ErlNifEnv* env, /* Ok, this write is done maybe activate the next (if any) */ - if (writer_pop(env, descP, &descP->currentWriter)) { - - /* There was another one */ - - SSDBG( descP, ("SOCKET", "send_check_result -> new (active) writer: " - "\r\n pid: %T" - "\r\n ref: %T" - "\r\n", - descP->currentWriter.pid, - descP->currentWriter.ref) ); - - if ((sres = esock_select_write(env, descP->sock, descP, - &descP->currentWriter.pid, - descP->currentWriter.ref)) < 0) { - return esock_make_error(env, - MKT2(env, - esock_atom_select_failed, - MKI(env, sres))); - } - } else { - descP->currentWriterP = NULL; + if (!activate_next_writer(env, descP, sockRef)) { + descP->currentWriterP = NULL; + descP->currentWriter.ref = esock_atom_undefined; + enif_set_pid_undefined(&descP->currentWriter.pid); + esock_monitor_init(&descP->currentWriter.mon); } return esock_atom_ok; @@ -13692,7 +13703,7 @@ ERL_NIF_TERM send_check_result(ErlNifEnv* env, if ((saveErrno != EAGAIN) && (saveErrno != EINTR)) { SocketRequestor req; - ERL_NIF_TERM res; + ERL_NIF_TERM res, reason; /* * An actual failure - we (and everyone waiting) give up @@ -13701,9 +13712,11 @@ ERL_NIF_TERM send_check_result(ErlNifEnv* env, cnt_inc(&descP->writeFails, 1); SSDBG( descP, - ("SOCKET", "send_check_result -> error: %d\r\n", saveErrno) ); + ("SOCKET", + "send_check_result -> error: %d\r\n", saveErrno) ); - res = esock_make_error_errno(env, saveErrno); + reason = MKA(env, erl_errno_id(saveErrno)); + res = esock_make_error(env, reason); if (descP->currentWriterP != NULL) { @@ -13714,7 +13727,8 @@ ERL_NIF_TERM send_check_result(ErlNifEnv* env, SSDBG( descP, ("SOCKET", "send_check_result -> abort %T\r\n", req.pid) ); - esock_send_abort_msg(env, sockRef, req.ref, res, &req.pid); + esock_send_abort_msg(env, sockRef, req.ref, + reason, &req.pid); DEMONP("send_check_result -> pop'ed writer", env, descP, &req.mon); } @@ -13881,9 +13895,9 @@ char* recv_init_current_reader(ErlNifEnv* env, static ERL_NIF_TERM recv_update_current_reader(ErlNifEnv* env, - SocketDescriptor* descP) + SocketDescriptor* descP, + ERL_NIF_TERM sockRef) { - int sres; ERL_NIF_TERM res = esock_atom_ok; if (descP->currentReaderP != NULL) { @@ -13891,29 +13905,18 @@ ERL_NIF_TERM recv_update_current_reader(ErlNifEnv* env, DEMONP("recv_update_current_reader -> current reader", env, descP, &descP->currentReader.mon); - if (reader_pop(env, descP, &descP->currentReader)) { - - /* There was another one */ - + if (!activate_next_reader(env, descP, sockRef)) { + SSDBG( descP, - ("SOCKET", "recv_update_current_reader -> new (active) reader: " - "\r\n pid: %T" - "\r\n ref: %T" - "\r\n", - descP->currentReader.pid, - descP->currentReader.ref) ); - - if ((sres = esock_select_read(env, descP->sock, descP, - &descP->currentReader.pid, - descP->currentReader.ref)) < 0) { - res = esock_make_error(env, - MKT2(env, - esock_atom_select_failed, - MKI(env, sres))); - } - } else { - descP->currentReaderP = NULL; + ("SOCKET", + "recv_update_current_reader -> no more readers\r\n") ); + + descP->currentReaderP = NULL; + descP->currentReader.ref = esock_atom_undefined; + enif_set_pid_undefined(&descP->currentReader.pid); + esock_monitor_init(&descP->currentReader.mon); } + } return res; @@ -14064,7 +14067,7 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env, cnt_inc(&descP->readPkgCnt, 1); - recv_update_current_reader(env, descP); + recv_update_current_reader(env, descP, sockRef); /* This transfers "ownership" of the *allocated* binary to an * erlang term (no need for an explicit free). @@ -14109,7 +14112,7 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env, "recv_check_result -> [%d] " "we got exactly what we could fit\r\n", toRead) ); - recv_update_current_reader(env, descP); + recv_update_current_reader(env, descP, sockRef); /* This transfers "ownership" of the *allocated* binary to an * erlang term (no need for an explicit free). @@ -14221,7 +14224,7 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env, cnt_inc(&descP->readPkgCnt, 1); cnt_inc(&descP->readByteCnt, read); - recv_update_current_reader(env, descP); + recv_update_current_reader(env, descP, sockRef); /* This transfers "ownership" of the *allocated* binary to an * erlang term (no need for an explicit free). @@ -14406,7 +14409,7 @@ ERL_NIF_TERM recvfrom_check_result(ErlNifEnv* env, data = MKSBIN(env, data, 0, read); } - recv_update_current_reader(env, descP); + recv_update_current_reader(env, descP, sockRef); return esock_make_ok2(env, MKT2(env, eSockAddr, data)); @@ -14571,7 +14574,7 @@ ERL_NIF_TERM recvmsg_check_result(ErlNifEnv* env, "recvmsg_check_result -> " "(msghdr) encode failed: %s\r\n", xres) ); - recv_update_current_reader(env, descP); + recv_update_current_reader(env, descP, sockRef); FREE_BIN(dataBufP); FREE_BIN(ctrlBufP); @@ -14583,7 +14586,7 @@ ERL_NIF_TERM recvmsg_check_result(ErlNifEnv* env, "recvmsg_check_result -> " "(msghdr) encode ok: %T\r\n", eMsgHdr) ); - recv_update_current_reader(env, descP); + recv_update_current_reader(env, descP, sockRef); return esock_make_ok2(env, eMsgHdr); } @@ -17006,6 +17009,102 @@ int esock_select_cancel(ErlNifEnv* env, /* ---------------------------------------------------------------------- + * A c t i v a t e N e x t ( o p e r a t o r ) F u n c t i o n s + * ---------------------------------------------------------------------- + */ + +/* *** activate_next_acceptor | activate_next_writer | activate_next_reader *** + * + * This functions pops the writer queue and then selects until it + * manages to successfully activate a writer or the queue is empty. + */ + +#define ACTIVATE_NEXT_FUNCS \ + ACTIVATE_NEXT_FUNC_DECL(acceptor, currentAcceptor, acceptorsQ) \ + ACTIVATE_NEXT_FUNC_DECL(writer, currentWriter, writersQ) \ + ACTIVATE_NEXT_FUNC_DECL(reader, currentReader, readersQ) + +#define ACTIVATE_NEXT_FUNC_DECL(F, R, Q) \ + static \ + BOOLEAN_T activate_next_##F(ErlNifEnv* env, \ + SocketDescriptor* descP, \ + ERL_NIF_TERM sockRef) \ + { \ + return activate_next(env, descP, \ + &descP->R, &descP->Q, \ + sockRef); \ + } +ACTIVATE_NEXT_FUNCS +#undef ACTIVATE_NEXT_FUNC_DECL + + +/* *** activate_next *** + * + * This functions pops the requestor queue and then selects until it + * manages to successfully activate a new requestor or the queue is empty. + * Return value indicates if a new requestor was activated or not. + */ + +static +BOOLEAN_T activate_next(ErlNifEnv* env, + SocketDescriptor* descP, + SocketRequestor* reqP, + SocketRequestQueue* q, + ERL_NIF_TERM sockRef) +{ + BOOLEAN_T popped, activated; + int sres; + + popped = FALSE; + do { + + if (requestor_pop(q, reqP)) { + + /* There was another one */ + + SSDBG( descP, + ("SOCKET", "activate_next -> new (active) requestor: " + "\r\n pid: %T" + "\r\n ref: %T" + "\r\n", reqP->pid, reqP->ref) ); + + if ((sres = esock_select_read(env, descP->sock, descP, + &reqP->pid, reqP->ref)) < 0) { + /* We need to inform this process, reqP->pid, that we + * failed to select, so we don't leave it hanging. + * => send abort + */ + + esock_send_abort_msg(env, sockRef, reqP->ref, sres, &reqP->pid); + + } else { + + /* Success: New requestor selected */ + popped = TRUE; + activated = FALSE; + + } + + } else { + + SSDBG( descP, + ("SOCKET", "send_activate_next -> no more requestors\r\n") ); + + popped = TRUE; + activated = FALSE; + } + + } while (!popped); + + SSDBG( descP, + ("SOCKET", "activate_next -> " + "done with %s\r\n", B2S(activated)) ); + + return activated; +} + + +/* ---------------------------------------------------------------------- * R e q u e s t o r Q u e u e F u n c t i o n s * ---------------------------------------------------------------------- * @@ -17462,7 +17561,7 @@ void socket_stop(ErlNifEnv* env, void* obj, int fd, int is_direct_call) descP->readTries, descP->readWaits) ); - sockRef = enif_make_resource(env, descP), + sockRef = enif_make_resource(env, descP); descP->state = SOCKET_STATE_CLOSING; // Just in case...??? descP->isReadable = FALSE; descP->isWritable = FALSE; @@ -17751,6 +17850,7 @@ void socket_down(ErlNifEnv* env, #if !defined(__WIN32__) SocketDescriptor* descP = (SocketDescriptor*) obj; int sres; + ERL_NIF_TERM sockRef; SSDBG( descP, ("SOCKET", "socket_down -> entry with" "\r\n sock: %d" @@ -17865,19 +17965,21 @@ void socket_down(ErlNifEnv* env, SSDBG( descP, ("SOCKET", "socket_down -> other process term\r\n") ); + sockRef = enif_make_resource(env, descP); + MLOCK(descP->accMtx); if (descP->currentAcceptorP != NULL) - socket_down_acceptor(env, descP, pid); + socket_down_acceptor(env, descP, sockRef, pid); MUNLOCK(descP->accMtx); MLOCK(descP->writeMtx); if (descP->currentWriterP != NULL) - socket_down_writer(env, descP, pid); + socket_down_writer(env, descP, sockRef, pid); MUNLOCK(descP->writeMtx); MLOCK(descP->readMtx); if (descP->currentReaderP != NULL) - socket_down_reader(env, descP, pid); + socket_down_reader(env, descP, sockRef, pid); MUNLOCK(descP->readMtx); } @@ -17899,46 +18001,28 @@ void socket_down(ErlNifEnv* env, static void socket_down_acceptor(ErlNifEnv* env, SocketDescriptor* descP, + ERL_NIF_TERM sockRef, const ErlNifPid* pid) { if (compare_pids(env, &descP->currentAcceptor.pid, pid)) { SSDBG( descP, ("SOCKET", "socket_down_acceptor -> " - "current acceptor - try pop the queue\r\n") ); + "current acceptor - try activate next\r\n") ); - if (acceptor_pop(env, descP, &descP->currentAcceptor)) { - int res; - - /* There was another one, so we will still be in accepting state */ - - SSDBG( descP, ("SOCKET", - "socket_down_acceptor -> new (active) acceptor: " - "\r\n pid: %T" - "\r\n ref: %T" - "\r\n", - descP->currentAcceptor.pid, - descP->currentAcceptor.ref) ); - - if ((res = esock_select_read(env, descP->sock, descP, - &descP->currentAcceptor.pid, - descP->currentAcceptor.ref) < 0)) { - - esock_warning_msg("Failed select (%d) for new acceptor " - "after current (%T) died\r\n", - res, *pid); - - } - - } else { - - SSDBG( descP, ("SOCKET", - "socket_down_acceptor -> no active acceptor\r\n") ); - - descP->currentAcceptorP = NULL; - descP->state = SOCKET_STATE_LISTENING; + if (!activate_next_acceptor(env, descP, sockRef)) { + + SSDBG( descP, + ("SOCKET", "socket_down_acceptor -> no more writers\r\n") ); + + descP->state = SOCKET_STATE_LISTENING; + + descP->currentAcceptorP = NULL; + descP->currentAcceptor.ref = esock_atom_undefined; + enif_set_pid_undefined(&descP->currentAcceptor.pid); + esock_monitor_init(&descP->currentAcceptor.mon); } - + } else { /* Maybe unqueue one of the waiting acceptors */ @@ -17962,42 +18046,22 @@ void socket_down_acceptor(ErlNifEnv* env, static void socket_down_writer(ErlNifEnv* env, SocketDescriptor* descP, + ERL_NIF_TERM sockRef, const ErlNifPid* pid) { if (compare_pids(env, &descP->currentWriter.pid, pid)) { SSDBG( descP, ("SOCKET", "socket_down_writer -> " - "current writer - try pop the queue\r\n") ); + "current writer - try activate next\r\n") ); - if (writer_pop(env, descP, &descP->currentWriter)) { - int res; - - /* There was another one */ - - SSDBG( descP, ("SOCKET", "socket_down_writer -> new (current) writer: " - "\r\n pid: %T" - "\r\n ref: %T" - "\r\n", - descP->currentWriter.pid, - descP->currentWriter.ref) ); - - if ((res = esock_select_write(env, descP->sock, descP, - &descP->currentWriter.pid, - descP->currentWriter.ref) < 0)) { - - esock_warning_msg("Failed select (%d) for new writer " - "after current (%T) died\r\n", - res, *pid); - - } - - } else { - + if (!activate_next_writer(env, descP, sockRef)) { SSDBG( descP, ("SOCKET", "socket_down_writer -> no active writer\r\n") ); - - descP->currentWriterP = NULL; + descP->currentWriterP = NULL; + descP->currentWriter.ref = esock_atom_undefined; + enif_set_pid_undefined(&descP->currentWriter.pid); + esock_monitor_init(&descP->currentWriter.mon); } } else { @@ -18023,44 +18087,24 @@ void socket_down_writer(ErlNifEnv* env, static void socket_down_reader(ErlNifEnv* env, SocketDescriptor* descP, + ERL_NIF_TERM sockRef, const ErlNifPid* pid) { if (compare_pids(env, &descP->currentReader.pid, pid)) { SSDBG( descP, ("SOCKET", "socket_down_reader -> " - "current reader - try pop the queue\r\n") ); + "current reader - try activate next\r\n") ); - if (reader_pop(env, descP, &descP->currentReader)) { - int res; - - /* There was another one */ - - SSDBG( descP, ("SOCKET", "socket_down_reader -> new (current) reader: " - "\r\n pid: %T" - "\r\n ref: %T" - "\r\n", - descP->currentReader.pid, - descP->currentReader.ref) ); - - if ((res = esock_select_read(env, descP->sock, descP, - &descP->currentReader.pid, - descP->currentReader.ref) < 0)) { - - esock_warning_msg("Failed select (%d) for new reader " - "after current (%T) died\r\n", - res, *pid); - - } - - } else { - - SSDBG( descP, ("SOCKET", - "socket_down_reader -> no active reader\r\n") ); - - descP->currentReaderP = NULL; + if (!activate_next_reader(env, descP, sockRef)) { + SSDBG( descP, + ("SOCKET", "ncancel_recv_current -> no more readers\r\n") ); + descP->currentReaderP = NULL; + descP->currentReader.ref = esock_atom_undefined; + enif_set_pid_undefined(&descP->currentReader.pid); + esock_monitor_init(&descP->currentReader.mon); } - + } else { /* Maybe unqueue one of the waiting reader(s) */ |