diff options
Diffstat (limited to 'erts/emulator/nifs/common')
-rw-r--r-- | erts/emulator/nifs/common/socket_int.h | 8 | ||||
-rw-r--r-- | erts/emulator/nifs/common/socket_nif.c | 587 | ||||
-rw-r--r-- | erts/emulator/nifs/common/socket_util.c | 2 |
3 files changed, 522 insertions, 75 deletions
diff --git a/erts/emulator/nifs/common/socket_int.h b/erts/emulator/nifs/common/socket_int.h index 06f677482c..7d223b8259 100644 --- a/erts/emulator/nifs/common/socket_int.h +++ b/erts/emulator/nifs/common/socket_int.h @@ -339,8 +339,12 @@ extern ERL_NIF_TERM esock_atom_einval; #define MLOCK(M) enif_mutex_lock((M)) #define MUNLOCK(M) enif_mutex_unlock((M)) -#define MONP(E,D,P,M) enif_monitor_process((E), (D), (P), (M)) -#define DEMONP(E,D,M) enif_demonitor_process((E), (D), (M)) +// #define MONP(S,E,D,P,M) enif_monitor_process((E), (D), (P), (M)) +// #define DEMONP(S,E,D,M) enif_demonitor_process((E), (D), (M)) +#define MONP(S,E,D,P,M) esock_monitor((S), (E), (D), (P), (M)) +#define DEMONP(S,E,D,M) esock_demonitor((S), (E), (D), (M)) +#define MON_INIT(M) esock_monitor_init((M)) +// #define MON_COMP(M1, M2) esock_monitor_compare((M1), (M2)) #define SELECT(E,FD,M,O,P,R) \ if (enif_select((E), (FD), (M), (O), (P), (R)) < 0) \ diff --git a/erts/emulator/nifs/common/socket_nif.c b/erts/emulator/nifs/common/socket_nif.c index 4544604f28..a137692d68 100644 --- a/erts/emulator/nifs/common/socket_nif.c +++ b/erts/emulator/nifs/common/socket_nif.c @@ -394,7 +394,18 @@ static void (*esock_sctp_freepaddrs)(struct sockaddr *addrs) = NULL; #define SOCKET_STATE_ACCEPTING (SOCKET_STATE_LISTENING | SOCKET_FLAG_ACC) #define SOCKET_STATE_CLOSING (SOCKET_FLAG_CLOSE) -#define IS_OPEN(d) \ +#define IS_CLOSED(d) \ + ((d)->state == SOCKET_STATE_CLOSED) + +/* +#define IS_STATE(d, f) \ + (((d)->state & (f)) == (f)) +*/ + +#define IS_CLOSING(d) \ + (((d)->state & SOCKET_STATE_CLOSING) == SOCKET_STATE_CLOSING) + +#define IS_OPEN(d) \ (((d)->state & SOCKET_FLAG_OPEN) == SOCKET_FLAG_OPEN) #define IS_CONNECTED(d) \ @@ -723,9 +734,16 @@ static unsigned long one_value = 1; ((sap)->in4.sin_port) : -1) +typedef union { + ErlNifMonitor mon; + uint32_t raw[4]; +} ESockMonitor; + + typedef struct { ErlNifPid pid; // PID of the requesting process - ErlNifMonitor mon; // Monitor to the requesting process + // ErlNifMonitor mon; Monitor to the requesting process + ESockMonitor mon; // Monitor to the requesting process ERL_NIF_TERM ref; // The (unique) reference (ID) of the request } SocketRequestor; @@ -758,7 +776,8 @@ typedef struct { /* +++ Controller (owner) process +++ */ ErlNifPid ctrlPid; - ErlNifMonitor ctrlMon; + // ErlNifMonitor ctrlMon; + ESockMonitor ctrlMon; /* +++ Write stuff +++ */ ErlNifMutex* writeMtx; @@ -801,7 +820,8 @@ typedef struct { /* +++ Close stuff +++ */ ErlNifMutex* closeMtx; ErlNifPid closerPid; - ErlNifMonitor closerMon; + // ErlNifMonitor closerMon; + ESockMonitor closerMon; ERL_NIF_TERM closeRef; BOOLEAN_T closeLocal; @@ -2196,7 +2216,8 @@ static ERL_NIF_TERM acceptor_push(ErlNifEnv* env, static BOOLEAN_T acceptor_pop(ErlNifEnv* env, SocketDescriptor* descP, ErlNifPid* pid, - ErlNifMonitor* mon, + // ErlNifMonitor* mon, + ESockMonitor* mon, ERL_NIF_TERM* ref); static BOOLEAN_T acceptor_unqueue(ErlNifEnv* env, SocketDescriptor* descP, @@ -2212,7 +2233,8 @@ static ERL_NIF_TERM writer_push(ErlNifEnv* env, static BOOLEAN_T writer_pop(ErlNifEnv* env, SocketDescriptor* descP, ErlNifPid* pid, - ErlNifMonitor* mon, + // ErlNifMonitor* mon, + ESockMonitor* mon, ERL_NIF_TERM* ref); static BOOLEAN_T writer_unqueue(ErlNifEnv* env, SocketDescriptor* descP, @@ -2228,7 +2250,8 @@ static ERL_NIF_TERM reader_push(ErlNifEnv* env, static BOOLEAN_T reader_pop(ErlNifEnv* env, SocketDescriptor* descP, ErlNifPid* pid, - ErlNifMonitor* mon, + // ErlNifMonitor* mon, + ESockMonitor* mon, ERL_NIF_TERM* ref); static BOOLEAN_T reader_unqueue(ErlNifEnv* env, SocketDescriptor* descP, @@ -2241,8 +2264,27 @@ static void qpush(SocketRequestQueue* q, SocketRequestQueueElement* e); static SocketRequestQueueElement* qpop(SocketRequestQueue* q); static BOOLEAN_T qunqueue(ErlNifEnv* env, + SocketDescriptor* descP, + const char* slogan, SocketRequestQueue* q, const ErlNifPid* pid); + +static int esock_monitor(const char* slogan, + ErlNifEnv* env, + SocketDescriptor* descP, + const ErlNifPid* pid, + ESockMonitor* mon); +static int esock_demonitor(const char* slogan, + ErlNifEnv* env, + SocketDescriptor* descP, + ESockMonitor* monP); +static void esock_monitor_init(ESockMonitor* mon); +/* +static int esock_monitor_compare(const ErlNifMonitor* mon1, + const ESockMonitor* mon2); +*/ + + /* #if defined(HAVE_SYS_UN_H) || defined(SO_BINDTODEVICE) static size_t my_strnlen(const char *s, size_t maxlen); @@ -2293,6 +2335,7 @@ static BOOLEAN_T extract_iow(ErlNifEnv* env, static int on_load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info); + #if HAVE_IN6 # if ! defined(HAVE_IN6ADDR_ANY) || ! HAVE_IN6ADDR_ANY # if HAVE_DECL_IN6ADDR_ANY_INIT @@ -4193,7 +4236,6 @@ ERL_NIF_TERM nopen(ErlNifEnv* env, res = enif_make_resource(env, descP); enif_release_resource(descP); // We should really store a reference ... - /* Keep track of the creator * This should not be a problem but just in case * the *open* function is used with the wrong kind @@ -4202,9 +4244,10 @@ ERL_NIF_TERM nopen(ErlNifEnv* env, if (enif_self(env, &descP->ctrlPid) == NULL) return esock_make_error(env, atom_exself); - if (MONP(env, descP, + if (MONP("nopen -> ctrl", + env, descP, &descP->ctrlPid, - &descP->ctrlMon) > 0) + &descP->ctrlMon) != 0) return esock_make_error(env, atom_exmon); @@ -4363,6 +4406,9 @@ ERL_NIF_TERM nif_bind(ErlNifEnv* env, } eSockAddr = argv[1]; + if (IS_CLOSED(descP) || IS_CLOSING(descP)) + return esock_make_error(env, atom_closed); + SSDBG( descP, ("SOCKET", "nif_bind -> args when sock = %d (0x%lX)" "\r\n Socket: %T" @@ -4452,6 +4498,9 @@ ERL_NIF_TERM nif_connect(ErlNifEnv* env, } eSockAddr = argv[1]; + if (IS_CLOSED(descP) || IS_CLOSING(descP)) + return esock_make_error(env, atom_closed); + SSDBG( descP, ("SOCKET", "nif_connect -> args when sock = %d:" "\r\n Socket: %T" @@ -4653,6 +4702,9 @@ ERL_NIF_TERM nif_listen(ErlNifEnv* env, return enif_make_badarg(env); } + if (IS_CLOSED(descP) || IS_CLOSING(descP)) + return esock_make_error(env, atom_closed); + SSDBG( descP, ("SOCKET", "nif_listen -> args when sock = %d:" "\r\n Socket: %T" @@ -4714,6 +4766,9 @@ ERL_NIF_TERM nif_accept(ErlNifEnv* env, } ref = argv[1]; + if (IS_CLOSED(descP) || IS_CLOSING(descP)) + return esock_make_error(env, atom_closed); + SSDBG( descP, ("SOCKET", "nif_accept -> args when sock = %d:" "\r\n Socket: %T" @@ -4791,9 +4846,10 @@ ERL_NIF_TERM naccept_listening(ErlNifEnv* env, SSDBG( descP, ("SOCKET", "naccept_listening -> would block\r\n") ); descP->currentAcceptor.pid = caller; - if (MONP(env, descP, + if (MONP("naccept_listening -> current acceptor", + env, descP, &descP->currentAcceptor.pid, - &descP->currentAcceptor.mon) > 0) + &descP->currentAcceptor.mon) != 0) return esock_make_error(env, atom_exmon); descP->currentAcceptor.ref = enif_make_copy(descP->env, ref); @@ -4868,9 +4924,10 @@ ERL_NIF_TERM naccept_listening(ErlNifEnv* env, enif_release_resource(accDescP); // We should really store a reference ... accDescP->ctrlPid = caller; - if (MONP(env, accDescP, + if (MONP("naccept_listening -> ctrl", + env, accDescP, &accDescP->ctrlPid, - &accDescP->ctrlMon) > 0) { + &accDescP->ctrlMon) != 0) { sock_close(accSock); return esock_make_error(env, atom_exmon); } @@ -4984,7 +5041,8 @@ ERL_NIF_TERM naccept_accepting(ErlNifEnv* env, SSDBG( descP, ("SOCKET", "naccept_accepting -> accept success\r\n") ); - DEMONP(env, descP, &descP->currentAcceptor.mon); + DEMONP("naccept_accepting -> current acceptor", + env, descP, &descP->currentAcceptor.mon); if ((accEvent = sock_create_event(accSock)) == INVALID_EVENT) { save_errno = sock_errno(); @@ -5006,9 +5064,10 @@ ERL_NIF_TERM naccept_accepting(ErlNifEnv* env, enif_release_resource(accDescP); // We should really store a reference ... accDescP->ctrlPid = caller; - if (MONP(env, accDescP, + if (MONP("naccept_accepting -> ctrl", + env, accDescP, &accDescP->ctrlPid, - &accDescP->ctrlMon) > 0) { + &accDescP->ctrlMon) != 0) { sock_close(accSock); return esock_make_error(env, atom_exmon); } @@ -5094,6 +5153,9 @@ ERL_NIF_TERM nif_send(ErlNifEnv* env, } sendRef = argv[1]; + if (IS_CLOSED(descP) || IS_CLOSING(descP)) + return esock_make_error(env, atom_closed); + SSDBG( descP, ("SOCKET", "nif_send -> args when sock = %d:" "\r\n Socket: %T" @@ -5218,6 +5280,9 @@ ERL_NIF_TERM nif_sendto(ErlNifEnv* env, sendRef = argv[1]; eSockAddr = argv[3]; + if (IS_CLOSED(descP) || IS_CLOSING(descP)) + return esock_make_error(env, atom_closed); + SSDBG( descP, ("SOCKET", "nif_sendto -> args when sock = %d:" "\r\n Socket: %T" @@ -5341,6 +5406,9 @@ ERL_NIF_TERM nif_sendmsg(ErlNifEnv* env, sendRef = argv[1]; eMsgHdr = argv[2]; + if (IS_CLOSED(descP) || IS_CLOSING(descP)) + return esock_make_error(env, atom_closed); + SSDBG( descP, ("SOCKET", "nif_sendmsg -> args when sock = %d:" "\r\n Socket: %T" @@ -5630,6 +5698,9 @@ ERL_NIF_TERM nif_recv(ErlNifEnv* env, } recvRef = argv[1]; + if (IS_CLOSED(descP) || IS_CLOSING(descP)) + return esock_make_error(env, atom_closed); + if (!IS_CONNECTED(descP)) return esock_make_error(env, atom_enotconn); @@ -5769,6 +5840,9 @@ ERL_NIF_TERM nif_recvfrom(ErlNifEnv* env, } recvRef = argv[1]; + if (IS_CLOSED(descP) || IS_CLOSING(descP)) + return esock_make_error(env, atom_closed); + SSDBG( descP, ("SOCKET", "nif_recvfrom -> args when sock = %d:" "\r\n Socket: %T" @@ -5929,6 +6003,9 @@ ERL_NIF_TERM nif_recvmsg(ErlNifEnv* env, } recvRef = argv[1]; + if (IS_CLOSED(descP) || IS_CLOSING(descP)) + return esock_make_error(env, atom_closed); + SSDBG( descP, ("SOCKET", "nif_recvmsg -> args when sock = %d:" "\r\n Socket: %T" @@ -6087,6 +6164,9 @@ ERL_NIF_TERM nif_close(ErlNifEnv* env, return enif_make_badarg(env); } + if (IS_CLOSED(descP) || IS_CLOSING(descP)) + return esock_make_error(env, atom_closed); + return nclose(env, descP); } @@ -6102,7 +6182,11 @@ ERL_NIF_TERM nclose(ErlNifEnv* env, int type = descP->type; int protocol = descP->protocol; - SSDBG( descP, ("SOCKET", "nclose -> [%d] entry\r\n", descP->sock) ); + SSDBG( descP, ("SOCKET", "nclose -> [%d] entry (0x%lX, 0x%lX, 0x%lX)\r\n", + descP->sock, + descP->currentWriterP, + descP->currentReaderP, + descP->currentAcceptorP) ); MLOCK(descP->closeMtx); @@ -6127,11 +6211,18 @@ ERL_NIF_TERM nclose(ErlNifEnv* env, /* Monitor the caller, since we should complete this operation even if * the caller dies (for whatever reason). + * + * <KOLLA> + * + * Can we actiually use this for anything? + * + * </KOLLA> */ - if (MONP(env, descP, - &descP->closerPid, - &descP->closerMon) > 0) { + if (MONP("nclose -> closer", + env, descP, + &descP->closerPid, + &descP->closerMon) != 0) { MUNLOCK(descP->closeMtx); return esock_make_error(env, atom_exmon); } @@ -6150,17 +6241,24 @@ ERL_NIF_TERM nclose(ErlNifEnv* env, if (selectRes & ERL_NIF_SELECT_STOP_CALLED) { /* Prep done - inform the caller it can finalize (close) directly */ SSDBG( descP, - ("SOCKET", "nclose -> [%d] stop called\r\n", descP->sock) ); + ("SOCKET", "nclose -> [%d] stop was called\r\n", descP->sock) ); dec_socket(domain, type, protocol); + DEMONP("nclose -> closer", env, descP, &descP->closerMon); reply = esock_atom_ok; } else if (selectRes & ERL_NIF_SELECT_STOP_SCHEDULED) { /* The stop callback function has been *scheduled* which means that we * have to wait for it to complete. */ SSDBG( descP, - ("SOCKET", "nclose -> [%d] stop scheduled\r\n", descP->sock) ); + ("SOCKET", "nclose -> [%d] stop was scheduled\r\n", + descP->sock) ); dec_socket(domain, type, protocol); // SHALL WE DO THIS AT finalize? reply = esock_make_ok2(env, descP->closeRef); } else { + + SSDBG( descP, + ("SOCKET", "nclose -> [%d] stop failed: %d\r\n", + descP->sock, selectRes) ); + /* <KOLLA> * * WE SHOULD REALLY HAVE A WAY TO CLOBBER THE SOCKET, @@ -6170,6 +6268,10 @@ ERL_NIF_TERM nclose(ErlNifEnv* env, * * </KOLLA> */ + + // No point in having this? + DEMONP("nclose -> closer", env, descP, &descP->closerMon); + reason = MKT2(env, atom_select, MKI(env, selectRes)); reply = esock_make_error(env, reason); } @@ -6225,10 +6327,10 @@ ERL_NIF_TERM nfinalize_close(ErlNifEnv* env, { ERL_NIF_TERM reply; - if (descP->state == SOCKET_STATE_CLOSED) + if (IS_CLOSED(descP)) return esock_atom_ok; - if (descP->state != SOCKET_STATE_CLOSING) + if (!IS_CLOSING(descP)) return esock_make_error(env, atom_enotclosing); /* This nif is executed in a dirty scheduler just so that @@ -6291,6 +6393,9 @@ ERL_NIF_TERM nif_shutdown(ErlNifEnv* env, return enif_make_badarg(env); } + if (IS_CLOSED(descP) || IS_CLOSING(descP)) + return esock_make_error(env, atom_closed); + if (!ehow2how(ehow, &how)) return enif_make_badarg(env); @@ -6375,6 +6480,9 @@ ERL_NIF_TERM nif_setopt(ErlNifEnv* env, eIsEncoded = argv[1]; eVal = argv[4]; + if (IS_CLOSED(descP) || IS_CLOSING(descP)) + return esock_make_error(env, atom_closed); + isEncoded = esock_decode_bool(eIsEncoded); /* SGDBG( ("SOCKET", "nif_setopt -> eIsDecoded (%T) decoded: %d\r\n", */ @@ -6521,7 +6629,8 @@ ERL_NIF_TERM nsetopt_otp_ctrl_proc(ErlNifEnv* env, ERL_NIF_TERM eVal) { ErlNifPid caller, newCtrlPid; - ErlNifMonitor newCtrlMon; + // ErlNifMonitor newCtrlMon; + ESockMonitor newCtrlMon; int xres; SSDBG( descP, @@ -6544,20 +6653,22 @@ ERL_NIF_TERM nsetopt_otp_ctrl_proc(ErlNifEnv* env, return esock_make_error(env, esock_atom_einval); } - if ((xres = MONP(env, descP, &newCtrlPid, &newCtrlMon)) != 0) { + if ((xres = MONP("nsetopt_otp_ctrl_proc -> (new) ctrl", + env, descP, &newCtrlPid, &newCtrlMon)) != 0) { esock_warning_msg("Failed monitor %d) (new) controlling process\r\n", xres); return esock_make_error(env, esock_atom_einval); } - if ((xres = DEMONP(env, descP, &descP->ctrlMon)) != 0) { + if ((xres = DEMONP("nsetopt_otp_ctrl_proc -> (old) ctrl", + env, descP, &descP->ctrlMon)) != 0) { esock_warning_msg("Failed demonitor (%d) " "old controlling process %T (%T)\r\n", xres, descP->ctrlPid, descP->ctrlMon); } descP->ctrlPid = newCtrlPid; - descP->ctrlMon = newCtrlMon; - + descP->ctrlMon = newCtrlMon; + SSDBG( descP, ("SOCKET", "nsetopt_otp_ctrl_proc -> done\r\n") ); return esock_atom_ok; @@ -9640,6 +9751,9 @@ ERL_NIF_TERM nif_getopt(ErlNifEnv* env, eIsEncoded = argv[1]; eOpt = argv[3]; // Is "normally" an int, but if raw mode: {Int, ValueSz} + if (IS_CLOSED(descP) || IS_CLOSING(descP)) + return esock_make_error(env, atom_closed); + SSDBG( descP, ("SOCKET", "nif_getopt -> args when sock = %d:" "\r\n Socket: %T" @@ -12237,6 +12351,9 @@ ERL_NIF_TERM nif_sockname(ErlNifEnv* env, return enif_make_badarg(env); } + if (IS_CLOSED(descP) || IS_CLOSING(descP)) + return esock_make_error(env, atom_closed); + SSDBG( descP, ("SOCKET", "nif_sockname -> args when sock = %d:" "\r\n Socket: %T" @@ -12302,6 +12419,9 @@ ERL_NIF_TERM nif_peername(ErlNifEnv* env, return enif_make_badarg(env); } + if (IS_CLOSED(descP) || IS_CLOSING(descP)) + return esock_make_error(env, atom_closed); + SSDBG( descP, ("SOCKET", "nif_peername -> args when sock = %d:" "\r\n Socket: %T" @@ -12370,6 +12490,9 @@ ERL_NIF_TERM nif_cancel(ErlNifEnv* env, op = argv[1]; opRef = argv[2]; + if (IS_CLOSED(descP) || IS_CLOSING(descP)) + return esock_make_error(env, atom_closed); + SSDBG( descP, ("SOCKET", "nif_cancel -> args when sock = %d:" "\r\n op: %T" @@ -12498,6 +12621,8 @@ ERL_NIF_TERM ncancel_accept_current(ErlNifEnv* env, SSDBG( descP, ("SOCKET", "ncancel_accept_current -> entry\r\n") ); + DEMONP("ncancel_accept_current -> current acceptor", + env, descP, &descP->currentAcceptor.mon); res = ncancel_read_select(env, descP, descP->currentAcceptor.ref); SSDBG( descP, ("SOCKET", "ncancel_accept_current -> cancel res: %T\r\n", res) ); @@ -12616,6 +12741,8 @@ ERL_NIF_TERM ncancel_send_current(ErlNifEnv* env, SSDBG( descP, ("SOCKET", "ncancel_send_current -> entry\r\n") ); + DEMONP("ncancel_recv_current -> current writer", + env, descP, &descP->currentWriter.mon); res = ncancel_write_select(env, descP, descP->currentWriter.ref); SSDBG( descP, ("SOCKET", "ncancel_send_current -> cancel res: %T\r\n", res) ); @@ -12732,6 +12859,8 @@ ERL_NIF_TERM ncancel_recv_current(ErlNifEnv* env, SSDBG( descP, ("SOCKET", "ncancel_recv_current -> entry\r\n") ); + DEMONP("ncancel_recv_current -> current reader", + env, descP, &descP->currentReader.mon); res = ncancel_read_select(env, descP, descP->currentReader.ref); SSDBG( descP, ("SOCKET", "ncancel_recv_current -> cancel res: %T\r\n", res) ); @@ -12929,7 +13058,8 @@ ERL_NIF_TERM send_check_result(ErlNifEnv* env, cnt_inc(&descP->writePkgCnt, 1); cnt_inc(&descP->writeByteCnt, written); if (descP->currentWriterP != NULL) - DEMONP(env, descP, &descP->currentWriter.mon); + DEMONP("send_check_result -> current writer", + env, descP, &descP->currentWriter.mon); SSDBG( descP, ("SOCKET", "send_check_result -> " @@ -12968,7 +13098,8 @@ ERL_NIF_TERM send_check_result(ErlNifEnv* env, if ((saveErrno != EAGAIN) && (saveErrno != EINTR)) { ErlNifPid pid; - ErlNifMonitor mon; + // ErlNifMonitor mon; + ESockMonitor mon; ERL_NIF_TERM ref, res; /* @@ -12983,13 +13114,15 @@ ERL_NIF_TERM send_check_result(ErlNifEnv* env, res = esock_make_error_errno(env, saveErrno); if (descP->currentWriterP != NULL) { - DEMONP(env, descP, &descP->currentWriter.mon); + DEMONP("send_check_result -> current writer", + env, descP, &descP->currentWriter.mon); while (writer_pop(env, descP, &pid, &mon, &ref)) { SSDBG( descP, ("SOCKET", "send_check_result -> abort %T\r\n", pid) ); send_msg_nif_abort(env, ref, res, &pid); - DEMONP(env, descP, &mon); + DEMONP("send_check_result -> pop'ed writer", + env, descP, &mon); } } @@ -13021,9 +13154,10 @@ ERL_NIF_TERM send_check_result(ErlNifEnv* env, if (enif_self(env, &caller) == NULL) return esock_make_error(env, atom_exself); descP->currentWriter.pid = caller; - if (MONP(env, descP, + if (MONP("send_check_result -> current writer", + env, descP, &descP->currentWriter.pid, - &descP->currentWriter.mon) > 0) + &descP->currentWriter.mon) != 0) return esock_make_error(env, atom_exmon); descP->currentWriter.ref = enif_make_copy(descP->env, sendRef); descP->currentWriterP = &descP->currentWriter; @@ -13108,9 +13242,10 @@ char* recv_init_current_reader(ErlNifEnv* env, return str_exself; descP->currentReader.pid = caller; - if (MONP(env, descP, + if (MONP("recv_init_current_reader -> current reader", + env, descP, &descP->currentReader.pid, - &descP->currentReader.mon) > 0) { + &descP->currentReader.mon) != 0) { return str_exmon; } descP->currentReader.ref = enif_make_copy(descP->env, recvRef); @@ -13135,7 +13270,8 @@ ERL_NIF_TERM recv_update_current_reader(ErlNifEnv* env, { if (descP->currentReaderP != NULL) { - DEMONP(env, descP, &descP->currentReader.mon); + DEMONP("recv_update_current_reader -> current reader", + env, descP, &descP->currentReader.mon); if (reader_pop(env, descP, &descP->currentReader.pid, @@ -13183,16 +13319,19 @@ void recv_error_current_reader(ErlNifEnv* env, { if (descP->currentReaderP != NULL) { ErlNifPid pid; - ErlNifMonitor mon; + // ErlNifMonitor mon; + ESockMonitor mon; ERL_NIF_TERM ref; - DEMONP(env, descP, &descP->currentReader.mon); + DEMONP("recv_error_current_reader -> current reader", + env, descP, &descP->currentReader.mon); while (reader_pop(env, descP, &pid, &mon, &ref)) { SSDBG( descP, ("SOCKET", "recv_error_current_reader -> abort %T\r\n", pid) ); send_msg_nif_abort(env, ref, reason, &pid); - DEMONP(env, descP, &mon); + DEMONP("recv_error_current_reader -> pop'ed reader", + env, descP, &mon); } } } @@ -13368,6 +13507,9 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env, SSDBG( descP, ("SOCKET", "recv_check_result -> [%d] eagain\r\n", toRead) ); + if ((xres = recv_init_current_reader(env, descP, recvRef)) != NULL) + return esock_make_error_str(env, xres); + SELECT(env, descP->sock, (ERL_NIF_SELECT_READ), descP, NULL, recvRef); @@ -13622,9 +13764,13 @@ ERL_NIF_TERM recvmsg_check_result(ErlNifEnv* env, } else if ((saveErrno == ERRNO_BLOCK) || (saveErrno == EAGAIN)) { + char* xres; SSDBG( descP, ("SOCKET", "recvmsg_check_result -> eagain\r\n") ); + if ((xres = recv_init_current_reader(env, descP, recvRef)) != NULL) + return esock_make_error_str(env, xres); + SELECT(env, descP->sock, (ERL_NIF_SELECT_READ), descP, NULL, recvRef); @@ -15353,6 +15499,11 @@ SocketDescriptor* alloc_descriptor(SOCKET sock, HANDLE event) descP->sock = sock; descP->event = event; + MON_INIT(&descP->currentWriter.mon); + MON_INIT(&descP->currentReader.mon); + MON_INIT(&descP->currentAcceptor.mon); + MON_INIT(&descP->ctrlMon); + MON_INIT(&descP->closerMon); } return descP; @@ -15971,7 +16122,8 @@ ERL_NIF_TERM acceptor_push(ErlNifEnv* env, reqP->pid = pid; reqP->ref = enif_make_copy(descP->env, ref); - if (MONP(env, descP, &pid, &reqP->mon) > 0) { + if (MONP("acceptor_push -> acceptor request", + env, descP, &pid, &reqP->mon) != 0) { FREE(reqP); return esock_make_error(env, atom_exmon); } @@ -15991,7 +16143,8 @@ static BOOLEAN_T acceptor_pop(ErlNifEnv* env, SocketDescriptor* descP, ErlNifPid* pid, - ErlNifMonitor* mon, + // ErlNifMonitor* mon, + ESockMonitor* mon, ERL_NIF_TERM* ref) { SocketRequestQueueElement* e = qpop(&descP->acceptorsQ); @@ -16022,7 +16175,8 @@ BOOLEAN_T acceptor_unqueue(ErlNifEnv* env, SocketDescriptor* descP, const ErlNifPid* pid) { - return qunqueue(env, &descP->acceptorsQ, pid); + return qunqueue(env, descP, "qunqueue -> waiting acceptor", + &descP->acceptorsQ, pid); } @@ -16057,7 +16211,8 @@ ERL_NIF_TERM writer_push(ErlNifEnv* env, reqP->pid = pid; reqP->ref = enif_make_copy(descP->env, ref); - if (MONP(env, descP, &pid, &reqP->mon) > 0) { + if (MONP("writer_push -> writer request", + env, descP, &pid, &reqP->mon) != 0) { FREE(reqP); return esock_make_error(env, atom_exmon); } @@ -16077,7 +16232,8 @@ static BOOLEAN_T writer_pop(ErlNifEnv* env, SocketDescriptor* descP, ErlNifPid* pid, - ErlNifMonitor* mon, + // ErlNifMonitor* mon, + ESockMonitor* mon, ERL_NIF_TERM* ref) { SocketRequestQueueElement* e = qpop(&descP->writersQ); @@ -16108,7 +16264,8 @@ BOOLEAN_T writer_unqueue(ErlNifEnv* env, SocketDescriptor* descP, const ErlNifPid* pid) { - return qunqueue(env, &descP->writersQ, pid); + return qunqueue(env, descP, "qunqueue -> waiting writer", + &descP->writersQ, pid); } @@ -16143,7 +16300,8 @@ ERL_NIF_TERM reader_push(ErlNifEnv* env, reqP->pid = pid; reqP->ref = enif_make_copy(descP->env, ref); - if (MONP(env, descP, &pid, &reqP->mon) > 0) { + if (MONP("reader_push -> reader request", + env, descP, &pid, &reqP->mon) != 0) { FREE(reqP); return esock_make_error(env, atom_exmon); } @@ -16163,7 +16321,8 @@ static BOOLEAN_T reader_pop(ErlNifEnv* env, SocketDescriptor* descP, ErlNifPid* pid, - ErlNifMonitor* mon, + // ErlNifMonitor* mon, + ESockMonitor* mon, ERL_NIF_TERM* ref) { SocketRequestQueueElement* e = qpop(&descP->readersQ); @@ -16194,7 +16353,8 @@ BOOLEAN_T reader_unqueue(ErlNifEnv* env, SocketDescriptor* descP, const ErlNifPid* pid) { - return qunqueue(env, &descP->readersQ, pid); + return qunqueue(env, descP, "qunqueue -> waiting reader", + &descP->readersQ, pid); } @@ -16258,6 +16418,8 @@ SocketRequestQueueElement* qpop(SocketRequestQueue* q) static BOOLEAN_T qunqueue(ErlNifEnv* env, + SocketDescriptor* descP, + const char* slogan, SocketRequestQueue* q, const ErlNifPid* pid) { @@ -16270,6 +16432,8 @@ BOOLEAN_T qunqueue(ErlNifEnv* env, /* We have a match */ + DEMONP(slogan, env, descP, &e->data.mon); + if (p != NULL) { /* Not the first, but could be the last */ if (q->last == e) { @@ -16344,6 +16508,97 @@ void cnt_dec(uint32_t* cnt, uint32_t dec) /* ---------------------------------------------------------------------- + * M o n i t o r W r a p p e r F u n c t i o n s + * ---------------------------------------------------------------------- + */ + +static +int esock_monitor(const char* slogan, + ErlNifEnv* env, + SocketDescriptor* descP, + const ErlNifPid* pid, + ESockMonitor* monP) +{ + int res; + + SSDBG( descP, ("SOCKET", "[%d] %s: try monitor", descP->sock, slogan) ); + // esock_dbg_printf("MONP", "[%d] %s\r\n", descP->sock, slogan); + res = enif_monitor_process(env, descP, pid, &monP->mon); + + if (res != 0) { + SSDBG( descP, ("SOCKET", "[%d] monitor failed: %d", descP->sock, res) ); + // esock_dbg_printf("MONP", "[%d] failed: %d\r\n", descP->sock, res); + } /* else { + esock_dbg_printf("MONP", + "[%d] success: " + "%u,%u,%u,%u\r\n", + descP->sock, + monP->raw[0], monP->raw[1], + monP->raw[2], monP->raw[3]); + } */ + + return res; +} + + +static +int esock_demonitor(const char* slogan, + ErlNifEnv* env, + SocketDescriptor* descP, + ESockMonitor* monP) +{ + int res; + + SSDBG( descP, ("SOCKET", "[%d] %s: try demonitor\r\n", descP->sock, slogan) ); + /* + esock_dbg_printf("DEMONP", "[%d] %s: %u,%u,%u,%u\r\n", + descP->sock, slogan, + monP->raw[0], monP->raw[1], + monP->raw[2], monP->raw[3]); + */ + + res = enif_demonitor_process(env, descP, &monP->mon); + + if (res == 0) { + esock_monitor_init(monP); + } else { + SSDBG( descP, + ("SOCKET", "[%d] demonitor failed: %d\r\n", descP->sock, res) ); + /* + esock_dbg_printf("DEMONP", "[%d] failed: %d\r\n", descP->sock, res); + */ + } + + return res; +} + + +static +void esock_monitor_init(ESockMonitor* monP) +{ + int i; + + /* + * UGLY, + * but since we don't have a ERL_NIF_MONITOR_NULL, + * this will have to do for now... + */ + for (i = 0; i < 4; i++) + monP->raw[i] = 0; + +} + +/* +static +int esock_monitor_compare(const ErlNifMonitor* mon1, + const ESockMonitor* mon2) +{ + return enif_compare_monitors(mon1, &mon2->mon); +} +*/ + + +/* ---------------------------------------------------------------------- * C a l l b a c k F u n c t i o n s * ---------------------------------------------------------------------- */ @@ -16393,35 +16648,49 @@ static void socket_stop(ErlNifEnv* env, void* obj, int fd, int is_direct_call) { SocketDescriptor* descP = (SocketDescriptor*) obj; - + + /* + esock_dbg_printf("STOP", "[%d] begin\r\n", descP->sock); + */ + SSDBG( descP, - ("SOCKET", "socket_stop -> entry when" - "\r\n sock: %d (%d)" - "\r\n Is Direct Call: %s" - "\r\n", descP->sock, fd, B2S(is_direct_call)) ); + ("SOCKET", "socket_stop -> entry when %s" + "\r\n sock: %d (%d)" + "\r\n", ((is_direct_call) ? "called" : "scheduled"), descP->sock, fd) ); MLOCK(descP->writeMtx); MLOCK(descP->readMtx); MLOCK(descP->accMtx); MLOCK(descP->closeMtx); + SSDBG( descP, ("SOCKET", "socket_stop -> all mutex(s) locked\r\n") ); descP->state = SOCKET_STATE_CLOSING; // Just in case...??? descP->isReadable = FALSE; descP->isWritable = FALSE; - - + /* We should check that we actually have a monitor. * This *should* be done with a "NULL" monitor value, * which there currently is none... + * If we got here because the controlling process died, + * its no point in demonitor. Also, we not actually have + * a monitor in that case... */ - DEMONP(env, descP, &descP->ctrlMon); - + DEMONP("socket_stop -> ctrl", env, descP, &descP->ctrlMon); + + /* + esock_dbg_printf("STOP", "[%d] maybe handle current writer (0x%lX)\r\n", + descP->sock, descP->currentReaderP); + */ if (descP->currentWriterP != NULL) { /* We have a (current) writer and *may* therefor also have * writers waiting. */ + DEMONP("socket_stop -> current writer", + env, descP, &descP->currentWriter.mon); + + SSDBG( descP, ("SOCKET", "socket_stop -> handle current writer\r\n") ); if (!compare_pids(env, &descP->closerPid, &descP->currentWriter.pid) && @@ -16439,15 +16708,24 @@ void socket_stop(ErlNifEnv* env, void* obj, int fd, int is_direct_call) } /* And also deal with the waiting writers (in the same way) */ + SSDBG( descP, ("SOCKET", "socket_stop -> handle waiting writer(s)\r\n") ); inform_waiting_procs(env, descP, &descP->writersQ, TRUE, atom_closed); } - + + /* + esock_dbg_printf("STOP", "[%d] maybe handle current reader (0x%lX)\r\n", + descP->sock, descP->currentReaderP); + */ if (descP->currentReaderP != NULL) { /* We have a (current) reader and *may* therefor also have * readers waiting. */ + DEMONP("socket_stop -> current reader", + env, descP, &descP->currentReader.mon); + + SSDBG( descP, ("SOCKET", "socket_stop -> handle current reader\r\n") ); if (!compare_pids(env, &descP->closerPid, &descP->currentReader.pid) && @@ -16465,14 +16743,23 @@ void socket_stop(ErlNifEnv* env, void* obj, int fd, int is_direct_call) } /* And also deal with the waiting readers (in the same way) */ + SSDBG( descP, ("SOCKET", "socket_stop -> handle waiting reader(s)\r\n") ); inform_waiting_procs(env, descP, &descP->readersQ, TRUE, atom_closed); } - + + /* + esock_dbg_printf("STOP", "[%d] maybe handle current acceptor (0x%lX)\r\n", + descP->sock, descP->currentReaderP); + */ if (descP->currentAcceptorP != NULL) { /* We have a (current) acceptor and *may* therefor also have * acceptors waiting. */ + DEMONP("socket_stop -> current acceptor", + env, descP, &descP->currentAcceptor.mon); + + SSDBG( descP, ("SOCKET", "socket_stop -> handle current acceptor\r\n") ); if (!compare_pids(env, &descP->closerPid, &descP->currentAcceptor.pid) && @@ -16490,6 +16777,7 @@ void socket_stop(ErlNifEnv* env, void* obj, int fd, int is_direct_call) } /* And also deal with the waiting acceptors (in the same way) */ + SSDBG( descP, ("SOCKET", "socket_stop -> handle waiting acceptor(s)\r\n") ); inform_waiting_procs(env, descP, &descP->acceptorsQ, TRUE, atom_closed); } @@ -16521,7 +16809,8 @@ void socket_stop(ErlNifEnv* env, void* obj, int fd, int is_direct_call) send_msg(env, MKT2(env, atom_close, descP->closeRef), &descP->closerPid); - DEMONP(env, descP, &descP->closerMon); + DEMONP("socket_stop -> closer", + env, descP, &descP->closerMon); } else { @@ -16535,12 +16824,28 @@ void socket_stop(ErlNifEnv* env, void* obj, int fd, int is_direct_call) } } - + + if (!is_direct_call) { + if (descP->closeLocal) { + DEMONP("socket_stop -> closer", + env, descP, &descP->closerMon); + } + descP->sock = INVALID_SOCKET; + descP->event = INVALID_EVENT; + descP->state = SOCKET_STATE_CLOSED; + } + + SSDBG( descP, ("SOCKET", "socket_stop -> unlock all mutex(s)\r\n") ); + MUNLOCK(descP->closeMtx); MUNLOCK(descP->accMtx); MUNLOCK(descP->readMtx); MUNLOCK(descP->writeMtx); + /* + esock_dbg_printf("STOP", "[%d] end\r\n", descP->sock); + */ + SSDBG( descP, ("SOCKET", "socket_stop -> done (%d, %d)\r\n", descP->sock, fd) ); @@ -16580,7 +16885,8 @@ void inform_waiting_procs(ErlNifEnv* env, currentP->data.ref, reason, ¤tP->data.pid)) ); - DEMONP(env, descP, ¤tP->data.mon); + DEMONP("inform_waiting_procs -> current 'request'", + env, descP, ¤tP->data.mon); nextP = currentP->nextP; if (free) FREE(currentP); currentP = nextP; @@ -16605,30 +16911,159 @@ void socket_down(ErlNifEnv* env, const ErlNifMonitor* mon) { SocketDescriptor* descP = (SocketDescriptor*) obj; + ESockMonitor* monP = (ESockMonitor*) mon; SSDBG( descP, ("SOCKET", "socket_down -> entry with" - "\r\n sock: %d" - "\r\n pid: %T" - "\r\n", descP->sock, *pid) ); + "\r\n sock: %d" + "\r\n pid: %T" + "\r\n Close: %s (%s)" + "\r\n", + descP->sock, *pid, + B2S(IS_CLOSED(descP)), + B2S(IS_CLOSING(descP))) ); + /* + esock_dbg_printf("DOWN", + "[%d] begin %u,%u,%u,%d\r\n", + descP->sock, + monP->raw[0], monP->raw[1], + monP->raw[2], monP->raw[3]); + */ + + /* + if (MON_COMP(mon, &descP->ctrlMon) == 0) { + SSDBG( descP, ("SOCKET", "socket_down -> controlling process mon\r\n") ); + } else if (MON_COMP(mon, &descP->closerMon) == 0) { + SSDBG( descP, ("SOCKET", "socket_down -> closer mon\r\n") ); + } else if ((descP->currentWriterP != NULL) && + (MON_COMP(mon, &descP->currentWriter.mon) == 0)) { + SSDBG( descP, ("SOCKET", "socket_down -> current writer mon\r\n") ); + } else if ((descP->currentReaderP != NULL) && + (MON_COMP(mon, &descP->currentReader.mon) == 0)) { + SSDBG( descP, ("SOCKET", "socket_down -> current reader mon\r\n") ); + } else if ((descP->currentAcceptorP != NULL) && + (MON_COMP(mon, &descP->currentAcceptor.mon) == 0)) { + SSDBG( descP, ("SOCKET", "socket_down -> current acceptor mon\r\n") ); + } else { + SSDBG( descP, ("SOCKET", "socket_down -> OTHER mon\r\n") ); + } + */ if (compare_pids(env, &descP->ctrlPid, pid)) { + int selectRes; + /* We don't bother with the queue cleanup here - * we leave it to the stop callback function. */ + SSDBG( descP, + ("SOCKET", "socket_down -> controlling process exit\r\n") ); + descP->state = SOCKET_STATE_CLOSING; descP->closeLocal = TRUE; descP->closerPid = *pid; - descP->closerMon = *mon; - descP->closeRef = MKREF(env); - enif_select(env, descP->sock, (ERL_NIF_SELECT_STOP), - descP, NULL, descP->closeRef); + descP->closerMon = (ESockMonitor) *mon; + descP->closeRef = MKREF(env); // Do we really need this in this case? + + /* + esock_dbg_printf("DOWN", + "[%d] select stop %u,%u,%u,%d\r\n", + descP->sock, + monP->raw[0], monP->raw[1], + monP->raw[2], monP->raw[3]); + */ + + selectRes = enif_select(env, descP->sock, (ERL_NIF_SELECT_STOP), + descP, NULL, descP->closeRef); + + if (selectRes & ERL_NIF_SELECT_STOP_CALLED) { + /* We are done - wwe can finalize (socket close) directly */ + SSDBG( descP, + ("SOCKET", "socket_down -> [%d] stop called\r\n", descP->sock) ); + dec_socket(descP->domain, descP->type, descP->protocol); + descP->state = SOCKET_STATE_CLOSED; + + /* And finally close the socket. + * Since we close the socket because of an exiting owner, + * we do not need to wait for buffers to sync (linger). + * If the owner wish to ensure the buffer are written, + * it should have closed teh socket explicitly... + */ + if (sock_close(descP->sock) != 0) { + int save_errno = sock_errno(); + + esock_warning_msg("Failed closing socket for terminating " + "controlling process: " + "\r\n Controlling Process: %T" + "\r\n Descriptor: %d" + "\r\n Errno: %d" + "\r\n", pid, descP->sock, save_errno); + } + sock_close_event(descP->event); + + descP->sock = INVALID_SOCKET; + descP->event = INVALID_EVENT; + + descP->state = SOCKET_STATE_CLOSED; + + } else if (selectRes & ERL_NIF_SELECT_STOP_SCHEDULED) { + /* The stop callback function has been *scheduled* which means that + * "should" wait for it to complete. But since we are in a callback + * (down) function, we cannot... + * So, we must close the socket + */ + SSDBG( descP, + ("SOCKET", + "socket_down -> [%d] stop scheduled\r\n", descP->sock) ); + dec_socket(descP->domain, descP->type, descP->protocol); + + /* And now what? We can't wait for the stop function here... + * So, we simply close it here and leave the rest of the "close" + * for later (when the stop function actually gets called... + */ + + if (sock_close(descP->sock) != 0) { + int save_errno = sock_errno(); + + esock_warning_msg("Failed closing socket for terminating " + "controlling process: " + "\r\n Controlling Process: %T" + "\r\n Descriptor: %d" + "\r\n Errno: %d" + "\r\n", pid, descP->sock, save_errno); + } + sock_close_event(descP->event); + + } else { + + /* + * <KOLLA> + * + * WE SHOULD REALLY HAVE A WAY TO CLOBBER THE SOCKET, + * SO WE DON'T LET STUFF LEAK. + * NOW, BECAUSE WE FAILED TO SELECT, WE CANNOT FINISH + * THE CLOSE, WHAT TO DO? ABORT? + * + * </KOLLA> + */ + esock_warning_msg("Failed selecting stop when handling down " + "of controlling process: " + "\r\n Select Res: %d" + "\r\n Controlling Process: %T" + "\r\n Descriptor: %d" + "\r\n Monitor: %u.%u.%u.%u" + "\r\n", selectRes, pid, descP->sock, + monP->raw[0], monP->raw[1], + monP->raw[2], monP->raw[3]); + } + } else { /* check all operation queue(s): acceptor, writer and reader. */ + SSDBG( descP, ("SOCKET", "socket_down -> other process term\r\n") ); + MLOCK(descP->accMtx); if (descP->currentAcceptorP != NULL) socket_down_acceptor(env, descP, pid); @@ -16645,6 +17080,14 @@ void socket_down(ErlNifEnv* env, MUNLOCK(descP->readMtx); } + + /* + esock_dbg_printf("DOWN", + "[%d] end %u,%u,%u,%d\r\n", + descP->sock, + monP->raw[0], monP->raw[1], + monP->raw[2], monP->raw[3]); + */ SSDBG( descP, ("SOCKET", "socket_down -> done\r\n") ); diff --git a/erts/emulator/nifs/common/socket_util.c b/erts/emulator/nifs/common/socket_util.c index ff50fd2384..766d3724c1 100644 --- a/erts/emulator/nifs/common/socket_util.c +++ b/erts/emulator/nifs/common/socket_util.c @@ -1506,7 +1506,7 @@ void esock_warning_msg( const char* format, ... ) { va_list args; char f[512 + sizeof(format)]; // This has to suffice... - char stamp[32]; + char stamp[64]; // Just in case... struct timespec ts; int res; |