diff options
Diffstat (limited to 'erts')
-rw-r--r-- | erts/emulator/nifs/common/socket_int.h | 2 | ||||
-rw-r--r-- | erts/emulator/nifs/common/socket_nif.c | 880 | ||||
-rw-r--r-- | erts/emulator/test/socket_SUITE.erl | 29 | ||||
-rw-r--r-- | erts/preloaded/ebin/socket.beam | bin | 70284 -> 21788 bytes | |||
-rw-r--r-- | erts/preloaded/src/socket.erl | 201 |
5 files changed, 599 insertions, 513 deletions
diff --git a/erts/emulator/nifs/common/socket_int.h b/erts/emulator/nifs/common/socket_int.h index fe02e0b051..38c28a6de5 100644 --- a/erts/emulator/nifs/common/socket_int.h +++ b/erts/emulator/nifs/common/socket_int.h @@ -388,5 +388,7 @@ GLOBAL_ERROR_REASON_ATOM_DEFS #define REALLOC_BIN(SZ, BP) enif_realloc_binary((SZ), (BP)) #define FREE_BIN(BP) enif_release_binary((BP)) +/* Copy term T into environment E */ +#define CP_TERM(E, T) enif_make_copy((E), (T)) #endif // SOCKET_INT_H__ diff --git a/erts/emulator/nifs/common/socket_nif.c b/erts/emulator/nifs/common/socket_nif.c index 3faa9ac96d..ac1beba344 100644 --- a/erts/emulator/nifs/common/socket_nif.c +++ b/erts/emulator/nifs/common/socket_nif.c @@ -654,7 +654,9 @@ typedef union { * * * =================================================================== */ +/* Global socket debug */ #define SGDBG( proto ) ESOCK_DBG_PRINTF( data.dbg , proto ) +/* Socket specific debug */ #define SSDBG( __D__ , proto ) ESOCK_DBG_PRINTF( (__D__)->dbg , proto ) @@ -759,14 +761,21 @@ static unsigned long one_value = 1; typedef struct { - int is_active; ErlNifMonitor mon; + BOOLEAN_T isActive; } ESockMonitor; typedef struct { - ErlNifPid pid; // PID of the requesting process - ESockMonitor mon; // Monitor to the requesting process - ERL_NIF_TERM ref; // The (unique) reference (ID) of the request + ErlNifPid pid; // PID of the requesting process + ESockMonitor mon; // Monitor to the requesting process + + /* We need an environment for the copy of the ref we store here. + * We will also use this environment for any messages we send + * (with the ref in it). Such as the select message (used in the + * select call) or the abort message. + */ + ErlNifEnv* env; + ERL_NIF_TERM ref; // The (unique) reference (ID) of the request } ESockRequestor; typedef struct esock_request_queue_element { @@ -794,8 +803,6 @@ typedef struct { ESockAddress remote; unsigned int addrLen; - ErlNifEnv* env; - /* +++ Controller (owner) process +++ */ ErlNifPid ctrlPid; ESockMonitor ctrlMon; @@ -985,7 +992,8 @@ static ERL_NIF_TERM nbind(ErlNifEnv* env, ESockAddress* sockAddrP, unsigned int addrLen); static ERL_NIF_TERM nconnect(ErlNifEnv* env, - ESockDescriptor* descP); + ESockDescriptor* descP, + ERL_NIF_TERM sockRef); static ERL_NIF_TERM nlisten(ErlNifEnv* env, ESockDescriptor* descP, int backlog); @@ -995,10 +1003,12 @@ static ERL_NIF_TERM naccept(ErlNifEnv* env, ERL_NIF_TERM ref); static ERL_NIF_TERM naccept_listening(ErlNifEnv* env, ESockDescriptor* descP, + ERL_NIF_TERM sockRef, ERL_NIF_TERM ref); static ERL_NIF_TERM naccept_listening_error(ErlNifEnv* env, ESockDescriptor* descP, - ERL_NIF_TERM ref, + ERL_NIF_TERM sockRef, + ERL_NIF_TERM accRef, ErlNifPid caller, int save_errno); static ERL_NIF_TERM naccept_listening_accept(ErlNifEnv* env, @@ -1030,7 +1040,8 @@ static ERL_NIF_TERM naccept_accepting_other(ErlNifEnv* env, ErlNifPid caller); static ERL_NIF_TERM naccept_busy_retry(ErlNifEnv* env, ESockDescriptor* descP, - ERL_NIF_TERM ref, + ERL_NIF_TERM sockRef, + ERL_NIF_TERM accRef, ErlNifPid* pid, unsigned int nextState); static BOOLEAN_T naccept_accepted(ErlNifEnv* env, @@ -2084,6 +2095,7 @@ static ERL_NIF_TERM send_check_fail(ErlNifEnv* env, static ERL_NIF_TERM send_check_retry(ErlNifEnv* env, ESockDescriptor* descP, ssize_t written, + ERL_NIF_TERM sockRef, ERL_NIF_TERM sendRef); static BOOLEAN_T recv_check_reader(ErlNifEnv* env, ESockDescriptor* descP, @@ -2153,9 +2165,11 @@ static ERL_NIF_TERM recv_check_partial_part(ErlNifEnv* env, ESockDescriptor* descP, int read, ErlNifBinary* bufP, + ERL_NIF_TERM sockRef, ERL_NIF_TERM recvRef); static ERL_NIF_TERM recv_check_retry(ErlNifEnv* env, ESockDescriptor* descP, + ERL_NIF_TERM sockRef, ERL_NIF_TERM recvRef); static ERL_NIF_TERM recv_check_fail_gen(ErlNifEnv* env, ESockDescriptor* descP, @@ -2323,9 +2337,15 @@ static BOOLEAN_T decode_native_get_opt(ErlNifEnv* env, // static void encode_bool(BOOLEAN_T val, ERL_NIF_TERM* eVal); static ERL_NIF_TERM encode_ip_tos(ErlNifEnv* env, int val); +static void socket_stop_handle_current(ErlNifEnv* env, + const char* role, + ESockDescriptor* descP, + ERL_NIF_TERM sockRef, + ESockRequestor* reqP); static void inform_waiting_procs(ErlNifEnv* env, - char* role, + const char* role, ESockDescriptor* descP, + ERL_NIF_TERM sockRef, ESockRequestQueue* q, BOOLEAN_T free, ERL_NIF_TERM reason); @@ -2389,12 +2409,6 @@ static void dec_socket(int domain, int type, int protocol); ACTIVATE_NEXT_FUNCS_DEFS #undef ACTIVATE_NEXT_FUNC_DEF -static BOOLEAN_T activate_next(ErlNifEnv* env, - ESockDescriptor* descP, - ESockRequestor* reqP, - ESockRequestQueue* q, - ERL_NIF_TERM sockRef); - /* *** acceptor_search4pid | writer_search4pid | reader_search4pid *** * *** acceptor_push | writer_push | reader_push *** * *** acceptor_pop | writer_pop | reader_pop *** @@ -2487,37 +2501,48 @@ static void socket_down_reader(ErlNifEnv* env, const ErlNifPid* pid); static char* esock_send_close_msg(ErlNifEnv* env, - ESockDescriptor* descP); - + ESockDescriptor* descP, + ErlNifPid* pid); static char* esock_send_abort_msg(ErlNifEnv* env, ERL_NIF_TERM sockRef, ERL_NIF_TERM recvRef, + ErlNifEnv* msgEnv, ERL_NIF_TERM reason, ErlNifPid* pid); -static char* esock_send_socket_msg(ErlNifEnv* env, - ERL_NIF_TERM sockRef, - ERL_NIF_TERM tag, - ERL_NIF_TERM info, - ErlNifPid* pid, - ErlNifEnv* msg_env); static char* esock_send_msg(ErlNifEnv* env, - ERL_NIF_TERM msg, ErlNifPid* pid, - ErlNifEnv* msg_env); - -static ERL_NIF_TERM make_socket_record(ErlNifEnv* env, - ERL_NIF_TERM sockRef); + ERL_NIF_TERM msg, + ErlNifEnv* msgEnv); + +static ERL_NIF_TERM mk_abort_msg(ErlNifEnv* env, + ERL_NIF_TERM sockRef, + ERL_NIF_TERM opRef, + ERL_NIF_TERM reason); +static ERL_NIF_TERM mk_close_msg(ErlNifEnv* env, + ERL_NIF_TERM sockRef, + ERL_NIF_TERM closeRef); +static ERL_NIF_TERM mk_select_msg(ErlNifEnv* env, + ERL_NIF_TERM sockRef, + ERL_NIF_TERM selectRef); +static ERL_NIF_TERM mk_socket_msg(ErlNifEnv* env, + ERL_NIF_TERM sockRef, + ERL_NIF_TERM tag, + ERL_NIF_TERM info); +static ERL_NIF_TERM mk_socket(ErlNifEnv* env, + ERL_NIF_TERM sockRef); static int esock_select_read(ErlNifEnv* env, ErlNifEvent event, void* obj, const ErlNifPid* pid, - ERL_NIF_TERM ref); + ERL_NIF_TERM sockRef, + ERL_NIF_TERM selectRef); static int esock_select_write(ErlNifEnv* env, ErlNifEvent event, void* obj, const ErlNifPid* pid, - ERL_NIF_TERM ref); + ERL_NIF_TERM sockRef, + ERL_NIF_TERM selectRef); static int esock_select_stop(ErlNifEnv* env, ErlNifEvent event, void* obj); @@ -2869,6 +2894,27 @@ static ErlNifResourceTypeInit socketInit = { static ESockData data; +/* These two (inline) functions are primarily intended for debugging, + * that is, to make it easy to add debug printouts. + */ +static inline void esock_free_env(const char* slogan, ErlNifEnv* env) +{ + SGDBG( ("SOCKET", "env free - %s: 0x%lX\r\n", slogan, env) ); + + if (env != NULL) enif_free_env(env); +} + + +static inline ErlNifEnv* esock_alloc_env(const char* slogan) +{ + ErlNifEnv* env = enif_alloc_env(); + + SGDBG( ("SOCKET", "env alloc - %s: 0x%lX\r\n", slogan, env) ); + + return env; +} + + /* ---------------------------------------------------------------------- * N I F F u n c t i o n s * ---------------------------------------------------------------------- @@ -4684,15 +4730,16 @@ ERL_NIF_TERM nif_connect(ErlNifEnv* env, return enif_raise_exception(env, MKA(env, "notsup")); #else ESockDescriptor* descP; - ERL_NIF_TERM res, eSockAddr; + ERL_NIF_TERM res, eSockAddr, sockRef; char* xres; SGDBG( ("SOCKET", "nif_connect -> entry with argc: %d\r\n", argc) ); /* Extract arguments and perform preliminary validation */ + sockRef = argv[0]; if ((argc != 2) || - !enif_get_resource(env, argv[0], sockets, (void**) &descP)) { + !enif_get_resource(env, sockRef, sockets, (void**) &descP)) { return enif_make_badarg(env); } eSockAddr = argv[1]; @@ -4704,16 +4751,24 @@ ERL_NIF_TERM nif_connect(ErlNifEnv* env, "\r\n", descP->sock, argv[0], eSockAddr) ); if ((xres = esock_decode_sockaddr(env, eSockAddr, - &descP->remote, &descP->addrLen)) != NULL) { + &descP->remote, + &descP->addrLen)) != NULL) { return esock_make_error_str(env, xres); } + /* Only a *!%&$*# would send an opened but non-connected socket + * somewhere (before its actually usable), but just to be on the + * safe side we do the best we can to avoid complications... + */ + MLOCK(descP->readMtx); MLOCK(descP->writeMtx); + MLOCK(descP->cfgMtx); - res = nconnect(env, descP); + res = nconnect(env, descP, sockRef); + MUNLOCK(descP->cfgMtx); MUNLOCK(descP->writeMtx); MUNLOCK(descP->readMtx); @@ -4726,7 +4781,8 @@ ERL_NIF_TERM nif_connect(ErlNifEnv* env, #if !defined(__WIN32__) static ERL_NIF_TERM nconnect(ErlNifEnv* env, - ESockDescriptor* descP) + ESockDescriptor* descP, + ERL_NIF_TERM sockRef) { ERL_NIF_TERM res, ref; int code, sres, save_errno = 0; @@ -4771,7 +4827,8 @@ ERL_NIF_TERM nconnect(ErlNifEnv* env, (save_errno == EINPROGRESS))) { /* Unix & OSE!! */ ref = MKREF(env); descP->state = SOCKET_STATE_CONNECTING; - if ((sres = esock_select_write(env, descP->sock, descP, NULL, ref)) < 0) { + if ((sres = esock_select_write(env, descP->sock, descP, NULL, + sockRef, ref)) < 0) { res = esock_make_error(env, MKT2(env, esock_atom_select_failed, @@ -4801,6 +4858,9 @@ ERL_NIF_TERM nconnect(ErlNifEnv* env, * * Description: * Make socket ready for input and output. + * This function is called if we where made to wait when we called the + * nif_connect function (we made a select, and the select message has + * now been received). * * Arguments: * Socket (ref) - Points to the socket descriptor. @@ -5056,7 +5116,7 @@ ERL_NIF_TERM naccept(ErlNifEnv* env, switch (descP->state) { case SOCKET_STATE_LISTENING: - res = naccept_listening(env, descP, ref); + res = naccept_listening(env, descP, sockRef, ref); break; case SOCKET_STATE_ACCEPTING: @@ -5074,13 +5134,15 @@ ERL_NIF_TERM naccept(ErlNifEnv* env, /* *** naccept_listening *** - * We have no active acceptor (and no acceptors in queue). + * + * We have no active acceptor (and therefor no acceptors in queue). */ #if !defined(__WIN32__) static ERL_NIF_TERM naccept_listening(ErlNifEnv* env, ESockDescriptor* descP, - ERL_NIF_TERM ref) + ERL_NIF_TERM sockRef, + ERL_NIF_TERM accRef) { ESockAddress remote; unsigned int n; @@ -5106,7 +5168,8 @@ ERL_NIF_TERM naccept_listening(ErlNifEnv* env, ("SOCKET", "naccept_listening -> accept failed (%d)\r\n", save_errno) ); - res = naccept_listening_error(env, descP, ref, caller, save_errno); + res = naccept_listening_error(env, descP, sockRef, accRef, + caller, save_errno); } else { @@ -5125,6 +5188,7 @@ ERL_NIF_TERM naccept_listening(ErlNifEnv* env, /* *** naccept_listening_error *** + * * The accept call resultet in an error - handle it. * There are only two cases: * 1) BLOCK => Attempt a "retry" @@ -5133,7 +5197,8 @@ ERL_NIF_TERM naccept_listening(ErlNifEnv* env, static ERL_NIF_TERM naccept_listening_error(ErlNifEnv* env, ESockDescriptor* descP, - ERL_NIF_TERM ref, + ERL_NIF_TERM sockRef, + ERL_NIF_TERM accRef, ErlNifPid caller, int save_errno) { @@ -5142,21 +5207,26 @@ ERL_NIF_TERM naccept_listening_error(ErlNifEnv* env, if (save_errno == ERRNO_BLOCK) { /* *** Try again later *** */ - SSDBG( descP, ("SOCKET", "naccept_listening_error -> would block\r\n") ); + + SSDBG( descP, + ("SOCKET", "naccept_listening_error -> would block\r\n") ); descP->currentAcceptor.pid = caller; if (MONP("naccept_listening -> current acceptor", env, descP, &descP->currentAcceptor.pid, - &descP->currentAcceptor.mon) != 0) - return esock_make_error(env, atom_exmon); - - descP->currentAcceptor.ref = enif_make_copy(descP->env, ref); - descP->currentAcceptorP = &descP->currentAcceptor; - - res = naccept_busy_retry(env, descP, ref, NULL, SOCKET_STATE_ACCEPTING); - - + &descP->currentAcceptor.mon) != 0) { + enif_set_pid_undefined(&descP->currentAcceptor.pid); + res = esock_make_error(env, atom_exmon); + } else { + descP->currentAcceptor.env = esock_alloc_env("current acceptor"); + descP->currentAcceptor.ref = CP_TERM(descP->currentAcceptor.env, + accRef); + descP->currentAcceptorP = &descP->currentAcceptor; + res = naccept_busy_retry(env, descP, + sockRef, descP->currentAcceptor.ref, + NULL, SOCKET_STATE_ACCEPTING); + } } else { SSDBG( descP, ("SOCKET", @@ -5169,6 +5239,7 @@ ERL_NIF_TERM naccept_listening_error(ErlNifEnv* env, /* *** naccept_listening_accept *** + * * The accept call was successful (accepted) - handle the new connection. */ static @@ -5189,6 +5260,7 @@ ERL_NIF_TERM naccept_listening_accept(ErlNifEnv* env, /* *** naccept_accepting *** + * * We have an active acceptor and possibly acceptors waiting in queue. * If the pid of the calling process is not the pid of the "current process", * push the requester onto the (acceptor) queue. @@ -5298,14 +5370,6 @@ ERL_NIF_TERM naccept_accepting_current_accept(ErlNifEnv* env, if (naccept_accepted(env, descP, accSock, descP->currentAcceptor.pid, remote, &res)) { - /* We should really go through the queue until we succeed to activate - * a waiting acceptor. For now we just pop once and hope for the best... - * This will leave any remaining acceptors *hanging*... - * - * We need a "activate-next" function. - * - */ - if (!activate_next_acceptor(env, descP, sockRef)) { SSDBG( descP, @@ -5318,7 +5382,11 @@ ERL_NIF_TERM naccept_accepting_current_accept(ErlNifEnv* env, descP->currentAcceptorP = NULL; descP->currentAcceptor.ref = esock_atom_undefined; enif_set_pid_undefined(&descP->currentAcceptor.pid); - esock_monitor_init(&descP->currentAcceptor.mon); + esock_free_env("naccept_accepting_current_accept - " + "current-accept-env", + descP->currentAcceptor.env); + descP->currentAcceptor.env = NULL; + MON_INIT(&descP->currentAcceptor.mon); } } @@ -5354,7 +5422,8 @@ ERL_NIF_TERM naccept_accepting_current_error(ErlNifEnv* env, "naccept_accepting_current_error -> " "would block: try again\r\n") ); - res = naccept_busy_retry(env, descP, opRef, &descP->currentAcceptor.pid, + res = naccept_busy_retry(env, descP, sockRef, opRef, + &descP->currentAcceptor.pid, /* No state change */ descP->state); @@ -5367,7 +5436,8 @@ ERL_NIF_TERM naccept_accepting_current_error(ErlNifEnv* env, SSDBG( descP, ("SOCKET", "naccept_accepting_current_error -> abort %T\r\n", req.pid) ); - esock_send_abort_msg(env, sockRef, req.ref, reason, &req.pid); + esock_send_abort_msg(env, sockRef, req.ref, req.env, + reason, &req.pid); DEMONP("naccept_accepting_current_error -> pop'ed writer", env, descP, &req.mon); } @@ -5403,25 +5473,28 @@ ERL_NIF_TERM naccept_accepting_other(ErlNifEnv* env, /* *** naccept_busy_retry *** + * * Perform a retry select. If successful, set nextState. */ #if !defined(__WIN32__) static ERL_NIF_TERM naccept_busy_retry(ErlNifEnv* env, ESockDescriptor* descP, - ERL_NIF_TERM ref, + ERL_NIF_TERM sockRef, + ERL_NIF_TERM accRef, // Not needed ErlNifPid* pid, unsigned int nextState) { int sres; ERL_NIF_TERM res, reason; - if ((sres = esock_select_read(env, descP->sock, descP, pid, ref)) < 0) { + if ((sres = esock_select_read(env, descP->sock, descP, pid, + sockRef, accRef)) < 0) { reason = MKT2(env, esock_atom_select_failed, MKI(env, sres)); res = esock_make_error(env, reason); } else { descP->state = nextState; - res = esock_make_error(env, esock_atom_eagain); + res = esock_make_error(env, esock_atom_eagain); // OK!! } return res; @@ -5430,6 +5503,7 @@ ERL_NIF_TERM naccept_busy_retry(ErlNifEnv* env, /* *** naccept_accepted *** + * * Generic function handling a successful accept. */ static @@ -5469,7 +5543,7 @@ BOOLEAN_T naccept_accepted(ErlNifEnv* env, accDescP->rBufSz = descP->rBufSz; // Inherit buffer size accDescP->rNum = descP->rNum; // Inherit buffer uses accDescP->rNumCnt = 0; - accDescP->rCtrlSz = descP->rCtrlSz; // Inherit buffer siez + accDescP->rCtrlSz = descP->rCtrlSz; // Inherit buffer size accDescP->wCtrlSz = descP->wCtrlSz; // Inherit buffer size accRef = enif_make_resource(env, accDescP); @@ -5481,6 +5555,7 @@ BOOLEAN_T naccept_accepted(ErlNifEnv* env, &accDescP->ctrlPid, &accDescP->ctrlMon) != 0) { sock_close(accSock); + enif_set_pid_undefined(&descP->ctrlPid); *result = esock_make_error(env, atom_exmon); return FALSE; } @@ -6668,8 +6743,8 @@ BOOLEAN_T nclose_check(ErlNifEnv* env, } else { - /* Monitor the caller, since we should complete this operation even if - * the caller dies (for whatever reason). + /* Monitor the caller, since we should complete this + * operation even if the caller dies (for whatever reason). * * <KOLLA> * @@ -6719,8 +6794,9 @@ ERL_NIF_TERM nclose_do(ErlNifEnv* env, int sres; ERL_NIF_TERM reply, reason; - descP->closeEnv = enif_alloc_env(); + descP->closeEnv = esock_alloc_env("nclose-do - close-env"); descP->closeRef = MKREF(descP->closeEnv); + sres = esock_select_stop(env, descP->sock, descP); if (sres & ERL_NIF_SELECT_STOP_CALLED) { @@ -13883,7 +13959,7 @@ ERL_NIF_TERM send_check_result(ErlNifEnv* env, SSDBG( descP, ("SOCKET", "send_check_result -> try again\r\n") ); - res = send_check_retry(env, descP, written, sendRef); + res = send_check_retry(env, descP, written, sockRef, sendRef); } @@ -13896,7 +13972,7 @@ ERL_NIF_TERM send_check_result(ErlNifEnv* env, "not entire package written (%d of %d)\r\n", written, dataSize) ); - res = send_check_retry(env, descP, written, sendRef); + res = send_check_retry(env, descP, written, sockRef, sendRef); } @@ -13920,18 +13996,23 @@ ERL_NIF_TERM send_check_ok(ErlNifEnv* env, cnt_inc(&descP->writePkgCnt, 1); cnt_inc(&descP->writeByteCnt, written); - if (descP->currentWriterP != NULL) + if (descP->currentWriterP != NULL) { DEMONP("send_check_ok -> current writer", env, descP, &descP->currentWriter.mon); + esock_free_env("send_check_ok", descP->currentWriter.env); + } SSDBG( descP, ("SOCKET", "send_check_ok -> " "everything written (%d,%d) - done\r\n", dataSize, written) ); - /* Ok, this write is done maybe activate the next (if any) */ + /* + * Ok, this write is done maybe activate the next (if any) + */ if (!activate_next_writer(env, descP, sockRef)) { descP->currentWriterP = NULL; + descP->currentWriter.env = NULL; descP->currentWriter.ref = esock_atom_undefined; enif_set_pid_undefined(&descP->currentWriter.pid); esock_monitor_init(&descP->currentWriter.mon); @@ -13970,7 +14051,8 @@ ERL_NIF_TERM send_check_fail(ErlNifEnv* env, while (writer_pop(env, descP, &req)) { SSDBG( descP, ("SOCKET", "send_check_fail -> abort %T\r\n", req.pid) ); - esock_send_abort_msg(env, sockRef, req.ref, reason, &req.pid); + esock_send_abort_msg(env, sockRef, req.ref, req.env, + reason, &req.pid); DEMONP("send_check_fail -> pop'ed writer", env, descP, &req.mon); } } @@ -13992,6 +14074,7 @@ static ERL_NIF_TERM send_check_retry(ErlNifEnv* env, ESockDescriptor* descP, ssize_t written, + ERL_NIF_TERM sockRef, ERL_NIF_TERM sendRef) { int sres; @@ -14007,17 +14090,19 @@ ERL_NIF_TERM send_check_retry(ErlNifEnv* env, if (MONP("send_check_retry -> current writer", env, descP, &descP->currentWriter.pid, - &descP->currentWriter.mon) != 0) + &descP->currentWriter.mon) != 0) { + enif_set_pid_undefined(&descP->currentWriter.pid); return esock_make_error(env, atom_exmon); - - descP->currentWriter.ref = enif_make_copy(descP->env, sendRef); - descP->currentWriterP = &descP->currentWriter; - + } else { + descP->currentWriter.env = esock_alloc_env("current-writer"); + descP->currentWriter.ref = CP_TERM(descP->currentWriter.env, sendRef); + descP->currentWriterP = &descP->currentWriter; + } } cnt_inc(&descP->writeWaits, 1); - sres = esock_select_write(env, descP->sock, descP, NULL, sendRef); + sres = esock_select_write(env, descP->sock, descP, NULL, sockRef, sendRef); if (written >= 0) { @@ -14062,6 +14147,9 @@ ERL_NIF_TERM send_check_retry(ErlNifEnv* env, * Checks if we have a current reader and if that is us. If not, * then we must be made to wait for our turn. This is done by pushing * us unto the reader queue. + * Note that we do *not* actually initiate the currentReader structure + * here, since we do not actually know yet if we need to! We do that in + * the [recv|recvfrom|recvmsg]_check_result function. */ static BOOLEAN_T recv_check_reader(ErlNifEnv* env, @@ -14132,10 +14220,26 @@ char* recv_init_current_reader(ErlNifEnv* env, env, descP, &descP->currentReader.pid, &descP->currentReader.mon) != 0) { + enif_set_pid_undefined(&descP->currentReader.pid); return str_exmon; + } else { + + descP->currentReader.env = esock_alloc_env("current-reader"); + descP->currentReader.ref = CP_TERM(descP->currentReader.env, + recvRef); + descP->currentReaderP = &descP->currentReader; } - descP->currentReader.ref = enif_make_copy(descP->env, recvRef); - descP->currentReaderP = &descP->currentReader; + } else { + + /* + * This is a retry: + * We have done, for instance, recv(Sock, X), but only received Y < X. + * We then call recv again with size = X-Y. So, we then get a new ref. + * + * Make use of the existing environment + */ + + descP->currentReader.ref = CP_TERM(descP->currentReader.env, recvRef); } return NULL; @@ -14159,9 +14263,13 @@ ERL_NIF_TERM recv_update_current_reader(ErlNifEnv* env, if (descP->currentReaderP != NULL) { - DEMONP("recv_update_current_reader -> current reader", + DEMONP("recv_update_current_reader", env, descP, &descP->currentReader.mon); - + + esock_free_env("recv_update_current_reader - current-read-env", + descP->currentReader.env); + descP->currentReader.env = NULL; + if (!activate_next_reader(env, descP, sockRef)) { SSDBG( descP, @@ -14205,7 +14313,8 @@ void recv_error_current_reader(ErlNifEnv* env, SSDBG( descP, ("SOCKET", "recv_error_current_reader -> abort %T\r\n", req.pid) ); - esock_send_abort_msg(env, sockRef, req.ref, reason, &req.pid); + esock_send_abort_msg(env, sockRef, req.ref, req.env, + reason, &req.pid); DEMONP("recv_error_current_reader -> pop'ed reader", env, descP, &req.mon); } @@ -14317,7 +14426,8 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env, * This function is called if we filled the allocated buffer. * But are we done yet? * - * toRead = 0 means: Give me everything you have + * toRead = 0 means: Give me everything you have => maybe + * toRead > 0 means: Yes */ static ERL_NIF_TERM recv_check_full(ErlNifEnv* env, @@ -14442,7 +14552,7 @@ ERL_NIF_TERM recv_check_full_maybe_done(ErlNifEnv* env, /* *** recv_check_full_done *** * - * A successful and full (that is, the buffer was filled) recv. + * A successful recv and we filled the buffer. */ static ERL_NIF_TERM recv_check_full_done(ErlNifEnv* env, @@ -14498,7 +14608,7 @@ ERL_NIF_TERM recv_check_fail(ErlNifEnv* env, SSDBG( descP, ("SOCKET", "recv_check_fail -> eagain\r\n") ); - res = recv_check_retry(env, descP, recvRef); + res = recv_check_retry(env, descP, sockRef, recvRef); } else { @@ -14565,11 +14675,12 @@ ERL_NIF_TERM recv_check_fail_closed(ErlNifEnv* env, static ERL_NIF_TERM recv_check_retry(ErlNifEnv* env, ESockDescriptor* descP, + ERL_NIF_TERM sockRef, ERL_NIF_TERM recvRef) { int sres; char* xres; - ERL_NIF_TERM res; + ERL_NIF_TERM reason; descP->rNumCnt = 0; if ((xres = recv_init_current_reader(env, descP, recvRef)) != NULL) @@ -14577,16 +14688,17 @@ ERL_NIF_TERM recv_check_retry(ErlNifEnv* env, SSDBG( descP, ("SOCKET", "recv_check_retry -> SELECT for more\r\n") ); - if ((sres = esock_select_read(env, descP->sock, descP, - NULL, recvRef)) < 0) { - res = esock_make_error(env, - MKT2(env, - esock_atom_select_failed, MKI(env, sres))); + if ((sres = esock_select_read(env, descP->sock, descP, NULL, + sockRef, descP->currentReader.ref)) < 0) { + /* Ouch + * Now what? We have copied ref into *its own* environment! + */ + reason = MKT2(env, esock_atom_select_failed, MKI(env, sres)); } else { - res = esock_make_error(env, esock_atom_eagain); + reason = esock_atom_eagain; } - return res; + return esock_make_error(env, reason); } @@ -14644,7 +14756,7 @@ ERL_NIF_TERM recv_check_partial(ErlNifEnv* env, SSDBG( descP, ("SOCKET", "recv_check_partial -> [%d] " "only part of message - expect more\r\n", toRead) ); - res = recv_check_partial_part(env, descP, read, bufP, recvRef); + res = recv_check_partial_part(env, descP, read, bufP, sockRef, recvRef); } return res; @@ -14695,6 +14807,7 @@ ERL_NIF_TERM recv_check_partial_part(ErlNifEnv* env, ESockDescriptor* descP, int read, ErlNifBinary* bufP, + ERL_NIF_TERM sockRef, ERL_NIF_TERM recvRef) { ERL_NIF_TERM res, reason, data; @@ -14713,7 +14826,8 @@ ERL_NIF_TERM recv_check_partial_part(ErlNifEnv* env, /* SELECT for more data */ - sres = esock_select_read(env, descP->sock, descP, NULL, recvRef); + sres = esock_select_read(env, descP->sock, descP, NULL, + sockRef, descP->currentReader.ref); if (sres < 0) { /* Result: {error, Reason} * Reason: {select_failed, sres, data} @@ -16591,10 +16705,14 @@ ESockDescriptor* alloc_descriptor(SOCKET sock, HANDLE event) char buf[64]; /* Buffer used for building the mutex name */ // This needs to be released when the socket is closed! - descP->env = enif_alloc_env(); + // descP->env = enif_alloc_env(); sprintf(buf, "esock[w,%d]", sock); descP->writeMtx = MCREATE(buf); + enif_set_pid_undefined(&descP->currentWriter.pid); + MON_INIT(&descP->currentWriter.mon); + descP->currentWriter.env = NULL; + descP->currentWriter.ref = esock_atom_undefined; descP->currentWriterP = NULL; // currentWriter not used descP->writersQ.first = NULL; descP->writersQ.last = NULL; @@ -16607,6 +16725,10 @@ ESockDescriptor* alloc_descriptor(SOCKET sock, HANDLE event) sprintf(buf, "esock[r,%d]", sock); descP->readMtx = MCREATE(buf); + enif_set_pid_undefined(&descP->currentReader.pid); + MON_INIT(&descP->currentReader.mon); + descP->currentReader.env = NULL; + descP->currentReader.ref = esock_atom_undefined; descP->currentReaderP = NULL; // currentReader not used descP->readersQ.first = NULL; descP->readersQ.last = NULL; @@ -16618,6 +16740,10 @@ ESockDescriptor* alloc_descriptor(SOCKET sock, HANDLE event) sprintf(buf, "esock[acc,%d]", sock); descP->accMtx = MCREATE(buf); + enif_set_pid_undefined(&descP->currentAcceptor.pid); + MON_INIT(&descP->currentAcceptor.mon); + descP->currentAcceptor.env = NULL; + descP->currentAcceptor.ref = esock_atom_undefined; descP->currentAcceptorP = NULL; // currentAcceptor not used descP->acceptorsQ.first = NULL; descP->acceptorsQ.last = NULL; @@ -16626,6 +16752,8 @@ ESockDescriptor* alloc_descriptor(SOCKET sock, HANDLE event) descP->closeMtx = MCREATE(buf); descP->closeEnv = NULL; descP->closeRef = esock_atom_undefined; + enif_set_pid_undefined(&descP->closerPid); + MON_INIT(&descP->closerMon); sprintf(buf, "esock[cfg,%d]", sock); descP->cfgMtx = MCREATE(buf); @@ -16640,11 +16768,9 @@ ESockDescriptor* alloc_descriptor(SOCKET sock, HANDLE event) descP->sock = sock; descP->event = event; - MON_INIT(&descP->currentWriter.mon); - MON_INIT(&descP->currentReader.mon); - MON_INIT(&descP->currentAcceptor.mon); + enif_set_pid_undefined(&descP->ctrlPid); MON_INIT(&descP->ctrlMon); - MON_INIT(&descP->closerMon); + } return descP; @@ -17166,21 +17292,23 @@ size_t my_strnlen(const char *s, size_t maxlen) */ static char* esock_send_close_msg(ErlNifEnv* env, - ESockDescriptor* descP) + ESockDescriptor* descP, + ErlNifPid* pid) { - ERL_NIF_TERM sr = ((descP->closeEnv != NULL) ? - enif_make_copy(descP->closeEnv, sockRef) : - sockRef); - char* res = esock_send_socket_msg(env, - sr, - esock_atom_close, - descP->closeRef, - &descP->closerPid, - descP->closeEnv); - - descP->closeEnv = NULL; - - return res; + ERL_NIF_TERM sockRef, msg; + ErlNifEnv* menv; + + if (descP->closeEnv != NULL) { + sockRef = enif_make_resource(descP->closeEnv, descP); + msg = mk_close_msg(descP->closeEnv, sockRef, descP->closeRef); + menv = descP->closeEnv; + } else { + sockRef = enif_make_resource(env, descP); + msg = mk_close_msg(env, sockRef, descP->closeRef); + menv = NULL; // This has the effect that the message will be copied + } + + return esock_send_msg(env, pid, msg, menv); } @@ -17195,93 +17323,126 @@ char* esock_send_close_msg(ErlNifEnv* env, static char* esock_send_abort_msg(ErlNifEnv* env, ERL_NIF_TERM sockRef, - ERL_NIF_TERM recvRef, + ERL_NIF_TERM opRef, + ErlNifEnv* msgEnv, ERL_NIF_TERM reason, ErlNifPid* pid) { - ErlNifEnv* msg_env = enif_alloc_env(); - ERL_NIF_TERM info = MKT2(msg_env, - enif_make_copy(msg_env, recvRef), - enif_make_copy(msg_env, reason)); + ERL_NIF_TERM msg = mk_abort_msg(msgEnv, + /* sockRef not in msgEnv so copy */ + CP_TERM(msgEnv, sockRef), + opRef, reason); + + return esock_send_msg(env, pid, msg, msgEnv); +} + + +/* Send a message to the specified process. + */ +static +char* esock_send_msg(ErlNifEnv* env, + ErlNifPid* pid, + ERL_NIF_TERM msg, + ErlNifEnv* msgEnv) +{ + int res = enif_send(env, pid, msgEnv, msg); + if (msgEnv) + esock_free_env("esock_msg_send - msg-env", msgEnv); - return esock_send_socket_msg(env, sockRef, esock_atom_abort, info, pid, - msg_env); + if (!res) + return str_exsend; + else + return NULL; } +#endif // #if defined(__WIN32__) + -/* *** esock_send_socket_msg *** +/* *** mk_abort_msg *** * - * This function sends a general purpose socket message to an erlang - * process. A general 'socket' message has the ("erlang") form: + * Create the abort message, which has the following form: * - * {'$socket', Socket, Tag, Info} + * {'$socket', Socket, abort, {OpRef, Reason}} * - * Where + * This message is for processes that are waiting in the + * erlang API functions for a select (or this) message. + */ +static +ERL_NIF_TERM mk_abort_msg(ErlNifEnv* env, + ERL_NIF_TERM sockRef, + ERL_NIF_TERM opRef, + ERL_NIF_TERM reason) +{ + ERL_NIF_TERM info = MKT2(env, opRef, reason); + + return mk_socket_msg(env, sockRef, esock_atom_abort, info); +} + + +/* *** mk_close_msg *** * - * Socket: #socket{ref = SockRef} ({socket, SockRef}) - * SockRef: reference() - * Tag: atom() - * Info: term() + * Construct a close (socket) message. It has the form: + * + * {'$socket', Socket, close, closeRef} * */ - static -char* esock_send_socket_msg(ErlNifEnv* env, - ERL_NIF_TERM sockRef, - ERL_NIF_TERM tag, - ERL_NIF_TERM info, - ErlNifPid* pid, - ErlNifEnv* msgEnv) +ERL_NIF_TERM mk_close_msg(ErlNifEnv* env, + ERL_NIF_TERM sockRef, + ERL_NIF_TERM closeRef) { - ErlNifEnv* menv; - ERL_NIF_TERM msg, msock, mtag, minfo; + return mk_socket_msg(env, sockRef, esock_atom_close, closeRef); +} - if (msgEnv == NULL) { - menv = enif_alloc_env(); - msock = make_socket_record(menv, enif_make_copy(menv, sockRef)); - mtag = enif_make_copy(menv, tag); - minfo = enif_make_copy(menv, info); - } else { - menv = msgEnv; - msock = make_socket_record(menv, sockRef); - mtag = tag; - minfo = info; - } - msg = MKT4(menv, esock_atom_socket_tag, msock, mtag, minfo); - return esock_send_msg(env, msg, pid, menv); +/* *** mk_select_msg *** + * + * Construct a select (socket) message. It has the form: + * + * {'$socket', Socket, select, selectRef} + * + */ +static +ERL_NIF_TERM mk_select_msg(ErlNifEnv* env, + ERL_NIF_TERM sockRef, + ERL_NIF_TERM selectRef) +{ + return mk_socket_msg(env, sockRef, atom_select, selectRef); } -/* Send a message to the specified process. +/* *** mk_socket_msg *** + * + * Construct the socket message: + * + * {'$socket', Socket, Tag, Info} + * + * Socket :: socket() (#socket{}) + * Tag :: atom() + * Info :: term() + * */ static -char* esock_send_msg(ErlNifEnv* env, - ERL_NIF_TERM msg, - ErlNifPid* pid, - ErlNifEnv* msg_env) +ERL_NIF_TERM mk_socket_msg(ErlNifEnv* env, + ERL_NIF_TERM sockRef, + ERL_NIF_TERM tag, + ERL_NIF_TERM info) { - int res = enif_send(env, pid, msg_env, msg); - if (msg_env) - enif_free_env(msg_env); + ERL_NIF_TERM socket = mk_socket(env, sockRef); - if (!res) - return str_exsend; - else - return NULL; + return MKT4(env, esock_atom_socket_tag, socket, tag, info); } -#endif // #if defined(__WIN32__) -/* *** make_socket_record *** +/* *** mk_socket *** * * Simple utility function that construct the socket resord: * * #socket{ref = SockRef} => {socket, SockRef :: reference()} */ static -ERL_NIF_TERM make_socket_record(ErlNifEnv* env, - ERL_NIF_TERM sockRef) +ERL_NIF_TERM mk_socket(ErlNifEnv* env, + ERL_NIF_TERM sockRef) { return MKT2(env, esock_atom_socket, sockRef); } @@ -17293,25 +17454,53 @@ ERL_NIF_TERM make_socket_record(ErlNifEnv* env, * ---------------------------------------------------------------------- */ +/* *** esock_select_read *** + * + * Perform a read select. When the select is triggered, a 'select' + * message (see mk_select_msg) will be sent. + * + * There are two ways to handle the select message: + * 1) Create "your own" environment and create the message using it + * and then pass it on to the select function. + * 2) Or, to create the message using any available environment, + * and then pass a NULL pointer to the select function. + * This will have the effect that the select function will + * create its own environment and then copy the message to it. + * We choose the second alternative. + */ static int esock_select_read(ErlNifEnv* env, - ErlNifEvent event, - void* obj, - const ErlNifPid* pid, - ERL_NIF_TERM ref) + ErlNifEvent event, // The file descriptor + void* obj, // The socket descriptor object + const ErlNifPid* pid, // Destination + ERL_NIF_TERM sockRef, // Socket + ERL_NIF_TERM selectRef) // "ID" of the operation { - return enif_select(env, event, (ERL_NIF_SELECT_READ), obj, pid, ref); + ERL_NIF_TERM selectMsg = mk_select_msg(env, sockRef, selectRef); + + return enif_select_read(env, event, obj, pid, selectMsg, NULL); + } +/* *** esock_select_write *** + * + * Perform a write select. When the select is triggered, a 'select' + * message (see mk_select_msg) will be sent. + * The sockRef is copied to the msgEnv when the socket message is created, + * so no need to do that here, but the selectRef needs to be copied. + */ static int esock_select_write(ErlNifEnv* env, - ErlNifEvent event, - void* obj, - const ErlNifPid* pid, - ERL_NIF_TERM ref) + ErlNifEvent event, // The file descriptor + void* obj, // The socket descriptor + const ErlNifPid* pid, // Destination + ERL_NIF_TERM sockRef, // Socket + ERL_NIF_TERM selectRef) // "ID" of the operation { - return enif_select(env, event, (ERL_NIF_SELECT_WRITE), obj, pid, ref); + ERL_NIF_TERM selectMsg = mk_select_msg(env, sockRef, selectRef); + + return enif_select_write(env, event, obj, pid, selectMsg, NULL); } @@ -17354,84 +17543,78 @@ int esock_select_cancel(ErlNifEnv* env, ACTIVATE_NEXT_FUNC_DECL(writer, write, currentWriter, writersQ) \ ACTIVATE_NEXT_FUNC_DECL(reader, read, currentReader, readersQ) -#define ACTIVATE_NEXT_FUNC_DECL(F, R, Q) \ - static \ - BOOLEAN_T activate_next_##F(ErlNifEnv* env, \ - ESockDescriptor* descP, \ - ERL_NIF_TERM sockRef) \ - { \ - return activate_next(env, descP, \ - &descP->R, &descP->Q, \ - sockRef); \ +#define ACTIVATE_NEXT_FUNC_DECL(F, S, R, Q) \ + static \ + BOOLEAN_T activate_next_##F(ErlNifEnv* env, \ + ESockDescriptor* descP, \ + ERL_NIF_TERM sockRef) \ + { \ + BOOLEAN_T popped, activated; \ + int sres; \ + ERL_NIF_TERM reason; \ + ESockRequestor* reqP = &descP->R; \ + ESockRequestQueue* q = &descP->Q; \ + \ + popped = FALSE; \ + do { \ + \ + if (requestor_pop(q, reqP)) { \ + \ + /* There was another one */ \ + \ + SSDBG( descP, \ + ("SOCKET", \ + "activate_next_" #F " -> new (active) requestor: " \ + "\r\n pid: %T" \ + "\r\n ref: %T" \ + "\r\n", reqP->pid, reqP->ref) ); \ + \ + if ((sres = esock_select_##S(env, descP->sock, descP, \ + &reqP->pid, sockRef, \ + reqP->ref)) < 0) { \ + \ + /* We need to inform this process, reqP->pid, */ \ + /* that we failed to select, so we don't leave */ \ + /* it hanging. */ \ + /* => send abort */ \ + \ + reason = MKT2(env, \ + esock_atom_select_failed, \ + MKI(env, sres)); \ + esock_send_abort_msg(env, sockRef, \ + reqP->ref, reqP->env, \ + reason, &reqP->pid); \ + \ + } else { \ + \ + /* Success: New requestor selected */ \ + popped = TRUE; \ + activated = FALSE; \ + \ + } \ + \ + } else { \ + \ + SSDBG( descP, \ + ("SOCKET", \ + "activate_next_" #F " -> no more requestors\r\n") ); \ + \ + popped = TRUE; \ + activated = FALSE; \ + } \ + \ + } while (!popped); \ + \ + SSDBG( descP, \ + ("SOCKET", "activate_next_" #F " -> " \ + "done with %s\r\n", B2S(activated)) ); \ + \ + return activated; \ } ACTIVATE_NEXT_FUNCS #undef ACTIVATE_NEXT_FUNC_DECL -/* *** activate_next *** - * - * This functions pops the requestor queue and then selects until it - * manages to successfully activate a new requestor or the queue is empty. - * Return value indicates if a new requestor was activated or not. - */ - -static -BOOLEAN_T activate_next(ErlNifEnv* env, - ESockDescriptor* descP, - ESockRequestor* reqP, - ESockRequestQueue* q, - ERL_NIF_TERM sockRef) -{ - BOOLEAN_T popped, activated; - int sres; - - popped = FALSE; - do { - - if (requestor_pop(q, reqP)) { - - /* There was another one */ - - SSDBG( descP, - ("SOCKET", "activate_next -> new (active) requestor: " - "\r\n pid: %T" - "\r\n ref: %T" - "\r\n", reqP->pid, reqP->ref) ); - - if ((sres = esock_select_read(env, descP->sock, descP, - &reqP->pid, reqP->ref)) < 0) { - /* We need to inform this process, reqP->pid, that we - * failed to select, so we don't leave it hanging. - * => send abort - */ - - esock_send_abort_msg(env, sockRef, reqP->ref, sres, &reqP->pid); - - } else { - - /* Success: New requestor selected */ - popped = TRUE; - activated = FALSE; - - } - - } else { - - SSDBG( descP, - ("SOCKET", "send_activate_next -> no more requestors\r\n") ); - - popped = TRUE; - activated = FALSE; - } - - } while (!popped); - - SSDBG( descP, - ("SOCKET", "activate_next -> " - "done with %s\r\n", B2S(activated)) ); - - return activated; -} @@ -17497,13 +17680,13 @@ REQ_SEARCH4PID_FUNCS ESockRequestor* reqP = &e->data; \ \ reqP->pid = pid; \ - reqP->ref = enif_make_copy(descP->env, ref); \ - \ if (MONP("reader_push -> " #F " request", \ env, descP, &pid, &reqP->mon) != 0) { \ FREE(reqP); \ return esock_make_error(env, atom_exmon); \ } \ + reqP->env = esock_alloc_env(#F "_push"); \ + reqP->ref = enif_make_copy(reqP->env, ref); \ \ qpush(&descP->Q, e); \ \ @@ -17579,13 +17762,15 @@ BOOLEAN_T requestor_pop(ESockRequestQueue* q, if (e != NULL) { reqP->pid = e->data.pid; reqP->mon = e->data.mon; + reqP->env = e->data.env; reqP->ref = e->data.ref; FREE(e); return TRUE; } else { - /* (writers) Queue was empty */ + /* Queue was empty */ enif_set_pid_undefined(&reqP->pid); - // *reqP->mon = NULL; we have no null value for monitors + MON_INIT(&reqP->mon); + reqP->env = NULL; reqP->ref = esock_atom_undefined; // Just in case return FALSE; } @@ -17763,10 +17948,10 @@ int esock_monitor(const char* slogan, res = enif_monitor_process(env, descP, pid, &monP->mon); if (res != 0) { - monP->is_active = 0; + monP->isActive = FALSE; SSDBG( descP, ("SOCKET", "[%d] monitor failed: %d\r\n", descP->sock, res) ); } else { - monP->is_active = 1; + monP->isActive = TRUE; } return res; @@ -17781,7 +17966,7 @@ int esock_demonitor(const char* slogan, { int res; - if (!monP->is_active) + if (!monP->isActive) return 1; SSDBG( descP, ("SOCKET", "[%d] %s: try demonitor\r\n", descP->sock, slogan) ); @@ -17802,7 +17987,7 @@ int esock_demonitor(const char* slogan, static void esock_monitor_init(ESockMonitor* monP) { - monP->is_active = 0; + monP->isActive = FALSE; } #endif // if !defined(__WIN32__) @@ -17824,10 +18009,6 @@ void socket_dtor(ErlNifEnv* env, void* obj) #if !defined(__WIN32__) ESockDescriptor* descP = (ESockDescriptor*) obj; - enif_clear_env(descP->env); - enif_free_env(descP->env); - descP->env = NULL; - MDESTROY(descP->writeMtx); MDESTROY(descP->readMtx); MDESTROY(descP->accMtx); @@ -17920,37 +18101,19 @@ void socket_stop(ErlNifEnv* env, void* obj, int fd, int is_direct_call) */ if (descP->currentWriterP != NULL) { + /* We have a (current) writer and *may* therefor also have * writers waiting. */ - DEMONP("socket_stop -> current writer", - env, descP, &descP->currentWriter.mon); + socket_stop_handle_current(env, + "writer", + descP, sockRef, &descP->currentWriter); - SSDBG( descP, ("SOCKET", "socket_stop -> handle current writer\r\n") ); - if (COMPARE_PIDS(&descP->closerPid, &descP->currentWriter.pid) != 0) { - SSDBG( descP, ("SOCKET", "socket_stop -> " - "send abort message to current writer %T\r\n", - descP->currentWriter.pid) ); - if (esock_send_abort_msg(env, - sockRef, - descP->currentWriter.ref, - atom_closed, - &descP->currentWriter.pid) != NULL) { - /* Shall we really do this? - * This happens if the controlling process has been killed! - */ - esock_warning_msg("Failed sending abort (%T) message to " - "current writer %T\r\n", - descP->currentWriter.ref, - descP->currentWriter.pid); - } - } - /* And also deal with the waiting writers (in the same way) */ SSDBG( descP, ("SOCKET", "socket_stop -> handle waiting writer(s)\r\n") ); inform_waiting_procs(env, "writer", - descP, &descP->writersQ, TRUE, atom_closed); + descP, sockRef, &descP->writersQ, TRUE, atom_closed); } @@ -17967,38 +18130,14 @@ void socket_stop(ErlNifEnv* env, void* obj, int fd, int is_direct_call) * readers waiting. */ - DEMONP("socket_stop -> current reader", - env, descP, &descP->currentReader.mon); + socket_stop_handle_current(env, + "reader", + descP, sockRef, &descP->currentReader); - SSDBG( descP, ("SOCKET", "socket_stop -> handle current reader\r\n") ); - if (COMPARE_PIDS(&descP->closerPid, &descP->currentReader.pid) != 0) { - SSDBG( descP, ("SOCKET", "socket_stop -> " - "send abort message to current reader %T\r\n", - descP->currentReader.pid) ); - /* - esock_dbg_printf("SOCKET", "socket_stop -> " - "send abort message to current reader %T\r\n", - descP->currentReader.pid); - */ - if (esock_send_abort_msg(env, - sockRef, - descP->currentReader.ref, - atom_closed, - &descP->currentReader.pid) != NULL) { - /* Shall we really do this? - * This happens if the controlling process has been killed! - */ - esock_warning_msg("Failed sending abort (%T) message to " - "current reader %T\r\n", - descP->currentReader.ref, - descP->currentReader.pid); - } - } - /* And also deal with the waiting readers (in the same way) */ SSDBG( descP, ("SOCKET", "socket_stop -> handle waiting reader(s)\r\n") ); inform_waiting_procs(env, "reader", - descP, &descP->readersQ, TRUE, atom_closed); + descP, sockRef, &descP->readersQ, TRUE, atom_closed); } @@ -18011,37 +18150,19 @@ void socket_stop(ErlNifEnv* env, void* obj, int fd, int is_direct_call) */ if (descP->currentAcceptorP != NULL) { + /* We have a (current) acceptor and *may* therefor also have * acceptors waiting. */ - - DEMONP("socket_stop -> current acceptor", - env, descP, &descP->currentAcceptor.mon); - - SSDBG( descP, ("SOCKET", "socket_stop -> handle current acceptor\r\n") ); - if (COMPARE_PIDS(&descP->closerPid, &descP->currentAcceptor.pid) != 0) { - SSDBG( descP, ("SOCKET", "socket_stop -> " - "send abort message to current acceptor %T\r\n", - descP->currentAcceptor.pid) ); - if (esock_send_abort_msg(env, - sockRef, - descP->currentAcceptor.ref, - atom_closed, - &descP->currentAcceptor.pid) != NULL) { - /* Shall we really do this? - * This happens if the controlling process has been killed! - */ - esock_warning_msg("Failed sending abort (%T) message to " - "current acceptor %T\r\n", - descP->currentAcceptor.ref, - descP->currentAcceptor.pid); - } - } - + + socket_stop_handle_current(env, + "acceptor", + descP, sockRef, &descP->currentAcceptor); + /* And also deal with the waiting acceptors (in the same way) */ SSDBG( descP, ("SOCKET", "socket_stop -> handle waiting acceptor(s)\r\n") ); inform_waiting_procs(env, "acceptor", - descP, &descP->acceptorsQ, TRUE, atom_closed); + descP, sockRef, &descP->acceptorsQ, TRUE, atom_closed); } @@ -18054,15 +18175,15 @@ void socket_stop(ErlNifEnv* env, void* obj, int fd, int is_direct_call) */ if (descP->sock != INVALID_SOCKET) { - + if (descP->closeLocal) { if (!is_direct_call) { /* +++ send close message to the waiting process +++ */ - esock_send_close_msg(env, descP, sockRef); - + esock_send_close_msg(env, descP, &descP->closerPid); + DEMONP("socket_stop -> closer", env, descP, &descP->closerMon); } else { @@ -18071,11 +18192,8 @@ void socket_stop(ErlNifEnv* env, void* obj, int fd, int is_direct_call) * since the message send takes care of it if scheduled. */ - if (descP->closeEnv != NULL) { - enif_clear_env(descP->closeEnv); - enif_free_env(descP->closeEnv); - descP->closeEnv = NULL; - } + if (descP->closeEnv != NULL) + esock_free_env("socket_stop - close-env", descP->closeEnv); } } @@ -18098,29 +18216,59 @@ void socket_stop(ErlNifEnv* env, void* obj, int fd, int is_direct_call) +/* *** socket_stop_handle_current *** + * + * Handle current requestor (reader, writer or acceptor) during + * socket stop. + */ +static +void socket_stop_handle_current(ErlNifEnv* env, + const char* role, + ESockDescriptor* descP, + ERL_NIF_TERM sockRef, + ESockRequestor* reqP) +{ + SSDBG( descP, ("SOCKET", "socket_stop -> handle current %s\r\n", role) ); + + DEMONP("socket_stop_handle_current", env, descP, &reqP->mon); + + if (COMPARE_PIDS(&descP->closerPid, &reqP->pid) != 0) { + + SSDBG( descP, ("SOCKET", "socket_stop_handle_current -> " + "send abort message to current %s %T\r\n", + role, reqP->pid) ); + + if (esock_send_abort_msg(env, sockRef, reqP->ref, reqP->env, + atom_closed, &reqP->pid) != NULL) { + + esock_warning_msg("Failed sending abort (%T) message to " + "current %s %T\r\n", + reqP->ref, role, reqP->pid); + } + } +} + + + /* This function traverse the queue and sends the specified * nif_abort message with the specified reason to each member, * and if the 'free' argument is TRUE, the queue will be emptied. */ #if !defined(__WIN32__) static void inform_waiting_procs(ErlNifEnv* env, - char* role, + const char* role, ESockDescriptor* descP, + ERL_NIF_TERM sockRef, ESockRequestQueue* q, BOOLEAN_T free, ERL_NIF_TERM reason) { ESockRequestQueueElement* currentP = q->first; ESockRequestQueueElement* nextP; - ERL_NIF_TERM sockRef = enif_make_resource(env, descP); - /* - esock_dbg_printf("SOCKET", "inform_waiting_procs -> entry with: " - "\r\n role: %s" - "\r\n free: %s" - "\r\n reason: %T" - "\r\n", role, B2S(free), reason); - */ + SSDBG( descP, + ("SOCKET", + "inform_waiting_procs -> handle waiting %s(s)\r\n", role) ); while (currentP != NULL) { @@ -18134,18 +18282,14 @@ static void inform_waiting_procs(ErlNifEnv* env, */ SSDBG( descP, - ("SOCKET", "inform_waiting_procs -> abort request %T (from %T)\r\n", + ("SOCKET", + "inform_waiting_procs -> abort request %T (from %T)\r\n", currentP->data.ref, currentP->data.pid) ); - /* - esock_dbg_printf("SOCKET", "inform_waiting_procs -> " - "try sending abort to %s %T " - "\r\n", role, currentP->data.pid); - */ - if (esock_send_abort_msg(env, sockRef, currentP->data.ref, + currentP->data.env, reason, ¤tP->data.pid) != NULL) { @@ -18211,7 +18355,7 @@ void socket_down(ErlNifEnv* env, descP->closeLocal = TRUE; descP->closerPid = *pid; MON_INIT(&descP->closerMon); - + sres = esock_select_stop(env, descP->sock, descP); if (sres & ERL_NIF_SELECT_STOP_CALLED) { diff --git a/erts/emulator/test/socket_SUITE.erl b/erts/emulator/test/socket_SUITE.erl index 2e3f40a350..2bd4163628 100644 --- a/erts/emulator/test/socket_SUITE.erl +++ b/erts/emulator/test/socket_SUITE.erl @@ -28,10 +28,14 @@ %% ESOCK_TEST_TRAFFIC: include %% ESOCK_TEST_TTEST: exclude %% +%% Variable that controls "verbosity" of the test case(s): +%% +%% ESOCK_TEST_QUIET: true (default) | false +%% %% Defines the runtime of the ttest cases %% (This is the time during which "measurement" is performed. %% the actual time it takes for the test case to complete -%% will be longer) +%% will be longer; setup, completion, ...) %% %% ESOCK_TEST_TTEST_RUNTIME: 10 seconds %% Format of values: <integer>[<unit>] @@ -5591,7 +5595,7 @@ sc_lc_receive_response_tcp(InitState) -> State1 = maps:remove(sock, State), {ok, State1}; {error, Reason} = ERROR -> - ?SEV_EPRINT("Unexpected read faulure: " + ?SEV_EPRINT("Unexpected read failure: " "~n ~p", [Reason]), ERROR end @@ -8987,6 +8991,7 @@ traffic_send_and_recv_chunks_tcp(InitState) -> end}, #{desc => "recv (one big)", cmd => fun(#{tester := Tester, csock := Sock, size := Size} = _State) -> + %% socket:setopt(Sock, otp, debug, true), case socket:recv(Sock, Size) of {ok, Data} -> ?SEV_ANNOUNCE_READY(Tester, @@ -11044,7 +11049,7 @@ tpp_tcp_client_msg_exchange_loop(Sock, _Send, _Recv, _Msg, end; tpp_tcp_client_msg_exchange_loop(Sock, Send, Recv, Data, Num, N, Sent, Received, Start) -> - %% d("tpp_tcp_client_msg_exchange_loop(~w,~w) try send", [Num,N]), + %% d("tpp_tcp_client_msg_exchange_loop(~w,~w) try send ~w", [Num,N,size(Data)]), case tpp_tcp_send_req(Sock, Send, Data) of {ok, SendSz} -> %% d("tpp_tcp_client_msg_exchange_loop(~w,~w) sent - " @@ -11057,11 +11062,13 @@ tpp_tcp_client_msg_exchange_loop(Sock, Send, Recv, Data, Received+RecvSz, Start); {error, RReason} -> - ?SEV_EPRINT("recv (~w of ~w): ~p", [N, Num, RReason]), + ?SEV_EPRINT("recv (~w of ~w): ~p: " + "~n ~p", [N, Num, RReason, mq()]), exit({recv, RReason, N}) end; {error, SReason} -> - ?SEV_EPRINT("send (~w of ~w): ~p", [N, Num, SReason]), + ?SEV_EPRINT("send (~w of ~w): ~p" + "~n ~p", [N, Num, SReason, mq()]), exit({send, SReason, N}) end. @@ -11121,7 +11128,7 @@ tpp_tcp_recv(Sock, Recv, Tag) -> tpp_tcp_recv(Sock, Recv, Tag, Remains, size(Msg), [Data]); {ok, <<Tag:32/integer, _/binary>>} -> {error, {invalid_msg_tag, Tag}}; - {error, _} = ERROR -> + {error, _R} = ERROR -> ERROR end. @@ -11135,7 +11142,7 @@ tpp_tcp_recv(Sock, Recv, Tag, Remaining, AccSz, Acc) -> tpp_tcp_recv(Sock, Recv, Tag, Remaining - size(Data), AccSz + size(Data), [Data | Acc]); - {error, _} = ERROR -> + {error, _R} = ERROR -> ERROR end. @@ -11173,6 +11180,14 @@ tpp_tcp_send_msg(Sock, Send, Msg, AccSz) when is_binary(Msg) -> %% size_of_iovec([B|IOVec], Sz) -> %% size_of_iovec(IOVec, Sz+size(B)). +mq() -> + mq(self()). + +mq(Pid) when is_pid(Pid) -> + Tag = messages, + {Tag, Msgs} = process_info(Pid, Tag), + Msgs. + %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% diff --git a/erts/preloaded/ebin/socket.beam b/erts/preloaded/ebin/socket.beam Binary files differindex 23b3269990..593bd7a797 100644 --- a/erts/preloaded/ebin/socket.beam +++ b/erts/preloaded/ebin/socket.beam diff --git a/erts/preloaded/src/socket.erl b/erts/preloaded/src/socket.erl index 5687b067f3..126db66cdd 100644 --- a/erts/preloaded/src/socket.erl +++ b/erts/preloaded/src/socket.erl @@ -520,7 +520,7 @@ %% necessary adapt (increase) the buffer size until all of %% it fits. %% -%% Note that not all of these flags is useful for every recv function! +%% Note that not all of these flags are useful for every recv function! %% -type recv_flags() :: [recv_flag()]. -type recv_flag() :: cmsg_cloexec | @@ -531,7 +531,6 @@ -type shutdown_how() :: read | write | read_write. -%% These are just place-holder(s) - used by the sendmsg/recvmsg functions... -type msghdr_flag() :: ctrunc | eor | errqueue | oob | trunc. -type msghdr_flags() :: [msghdr_flag()]. -type msghdr() :: #{ @@ -588,7 +587,7 @@ %% This is used in messages sent from the nif-code to erlang processes: %% -%% {?SOCKET_TAG, Socket, Tag, Info} +%% {?SOCKET_TAG, Socket :: socket(), Tag :: atom(), Info :: term()} %% -define(SOCKET_TAG, '$socket'). @@ -955,77 +954,50 @@ supports(_Key1, _Key2, _Key3) -> %% =========================================================================== %% -%% open - create an endpoint for communication -%% -%% Extra: netns -%% %% <KOLLA> %% %% How do we handle the case when an fd has been created (somehow) %% and we shall create a socket "from it". %% Can we figure out Domain, Type and Protocol from fd? -%% Yes we can: SO_DOMAIN, SO_PROTOCOL, SO_TYPE -%% But does that work on all platforms? Or shall we require that the -%% caller provide this explicitly? -%% +%% No we can't: For instance, its not possible to 'get' domain on FreeBSD. +%% +%% Instead, require: open(Domain, Stream, Proto, #{fd => FD}). +%% The last argument, Extra, is used to provide the fd. +%% %% </KOLLA> %% %% %% <KOLLA> %% -%% Start a controller process here, *before* the nif_open call. -%% If that call is successful, update with owner process (controlling -%% process) and SockRef. If the open fails, kill the process. -%% "Register" the process on success: -%% -%% nif_register(SockRef, self()). -%% -%% <ALSO> -%% -%% Maybe register the process under a name? -%% Something like: -%% -%% list_to_atom(lists:flatten(io_lib:format("socket-~p", [SockRef]))). -%% -%% </ALSO> +%% Possibly add a "registry" in the nif, allowing the user processes to +%% "register" themselves. +%% The point of this would be to ensure that these processes are +%% informed if the socket "terminates". Could possibly be used for +%% other things? If gen_tcp implements the active feature using +%% a reader process, the nif may need to know about this process, +%% since its probably "hidden" from the socket "owner" (someone +%% needs to handle it if it dies). +%% Register under a name? %% %% The nif sets up a monitor to this process, and if it dies the socket %% is closed. It is also used if someone wants to monitor the socket. %% -%% We therefor need monitor function(s): +%% We may therefor need monitor function(s): %% %% socket:monitor(Socket) %% socket:demonitor(Socket) %% -%% These are basically used to monitor the controller process. -%% Should the socket record therefor contain the pid of the controller process? -%% %% </KOLLA> %% -%% -spec open(FD) -> {ok, Socket} | {error, Reason} when -%% Socket :: socket(), -%% Reason :: term(). - -%% open(FD) -> -%% try -%% begin -%% case nif_open(FD) of -%% {ok, {SockRef, Domain, Type, Protocol}} -> -%% SocketInfo = #{domain => Domain, -%% type => Type, -%% protocol => Protocol}, -%% Socket = #socket{info = SocketInfo, -%% ref = SockRef}, -%% {ok, Socket}; -%% {error, _} = ERROR -> -%% ERROR -%% end -%% end -%% catch -%% _:_ -> % This must be improved!! -%% {error, einval} -%% end. + + +%% =========================================================================== +%% +%% open - create an endpoint for communication +%% +%% Extra: Currently only used for netns +%% -spec open(Domain, Type) -> {ok, Socket} | {error, Reason} when Domain :: domain(), @@ -1251,21 +1223,7 @@ connect(#socket{ref = SockRef}, #{family := Fam} = SockAddr, Timeout) %% Connecting... NewTimeout = next_timeout(TS, Timeout), receive - {select, SockRef, Ref, ready_output} -> - %% <KOLLA> - %% - %% See open above!! - %% - %% * Here we should start and *register* the reader process - %% (This will cause the nif code to create a monitor to - %% the process) - %% * The reader is basically used to implement the active-X - %% feature! - %% * If the reader dies for whatever reason, then the socket - %% (resource) closes and the owner (controlling) process - %% is informed (closed message). - %% - %% </KOLLA> + {?SOCKET_TAG, #socket{ref = SockRef}, select, Ref} -> nif_finalize_connection(SockRef) after NewTimeout -> cancel(SockRef, connect, Ref), @@ -1331,16 +1289,6 @@ do_accept(LSockRef, Timeout) -> AccRef = make_ref(), case nif_accept(LSockRef, AccRef) of {ok, SockRef} -> - %% <KOLLA> - %% - %% * Here we should start and *register* the reader process - %% (This will cause the nif code to create a monitor to the process) - %% * The reader is basically used to implement the active-X feature! - %% * If the reader dies for whatever reason, then the socket (resource) - %% closes and the owner (controlling) process is informed (closed - %% message). - %% - %% </KOLLA> Socket = #socket{ref = SockRef}, {ok, Socket}; @@ -1350,10 +1298,10 @@ do_accept(LSockRef, Timeout) -> %% the receive. NewTimeout = next_timeout(TS, Timeout), receive - {select, LSockRef, AccRef, ready_input} -> + {?SOCKET_TAG, #socket{ref = LSockRef}, select, AccRef} -> do_accept(LSockRef, next_timeout(TS, Timeout)); - {?SOCKET_TAG, _, abort, {AccRef, Reason}} -> + {?SOCKET_TAG, _Socket, abort, {AccRef, Reason}} -> {error, Reason} after NewTimeout -> @@ -1422,15 +1370,17 @@ do_send(SockRef, Data, EFlags, Timeout) -> NewTimeout = next_timeout(TS, Timeout), %% We are partially done, wait for continuation receive - {select, SockRef, SendRef, ready_output} when (Written > 0) -> + {?SOCKET_TAG, #socket{ref = SockRef}, select, SendRef} + when (Written > 0) -> <<_:Written/binary, Rest/binary>> = Data, do_send(SockRef, Rest, EFlags, next_timeout(TS, Timeout)); - {select, SockRef, SendRef, ready_output} -> + + {?SOCKET_TAG, #socket{ref = SockRef}, select, SendRef} -> do_send(SockRef, Data, EFlags, next_timeout(TS, Timeout)); - {?SOCKET_TAG, _, abort, {SendRef, Reason}} -> + {?SOCKET_TAG, _Socket, abort, {SendRef, Reason}} -> {error, Reason} after NewTimeout -> @@ -1439,11 +1389,11 @@ do_send(SockRef, Data, EFlags, Timeout) -> end; {error, eagain} -> receive - {select, SockRef, SendRef, ready_output} -> + {?SOCKET_TAG, #socket{ref = SockRef}, select, SendRef} -> do_send(SockRef, Data, EFlags, next_timeout(TS, Timeout)); - {?SOCKET_TAG, _, abort, {SendRef, Reason}} -> + {?SOCKET_TAG, _Socket, abort, {SendRef, Reason}} -> {error, Reason} after Timeout -> @@ -1527,15 +1477,17 @@ do_sendto(SockRef, Data, Dest, EFlags, Timeout) -> {ok, Written} -> %% We are partially done, wait for continuation receive - {select, SockRef, SendRef, ready_output} when (Written > 0) -> + {?SOCKET_TAG, #socket{ref = SockRef}, select, SendRef} + when (Written > 0) -> <<_:Written/binary, Rest/binary>> = Data, do_sendto(SockRef, Rest, Dest, EFlags, next_timeout(TS, Timeout)); - {select, SockRef, SendRef, ready_output} -> + + {?SOCKET_TAG, #socket{ref = SockRef}, select, SendRef} -> do_sendto(SockRef, Data, Dest, EFlags, next_timeout(TS, Timeout)); - {?SOCKET_TAG, _, abort, {SendRef, Reason}} -> + {?SOCKET_TAG, _Socket, abort, {SendRef, Reason}} -> {error, Reason} after Timeout -> @@ -1545,11 +1497,11 @@ do_sendto(SockRef, Data, Dest, EFlags, Timeout) -> {error, eagain} -> receive - {select, SockRef, SendRef, ready_output} -> + {?SOCKET_TAG, #socket{ref = SockRef}, select, SendRef} -> do_sendto(SockRef, Data, Dest, EFlags, next_timeout(TS, Timeout)); - {?SOCKET_TAG, _, abort, {SendRef, Reason}} -> + {?SOCKET_TAG, _Socket, abort, {SendRef, Reason}} -> {error, Reason} after Timeout -> @@ -1599,7 +1551,8 @@ sendmsg(Socket, MsgHdr, Timeout) sendmsg(Socket, MsgHdr, ?SOCKET_SENDMSG_FLAGS_DEFAULT, Timeout). --spec sendmsg(Socket, MsgHdr, Flags, Timeout) -> ok | {ok, Remaining} | {error, Reason} when +-spec sendmsg(Socket, MsgHdr, Flags, Timeout) -> + ok | {ok, Remaining} | {error, Reason} when Socket :: socket(), MsgHdr :: msghdr(), Flags :: send_flags(), @@ -1631,7 +1584,6 @@ do_sendmsg(SockRef, MsgHdr, EFlags, Timeout) -> ok; {ok, Written} when is_integer(Written) andalso (Written > 0) -> - %% We should not retry here since the protocol may not %% be able to handle a message being split. Leave it to %% the caller to figure out (call again with the rest). @@ -1644,9 +1596,10 @@ do_sendmsg(SockRef, MsgHdr, EFlags, Timeout) -> {error, eagain} -> receive - {select, SockRef, SendRef, ready_output} -> + {?SOCKET_TAG, #socket{ref = SockRef}, select, SendRef} -> do_sendmsg(SockRef, MsgHdr, EFlags, next_timeout(TS, Timeout)) + after Timeout -> cancel(SockRef, sendmsg, SendRef), {error, timeout} @@ -1674,13 +1627,6 @@ ensure_msghdr(_) -> %% =========================================================================== %% -%% writev - write data into multiple buffers -%% - - - -%% =========================================================================== -%% %% recv, recvfrom, recvmsg - receive a message from a socket %% %% Description: @@ -1763,14 +1709,10 @@ do_recv(SockRef, _OldRef, Length, EFlags, Acc, Timeout) (is_integer(Timeout) andalso (Timeout > 0)) -> TS = timestamp(Timeout), RecvRef = make_ref(), - %% p("do_recv -> try read with" - %% "~n Length: ~p", [Length]), case nif_recv(SockRef, RecvRef, Length, EFlags) of {ok, true = _Complete, Bin} when (size(Acc) =:= 0) -> - %% p("do_recv -> complete success: ~w", [size(Bin)]), {ok, Bin}; {ok, true = _Complete, Bin} -> - %% p("do_recv -> completed success: ~w (~w)", [size(Bin), size(Acc)]), {ok, <<Acc/binary, Bin/binary>>}; %% It depends on the amount of bytes we tried to read: @@ -1779,7 +1721,6 @@ do_recv(SockRef, _OldRef, Length, EFlags, Acc, Timeout) %% > 0 - We got a part of the message and we will be notified %% when there is more to read (a select message) {ok, false = _Complete, Bin} when (Length =:= 0) -> - %% p("do_recv -> partial success: ~w", [size(Bin)]), do_recv(SockRef, RecvRef, Length, EFlags, <<Acc/binary, Bin/binary>>, @@ -1789,17 +1730,15 @@ do_recv(SockRef, _OldRef, Length, EFlags, Acc, Timeout) %% We got the first chunk of it. %% We will be notified (select message) when there %% is more to read. - %% p("do_recv -> partial success(~w): ~w" - %% "~n ~p", [Length, size(Bin), Bin]), NewTimeout = next_timeout(TS, Timeout), receive - {select, SockRef, RecvRef, ready_input} -> + {?SOCKET_TAG, #socket{ref = SockRef}, select, RecvRef} -> do_recv(SockRef, RecvRef, Length-size(Bin), EFlags, Bin, next_timeout(TS, Timeout)); - {?SOCKET_TAG, _, abort, {RecvRef, Reason}} -> + {?SOCKET_TAG, _Socket, abort, {RecvRef, Reason}} -> {error, Reason} after NewTimeout -> @@ -1809,17 +1748,15 @@ do_recv(SockRef, _OldRef, Length, EFlags, Acc, Timeout) {ok, false = _Completed, Bin} -> %% We got a chunk of it! - %% p("do_recv -> partial success(~w): ~w (~w)", - %% [Length, size(Bin), size(Acc)]), NewTimeout = next_timeout(TS, Timeout), receive - {select, SockRef, RecvRef, ready_input} -> + {?SOCKET_TAG, #socket{ref = SockRef}, select, RecvRef} -> do_recv(SockRef, RecvRef, Length-size(Bin), EFlags, <<Acc/binary, Bin/binary>>, next_timeout(TS, Timeout)); - {?SOCKET_TAG, _, abort, {RecvRef, Reason}} -> + {?SOCKET_TAG, _Socket, abort, {RecvRef, Reason}} -> {error, Reason} after NewTimeout -> @@ -1835,16 +1772,15 @@ do_recv(SockRef, _OldRef, Length, EFlags, Acc, Timeout) {error, eagain} -> %% There is nothing just now, but we will be notified when there %% is something to read (a select message). - %% p("do_recv -> eagain(~w): ~w", [Length, size(Acc)]), NewTimeout = next_timeout(TS, Timeout), receive - {select, SockRef, RecvRef, ready_input} -> + {?SOCKET_TAG, #socket{ref = SockRef}, select, RecvRef} -> do_recv(SockRef, RecvRef, Length, EFlags, Acc, next_timeout(TS, Timeout)); - {?SOCKET_TAG, _, abort, {RecvRef, Reason}} -> + {?SOCKET_TAG, _Socket, abort, {RecvRef, Reason}} -> {error, Reason} after NewTimeout -> @@ -1893,7 +1829,7 @@ do_recv(_SockRef, _RecvRef, _Length, _EFlags, _Acc, _Timeout) -> %% It may be impossible to know what (buffer) size is appropriate %% "in advance", and in those cases it may be convenient to use the %% (recv) 'peek' flag. When this flag is provided the message is *not* -%% "consumed" from the underlying buffers, so another recvfrom call +%% "consumed" from the underlying (OS) buffers, so another recvfrom call %% is needed, possibly with a then adjusted buffer size. %% @@ -1979,11 +1915,11 @@ do_recvfrom(SockRef, BufSz, EFlags, Timeout) -> %% is something to read (a select message). NewTimeout = next_timeout(TS, Timeout), receive - {select, SockRef, RecvRef, ready_input} -> + {?SOCKET_TAG, #socket{ref = SockRef}, select, RecvRef} -> do_recvfrom(SockRef, BufSz, EFlags, next_timeout(TS, Timeout)); - {?SOCKET_TAG, _, abort, {RecvRef, Reason}} -> + {?SOCKET_TAG, _Socket, abort, {RecvRef, Reason}} -> {error, Reason} after NewTimeout -> @@ -1996,13 +1932,6 @@ do_recvfrom(SockRef, BufSz, EFlags, Timeout) -> end. -%% pi(Item) -> -%% pi(self(), Item). - -%% pi(Pid, Item) -> -%% {Item, Info} = process_info(Pid, Item), -%% Info. - %% --------------------------------------------------------------------------- %% @@ -2083,11 +2012,11 @@ do_recvmsg(SockRef, BufSz, CtrlSz, EFlags, Timeout) -> %% is something to read (a select message). NewTimeout = next_timeout(TS, Timeout), receive - {select, SockRef, RecvRef, ready_input} -> + {?SOCKET_TAG, #socket{ref = SockRef}, select, RecvRef} -> do_recvmsg(SockRef, BufSz, CtrlSz, EFlags, next_timeout(TS, Timeout)); - {?SOCKET_TAG, _, abort, {RecvRef, Reason}} -> + {?SOCKET_TAG, _Socket, abort, {RecvRef, Reason}} -> {error, Reason} after NewTimeout -> @@ -2106,12 +2035,6 @@ do_recvmsg(SockRef, BufSz, CtrlSz, EFlags, Timeout) -> -%% =========================================================================== -%% -%% readv - read data into multiple buffers -%% - - %% =========================================================================== %% @@ -2127,7 +2050,6 @@ do_recvmsg(SockRef, BufSz, CtrlSz, EFlags, Timeout) -> %% Before we call the socket close function, we set the socket %% BLOCKING. Thereby linger is handled properly. - -spec close(Socket) -> ok | {error, Reason} when Socket :: socket(), Reason :: term(). @@ -2387,6 +2309,8 @@ which_protocol(SockRef) -> end. + + %% =========================================================================== %% %% sockname - return the current address of the socket. @@ -3505,7 +3429,7 @@ cancel(SockRef, Op, OpRef) -> flush_select_msgs(SockRef, Ref) -> receive - {select, SockRef, Ref, _} -> + {?SOCKET_TAG, #socket{ref = SockRef}, select, Ref} -> flush_select_msgs(SockRef, Ref) after 0 -> ok @@ -3574,8 +3498,9 @@ tdiff(T1, T2) -> %% p(undefined, F, A) -> %% p("***", F, A); %% p(SName, F, A) -> -%% io:format(user,"[~s,~p] " ++ F ++ "~n", [SName, self()|A]), -%% io:format("[~s,~p] " ++ F ++ "~n", [SName, self()|A]). +%% TS = formated_timestamp(), +%% io:format(user,"[~s][~s,~p] " ++ F ++ "~n", [TS, SName, self()|A]), +%% io:format("[~s][~s,~p] " ++ F ++ "~n", [TS, SName, self()|A]). |