diff options
author | Micael Karlberg <[email protected]> | 2019-02-21 14:57:59 +0100 |
---|---|---|
committer | Micael Karlberg <[email protected]> | 2019-02-22 19:45:54 +0100 |
commit | 4ff181e69fc129576333c04a9d1d0c97b6770347 (patch) | |
tree | fe6b0e6ca67e64948d317c58734d0562563b4b2a /erts/emulator | |
parent | b71c341212fa1ff07eac914e07bc57303c2c4385 (diff) | |
download | otp-4ff181e69fc129576333c04a9d1d0c97b6770347.tar.gz otp-4ff181e69fc129576333c04a9d1d0c97b6770347.tar.bz2 otp-4ff181e69fc129576333c04a9d1d0c97b6770347.zip |
[socket] More fixes to socket close
Better handling of socket close. The 'closeRef' needed
its own environment to make if work in both cases (both
called and scheduled).
Also Introduced the enif select wrapper functions (read,
write, stop and cancel). Also add error handling at every
time one of these functions are called.
OTP-14831
Diffstat (limited to 'erts/emulator')
-rw-r--r-- | erts/emulator/nifs/common/socket_int.h | 5 | ||||
-rw-r--r-- | erts/emulator/nifs/common/socket_nif.c | 659 |
2 files changed, 377 insertions, 287 deletions
diff --git a/erts/emulator/nifs/common/socket_int.h b/erts/emulator/nifs/common/socket_int.h index 0f973855ae..a9e83adc21 100644 --- a/erts/emulator/nifs/common/socket_int.h +++ b/erts/emulator/nifs/common/socket_int.h @@ -258,6 +258,7 @@ extern ERL_NIF_TERM esock_atom_rxq_ovfl; extern ERL_NIF_TERM esock_atom_scope_id; extern ERL_NIF_TERM esock_atom_sctp; extern ERL_NIF_TERM esock_atom_sec; +extern ERL_NIF_TERM esock_atom_select_failed; extern ERL_NIF_TERM esock_atom_select_sent; extern ERL_NIF_TERM esock_atom_send; extern ERL_NIF_TERM esock_atom_sendmsg; @@ -349,10 +350,6 @@ extern ERL_NIF_TERM esock_atom_einval; #define MON_INIT(M) esock_monitor_init((M)) // #define MON_COMP(M1, M2) esock_monitor_compare((M1), (M2)) -#define SELECT(E,FD,M,O,P,R) \ - if (enif_select((E), (FD), (M), (O), (P), (R)) < 0) \ - return enif_make_badarg((E)); - #define COMPARE(A, B) enif_compare((A), (B)) #define IS_ATOM(E, TE) enif_is_atom((E), (TE)) diff --git a/erts/emulator/nifs/common/socket_nif.c b/erts/emulator/nifs/common/socket_nif.c index c0a37450f9..d860cb4965 100644 --- a/erts/emulator/nifs/common/socket_nif.c +++ b/erts/emulator/nifs/common/socket_nif.c @@ -834,6 +834,7 @@ typedef struct { ErlNifPid closerPid; // ErlNifMonitor closerMon; ESockMonitor closerMon; + ErlNifEnv* closeEnv; ERL_NIF_TERM closeRef; BOOLEAN_T closeLocal; @@ -2335,9 +2336,8 @@ static void socket_down_reader(ErlNifEnv* env, SocketDescriptor* descP, const ErlNifPid* pid); -static char* esock_send_close_msg(ErlNifEnv* env, - ERL_NIF_TERM closeRef, - ErlNifPid* pid); +static char* esock_send_close_msg(ErlNifEnv* env, + SocketDescriptor* descP); static char* esock_send_abort_msg(ErlNifEnv* env, ERL_NIF_TERM sockRef, ERL_NIF_TERM recvRef, @@ -2354,6 +2354,24 @@ static char* esock_send_msg(ErlNifEnv* env, ErlNifPid* pid, ErlNifEnv* msg_env); +static int esock_select_read(ErlNifEnv* env, + ErlNifEvent event, + void* obj, + const ErlNifPid* pid, + ERL_NIF_TERM ref); +static int esock_select_write(ErlNifEnv* env, + ErlNifEvent event, + void* obj, + const ErlNifPid* pid, + ERL_NIF_TERM ref); +static int esock_select_stop(ErlNifEnv* env, + ErlNifEvent event, + void* obj); +static int esock_select_cancel(ErlNifEnv* env, + ErlNifEvent event, + enum ErlNifSelectFlags mode, + void* obj); + static BOOLEAN_T extract_debug(ErlNifEnv* env, ERL_NIF_TERM map); static BOOLEAN_T extract_iow(ErlNifEnv* env, @@ -2618,6 +2636,7 @@ ERL_NIF_TERM esock_atom_rxq_ovfl; ERL_NIF_TERM esock_atom_scope_id; ERL_NIF_TERM esock_atom_sctp; ERL_NIF_TERM esock_atom_sec; +ERL_NIF_TERM esock_atom_select_failed; ERL_NIF_TERM esock_atom_select_sent; ERL_NIF_TERM esock_atom_send; ERL_NIF_TERM esock_atom_sendmsg; @@ -4582,7 +4601,17 @@ ERL_NIF_TERM nif_connect(ErlNifEnv* env, return esock_make_error_str(env, xres); } + /* + * <KOLLA> + * + * We should lock both the read and write mutex:es... + * + * </KOLLA> + * + */ + return nconnect(env, descP); + #endif // if !defined(__WIN32__) } @@ -4592,16 +4621,12 @@ static ERL_NIF_TERM nconnect(ErlNifEnv* env, SocketDescriptor* descP) { - int code, save_errno = 0; + ERL_NIF_TERM res; + int code, sres, save_errno = 0; /* - * <KOLLA> - * - * We should look both the read and write mutex:es... - * - * </KOLLA> - * - * Verify that we are where in the proper state */ + * Verify that we are where in the proper state + */ if (!IS_OPEN(descP)) { SSDBG( descP, ("SOCKET", "nif_connect -> not open\r\n") ); @@ -4629,13 +4654,17 @@ ERL_NIF_TERM nconnect(ErlNifEnv* env, if (IS_SOCKET_ERROR(code) && ((save_errno == ERRNO_BLOCK) || /* Winsock2 */ (save_errno == EINPROGRESS))) { /* Unix & OSE!! */ + /* THIS DOES NOT WORK!! WE NEED A "PERISTENT" ENV!! */ ERL_NIF_TERM ref = MKREF(env); descP->state = SOCKET_STATE_CONNECTING; - SELECT(env, - descP->sock, - (ERL_NIF_SELECT_WRITE), - descP, NULL, ref); - return esock_make_ok2(env, ref); + if ((sres = esock_select_write(env, descP->sock, descP, NULL, ref)) < 0) { + res = esock_make_error(env, + MKT2(env, + esock_atom_select_failed, + MKI(env, sres))); + } else { + res = esock_make_ok2(env, ref); + } } else if (code == 0) { /* ok we are connected */ descP->state = SOCKET_STATE_CONNECTED; descP->isReadable = TRUE; @@ -4643,11 +4672,13 @@ ERL_NIF_TERM nconnect(ErlNifEnv* env, /* Do we need to do somthing for "active" mode? * Is there even such a thing *here*? */ - return esock_atom_ok; + res = esock_atom_ok; } else { - return esock_make_error_errno(env, save_errno); + res = esock_make_error_errno(env, save_errno); } + return res; + } #endif // if !defined(__WIN32__) @@ -4925,7 +4956,7 @@ ERL_NIF_TERM naccept_listening(ErlNifEnv* env, unsigned int n; SOCKET accSock; HANDLE accEvent; - int save_errno; + int sres, save_errno; ErlNifPid caller; SSDBG( descP, ("SOCKET", "naccept_listening -> get caller\r\n") ); @@ -4960,37 +4991,41 @@ ERL_NIF_TERM naccept_listening(ErlNifEnv* env, descP->currentAcceptor.ref = enif_make_copy(descP->env, ref); descP->currentAcceptorP = &descP->currentAcceptor; - SELECT(env, - descP->sock, - (ERL_NIF_SELECT_READ), - descP, NULL, ref); - - /* Shall we really change state? - * The ready event is sent directly to the calling - * process, which simply calls this function again. - * Basically, state accepting means that we have - * an "outstanding" accept. - * Shall we store the pid of the calling process? - * And if someone else calls accept, return with ebusy? - * Can any process call accept or just the controlling - * process? - * We also need a monitor it case the calling process is - * called before we are done! - * - * Change state (to accepting) and store pid of the acceptor - * (current process). Only accept calls from the acceptor - * process (ebusy) and once we have a successful accept, - * change state back to listening. If cancel is called instead - * (only accepted from the acceptor process), we reset - * state to listening and also resets the pid to "null" - * (is there such a value?). - * Need a mutex to secure that we don't test and change the - * pid at the same time. - */ + if ((sres = esock_select_read(env, descP->sock, descP, + NULL, ref)) < 0) { + return esock_make_error(env, + MKT2(env, + esock_atom_select_failed, + MKI(env, sres))); + } else { - descP->state = SOCKET_STATE_ACCEPTING; + /* Shall we really change state? + * The ready event is sent directly to the calling + * process, which simply calls this function again. + * Basically, state accepting means that we have + * an "outstanding" accept. + * Shall we store the pid of the calling process? + * And if someone else calls accept, return with ebusy? + * Can any process call accept or just the controlling + * process? + * We also need a monitor it case the calling process is + * called before we are done! + * + * Change state (to accepting) and store pid of the acceptor + * (current process). Only accept calls from the acceptor + * process (ebusy) and once we have a successful accept, + * change state back to listening. If cancel is called instead + * (only accepted from the acceptor process), we reset + * state to listening and also resets the pid to "null" + * (is there such a value?). + * Need a mutex to secure that we don't test and change the + * pid at the same time. + */ - return esock_make_error(env, esock_atom_eagain); + descP->state = SOCKET_STATE_ACCEPTING; + + return esock_make_error(env, esock_atom_eagain); + } } else { SSDBG( descP, @@ -5071,7 +5106,7 @@ ERL_NIF_TERM naccept_accepting(ErlNifEnv* env, SOCKET accSock; HANDLE accEvent; ErlNifPid caller; - int save_errno; + int save_errno, sres; ERL_NIF_TERM result; SSDBG( descP, ("SOCKET", "naccept_accepting -> get caller\r\n") ); @@ -5125,12 +5160,18 @@ ERL_NIF_TERM naccept_accepting(ErlNifEnv* env, ("SOCKET", "naccept_accepting -> would block: try again\r\n") ); - SELECT(env, - descP->sock, - (ERL_NIF_SELECT_READ), - descP, NULL, ref); + if ((sres = esock_select_read(env, descP->sock, descP, + NULL, ref)) < 0) { + result = esock_make_error(env, + MKT2(env, + esock_atom_select_failed, + MKI(env, sres))); + } else { + result = esock_make_error(env, esock_atom_eagain); + } + + return result; - return esock_make_error(env, esock_atom_eagain); } else { SSDBG( descP, ("SOCKET", @@ -5201,10 +5242,14 @@ ERL_NIF_TERM naccept_accepting(ErlNifEnv* env, descP->currentAcceptor.pid, descP->currentAcceptor.ref) ); - SELECT(env, - descP->sock, - (ERL_NIF_SELECT_READ), - descP, &descP->currentAcceptor.pid, descP->currentAcceptor.ref); + if ((sres = esock_select_read(env, descP->sock, descP, + &descP->currentAcceptor.pid, + descP->currentAcceptor.ref)) < 0) { + return esock_make_error(env, + MKT2(env, + esock_atom_select_failed, + MKI(env, sres))); + } } else { descP->currentAcceptorP = NULL; descP->state = SOCKET_STATE_LISTENING; @@ -6379,12 +6424,10 @@ ERL_NIF_TERM nclose(ErlNifEnv* env, doClose = TRUE; } - MUNLOCK(descP->closeMtx); - if (doClose) { - descP->closeRef = MKREF(env); - selectRes = enif_select(env, descP->sock, (ERL_NIF_SELECT_STOP), - descP, NULL, descP->closeRef); + descP->closeEnv = enif_alloc_env(); + descP->closeRef = MKREF(descP->closeEnv); + selectRes = esock_select_stop(env, descP->sock, descP); if (selectRes & ERL_NIF_SELECT_STOP_CALLED) { /* Prep done - inform the caller it can finalize (close) directly */ SSDBG( descP, @@ -6421,10 +6464,13 @@ ERL_NIF_TERM nclose(ErlNifEnv* env, reason = MKT2(env, atom_select, MKI(env, selectRes)); reply = esock_make_error(env, reason); } + } else { reply = esock_make_error(env, reason); } + MUNLOCK(descP->closeMtx); + SSDBG( descP, ("SOCKET", "nclose -> [%d] done when: " "\r\n state: 0x%lX" @@ -13065,6 +13111,7 @@ static ERL_NIF_TERM ncancel_accept_current(ErlNifEnv* env, SocketDescriptor* descP) { + int sres; ERL_NIF_TERM res; SSDBG( descP, ("SOCKET", "ncancel_accept_current -> entry\r\n") ); @@ -13089,11 +13136,14 @@ ERL_NIF_TERM ncancel_accept_current(ErlNifEnv* env, descP->currentAcceptor.pid, descP->currentAcceptor.ref) ); - SELECT(env, - descP->sock, - (ERL_NIF_SELECT_READ), - descP, &descP->currentAcceptor.pid, descP->currentAcceptor.ref); - + if ((sres = esock_select_read(env, descP->sock, descP, + &descP->currentAcceptor.pid, + descP->currentAcceptor.ref)) < 0) { + return esock_make_error(env, + MKT2(env, + esock_atom_select_failed, + MKI(env, sres))); + } } else { SSDBG( descP, ("SOCKET", "ncancel_accept_current -> no more acceptors\r\n") ); descP->currentAcceptorP = NULL; @@ -13185,6 +13235,7 @@ static ERL_NIF_TERM ncancel_send_current(ErlNifEnv* env, SocketDescriptor* descP) { + int sres; ERL_NIF_TERM res; SSDBG( descP, ("SOCKET", "ncancel_send_current -> entry\r\n") ); @@ -13209,11 +13260,14 @@ ERL_NIF_TERM ncancel_send_current(ErlNifEnv* env, descP->currentWriter.pid, descP->currentWriter.ref) ); - SELECT(env, - descP->sock, - (ERL_NIF_SELECT_WRITE), - descP, &descP->currentWriter.pid, descP->currentWriter.ref); - + if ((sres = esock_select_write(env, descP->sock, descP, + &descP->currentWriter.pid, + descP->currentWriter.ref)) < 0) { + return esock_make_error(env, + MKT2(env, + esock_atom_select_failed, + MKI(env, sres))); + } } else { SSDBG( descP, ("SOCKET", "ncancel_send_current -> no more writers\r\n") ); descP->currentWriterP = NULL; @@ -13303,6 +13357,7 @@ static ERL_NIF_TERM ncancel_recv_current(ErlNifEnv* env, SocketDescriptor* descP) { + int sres; ERL_NIF_TERM res; SSDBG( descP, ("SOCKET", "ncancel_recv_current -> entry\r\n") ); @@ -13327,11 +13382,14 @@ ERL_NIF_TERM ncancel_recv_current(ErlNifEnv* env, descP->currentReader.pid, descP->currentReader.ref) ); - SELECT(env, - descP->sock, - (ERL_NIF_SELECT_READ), - descP, &descP->currentReader.pid, descP->currentReader.ref); - + if ((sres = esock_select_read(env, descP->sock, descP, + &descP->currentReader.pid, + descP->currentReader.ref)) < 0) { + return esock_make_error(env, + MKT2(env, + esock_atom_select_failed, + MKI(env, sres))); + } } else { SSDBG( descP, ("SOCKET", "ncancel_recv_current -> no more readers\r\n") ); descP->currentReaderP = NULL; @@ -13399,9 +13457,7 @@ ERL_NIF_TERM ncancel_mode_select(ErlNifEnv* env, int smode, int rmode) { - int selectRes = enif_select(env, descP->sock, - (ERL_NIF_SELECT_CANCEL | smode), - descP, NULL, opRef); + int selectRes = esock_select_cancel(env, descP->sock, smode, descP); if (selectRes & rmode) { /* Was cancelled */ @@ -13498,6 +13554,8 @@ ERL_NIF_TERM send_check_result(ErlNifEnv* env, ERL_NIF_TERM sockRef, ERL_NIF_TERM sendRef) { + int sres; + SSDBG( descP, ("SOCKET", "send_check_result -> entry with" "\r\n written: %d" @@ -13533,11 +13591,14 @@ ERL_NIF_TERM send_check_result(ErlNifEnv* env, descP->currentWriter.pid, descP->currentWriter.ref) ); - SELECT(env, - descP->sock, - (ERL_NIF_SELECT_WRITE), - descP, &descP->currentWriter.pid, descP->currentWriter.ref); - + if ((sres = esock_select_write(env, descP->sock, descP, + &descP->currentWriter.pid, + descP->currentWriter.ref)) < 0) { + return esock_make_error(env, + MKT2(env, + esock_atom_select_failed, + MKI(env, sres))); + } } else { descP->currentWriterP = NULL; } @@ -13615,12 +13676,34 @@ ERL_NIF_TERM send_check_result(ErlNifEnv* env, cnt_inc(&descP->writeWaits, 1); - SELECT(env, descP->sock, (ERL_NIF_SELECT_WRITE), descP, NULL, sendRef); - - if (written >= 0) - return esock_make_ok2(env, MKI(env, written)); - else - return esock_make_error(env, esock_atom_eagain); + sres = esock_select_write(env, descP->sock, descP, NULL, sendRef); + + if (written >= 0) { + if (sres < 0) { + /* Returned: {error, Reason} + * Reason: {select_failed, sres, written} + */ + return esock_make_error(env, + MKT3(env, + esock_atom_select_failed, + MKI(env, sres), + MKI(env, written))); + } else { + return esock_make_ok2(env, MKI(env, written)); + } + } else { + if (sres < 0) { + /* Returned: {error, Reason} + * Reason: {select_failed, sres} + */ + return esock_make_error(env, + MKT2(env, + esock_atom_select_failed, + MKI(env, sres))); + } else { + return esock_make_error(env, esock_atom_eagain); + } + } } @@ -13719,6 +13802,9 @@ static ERL_NIF_TERM recv_update_current_reader(ErlNifEnv* env, SocketDescriptor* descP) { + int sres; + ERL_NIF_TERM res = esock_atom_ok; + if (descP->currentReaderP != NULL) { DEMONP("recv_update_current_reader -> current reader", @@ -13739,19 +13825,20 @@ ERL_NIF_TERM recv_update_current_reader(ErlNifEnv* env, descP->currentReader.pid, descP->currentReader.ref) ); - SELECT(env, - descP->sock, - (ERL_NIF_SELECT_READ), - descP, - &descP->currentReader.pid, - descP->currentReader.ref); - + if ((sres = esock_select_read(env, descP->sock, descP, + &descP->currentReader.pid, + descP->currentReader.ref)) < 0) { + res = esock_make_error(env, + MKT2(env, + esock_atom_select_failed, + MKI(env, sres))); + } } else { descP->currentReaderP = NULL; } } - return esock_atom_ok; + return res; } @@ -13805,7 +13892,8 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env, ERL_NIF_TERM recvRef) { char* xres; - ERL_NIF_TERM data; + int sres; + ERL_NIF_TERM res, data; SSDBG( descP, ("SOCKET", "recv_check_result -> entry with" @@ -13825,7 +13913,8 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env, */ if ((read == 0) && (descP->type == SOCK_STREAM)) { - ERL_NIF_TERM res = esock_make_error(env, atom_closed); + + res = esock_make_error(env, atom_closed); /* * When a stream socket peer has performed an orderly shutdown, the return @@ -13961,7 +14050,8 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env, FREE_BIN(bufP); if (saveErrno == ECONNRESET) { - ERL_NIF_TERM res = esock_make_error(env, atom_closed); + + res = esock_make_error(env, atom_closed); /* +++ Oups - closed +++ */ @@ -13988,16 +14078,16 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env, recv_error_current_reader(env, descP, sockRef, res); - SELECT(env, - descP->sock, - (ERL_NIF_SELECT_STOP), - descP, NULL, recvRef); + if ((sres = esock_select_stop(env, descP->sock, descP)) < 0) { + esock_warning_msg("Failed stop select (closed) " + "for current reader (%T): %d\r\n", + recvRef, sres); + } return res; } else if ((saveErrno == ERRNO_BLOCK) || (saveErrno == EAGAIN)) { - int sres; SSDBG( descP, ("SOCKET", "recv_check_result -> [%d] eagain\r\n", toRead) ); @@ -14008,13 +14098,17 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env, SSDBG( descP, ("SOCKET", "recv_check_result -> SELECT for more\r\n") ); - sres = enif_select(env, descP->sock, (ERL_NIF_SELECT_READ), - descP, NULL, recvRef); + if ((sres = esock_select_read(env, descP->sock, descP, + NULL, recvRef)) < 0) { + res = esock_make_error(env, + MKT2(env, + esock_atom_select_failed, + MKI(env, sres))); + } else { + res = esock_make_error(env, esock_atom_eagain); + } - SSDBG( descP, - ("SOCKET", "recv_check_result -> SELECT res: %d\r\n", sres) ); - - return esock_make_error(env, esock_atom_eagain); + return res; } else { ERL_NIF_TERM res = esock_make_error_errno(env, saveErrno); @@ -14077,20 +14171,31 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env, return esock_make_error_str(env, xres); } - /* SELECT for more data */ + data = MKBIN(env, bufP); + data = MKSBIN(env, data, 0, read); - SELECT(env, descP->sock, (ERL_NIF_SELECT_READ), - descP, NULL, recvRef); - cnt_inc(&descP->readByteCnt, read); + /* SELECT for more data */ + + if ((sres = esock_select_read(env, descP->sock, descP, + NULL, recvRef)) < 0) { + /* Result: {error, Reason} + * Reason: {select_failed, sres, data} + */ + res = esock_make_error(env, + MKT3(env, + esock_atom_select_failed, + MKI(env, sres), + data)); + } else { + res = esock_make_ok3(env, atom_false, data); + } + /* This transfers "ownership" of the *allocated* binary to an * erlang term (no need for an explicit free). */ - data = MKBIN(env, bufP); - data = MKSBIN(env, data, 0, read); - - return esock_make_ok3(env, atom_false, data); + return res; } } } @@ -14113,7 +14218,8 @@ ERL_NIF_TERM recvfrom_check_result(ErlNifEnv* env, ERL_NIF_TERM recvRef) { char* xres; - ERL_NIF_TERM data; + int sres; + ERL_NIF_TERM data, res; SSDBG( descP, ("SOCKET", "recvfrom_check_result -> entry with" @@ -14133,7 +14239,7 @@ ERL_NIF_TERM recvfrom_check_result(ErlNifEnv* env, /* +++ Error handling +++ */ if (saveErrno == ECONNRESET) { - ERL_NIF_TERM res = esock_make_error(env, atom_closed); + res = esock_make_error(env, atom_closed); /* +++ Oups - closed +++ */ @@ -14153,10 +14259,11 @@ ERL_NIF_TERM recvfrom_check_result(ErlNifEnv* env, recv_error_current_reader(env, descP, sockRef, res); - SELECT(env, - descP->sock, - (ERL_NIF_SELECT_STOP), - descP, NULL, recvRef); + if ((sres = esock_select_stop(env, descP->sock, descP)) < 0) { + esock_warning_msg("Failed stop select (closed) " + "for current reader (%T): %d\r\n", + recvRef, sres); + } FREE_BIN(bufP); @@ -14166,19 +14273,26 @@ ERL_NIF_TERM recvfrom_check_result(ErlNifEnv* env, (saveErrno == EAGAIN)) { SSDBG( descP, ("SOCKET", "recvfrom_check_result -> eagain\r\n") ); - - if ((xres = recv_init_current_reader(env, descP, recvRef)) != NULL) - return esock_make_error_str(env, xres); - - SELECT(env, descP->sock, (ERL_NIF_SELECT_READ), - descP, NULL, recvRef); FREE_BIN(bufP); - return esock_make_error(env, esock_atom_eagain); + if ((xres = recv_init_current_reader(env, descP, recvRef)) != NULL) + return esock_make_error_str(env, xres); + + if ((sres = esock_select_read(env, descP->sock, descP, + NULL, recvRef)) < 0) { + res = esock_make_error(env, + MKT2(env, + esock_atom_select_failed, + MKI(env, sres))); + } else { + res = esock_make_error(env, esock_atom_eagain); + } + return res; } else { - ERL_NIF_TERM res = esock_make_error_errno(env, saveErrno); + + res = esock_make_error_errno(env, saveErrno); SSDBG( descP, ("SOCKET", @@ -14240,6 +14354,8 @@ ERL_NIF_TERM recvmsg_check_result(ErlNifEnv* env, ERL_NIF_TERM sockRef, ERL_NIF_TERM recvRef) { + int sres; + ERL_NIF_TERM res; SSDBG( descP, ("SOCKET", "recvmsg_check_result -> entry with" @@ -14284,7 +14400,6 @@ ERL_NIF_TERM recvmsg_check_result(ErlNifEnv* env, /* +++ Error handling +++ */ if (saveErrno == ECONNRESET) { - ERL_NIF_TERM res = esock_make_error(env, atom_closed); /* +++ Oups - closed +++ */ @@ -14299,19 +14414,21 @@ ERL_NIF_TERM recvmsg_check_result(ErlNifEnv* env, * </KOLLA> */ + res = esock_make_error(env, atom_closed); descP->closeLocal = FALSE; descP->state = SOCKET_STATE_CLOSING; recv_error_current_reader(env, descP, sockRef, res); - SELECT(env, - descP->sock, - (ERL_NIF_SELECT_STOP), - descP, NULL, recvRef); + if ((sres = esock_select_stop(env, descP->sock, descP)) < 0) { + esock_warning_msg("Failed stop select (closed) " + "for current reader (%T): %d\r\n", + recvRef, sres); + } FREE_BIN(dataBufP); FREE_BIN(ctrlBufP); - return res; + return res;; } else if ((saveErrno == ERRNO_BLOCK) || (saveErrno == EAGAIN)) { @@ -14319,18 +14436,26 @@ ERL_NIF_TERM recvmsg_check_result(ErlNifEnv* env, SSDBG( descP, ("SOCKET", "recvmsg_check_result -> eagain\r\n") ); + FREE_BIN(dataBufP); FREE_BIN(ctrlBufP); + if ((xres = recv_init_current_reader(env, descP, recvRef)) != NULL) return esock_make_error_str(env, xres); - SELECT(env, descP->sock, (ERL_NIF_SELECT_READ), - descP, NULL, recvRef); - - FREE_BIN(dataBufP); FREE_BIN(ctrlBufP); + if ((sres = esock_select_read(env, descP->sock, descP, + NULL, recvRef)) < 0) { + res = esock_make_error(env, + MKT2(env, + esock_atom_select_failed, + MKI(env, sres))); + } else { + res = esock_make_error(env, esock_atom_eagain); + } - return esock_make_error(env, esock_atom_eagain); + return res; } else { - ERL_NIF_TERM res = esock_make_error_errno(env, saveErrno); + + res = esock_make_error_errno(env, saveErrno); SSDBG( descP, ("SOCKET", @@ -16069,6 +16194,8 @@ SocketDescriptor* alloc_descriptor(SOCKET sock, HANDLE event) sprintf(buf, "socket[close,%d]", sock); descP->closeMtx = MCREATE(buf); + descP->closeEnv = NULL; + descP->closeRef = esock_atom_undefined; descP->rBufSz = SOCKET_RECV_BUFFER_SIZE_DEFAULT; descP->rNum = 0; @@ -16647,13 +16774,20 @@ char* send_msg_error(ErlNifEnv* env, * (actually that the 'stop' callback function has been called). */ static -char* esock_send_close_msg(ErlNifEnv* env, - ERL_NIF_TERM closeRef, - ErlNifPid* pid) +char* esock_send_close_msg(ErlNifEnv* env, + SocketDescriptor* descP) { - return esock_send_socket_msg(env, - esock_atom_undefined, - esock_atom_close, closeRef, pid, NULL); + ERL_NIF_TERM sockRef = enif_make_resource(descP->closeEnv, descP); + char* res = esock_send_socket_msg(env, + sockRef, + esock_atom_close, + descP->closeRef, + &descP->closerPid, + descP->closeEnv); + + descP->closeEnv = NULL; + + return res; } @@ -16673,7 +16807,8 @@ char* esock_send_abort_msg(ErlNifEnv* env, ErlNifPid* pid) { ErlNifEnv* msg_env = enif_alloc_env(); - ERL_NIF_TERM info = MKT2(msg_env, enif_make_copy(msg_env, recvRef), + ERL_NIF_TERM info = MKT2(msg_env, + enif_make_copy(msg_env, recvRef), enif_make_copy(msg_env, reason)); return esock_send_socket_msg(env, sockRef, esock_atom_abort, info, pid, @@ -16684,10 +16819,16 @@ char* esock_send_abort_msg(ErlNifEnv* env, /* *** esock_send_socket_msg *** * * This function sends a general purpose socket message to an erlang - * process. A general 'socket' message has the form: + * process. A general 'socket' message has the ("erlang") form: * * {'$socket', SockRef, Tag, Info} * + * Where + * + * SockRef: reference() + * Tag: atom() + * Info: term() + * */ static @@ -16696,7 +16837,7 @@ char* esock_send_socket_msg(ErlNifEnv* env, ERL_NIF_TERM tag, ERL_NIF_TERM info, ErlNifPid* pid, - ErlNifEnv* msg_env) + ErlNifEnv* msg_env) { ERL_NIF_TERM msg; if (!msg_env) { @@ -16731,6 +16872,52 @@ char* esock_send_msg(ErlNifEnv* env, #endif // #if defined(__WIN32__) +/* ---------------------------------------------------------------------- + * S e l e c t W r a p p e r F u n c t i o n s + * ---------------------------------------------------------------------- + */ + +static +int esock_select_read(ErlNifEnv* env, + ErlNifEvent event, + void* obj, + const ErlNifPid* pid, + ERL_NIF_TERM ref) +{ + return enif_select(env, event, (ERL_NIF_SELECT_READ), obj, pid, ref); +} + + +static +int esock_select_write(ErlNifEnv* env, + ErlNifEvent event, + void* obj, + const ErlNifPid* pid, + ERL_NIF_TERM ref) +{ + return enif_select(env, event, (ERL_NIF_SELECT_WRITE), obj, pid, ref); +} + + +static +int esock_select_stop(ErlNifEnv* env, + ErlNifEvent event, + void* obj) +{ + return enif_select(env, event, (ERL_NIF_SELECT_STOP), obj, NULL, + esock_atom_undefined); +} + +static +int esock_select_cancel(ErlNifEnv* env, + ErlNifEvent event, + enum ErlNifSelectFlags mode, + void* obj) +{ + return enif_select(env, event, (ERL_NIF_SELECT_CANCEL | mode), obj, NULL, + esock_atom_undefined); +} + /* ---------------------------------------------------------------------- * R e q u e s t Q u e u e F u n c t i o n s @@ -17180,20 +17367,13 @@ int esock_monitor(const char* slogan, int res; SSDBG( descP, ("SOCKET", "[%d] %s: try monitor\r\n", descP->sock, slogan) ); - /* esock_dbg_printf("MONP", "[%d] %s\r\n", descP->sock, slogan); */ res = enif_monitor_process(env, descP, pid, &monP->mon); if (res != 0) { monP->is_active = 0; SSDBG( descP, ("SOCKET", "[%d] monitor failed: %d\r\n", descP->sock, res) ); - // esock_dbg_printf("MONP", "[%d] failed: %d\r\n", descP->sock, res); } else { monP->is_active = 1; - /*esock_dbg_printf("MONP", - "[%d] success: " - "%T\r\n", - descP->sock, - my_make_monitor_term(env, &monP->mon));*/ } return res; @@ -17294,16 +17474,6 @@ void socket_dtor(ErlNifEnv* env, void* obj) * (in case it is, for instance, both controlling process * and a writer). * - * <KOLLA> - * - * We do not handle linger-issues yet! So anything in the out - * buffers will be left for the OS to solve... - * Do we need a special "close"-thread? Dirty scheduler? - * - * What happens if we are "stopped" for another reason then 'close'? - * For instance, down? - * - * </KOLLA> */ static void socket_stop(ErlNifEnv* env, void* obj, int fd, int is_direct_call) @@ -17321,7 +17491,7 @@ void socket_stop(ErlNifEnv* env, void* obj, int fd, int is_direct_call) MLOCK(descP->writeMtx); MLOCK(descP->readMtx); MLOCK(descP->accMtx); - MLOCK(descP->closeMtx); + if (!is_direct_call) MLOCK(descP->closeMtx); SSDBG( descP, ("SOCKET", "socket_stop -> " "[%d, %T] all mutex(s) locked when counters:" @@ -17360,10 +17530,6 @@ void socket_stop(ErlNifEnv* env, void* obj, int fd, int is_direct_call) */ DEMONP("socket_stop -> ctrl", env, descP, &descP->ctrlMon); - /* - esock_dbg_printf("STOP", "[%d] maybe handle current writer (0x%lX)\r\n", - descP->sock, descP->currentReaderP); - */ if (descP->currentWriterP != NULL) { /* We have a (current) writer and *may* therefor also have * writers waiting. @@ -17399,10 +17565,6 @@ void socket_stop(ErlNifEnv* env, void* obj, int fd, int is_direct_call) inform_waiting_procs(env, descP, &descP->writersQ, TRUE, atom_closed); } - /* - esock_dbg_printf("STOP", "[%d] maybe handle current reader (0x%lX)\r\n", - descP->sock, descP->currentReaderP); - */ if (descP->currentReaderP != NULL) { /* We have a (current) reader and *may* therefor also have @@ -17439,10 +17601,6 @@ void socket_stop(ErlNifEnv* env, void* obj, int fd, int is_direct_call) inform_waiting_procs(env, descP, &descP->readersQ, TRUE, atom_closed); } - /* - esock_dbg_printf("STOP", "[%d] maybe handle current acceptor (0x%lX)\r\n", - descP->sock, descP->currentReaderP); - */ if (descP->currentAcceptorP != NULL) { /* We have a (current) acceptor and *may* therefor also have * acceptors waiting. @@ -17490,56 +17648,32 @@ void socket_stop(ErlNifEnv* env, void* obj, int fd, int is_direct_call) * * </KOLLA> */ - - if (descP->closeLocal && - !is_direct_call) { - - /* +++ send close message to the waiting process +++ - * - * {close, CloseRef} - * - * <KOLLA> - * - * WHAT HAPPENS IF THE RECEIVER HAS DIED IN THE MEANTIME???? - * - * </KOLLA> - */ - /* We should actually move the sending out of the - * safe region (after the MUNLOCK). - * Construct it here, but do the actual send later. - * Here: - * cpid = descP->closePid; - * cenv = enif_alloc_env(); - * closeMsg = esock_mk_close_msg(env, - * CP_TERM(cenv, sockRef), - * CP_TERM(cenv, descP->closeRef)); - * And after MUNLOCK: - * if (cenv != NULL) { - * esock_send_msg(env, closeMsg, &cpid, cenv); - * - * } - */ - esock_send_close_msg(env, descP->closeRef, &descP->closerPid); - - DEMONP("socket_stop -> closer", env, descP, &descP->closerMon); - - } else { - - /* - * <KOLLA> - * - * ABORT? - * - * </KOLLA> - */ + if (descP->closeLocal) { + if (!is_direct_call) { + + /* +++ send close message to the waiting process +++ */ + + esock_send_close_msg(env, descP); + + DEMONP("socket_stop -> closer", env, descP, &descP->closerMon); + + } else { + + /* We only need to explicitly free the environment here + * since the message send takes care of it if scheduled. + */ + + if (descP->closeEnv != NULL) enif_free_env(descP->closeEnv); + + } } } SSDBG( descP, ("SOCKET", "socket_stop -> unlock all mutex(s)\r\n") ); - MUNLOCK(descP->closeMtx); + if (!is_direct_call) MUNLOCK(descP->closeMtx); MUNLOCK(descP->accMtx); MUNLOCK(descP->readMtx); MUNLOCK(descP->writeMtx); @@ -17624,31 +17758,6 @@ void socket_down(ErlNifEnv* env, B2S(IS_CLOSED(descP)), B2S(IS_CLOSING(descP))) ); - /* - esock_dbg_printf("DOWN", - "[%d] begin %T\r\n", - descP->sock, my_make_monitor_term(env, mon)); - */ - - /* - if (MON_COMP(mon, &descP->ctrlMon) == 0) { - SSDBG( descP, ("SOCKET", "socket_down -> controlling process mon\r\n") ); - } else if (MON_COMP(mon, &descP->closerMon) == 0) { - SSDBG( descP, ("SOCKET", "socket_down -> closer mon\r\n") ); - } else if ((descP->currentWriterP != NULL) && - (MON_COMP(mon, &descP->currentWriter.mon) == 0)) { - SSDBG( descP, ("SOCKET", "socket_down -> current writer mon\r\n") ); - } else if ((descP->currentReaderP != NULL) && - (MON_COMP(mon, &descP->currentReader.mon) == 0)) { - SSDBG( descP, ("SOCKET", "socket_down -> current reader mon\r\n") ); - } else if ((descP->currentAcceptorP != NULL) && - (MON_COMP(mon, &descP->currentAcceptor.mon) == 0)) { - SSDBG( descP, ("SOCKET", "socket_down -> current acceptor mon\r\n") ); - } else { - SSDBG( descP, ("SOCKET", "socket_down -> OTHER mon\r\n") ); - } - */ - if (!IS_CLOSED(descP)) { if (compare_pids(env, &descP->ctrlPid, pid)) { int selectRes; @@ -17664,16 +17773,8 @@ void socket_down(ErlNifEnv* env, descP->closeLocal = TRUE; descP->closerPid = *pid; MON_INIT(&descP->closerMon); - descP->closeRef = MKREF(env); // Do we really need this in this case? - - /* - esock_dbg_printf("DOWN", - "[%d] select stop %T\r\n", - descP->sock, my_make_monitor_term(env, mon)); - */ - selectRes = enif_select(env, descP->sock, (ERL_NIF_SELECT_STOP), - descP, NULL, descP->closeRef); + selectRes = esock_select_stop(env, descP->sock, descP); if (selectRes & ERL_NIF_SELECT_STOP_CALLED) { /* We are done - we can finalize (socket close) directly */ @@ -17832,12 +17933,9 @@ void socket_down_acceptor(ErlNifEnv* env, descP->currentAcceptor.pid, descP->currentAcceptor.ref) ); - if ((res = enif_select(env, - descP->sock, - (ERL_NIF_SELECT_READ), - descP, - &descP->currentAcceptor.pid, - descP->currentAcceptor.ref) < 0)) { + if ((res = esock_select_read(env, descP->sock, descP, + &descP->currentAcceptor.pid, + descP->currentAcceptor.ref) < 0)) { esock_warning_msg("Failed select (%d) for new acceptor " "after current (%T) died\r\n", @@ -17900,12 +17998,9 @@ void socket_down_writer(ErlNifEnv* env, descP->currentWriter.pid, descP->currentWriter.ref) ); - if ((res = enif_select(env, - descP->sock, - (ERL_NIF_SELECT_WRITE), - descP, - &descP->currentWriter.pid, - descP->currentWriter.ref) < 0)) { + if ((res = esock_select_write(env, descP->sock, descP, + &descP->currentWriter.pid, + descP->currentWriter.ref) < 0)) { esock_warning_msg("Failed select (%d) for new writer " "after current (%T) died\r\n", @@ -17967,17 +18062,14 @@ void socket_down_reader(ErlNifEnv* env, descP->currentReader.pid, descP->currentReader.ref) ); - if ((res = enif_select(env, - descP->sock, - (ERL_NIF_SELECT_READ), - descP, - &descP->currentReader.pid, - descP->currentReader.ref) < 0)) { + if ((res = esock_select_read(env, descP->sock, descP, + &descP->currentReader.pid, + descP->currentReader.ref) < 0)) { esock_warning_msg("Failed select (%d) for new reader " "after current (%T) died\r\n", res, *pid); - + } } else { @@ -18327,6 +18419,7 @@ int on_load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info) esock_atom_scope_id = MKA(env, "scope_id"); esock_atom_sctp = MKA(env, "sctp"); esock_atom_sec = MKA(env, "sec"); + esock_atom_select_failed = MKA(env, "select_failed"); esock_atom_select_sent = MKA(env, "select_sent"); esock_atom_send = MKA(env, "send"); esock_atom_sendmsg = MKA(env, "sendmsg"); |