diff options
-rw-r--r-- | erts/emulator/nifs/common/socket_int.h | 5 | ||||
-rw-r--r-- | erts/emulator/nifs/common/socket_nif.c | 659 | ||||
-rw-r--r-- | erts/preloaded/ebin/socket.beam | bin | 70288 -> 70312 bytes | |||
-rw-r--r-- | erts/preloaded/src/socket.erl | 22 |
4 files changed, 390 insertions, 296 deletions
diff --git a/erts/emulator/nifs/common/socket_int.h b/erts/emulator/nifs/common/socket_int.h index 0f973855ae..a9e83adc21 100644 --- a/erts/emulator/nifs/common/socket_int.h +++ b/erts/emulator/nifs/common/socket_int.h @@ -258,6 +258,7 @@ extern ERL_NIF_TERM esock_atom_rxq_ovfl; extern ERL_NIF_TERM esock_atom_scope_id; extern ERL_NIF_TERM esock_atom_sctp; extern ERL_NIF_TERM esock_atom_sec; +extern ERL_NIF_TERM esock_atom_select_failed; extern ERL_NIF_TERM esock_atom_select_sent; extern ERL_NIF_TERM esock_atom_send; extern ERL_NIF_TERM esock_atom_sendmsg; @@ -349,10 +350,6 @@ extern ERL_NIF_TERM esock_atom_einval; #define MON_INIT(M) esock_monitor_init((M)) // #define MON_COMP(M1, M2) esock_monitor_compare((M1), (M2)) -#define SELECT(E,FD,M,O,P,R) \ - if (enif_select((E), (FD), (M), (O), (P), (R)) < 0) \ - return enif_make_badarg((E)); - #define COMPARE(A, B) enif_compare((A), (B)) #define IS_ATOM(E, TE) enif_is_atom((E), (TE)) diff --git a/erts/emulator/nifs/common/socket_nif.c b/erts/emulator/nifs/common/socket_nif.c index c0a37450f9..d860cb4965 100644 --- a/erts/emulator/nifs/common/socket_nif.c +++ b/erts/emulator/nifs/common/socket_nif.c @@ -834,6 +834,7 @@ typedef struct { ErlNifPid closerPid; // ErlNifMonitor closerMon; ESockMonitor closerMon; + ErlNifEnv* closeEnv; ERL_NIF_TERM closeRef; BOOLEAN_T closeLocal; @@ -2335,9 +2336,8 @@ static void socket_down_reader(ErlNifEnv* env, SocketDescriptor* descP, const ErlNifPid* pid); -static char* esock_send_close_msg(ErlNifEnv* env, - ERL_NIF_TERM closeRef, - ErlNifPid* pid); +static char* esock_send_close_msg(ErlNifEnv* env, + SocketDescriptor* descP); static char* esock_send_abort_msg(ErlNifEnv* env, ERL_NIF_TERM sockRef, ERL_NIF_TERM recvRef, @@ -2354,6 +2354,24 @@ static char* esock_send_msg(ErlNifEnv* env, ErlNifPid* pid, ErlNifEnv* msg_env); +static int esock_select_read(ErlNifEnv* env, + ErlNifEvent event, + void* obj, + const ErlNifPid* pid, + ERL_NIF_TERM ref); +static int esock_select_write(ErlNifEnv* env, + ErlNifEvent event, + void* obj, + const ErlNifPid* pid, + ERL_NIF_TERM ref); +static int esock_select_stop(ErlNifEnv* env, + ErlNifEvent event, + void* obj); +static int esock_select_cancel(ErlNifEnv* env, + ErlNifEvent event, + enum ErlNifSelectFlags mode, + void* obj); + static BOOLEAN_T extract_debug(ErlNifEnv* env, ERL_NIF_TERM map); static BOOLEAN_T extract_iow(ErlNifEnv* env, @@ -2618,6 +2636,7 @@ ERL_NIF_TERM esock_atom_rxq_ovfl; ERL_NIF_TERM esock_atom_scope_id; ERL_NIF_TERM esock_atom_sctp; ERL_NIF_TERM esock_atom_sec; +ERL_NIF_TERM esock_atom_select_failed; ERL_NIF_TERM esock_atom_select_sent; ERL_NIF_TERM esock_atom_send; ERL_NIF_TERM esock_atom_sendmsg; @@ -4582,7 +4601,17 @@ ERL_NIF_TERM nif_connect(ErlNifEnv* env, return esock_make_error_str(env, xres); } + /* + * <KOLLA> + * + * We should lock both the read and write mutex:es... + * + * </KOLLA> + * + */ + return nconnect(env, descP); + #endif // if !defined(__WIN32__) } @@ -4592,16 +4621,12 @@ static ERL_NIF_TERM nconnect(ErlNifEnv* env, SocketDescriptor* descP) { - int code, save_errno = 0; + ERL_NIF_TERM res; + int code, sres, save_errno = 0; /* - * <KOLLA> - * - * We should look both the read and write mutex:es... - * - * </KOLLA> - * - * Verify that we are where in the proper state */ + * Verify that we are where in the proper state + */ if (!IS_OPEN(descP)) { SSDBG( descP, ("SOCKET", "nif_connect -> not open\r\n") ); @@ -4629,13 +4654,17 @@ ERL_NIF_TERM nconnect(ErlNifEnv* env, if (IS_SOCKET_ERROR(code) && ((save_errno == ERRNO_BLOCK) || /* Winsock2 */ (save_errno == EINPROGRESS))) { /* Unix & OSE!! */ + /* THIS DOES NOT WORK!! WE NEED A "PERISTENT" ENV!! */ ERL_NIF_TERM ref = MKREF(env); descP->state = SOCKET_STATE_CONNECTING; - SELECT(env, - descP->sock, - (ERL_NIF_SELECT_WRITE), - descP, NULL, ref); - return esock_make_ok2(env, ref); + if ((sres = esock_select_write(env, descP->sock, descP, NULL, ref)) < 0) { + res = esock_make_error(env, + MKT2(env, + esock_atom_select_failed, + MKI(env, sres))); + } else { + res = esock_make_ok2(env, ref); + } } else if (code == 0) { /* ok we are connected */ descP->state = SOCKET_STATE_CONNECTED; descP->isReadable = TRUE; @@ -4643,11 +4672,13 @@ ERL_NIF_TERM nconnect(ErlNifEnv* env, /* Do we need to do somthing for "active" mode? * Is there even such a thing *here*? */ - return esock_atom_ok; + res = esock_atom_ok; } else { - return esock_make_error_errno(env, save_errno); + res = esock_make_error_errno(env, save_errno); } + return res; + } #endif // if !defined(__WIN32__) @@ -4925,7 +4956,7 @@ ERL_NIF_TERM naccept_listening(ErlNifEnv* env, unsigned int n; SOCKET accSock; HANDLE accEvent; - int save_errno; + int sres, save_errno; ErlNifPid caller; SSDBG( descP, ("SOCKET", "naccept_listening -> get caller\r\n") ); @@ -4960,37 +4991,41 @@ ERL_NIF_TERM naccept_listening(ErlNifEnv* env, descP->currentAcceptor.ref = enif_make_copy(descP->env, ref); descP->currentAcceptorP = &descP->currentAcceptor; - SELECT(env, - descP->sock, - (ERL_NIF_SELECT_READ), - descP, NULL, ref); - - /* Shall we really change state? - * The ready event is sent directly to the calling - * process, which simply calls this function again. - * Basically, state accepting means that we have - * an "outstanding" accept. - * Shall we store the pid of the calling process? - * And if someone else calls accept, return with ebusy? - * Can any process call accept or just the controlling - * process? - * We also need a monitor it case the calling process is - * called before we are done! - * - * Change state (to accepting) and store pid of the acceptor - * (current process). Only accept calls from the acceptor - * process (ebusy) and once we have a successful accept, - * change state back to listening. If cancel is called instead - * (only accepted from the acceptor process), we reset - * state to listening and also resets the pid to "null" - * (is there such a value?). - * Need a mutex to secure that we don't test and change the - * pid at the same time. - */ + if ((sres = esock_select_read(env, descP->sock, descP, + NULL, ref)) < 0) { + return esock_make_error(env, + MKT2(env, + esock_atom_select_failed, + MKI(env, sres))); + } else { - descP->state = SOCKET_STATE_ACCEPTING; + /* Shall we really change state? + * The ready event is sent directly to the calling + * process, which simply calls this function again. + * Basically, state accepting means that we have + * an "outstanding" accept. + * Shall we store the pid of the calling process? + * And if someone else calls accept, return with ebusy? + * Can any process call accept or just the controlling + * process? + * We also need a monitor it case the calling process is + * called before we are done! + * + * Change state (to accepting) and store pid of the acceptor + * (current process). Only accept calls from the acceptor + * process (ebusy) and once we have a successful accept, + * change state back to listening. If cancel is called instead + * (only accepted from the acceptor process), we reset + * state to listening and also resets the pid to "null" + * (is there such a value?). + * Need a mutex to secure that we don't test and change the + * pid at the same time. + */ - return esock_make_error(env, esock_atom_eagain); + descP->state = SOCKET_STATE_ACCEPTING; + + return esock_make_error(env, esock_atom_eagain); + } } else { SSDBG( descP, @@ -5071,7 +5106,7 @@ ERL_NIF_TERM naccept_accepting(ErlNifEnv* env, SOCKET accSock; HANDLE accEvent; ErlNifPid caller; - int save_errno; + int save_errno, sres; ERL_NIF_TERM result; SSDBG( descP, ("SOCKET", "naccept_accepting -> get caller\r\n") ); @@ -5125,12 +5160,18 @@ ERL_NIF_TERM naccept_accepting(ErlNifEnv* env, ("SOCKET", "naccept_accepting -> would block: try again\r\n") ); - SELECT(env, - descP->sock, - (ERL_NIF_SELECT_READ), - descP, NULL, ref); + if ((sres = esock_select_read(env, descP->sock, descP, + NULL, ref)) < 0) { + result = esock_make_error(env, + MKT2(env, + esock_atom_select_failed, + MKI(env, sres))); + } else { + result = esock_make_error(env, esock_atom_eagain); + } + + return result; - return esock_make_error(env, esock_atom_eagain); } else { SSDBG( descP, ("SOCKET", @@ -5201,10 +5242,14 @@ ERL_NIF_TERM naccept_accepting(ErlNifEnv* env, descP->currentAcceptor.pid, descP->currentAcceptor.ref) ); - SELECT(env, - descP->sock, - (ERL_NIF_SELECT_READ), - descP, &descP->currentAcceptor.pid, descP->currentAcceptor.ref); + if ((sres = esock_select_read(env, descP->sock, descP, + &descP->currentAcceptor.pid, + descP->currentAcceptor.ref)) < 0) { + return esock_make_error(env, + MKT2(env, + esock_atom_select_failed, + MKI(env, sres))); + } } else { descP->currentAcceptorP = NULL; descP->state = SOCKET_STATE_LISTENING; @@ -6379,12 +6424,10 @@ ERL_NIF_TERM nclose(ErlNifEnv* env, doClose = TRUE; } - MUNLOCK(descP->closeMtx); - if (doClose) { - descP->closeRef = MKREF(env); - selectRes = enif_select(env, descP->sock, (ERL_NIF_SELECT_STOP), - descP, NULL, descP->closeRef); + descP->closeEnv = enif_alloc_env(); + descP->closeRef = MKREF(descP->closeEnv); + selectRes = esock_select_stop(env, descP->sock, descP); if (selectRes & ERL_NIF_SELECT_STOP_CALLED) { /* Prep done - inform the caller it can finalize (close) directly */ SSDBG( descP, @@ -6421,10 +6464,13 @@ ERL_NIF_TERM nclose(ErlNifEnv* env, reason = MKT2(env, atom_select, MKI(env, selectRes)); reply = esock_make_error(env, reason); } + } else { reply = esock_make_error(env, reason); } + MUNLOCK(descP->closeMtx); + SSDBG( descP, ("SOCKET", "nclose -> [%d] done when: " "\r\n state: 0x%lX" @@ -13065,6 +13111,7 @@ static ERL_NIF_TERM ncancel_accept_current(ErlNifEnv* env, SocketDescriptor* descP) { + int sres; ERL_NIF_TERM res; SSDBG( descP, ("SOCKET", "ncancel_accept_current -> entry\r\n") ); @@ -13089,11 +13136,14 @@ ERL_NIF_TERM ncancel_accept_current(ErlNifEnv* env, descP->currentAcceptor.pid, descP->currentAcceptor.ref) ); - SELECT(env, - descP->sock, - (ERL_NIF_SELECT_READ), - descP, &descP->currentAcceptor.pid, descP->currentAcceptor.ref); - + if ((sres = esock_select_read(env, descP->sock, descP, + &descP->currentAcceptor.pid, + descP->currentAcceptor.ref)) < 0) { + return esock_make_error(env, + MKT2(env, + esock_atom_select_failed, + MKI(env, sres))); + } } else { SSDBG( descP, ("SOCKET", "ncancel_accept_current -> no more acceptors\r\n") ); descP->currentAcceptorP = NULL; @@ -13185,6 +13235,7 @@ static ERL_NIF_TERM ncancel_send_current(ErlNifEnv* env, SocketDescriptor* descP) { + int sres; ERL_NIF_TERM res; SSDBG( descP, ("SOCKET", "ncancel_send_current -> entry\r\n") ); @@ -13209,11 +13260,14 @@ ERL_NIF_TERM ncancel_send_current(ErlNifEnv* env, descP->currentWriter.pid, descP->currentWriter.ref) ); - SELECT(env, - descP->sock, - (ERL_NIF_SELECT_WRITE), - descP, &descP->currentWriter.pid, descP->currentWriter.ref); - + if ((sres = esock_select_write(env, descP->sock, descP, + &descP->currentWriter.pid, + descP->currentWriter.ref)) < 0) { + return esock_make_error(env, + MKT2(env, + esock_atom_select_failed, + MKI(env, sres))); + } } else { SSDBG( descP, ("SOCKET", "ncancel_send_current -> no more writers\r\n") ); descP->currentWriterP = NULL; @@ -13303,6 +13357,7 @@ static ERL_NIF_TERM ncancel_recv_current(ErlNifEnv* env, SocketDescriptor* descP) { + int sres; ERL_NIF_TERM res; SSDBG( descP, ("SOCKET", "ncancel_recv_current -> entry\r\n") ); @@ -13327,11 +13382,14 @@ ERL_NIF_TERM ncancel_recv_current(ErlNifEnv* env, descP->currentReader.pid, descP->currentReader.ref) ); - SELECT(env, - descP->sock, - (ERL_NIF_SELECT_READ), - descP, &descP->currentReader.pid, descP->currentReader.ref); - + if ((sres = esock_select_read(env, descP->sock, descP, + &descP->currentReader.pid, + descP->currentReader.ref)) < 0) { + return esock_make_error(env, + MKT2(env, + esock_atom_select_failed, + MKI(env, sres))); + } } else { SSDBG( descP, ("SOCKET", "ncancel_recv_current -> no more readers\r\n") ); descP->currentReaderP = NULL; @@ -13399,9 +13457,7 @@ ERL_NIF_TERM ncancel_mode_select(ErlNifEnv* env, int smode, int rmode) { - int selectRes = enif_select(env, descP->sock, - (ERL_NIF_SELECT_CANCEL | smode), - descP, NULL, opRef); + int selectRes = esock_select_cancel(env, descP->sock, smode, descP); if (selectRes & rmode) { /* Was cancelled */ @@ -13498,6 +13554,8 @@ ERL_NIF_TERM send_check_result(ErlNifEnv* env, ERL_NIF_TERM sockRef, ERL_NIF_TERM sendRef) { + int sres; + SSDBG( descP, ("SOCKET", "send_check_result -> entry with" "\r\n written: %d" @@ -13533,11 +13591,14 @@ ERL_NIF_TERM send_check_result(ErlNifEnv* env, descP->currentWriter.pid, descP->currentWriter.ref) ); - SELECT(env, - descP->sock, - (ERL_NIF_SELECT_WRITE), - descP, &descP->currentWriter.pid, descP->currentWriter.ref); - + if ((sres = esock_select_write(env, descP->sock, descP, + &descP->currentWriter.pid, + descP->currentWriter.ref)) < 0) { + return esock_make_error(env, + MKT2(env, + esock_atom_select_failed, + MKI(env, sres))); + } } else { descP->currentWriterP = NULL; } @@ -13615,12 +13676,34 @@ ERL_NIF_TERM send_check_result(ErlNifEnv* env, cnt_inc(&descP->writeWaits, 1); - SELECT(env, descP->sock, (ERL_NIF_SELECT_WRITE), descP, NULL, sendRef); - - if (written >= 0) - return esock_make_ok2(env, MKI(env, written)); - else - return esock_make_error(env, esock_atom_eagain); + sres = esock_select_write(env, descP->sock, descP, NULL, sendRef); + + if (written >= 0) { + if (sres < 0) { + /* Returned: {error, Reason} + * Reason: {select_failed, sres, written} + */ + return esock_make_error(env, + MKT3(env, + esock_atom_select_failed, + MKI(env, sres), + MKI(env, written))); + } else { + return esock_make_ok2(env, MKI(env, written)); + } + } else { + if (sres < 0) { + /* Returned: {error, Reason} + * Reason: {select_failed, sres} + */ + return esock_make_error(env, + MKT2(env, + esock_atom_select_failed, + MKI(env, sres))); + } else { + return esock_make_error(env, esock_atom_eagain); + } + } } @@ -13719,6 +13802,9 @@ static ERL_NIF_TERM recv_update_current_reader(ErlNifEnv* env, SocketDescriptor* descP) { + int sres; + ERL_NIF_TERM res = esock_atom_ok; + if (descP->currentReaderP != NULL) { DEMONP("recv_update_current_reader -> current reader", @@ -13739,19 +13825,20 @@ ERL_NIF_TERM recv_update_current_reader(ErlNifEnv* env, descP->currentReader.pid, descP->currentReader.ref) ); - SELECT(env, - descP->sock, - (ERL_NIF_SELECT_READ), - descP, - &descP->currentReader.pid, - descP->currentReader.ref); - + if ((sres = esock_select_read(env, descP->sock, descP, + &descP->currentReader.pid, + descP->currentReader.ref)) < 0) { + res = esock_make_error(env, + MKT2(env, + esock_atom_select_failed, + MKI(env, sres))); + } } else { descP->currentReaderP = NULL; } } - return esock_atom_ok; + return res; } @@ -13805,7 +13892,8 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env, ERL_NIF_TERM recvRef) { char* xres; - ERL_NIF_TERM data; + int sres; + ERL_NIF_TERM res, data; SSDBG( descP, ("SOCKET", "recv_check_result -> entry with" @@ -13825,7 +13913,8 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env, */ if ((read == 0) && (descP->type == SOCK_STREAM)) { - ERL_NIF_TERM res = esock_make_error(env, atom_closed); + + res = esock_make_error(env, atom_closed); /* * When a stream socket peer has performed an orderly shutdown, the return @@ -13961,7 +14050,8 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env, FREE_BIN(bufP); if (saveErrno == ECONNRESET) { - ERL_NIF_TERM res = esock_make_error(env, atom_closed); + + res = esock_make_error(env, atom_closed); /* +++ Oups - closed +++ */ @@ -13988,16 +14078,16 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env, recv_error_current_reader(env, descP, sockRef, res); - SELECT(env, - descP->sock, - (ERL_NIF_SELECT_STOP), - descP, NULL, recvRef); + if ((sres = esock_select_stop(env, descP->sock, descP)) < 0) { + esock_warning_msg("Failed stop select (closed) " + "for current reader (%T): %d\r\n", + recvRef, sres); + } return res; } else if ((saveErrno == ERRNO_BLOCK) || (saveErrno == EAGAIN)) { - int sres; SSDBG( descP, ("SOCKET", "recv_check_result -> [%d] eagain\r\n", toRead) ); @@ -14008,13 +14098,17 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env, SSDBG( descP, ("SOCKET", "recv_check_result -> SELECT for more\r\n") ); - sres = enif_select(env, descP->sock, (ERL_NIF_SELECT_READ), - descP, NULL, recvRef); + if ((sres = esock_select_read(env, descP->sock, descP, + NULL, recvRef)) < 0) { + res = esock_make_error(env, + MKT2(env, + esock_atom_select_failed, + MKI(env, sres))); + } else { + res = esock_make_error(env, esock_atom_eagain); + } - SSDBG( descP, - ("SOCKET", "recv_check_result -> SELECT res: %d\r\n", sres) ); - - return esock_make_error(env, esock_atom_eagain); + return res; } else { ERL_NIF_TERM res = esock_make_error_errno(env, saveErrno); @@ -14077,20 +14171,31 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env, return esock_make_error_str(env, xres); } - /* SELECT for more data */ + data = MKBIN(env, bufP); + data = MKSBIN(env, data, 0, read); - SELECT(env, descP->sock, (ERL_NIF_SELECT_READ), - descP, NULL, recvRef); - cnt_inc(&descP->readByteCnt, read); + /* SELECT for more data */ + + if ((sres = esock_select_read(env, descP->sock, descP, + NULL, recvRef)) < 0) { + /* Result: {error, Reason} + * Reason: {select_failed, sres, data} + */ + res = esock_make_error(env, + MKT3(env, + esock_atom_select_failed, + MKI(env, sres), + data)); + } else { + res = esock_make_ok3(env, atom_false, data); + } + /* This transfers "ownership" of the *allocated* binary to an * erlang term (no need for an explicit free). */ - data = MKBIN(env, bufP); - data = MKSBIN(env, data, 0, read); - - return esock_make_ok3(env, atom_false, data); + return res; } } } @@ -14113,7 +14218,8 @@ ERL_NIF_TERM recvfrom_check_result(ErlNifEnv* env, ERL_NIF_TERM recvRef) { char* xres; - ERL_NIF_TERM data; + int sres; + ERL_NIF_TERM data, res; SSDBG( descP, ("SOCKET", "recvfrom_check_result -> entry with" @@ -14133,7 +14239,7 @@ ERL_NIF_TERM recvfrom_check_result(ErlNifEnv* env, /* +++ Error handling +++ */ if (saveErrno == ECONNRESET) { - ERL_NIF_TERM res = esock_make_error(env, atom_closed); + res = esock_make_error(env, atom_closed); /* +++ Oups - closed +++ */ @@ -14153,10 +14259,11 @@ ERL_NIF_TERM recvfrom_check_result(ErlNifEnv* env, recv_error_current_reader(env, descP, sockRef, res); - SELECT(env, - descP->sock, - (ERL_NIF_SELECT_STOP), - descP, NULL, recvRef); + if ((sres = esock_select_stop(env, descP->sock, descP)) < 0) { + esock_warning_msg("Failed stop select (closed) " + "for current reader (%T): %d\r\n", + recvRef, sres); + } FREE_BIN(bufP); @@ -14166,19 +14273,26 @@ ERL_NIF_TERM recvfrom_check_result(ErlNifEnv* env, (saveErrno == EAGAIN)) { SSDBG( descP, ("SOCKET", "recvfrom_check_result -> eagain\r\n") ); - - if ((xres = recv_init_current_reader(env, descP, recvRef)) != NULL) - return esock_make_error_str(env, xres); - - SELECT(env, descP->sock, (ERL_NIF_SELECT_READ), - descP, NULL, recvRef); FREE_BIN(bufP); - return esock_make_error(env, esock_atom_eagain); + if ((xres = recv_init_current_reader(env, descP, recvRef)) != NULL) + return esock_make_error_str(env, xres); + + if ((sres = esock_select_read(env, descP->sock, descP, + NULL, recvRef)) < 0) { + res = esock_make_error(env, + MKT2(env, + esock_atom_select_failed, + MKI(env, sres))); + } else { + res = esock_make_error(env, esock_atom_eagain); + } + return res; } else { - ERL_NIF_TERM res = esock_make_error_errno(env, saveErrno); + + res = esock_make_error_errno(env, saveErrno); SSDBG( descP, ("SOCKET", @@ -14240,6 +14354,8 @@ ERL_NIF_TERM recvmsg_check_result(ErlNifEnv* env, ERL_NIF_TERM sockRef, ERL_NIF_TERM recvRef) { + int sres; + ERL_NIF_TERM res; SSDBG( descP, ("SOCKET", "recvmsg_check_result -> entry with" @@ -14284,7 +14400,6 @@ ERL_NIF_TERM recvmsg_check_result(ErlNifEnv* env, /* +++ Error handling +++ */ if (saveErrno == ECONNRESET) { - ERL_NIF_TERM res = esock_make_error(env, atom_closed); /* +++ Oups - closed +++ */ @@ -14299,19 +14414,21 @@ ERL_NIF_TERM recvmsg_check_result(ErlNifEnv* env, * </KOLLA> */ + res = esock_make_error(env, atom_closed); descP->closeLocal = FALSE; descP->state = SOCKET_STATE_CLOSING; recv_error_current_reader(env, descP, sockRef, res); - SELECT(env, - descP->sock, - (ERL_NIF_SELECT_STOP), - descP, NULL, recvRef); + if ((sres = esock_select_stop(env, descP->sock, descP)) < 0) { + esock_warning_msg("Failed stop select (closed) " + "for current reader (%T): %d\r\n", + recvRef, sres); + } FREE_BIN(dataBufP); FREE_BIN(ctrlBufP); - return res; + return res;; } else if ((saveErrno == ERRNO_BLOCK) || (saveErrno == EAGAIN)) { @@ -14319,18 +14436,26 @@ ERL_NIF_TERM recvmsg_check_result(ErlNifEnv* env, SSDBG( descP, ("SOCKET", "recvmsg_check_result -> eagain\r\n") ); + FREE_BIN(dataBufP); FREE_BIN(ctrlBufP); + if ((xres = recv_init_current_reader(env, descP, recvRef)) != NULL) return esock_make_error_str(env, xres); - SELECT(env, descP->sock, (ERL_NIF_SELECT_READ), - descP, NULL, recvRef); - - FREE_BIN(dataBufP); FREE_BIN(ctrlBufP); + if ((sres = esock_select_read(env, descP->sock, descP, + NULL, recvRef)) < 0) { + res = esock_make_error(env, + MKT2(env, + esock_atom_select_failed, + MKI(env, sres))); + } else { + res = esock_make_error(env, esock_atom_eagain); + } - return esock_make_error(env, esock_atom_eagain); + return res; } else { - ERL_NIF_TERM res = esock_make_error_errno(env, saveErrno); + + res = esock_make_error_errno(env, saveErrno); SSDBG( descP, ("SOCKET", @@ -16069,6 +16194,8 @@ SocketDescriptor* alloc_descriptor(SOCKET sock, HANDLE event) sprintf(buf, "socket[close,%d]", sock); descP->closeMtx = MCREATE(buf); + descP->closeEnv = NULL; + descP->closeRef = esock_atom_undefined; descP->rBufSz = SOCKET_RECV_BUFFER_SIZE_DEFAULT; descP->rNum = 0; @@ -16647,13 +16774,20 @@ char* send_msg_error(ErlNifEnv* env, * (actually that the 'stop' callback function has been called). */ static -char* esock_send_close_msg(ErlNifEnv* env, - ERL_NIF_TERM closeRef, - ErlNifPid* pid) +char* esock_send_close_msg(ErlNifEnv* env, + SocketDescriptor* descP) { - return esock_send_socket_msg(env, - esock_atom_undefined, - esock_atom_close, closeRef, pid, NULL); + ERL_NIF_TERM sockRef = enif_make_resource(descP->closeEnv, descP); + char* res = esock_send_socket_msg(env, + sockRef, + esock_atom_close, + descP->closeRef, + &descP->closerPid, + descP->closeEnv); + + descP->closeEnv = NULL; + + return res; } @@ -16673,7 +16807,8 @@ char* esock_send_abort_msg(ErlNifEnv* env, ErlNifPid* pid) { ErlNifEnv* msg_env = enif_alloc_env(); - ERL_NIF_TERM info = MKT2(msg_env, enif_make_copy(msg_env, recvRef), + ERL_NIF_TERM info = MKT2(msg_env, + enif_make_copy(msg_env, recvRef), enif_make_copy(msg_env, reason)); return esock_send_socket_msg(env, sockRef, esock_atom_abort, info, pid, @@ -16684,10 +16819,16 @@ char* esock_send_abort_msg(ErlNifEnv* env, /* *** esock_send_socket_msg *** * * This function sends a general purpose socket message to an erlang - * process. A general 'socket' message has the form: + * process. A general 'socket' message has the ("erlang") form: * * {'$socket', SockRef, Tag, Info} * + * Where + * + * SockRef: reference() + * Tag: atom() + * Info: term() + * */ static @@ -16696,7 +16837,7 @@ char* esock_send_socket_msg(ErlNifEnv* env, ERL_NIF_TERM tag, ERL_NIF_TERM info, ErlNifPid* pid, - ErlNifEnv* msg_env) + ErlNifEnv* msg_env) { ERL_NIF_TERM msg; if (!msg_env) { @@ -16731,6 +16872,52 @@ char* esock_send_msg(ErlNifEnv* env, #endif // #if defined(__WIN32__) +/* ---------------------------------------------------------------------- + * S e l e c t W r a p p e r F u n c t i o n s + * ---------------------------------------------------------------------- + */ + +static +int esock_select_read(ErlNifEnv* env, + ErlNifEvent event, + void* obj, + const ErlNifPid* pid, + ERL_NIF_TERM ref) +{ + return enif_select(env, event, (ERL_NIF_SELECT_READ), obj, pid, ref); +} + + +static +int esock_select_write(ErlNifEnv* env, + ErlNifEvent event, + void* obj, + const ErlNifPid* pid, + ERL_NIF_TERM ref) +{ + return enif_select(env, event, (ERL_NIF_SELECT_WRITE), obj, pid, ref); +} + + +static +int esock_select_stop(ErlNifEnv* env, + ErlNifEvent event, + void* obj) +{ + return enif_select(env, event, (ERL_NIF_SELECT_STOP), obj, NULL, + esock_atom_undefined); +} + +static +int esock_select_cancel(ErlNifEnv* env, + ErlNifEvent event, + enum ErlNifSelectFlags mode, + void* obj) +{ + return enif_select(env, event, (ERL_NIF_SELECT_CANCEL | mode), obj, NULL, + esock_atom_undefined); +} + /* ---------------------------------------------------------------------- * R e q u e s t Q u e u e F u n c t i o n s @@ -17180,20 +17367,13 @@ int esock_monitor(const char* slogan, int res; SSDBG( descP, ("SOCKET", "[%d] %s: try monitor\r\n", descP->sock, slogan) ); - /* esock_dbg_printf("MONP", "[%d] %s\r\n", descP->sock, slogan); */ res = enif_monitor_process(env, descP, pid, &monP->mon); if (res != 0) { monP->is_active = 0; SSDBG( descP, ("SOCKET", "[%d] monitor failed: %d\r\n", descP->sock, res) ); - // esock_dbg_printf("MONP", "[%d] failed: %d\r\n", descP->sock, res); } else { monP->is_active = 1; - /*esock_dbg_printf("MONP", - "[%d] success: " - "%T\r\n", - descP->sock, - my_make_monitor_term(env, &monP->mon));*/ } return res; @@ -17294,16 +17474,6 @@ void socket_dtor(ErlNifEnv* env, void* obj) * (in case it is, for instance, both controlling process * and a writer). * - * <KOLLA> - * - * We do not handle linger-issues yet! So anything in the out - * buffers will be left for the OS to solve... - * Do we need a special "close"-thread? Dirty scheduler? - * - * What happens if we are "stopped" for another reason then 'close'? - * For instance, down? - * - * </KOLLA> */ static void socket_stop(ErlNifEnv* env, void* obj, int fd, int is_direct_call) @@ -17321,7 +17491,7 @@ void socket_stop(ErlNifEnv* env, void* obj, int fd, int is_direct_call) MLOCK(descP->writeMtx); MLOCK(descP->readMtx); MLOCK(descP->accMtx); - MLOCK(descP->closeMtx); + if (!is_direct_call) MLOCK(descP->closeMtx); SSDBG( descP, ("SOCKET", "socket_stop -> " "[%d, %T] all mutex(s) locked when counters:" @@ -17360,10 +17530,6 @@ void socket_stop(ErlNifEnv* env, void* obj, int fd, int is_direct_call) */ DEMONP("socket_stop -> ctrl", env, descP, &descP->ctrlMon); - /* - esock_dbg_printf("STOP", "[%d] maybe handle current writer (0x%lX)\r\n", - descP->sock, descP->currentReaderP); - */ if (descP->currentWriterP != NULL) { /* We have a (current) writer and *may* therefor also have * writers waiting. @@ -17399,10 +17565,6 @@ void socket_stop(ErlNifEnv* env, void* obj, int fd, int is_direct_call) inform_waiting_procs(env, descP, &descP->writersQ, TRUE, atom_closed); } - /* - esock_dbg_printf("STOP", "[%d] maybe handle current reader (0x%lX)\r\n", - descP->sock, descP->currentReaderP); - */ if (descP->currentReaderP != NULL) { /* We have a (current) reader and *may* therefor also have @@ -17439,10 +17601,6 @@ void socket_stop(ErlNifEnv* env, void* obj, int fd, int is_direct_call) inform_waiting_procs(env, descP, &descP->readersQ, TRUE, atom_closed); } - /* - esock_dbg_printf("STOP", "[%d] maybe handle current acceptor (0x%lX)\r\n", - descP->sock, descP->currentReaderP); - */ if (descP->currentAcceptorP != NULL) { /* We have a (current) acceptor and *may* therefor also have * acceptors waiting. @@ -17490,56 +17648,32 @@ void socket_stop(ErlNifEnv* env, void* obj, int fd, int is_direct_call) * * </KOLLA> */ - - if (descP->closeLocal && - !is_direct_call) { - - /* +++ send close message to the waiting process +++ - * - * {close, CloseRef} - * - * <KOLLA> - * - * WHAT HAPPENS IF THE RECEIVER HAS DIED IN THE MEANTIME???? - * - * </KOLLA> - */ - /* We should actually move the sending out of the - * safe region (after the MUNLOCK). - * Construct it here, but do the actual send later. - * Here: - * cpid = descP->closePid; - * cenv = enif_alloc_env(); - * closeMsg = esock_mk_close_msg(env, - * CP_TERM(cenv, sockRef), - * CP_TERM(cenv, descP->closeRef)); - * And after MUNLOCK: - * if (cenv != NULL) { - * esock_send_msg(env, closeMsg, &cpid, cenv); - * - * } - */ - esock_send_close_msg(env, descP->closeRef, &descP->closerPid); - - DEMONP("socket_stop -> closer", env, descP, &descP->closerMon); - - } else { - - /* - * <KOLLA> - * - * ABORT? - * - * </KOLLA> - */ + if (descP->closeLocal) { + if (!is_direct_call) { + + /* +++ send close message to the waiting process +++ */ + + esock_send_close_msg(env, descP); + + DEMONP("socket_stop -> closer", env, descP, &descP->closerMon); + + } else { + + /* We only need to explicitly free the environment here + * since the message send takes care of it if scheduled. + */ + + if (descP->closeEnv != NULL) enif_free_env(descP->closeEnv); + + } } } SSDBG( descP, ("SOCKET", "socket_stop -> unlock all mutex(s)\r\n") ); - MUNLOCK(descP->closeMtx); + if (!is_direct_call) MUNLOCK(descP->closeMtx); MUNLOCK(descP->accMtx); MUNLOCK(descP->readMtx); MUNLOCK(descP->writeMtx); @@ -17624,31 +17758,6 @@ void socket_down(ErlNifEnv* env, B2S(IS_CLOSED(descP)), B2S(IS_CLOSING(descP))) ); - /* - esock_dbg_printf("DOWN", - "[%d] begin %T\r\n", - descP->sock, my_make_monitor_term(env, mon)); - */ - - /* - if (MON_COMP(mon, &descP->ctrlMon) == 0) { - SSDBG( descP, ("SOCKET", "socket_down -> controlling process mon\r\n") ); - } else if (MON_COMP(mon, &descP->closerMon) == 0) { - SSDBG( descP, ("SOCKET", "socket_down -> closer mon\r\n") ); - } else if ((descP->currentWriterP != NULL) && - (MON_COMP(mon, &descP->currentWriter.mon) == 0)) { - SSDBG( descP, ("SOCKET", "socket_down -> current writer mon\r\n") ); - } else if ((descP->currentReaderP != NULL) && - (MON_COMP(mon, &descP->currentReader.mon) == 0)) { - SSDBG( descP, ("SOCKET", "socket_down -> current reader mon\r\n") ); - } else if ((descP->currentAcceptorP != NULL) && - (MON_COMP(mon, &descP->currentAcceptor.mon) == 0)) { - SSDBG( descP, ("SOCKET", "socket_down -> current acceptor mon\r\n") ); - } else { - SSDBG( descP, ("SOCKET", "socket_down -> OTHER mon\r\n") ); - } - */ - if (!IS_CLOSED(descP)) { if (compare_pids(env, &descP->ctrlPid, pid)) { int selectRes; @@ -17664,16 +17773,8 @@ void socket_down(ErlNifEnv* env, descP->closeLocal = TRUE; descP->closerPid = *pid; MON_INIT(&descP->closerMon); - descP->closeRef = MKREF(env); // Do we really need this in this case? - - /* - esock_dbg_printf("DOWN", - "[%d] select stop %T\r\n", - descP->sock, my_make_monitor_term(env, mon)); - */ - selectRes = enif_select(env, descP->sock, (ERL_NIF_SELECT_STOP), - descP, NULL, descP->closeRef); + selectRes = esock_select_stop(env, descP->sock, descP); if (selectRes & ERL_NIF_SELECT_STOP_CALLED) { /* We are done - we can finalize (socket close) directly */ @@ -17832,12 +17933,9 @@ void socket_down_acceptor(ErlNifEnv* env, descP->currentAcceptor.pid, descP->currentAcceptor.ref) ); - if ((res = enif_select(env, - descP->sock, - (ERL_NIF_SELECT_READ), - descP, - &descP->currentAcceptor.pid, - descP->currentAcceptor.ref) < 0)) { + if ((res = esock_select_read(env, descP->sock, descP, + &descP->currentAcceptor.pid, + descP->currentAcceptor.ref) < 0)) { esock_warning_msg("Failed select (%d) for new acceptor " "after current (%T) died\r\n", @@ -17900,12 +17998,9 @@ void socket_down_writer(ErlNifEnv* env, descP->currentWriter.pid, descP->currentWriter.ref) ); - if ((res = enif_select(env, - descP->sock, - (ERL_NIF_SELECT_WRITE), - descP, - &descP->currentWriter.pid, - descP->currentWriter.ref) < 0)) { + if ((res = esock_select_write(env, descP->sock, descP, + &descP->currentWriter.pid, + descP->currentWriter.ref) < 0)) { esock_warning_msg("Failed select (%d) for new writer " "after current (%T) died\r\n", @@ -17967,17 +18062,14 @@ void socket_down_reader(ErlNifEnv* env, descP->currentReader.pid, descP->currentReader.ref) ); - if ((res = enif_select(env, - descP->sock, - (ERL_NIF_SELECT_READ), - descP, - &descP->currentReader.pid, - descP->currentReader.ref) < 0)) { + if ((res = esock_select_read(env, descP->sock, descP, + &descP->currentReader.pid, + descP->currentReader.ref) < 0)) { esock_warning_msg("Failed select (%d) for new reader " "after current (%T) died\r\n", res, *pid); - + } } else { @@ -18327,6 +18419,7 @@ int on_load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info) esock_atom_scope_id = MKA(env, "scope_id"); esock_atom_sctp = MKA(env, "sctp"); esock_atom_sec = MKA(env, "sec"); + esock_atom_select_failed = MKA(env, "select_failed"); esock_atom_select_sent = MKA(env, "select_sent"); esock_atom_send = MKA(env, "send"); esock_atom_sendmsg = MKA(env, "sendmsg"); diff --git a/erts/preloaded/ebin/socket.beam b/erts/preloaded/ebin/socket.beam Binary files differindex 93b5fc215b..e44dff8475 100644 --- a/erts/preloaded/ebin/socket.beam +++ b/erts/preloaded/ebin/socket.beam diff --git a/erts/preloaded/src/socket.erl b/erts/preloaded/src/socket.erl index 12b6c3ac55..5c1647290d 100644 --- a/erts/preloaded/src/socket.erl +++ b/erts/preloaded/src/socket.erl @@ -2111,6 +2111,16 @@ do_recvmsg(SockRef, BufSz, CtrlSz, EFlags, Timeout) -> %% %% close - close a file descriptor %% +%% Closing a socket is a two stage rocket (because of linger). +%% We need to perform the actual socket close while in BLOCKING mode. +%% But that would hang the entire VM, so what we do is divide the +%% close in two steps: +%% 1) nif_close + the socket_stop (nif) callback function +%% This is for everything that can be done safely NON-BLOCKING. +%% 2) nif_finalize_close which is executed by a *dirty* scheduler +%% Before we call the socket close function, we se the socket +%% BLOCKING. Thereby linger is handled properly. + -spec close(Socket) -> ok | {error, Reason} when Socket :: socket(), @@ -2124,16 +2134,10 @@ do_close(SockRef) -> ok -> nif_finalize_close(SockRef); {ok, CloseRef} -> - %% We must wait + %% We must wait for the socket_stop callback function to + %% complete its work receive - {'$socket', _, close, CloseRef} -> -%% {close, CloseRef} -> - %% <KOLLA> - %% - %% WHAT HAPPENS IF THIS PROCESS IS KILLED - %% BEFORE WE CAN EXECUTE THE FINAL CLOSE??? - %% - %% </KOLLA> + {'$socket', SockRef, close, CloseRef} -> nif_finalize_close(SockRef) end; {error, _} = ERROR -> |