diff options
author | Micael Karlberg <[email protected]> | 2019-03-14 17:22:48 +0100 |
---|---|---|
committer | Micael Karlberg <[email protected]> | 2019-04-17 16:56:33 +0200 |
commit | 1854d05fb73fb7a787a37e9ada9e4cafdc7a7e84 (patch) | |
tree | 9d1fa8b5ee97d205bc1b0c4880aa7f2aa59b6336 | |
parent | 6a5eda1dc70bbfa8f53b2e2c3e79db1748a01bfd (diff) | |
download | otp-1854d05fb73fb7a787a37e9ada9e4cafdc7a7e84.tar.gz otp-1854d05fb73fb7a787a37e9ada9e4cafdc7a7e84.tar.bz2 otp-1854d05fb73fb7a787a37e9ada9e4cafdc7a7e84.zip |
[socket] Make use of the new select (read|write) functions
Make use of the new select functions; enif_select_[read|write],
for read and write select. These functions allows us to construct
the select message ourseves:
{'$socket', Socket, select, Ref}
This is in preparations for when we introduce the 'nowait'
(or something similar) value for the timeout argument (in accept,
read and write funcions).
It also solves (we hope) the term leakage problems (it was difficult
to free the environment when there was only one/socket).
OTP-15496
-rw-r--r-- | erts/emulator/nifs/common/socket_int.h | 2 | ||||
-rw-r--r-- | erts/emulator/nifs/common/socket_nif.c | 880 | ||||
-rw-r--r-- | erts/emulator/test/socket_SUITE.erl | 29 | ||||
-rw-r--r-- | erts/preloaded/ebin/socket.beam | bin | 70284 -> 21788 bytes | |||
-rw-r--r-- | erts/preloaded/src/socket.erl | 201 |
5 files changed, 599 insertions, 513 deletions
diff --git a/erts/emulator/nifs/common/socket_int.h b/erts/emulator/nifs/common/socket_int.h index fe02e0b051..38c28a6de5 100644 --- a/erts/emulator/nifs/common/socket_int.h +++ b/erts/emulator/nifs/common/socket_int.h @@ -388,5 +388,7 @@ GLOBAL_ERROR_REASON_ATOM_DEFS #define REALLOC_BIN(SZ, BP) enif_realloc_binary((SZ), (BP)) #define FREE_BIN(BP) enif_release_binary((BP)) +/* Copy term T into environment E */ +#define CP_TERM(E, T) enif_make_copy((E), (T)) #endif // SOCKET_INT_H__ diff --git a/erts/emulator/nifs/common/socket_nif.c b/erts/emulator/nifs/common/socket_nif.c index 3faa9ac96d..ac1beba344 100644 --- a/erts/emulator/nifs/common/socket_nif.c +++ b/erts/emulator/nifs/common/socket_nif.c @@ -654,7 +654,9 @@ typedef union { * * * =================================================================== */ +/* Global socket debug */ #define SGDBG( proto ) ESOCK_DBG_PRINTF( data.dbg , proto ) +/* Socket specific debug */ #define SSDBG( __D__ , proto ) ESOCK_DBG_PRINTF( (__D__)->dbg , proto ) @@ -759,14 +761,21 @@ static unsigned long one_value = 1; typedef struct { - int is_active; ErlNifMonitor mon; + BOOLEAN_T isActive; } ESockMonitor; typedef struct { - ErlNifPid pid; // PID of the requesting process - ESockMonitor mon; // Monitor to the requesting process - ERL_NIF_TERM ref; // The (unique) reference (ID) of the request + ErlNifPid pid; // PID of the requesting process + ESockMonitor mon; // Monitor to the requesting process + + /* We need an environment for the copy of the ref we store here. + * We will also use this environment for any messages we send + * (with the ref in it). Such as the select message (used in the + * select call) or the abort message. + */ + ErlNifEnv* env; + ERL_NIF_TERM ref; // The (unique) reference (ID) of the request } ESockRequestor; typedef struct esock_request_queue_element { @@ -794,8 +803,6 @@ typedef struct { ESockAddress remote; unsigned int addrLen; - ErlNifEnv* env; - /* +++ Controller (owner) process +++ */ ErlNifPid ctrlPid; ESockMonitor ctrlMon; @@ -985,7 +992,8 @@ static ERL_NIF_TERM nbind(ErlNifEnv* env, ESockAddress* sockAddrP, unsigned int addrLen); static ERL_NIF_TERM nconnect(ErlNifEnv* env, - ESockDescriptor* descP); + ESockDescriptor* descP, + ERL_NIF_TERM sockRef); static ERL_NIF_TERM nlisten(ErlNifEnv* env, ESockDescriptor* descP, int backlog); @@ -995,10 +1003,12 @@ static ERL_NIF_TERM naccept(ErlNifEnv* env, ERL_NIF_TERM ref); static ERL_NIF_TERM naccept_listening(ErlNifEnv* env, ESockDescriptor* descP, + ERL_NIF_TERM sockRef, ERL_NIF_TERM ref); static ERL_NIF_TERM naccept_listening_error(ErlNifEnv* env, ESockDescriptor* descP, - ERL_NIF_TERM ref, + ERL_NIF_TERM sockRef, + ERL_NIF_TERM accRef, ErlNifPid caller, int save_errno); static ERL_NIF_TERM naccept_listening_accept(ErlNifEnv* env, @@ -1030,7 +1040,8 @@ static ERL_NIF_TERM naccept_accepting_other(ErlNifEnv* env, ErlNifPid caller); static ERL_NIF_TERM naccept_busy_retry(ErlNifEnv* env, ESockDescriptor* descP, - ERL_NIF_TERM ref, + ERL_NIF_TERM sockRef, + ERL_NIF_TERM accRef, ErlNifPid* pid, unsigned int nextState); static BOOLEAN_T naccept_accepted(ErlNifEnv* env, @@ -2084,6 +2095,7 @@ static ERL_NIF_TERM send_check_fail(ErlNifEnv* env, static ERL_NIF_TERM send_check_retry(ErlNifEnv* env, ESockDescriptor* descP, ssize_t written, + ERL_NIF_TERM sockRef, ERL_NIF_TERM sendRef); static BOOLEAN_T recv_check_reader(ErlNifEnv* env, ESockDescriptor* descP, @@ -2153,9 +2165,11 @@ static ERL_NIF_TERM recv_check_partial_part(ErlNifEnv* env, ESockDescriptor* descP, int read, ErlNifBinary* bufP, + ERL_NIF_TERM sockRef, ERL_NIF_TERM recvRef); static ERL_NIF_TERM recv_check_retry(ErlNifEnv* env, ESockDescriptor* descP, + ERL_NIF_TERM sockRef, ERL_NIF_TERM recvRef); static ERL_NIF_TERM recv_check_fail_gen(ErlNifEnv* env, ESockDescriptor* descP, @@ -2323,9 +2337,15 @@ static BOOLEAN_T decode_native_get_opt(ErlNifEnv* env, // static void encode_bool(BOOLEAN_T val, ERL_NIF_TERM* eVal); static ERL_NIF_TERM encode_ip_tos(ErlNifEnv* env, int val); +static void socket_stop_handle_current(ErlNifEnv* env, + const char* role, + ESockDescriptor* descP, + ERL_NIF_TERM sockRef, + ESockRequestor* reqP); static void inform_waiting_procs(ErlNifEnv* env, - char* role, + const char* role, ESockDescriptor* descP, + ERL_NIF_TERM sockRef, ESockRequestQueue* q, BOOLEAN_T free, ERL_NIF_TERM reason); @@ -2389,12 +2409,6 @@ static void dec_socket(int domain, int type, int protocol); ACTIVATE_NEXT_FUNCS_DEFS #undef ACTIVATE_NEXT_FUNC_DEF -static BOOLEAN_T activate_next(ErlNifEnv* env, - ESockDescriptor* descP, - ESockRequestor* reqP, - ESockRequestQueue* q, - ERL_NIF_TERM sockRef); - /* *** acceptor_search4pid | writer_search4pid | reader_search4pid *** * *** acceptor_push | writer_push | reader_push *** * *** acceptor_pop | writer_pop | reader_pop *** @@ -2487,37 +2501,48 @@ static void socket_down_reader(ErlNifEnv* env, const ErlNifPid* pid); static char* esock_send_close_msg(ErlNifEnv* env, - ESockDescriptor* descP); - + ESockDescriptor* descP, + ErlNifPid* pid); static char* esock_send_abort_msg(ErlNifEnv* env, ERL_NIF_TERM sockRef, ERL_NIF_TERM recvRef, + ErlNifEnv* msgEnv, ERL_NIF_TERM reason, ErlNifPid* pid); -static char* esock_send_socket_msg(ErlNifEnv* env, - ERL_NIF_TERM sockRef, - ERL_NIF_TERM tag, - ERL_NIF_TERM info, - ErlNifPid* pid, - ErlNifEnv* msg_env); static char* esock_send_msg(ErlNifEnv* env, - ERL_NIF_TERM msg, ErlNifPid* pid, - ErlNifEnv* msg_env); - -static ERL_NIF_TERM make_socket_record(ErlNifEnv* env, - ERL_NIF_TERM sockRef); + ERL_NIF_TERM msg, + ErlNifEnv* msgEnv); + +static ERL_NIF_TERM mk_abort_msg(ErlNifEnv* env, + ERL_NIF_TERM sockRef, + ERL_NIF_TERM opRef, + ERL_NIF_TERM reason); +static ERL_NIF_TERM mk_close_msg(ErlNifEnv* env, + ERL_NIF_TERM sockRef, + ERL_NIF_TERM closeRef); +static ERL_NIF_TERM mk_select_msg(ErlNifEnv* env, + ERL_NIF_TERM sockRef, + ERL_NIF_TERM selectRef); +static ERL_NIF_TERM mk_socket_msg(ErlNifEnv* env, + ERL_NIF_TERM sockRef, + ERL_NIF_TERM tag, + ERL_NIF_TERM info); +static ERL_NIF_TERM mk_socket(ErlNifEnv* env, + ERL_NIF_TERM sockRef); static int esock_select_read(ErlNifEnv* env, ErlNifEvent event, void* obj, const ErlNifPid* pid, - ERL_NIF_TERM ref); + ERL_NIF_TERM sockRef, + ERL_NIF_TERM selectRef); static int esock_select_write(ErlNifEnv* env, ErlNifEvent event, void* obj, const ErlNifPid* pid, - ERL_NIF_TERM ref); + ERL_NIF_TERM sockRef, + ERL_NIF_TERM selectRef); static int esock_select_stop(ErlNifEnv* env, ErlNifEvent event, void* obj); @@ -2869,6 +2894,27 @@ static ErlNifResourceTypeInit socketInit = { static ESockData data; +/* These two (inline) functions are primarily intended for debugging, + * that is, to make it easy to add debug printouts. + */ +static inline void esock_free_env(const char* slogan, ErlNifEnv* env) +{ + SGDBG( ("SOCKET", "env free - %s: 0x%lX\r\n", slogan, env) ); + + if (env != NULL) enif_free_env(env); +} + + +static inline ErlNifEnv* esock_alloc_env(const char* slogan) +{ + ErlNifEnv* env = enif_alloc_env(); + + SGDBG( ("SOCKET", "env alloc - %s: 0x%lX\r\n", slogan, env) ); + + return env; +} + + /* ---------------------------------------------------------------------- * N I F F u n c t i o n s * ---------------------------------------------------------------------- @@ -4684,15 +4730,16 @@ ERL_NIF_TERM nif_connect(ErlNifEnv* env, return enif_raise_exception(env, MKA(env, "notsup")); #else ESockDescriptor* descP; - ERL_NIF_TERM res, eSockAddr; + ERL_NIF_TERM res, eSockAddr, sockRef; char* xres; SGDBG( ("SOCKET", "nif_connect -> entry with argc: %d\r\n", argc) ); /* Extract arguments and perform preliminary validation */ + sockRef = argv[0]; if ((argc != 2) || - !enif_get_resource(env, argv[0], sockets, (void**) &descP)) { + !enif_get_resource(env, sockRef, sockets, (void**) &descP)) { return enif_make_badarg(env); } eSockAddr = argv[1]; @@ -4704,16 +4751,24 @@ ERL_NIF_TERM nif_connect(ErlNifEnv* env, "\r\n", descP->sock, argv[0], eSockAddr) ); if ((xres = esock_decode_sockaddr(env, eSockAddr, - &descP->remote, &descP->addrLen)) != NULL) { + &descP->remote, + &descP->addrLen)) != NULL) { return esock_make_error_str(env, xres); } + /* Only a *!%&$*# would send an opened but non-connected socket + * somewhere (before its actually usable), but just to be on the + * safe side we do the best we can to avoid complications... + */ + MLOCK(descP->readMtx); MLOCK(descP->writeMtx); + MLOCK(descP->cfgMtx); - res = nconnect(env, descP); + res = nconnect(env, descP, sockRef); + MUNLOCK(descP->cfgMtx); MUNLOCK(descP->writeMtx); MUNLOCK(descP->readMtx); @@ -4726,7 +4781,8 @@ ERL_NIF_TERM nif_connect(ErlNifEnv* env, #if !defined(__WIN32__) static ERL_NIF_TERM nconnect(ErlNifEnv* env, - ESockDescriptor* descP) + ESockDescriptor* descP, + ERL_NIF_TERM sockRef) { ERL_NIF_TERM res, ref; int code, sres, save_errno = 0; @@ -4771,7 +4827,8 @@ ERL_NIF_TERM nconnect(ErlNifEnv* env, (save_errno == EINPROGRESS))) { /* Unix & OSE!! */ ref = MKREF(env); descP->state = SOCKET_STATE_CONNECTING; - if ((sres = esock_select_write(env, descP->sock, descP, NULL, ref)) < 0) { + if ((sres = esock_select_write(env, descP->sock, descP, NULL, + sockRef, ref)) < 0) { res = esock_make_error(env, MKT2(env, esock_atom_select_failed, @@ -4801,6 +4858,9 @@ ERL_NIF_TERM nconnect(ErlNifEnv* env, * * Description: * Make socket ready for input and output. + * This function is called if we where made to wait when we called the + * nif_connect function (we made a select, and the select message has + * now been received). * * Arguments: * Socket (ref) - Points to the socket descriptor. @@ -5056,7 +5116,7 @@ ERL_NIF_TERM naccept(ErlNifEnv* env, switch (descP->state) { case SOCKET_STATE_LISTENING: - res = naccept_listening(env, descP, ref); + res = naccept_listening(env, descP, sockRef, ref); break; case SOCKET_STATE_ACCEPTING: @@ -5074,13 +5134,15 @@ ERL_NIF_TERM naccept(ErlNifEnv* env, /* *** naccept_listening *** - * We have no active acceptor (and no acceptors in queue). + * + * We have no active acceptor (and therefor no acceptors in queue). */ #if !defined(__WIN32__) static ERL_NIF_TERM naccept_listening(ErlNifEnv* env, ESockDescriptor* descP, - ERL_NIF_TERM ref) + ERL_NIF_TERM sockRef, + ERL_NIF_TERM accRef) { ESockAddress remote; unsigned int n; @@ -5106,7 +5168,8 @@ ERL_NIF_TERM naccept_listening(ErlNifEnv* env, ("SOCKET", "naccept_listening -> accept failed (%d)\r\n", save_errno) ); - res = naccept_listening_error(env, descP, ref, caller, save_errno); + res = naccept_listening_error(env, descP, sockRef, accRef, + caller, save_errno); } else { @@ -5125,6 +5188,7 @@ ERL_NIF_TERM naccept_listening(ErlNifEnv* env, /* *** naccept_listening_error *** + * * The accept call resultet in an error - handle it. * There are only two cases: * 1) BLOCK => Attempt a "retry" @@ -5133,7 +5197,8 @@ ERL_NIF_TERM naccept_listening(ErlNifEnv* env, static ERL_NIF_TERM naccept_listening_error(ErlNifEnv* env, ESockDescriptor* descP, - ERL_NIF_TERM ref, + ERL_NIF_TERM sockRef, + ERL_NIF_TERM accRef, ErlNifPid caller, int save_errno) { @@ -5142,21 +5207,26 @@ ERL_NIF_TERM naccept_listening_error(ErlNifEnv* env, if (save_errno == ERRNO_BLOCK) { /* *** Try again later *** */ - SSDBG( descP, ("SOCKET", "naccept_listening_error -> would block\r\n") ); + + SSDBG( descP, + ("SOCKET", "naccept_listening_error -> would block\r\n") ); descP->currentAcceptor.pid = caller; if (MONP("naccept_listening -> current acceptor", env, descP, &descP->currentAcceptor.pid, - &descP->currentAcceptor.mon) != 0) - return esock_make_error(env, atom_exmon); - - descP->currentAcceptor.ref = enif_make_copy(descP->env, ref); - descP->currentAcceptorP = &descP->currentAcceptor; - - res = naccept_busy_retry(env, descP, ref, NULL, SOCKET_STATE_ACCEPTING); - - + &descP->currentAcceptor.mon) != 0) { + enif_set_pid_undefined(&descP->currentAcceptor.pid); + res = esock_make_error(env, atom_exmon); + } else { + descP->currentAcceptor.env = esock_alloc_env("current acceptor"); + descP->currentAcceptor.ref = CP_TERM(descP->currentAcceptor.env, + accRef); + descP->currentAcceptorP = &descP->currentAcceptor; + res = naccept_busy_retry(env, descP, + sockRef, descP->currentAcceptor.ref, + NULL, SOCKET_STATE_ACCEPTING); + } } else { SSDBG( descP, ("SOCKET", @@ -5169,6 +5239,7 @@ ERL_NIF_TERM naccept_listening_error(ErlNifEnv* env, /* *** naccept_listening_accept *** + * * The accept call was successful (accepted) - handle the new connection. */ static @@ -5189,6 +5260,7 @@ ERL_NIF_TERM naccept_listening_accept(ErlNifEnv* env, /* *** naccept_accepting *** + * * We have an active acceptor and possibly acceptors waiting in queue. * If the pid of the calling process is not the pid of the "current process", * push the requester onto the (acceptor) queue. @@ -5298,14 +5370,6 @@ ERL_NIF_TERM naccept_accepting_current_accept(ErlNifEnv* env, if (naccept_accepted(env, descP, accSock, descP->currentAcceptor.pid, remote, &res)) { - /* We should really go through the queue until we succeed to activate - * a waiting acceptor. For now we just pop once and hope for the best... - * This will leave any remaining acceptors *hanging*... - * - * We need a "activate-next" function. - * - */ - if (!activate_next_acceptor(env, descP, sockRef)) { SSDBG( descP, @@ -5318,7 +5382,11 @@ ERL_NIF_TERM naccept_accepting_current_accept(ErlNifEnv* env, descP->currentAcceptorP = NULL; descP->currentAcceptor.ref = esock_atom_undefined; enif_set_pid_undefined(&descP->currentAcceptor.pid); - esock_monitor_init(&descP->currentAcceptor.mon); + esock_free_env("naccept_accepting_current_accept - " + "current-accept-env", + descP->currentAcceptor.env); + descP->currentAcceptor.env = NULL; + MON_INIT(&descP->currentAcceptor.mon); } } @@ -5354,7 +5422,8 @@ ERL_NIF_TERM naccept_accepting_current_error(ErlNifEnv* env, "naccept_accepting_current_error -> " "would block: try again\r\n") ); - res = naccept_busy_retry(env, descP, opRef, &descP->currentAcceptor.pid, + res = naccept_busy_retry(env, descP, sockRef, opRef, + &descP->currentAcceptor.pid, /* No state change */ descP->state); @@ -5367,7 +5436,8 @@ ERL_NIF_TERM naccept_accepting_current_error(ErlNifEnv* env, SSDBG( descP, ("SOCKET", "naccept_accepting_current_error -> abort %T\r\n", req.pid) ); - esock_send_abort_msg(env, sockRef, req.ref, reason, &req.pid); + esock_send_abort_msg(env, sockRef, req.ref, req.env, + reason, &req.pid); DEMONP("naccept_accepting_current_error -> pop'ed writer", env, descP, &req.mon); } @@ -5403,25 +5473,28 @@ ERL_NIF_TERM naccept_accepting_other(ErlNifEnv* env, /* *** naccept_busy_retry *** + * * Perform a retry select. If successful, set nextState. */ #if !defined(__WIN32__) static ERL_NIF_TERM naccept_busy_retry(ErlNifEnv* env, ESockDescriptor* descP, - ERL_NIF_TERM ref, + ERL_NIF_TERM sockRef, + ERL_NIF_TERM accRef, // Not needed ErlNifPid* pid, unsigned int nextState) { int sres; ERL_NIF_TERM res, reason; - if ((sres = esock_select_read(env, descP->sock, descP, pid, ref)) < 0) { + if ((sres = esock_select_read(env, descP->sock, descP, pid, + sockRef, accRef)) < 0) { reason = MKT2(env, esock_atom_select_failed, MKI(env, sres)); res = esock_make_error(env, reason); } else { descP->state = nextState; - res = esock_make_error(env, esock_atom_eagain); + res = esock_make_error(env, esock_atom_eagain); // OK!! } return res; @@ -5430,6 +5503,7 @@ ERL_NIF_TERM naccept_busy_retry(ErlNifEnv* env, /* *** naccept_accepted *** + * * Generic function handling a successful accept. */ static @@ -5469,7 +5543,7 @@ BOOLEAN_T naccept_accepted(ErlNifEnv* env, accDescP->rBufSz = descP->rBufSz; // Inherit buffer size accDescP->rNum = descP->rNum; // Inherit buffer uses accDescP->rNumCnt = 0; - accDescP->rCtrlSz = descP->rCtrlSz; // Inherit buffer siez + accDescP->rCtrlSz = descP->rCtrlSz; // Inherit buffer size accDescP->wCtrlSz = descP->wCtrlSz; // Inherit buffer size accRef = enif_make_resource(env, accDescP); @@ -5481,6 +5555,7 @@ BOOLEAN_T naccept_accepted(ErlNifEnv* env, &accDescP->ctrlPid, &accDescP->ctrlMon) != 0) { sock_close(accSock); + enif_set_pid_undefined(&descP->ctrlPid); *result = esock_make_error(env, atom_exmon); return FALSE; } @@ -6668,8 +6743,8 @@ BOOLEAN_T nclose_check(ErlNifEnv* env, } else { - /* Monitor the caller, since we should complete this operation even if - * the caller dies (for whatever reason). + /* Monitor the caller, since we should complete this + * operation even if the caller dies (for whatever reason). * * <KOLLA> * @@ -6719,8 +6794,9 @@ ERL_NIF_TERM nclose_do(ErlNifEnv* env, int sres; ERL_NIF_TERM reply, reason; - descP->closeEnv = enif_alloc_env(); + descP->closeEnv = esock_alloc_env("nclose-do - close-env"); descP->closeRef = MKREF(descP->closeEnv); + sres = esock_select_stop(env, descP->sock, descP); if (sres & ERL_NIF_SELECT_STOP_CALLED) { @@ -13883,7 +13959,7 @@ ERL_NIF_TERM send_check_result(ErlNifEnv* env, SSDBG( descP, ("SOCKET", "send_check_result -> try again\r\n") ); - res = send_check_retry(env, descP, written, sendRef); + res = send_check_retry(env, descP, written, sockRef, sendRef); } @@ -13896,7 +13972,7 @@ ERL_NIF_TERM send_check_result(ErlNifEnv* env, "not entire package written (%d of %d)\r\n", written, dataSize) ); - res = send_check_retry(env, descP, written, sendRef); + res = send_check_retry(env, descP, written, sockRef, sendRef); } @@ -13920,18 +13996,23 @@ ERL_NIF_TERM send_check_ok(ErlNifEnv* env, cnt_inc(&descP->writePkgCnt, 1); cnt_inc(&descP->writeByteCnt, written); - if (descP->currentWriterP != NULL) + if (descP->currentWriterP != NULL) { DEMONP("send_check_ok -> current writer", env, descP, &descP->currentWriter.mon); + esock_free_env("send_check_ok", descP->currentWriter.env); + } SSDBG( descP, ("SOCKET", "send_check_ok -> " "everything written (%d,%d) - done\r\n", dataSize, written) ); - /* Ok, this write is done maybe activate the next (if any) */ + /* + * Ok, this write is done maybe activate the next (if any) + */ if (!activate_next_writer(env, descP, sockRef)) { descP->currentWriterP = NULL; + descP->currentWriter.env = NULL; descP->currentWriter.ref = esock_atom_undefined; enif_set_pid_undefined(&descP->currentWriter.pid); esock_monitor_init(&descP->currentWriter.mon); @@ -13970,7 +14051,8 @@ ERL_NIF_TERM send_check_fail(ErlNifEnv* env, while (writer_pop(env, descP, &req)) { SSDBG( descP, ("SOCKET", "send_check_fail -> abort %T\r\n", req.pid) ); - esock_send_abort_msg(env, sockRef, req.ref, reason, &req.pid); + esock_send_abort_msg(env, sockRef, req.ref, req.env, + reason, &req.pid); DEMONP("send_check_fail -> pop'ed writer", env, descP, &req.mon); } } @@ -13992,6 +14074,7 @@ static ERL_NIF_TERM send_check_retry(ErlNifEnv* env, ESockDescriptor* descP, ssize_t written, + ERL_NIF_TERM sockRef, ERL_NIF_TERM sendRef) { int sres; @@ -14007,17 +14090,19 @@ ERL_NIF_TERM send_check_retry(ErlNifEnv* env, if (MONP("send_check_retry -> current writer", env, descP, &descP->currentWriter.pid, - &descP->currentWriter.mon) != 0) + &descP->currentWriter.mon) != 0) { + enif_set_pid_undefined(&descP->currentWriter.pid); return esock_make_error(env, atom_exmon); - - descP->currentWriter.ref = enif_make_copy(descP->env, sendRef); - descP->currentWriterP = &descP->currentWriter; - + } else { + descP->currentWriter.env = esock_alloc_env("current-writer"); + descP->currentWriter.ref = CP_TERM(descP->currentWriter.env, sendRef); + descP->currentWriterP = &descP->currentWriter; + } } cnt_inc(&descP->writeWaits, 1); - sres = esock_select_write(env, descP->sock, descP, NULL, sendRef); + sres = esock_select_write(env, descP->sock, descP, NULL, sockRef, sendRef); if (written >= 0) { @@ -14062,6 +14147,9 @@ ERL_NIF_TERM send_check_retry(ErlNifEnv* env, * Checks if we have a current reader and if that is us. If not, * then we must be made to wait for our turn. This is done by pushing * us unto the reader queue. + * Note that we do *not* actually initiate the currentReader structure + * here, since we do not actually know yet if we need to! We do that in + * the [recv|recvfrom|recvmsg]_check_result function. */ static BOOLEAN_T recv_check_reader(ErlNifEnv* env, @@ -14132,10 +14220,26 @@ char* recv_init_current_reader(ErlNifEnv* env, env, descP, &descP->currentReader.pid, &descP->currentReader.mon) != 0) { + enif_set_pid_undefined(&descP->currentReader.pid); return str_exmon; + } else { + + descP->currentReader.env = esock_alloc_env("current-reader"); + descP->currentReader.ref = CP_TERM(descP->currentReader.env, + recvRef); + descP->currentReaderP = &descP->currentReader; } - descP->currentReader.ref = enif_make_copy(descP->env, recvRef); - descP->currentReaderP = &descP->currentReader; + } else { + + /* + * This is a retry: + * We have done, for instance, recv(Sock, X), but only received Y < X. + * We then call recv again with size = X-Y. So, we then get a new ref. + * + * Make use of the existing environment + */ + + descP->currentReader.ref = CP_TERM(descP->currentReader.env, recvRef); } return NULL; @@ -14159,9 +14263,13 @@ ERL_NIF_TERM recv_update_current_reader(ErlNifEnv* env, if (descP->currentReaderP != NULL) { - DEMONP("recv_update_current_reader -> current reader", + DEMONP("recv_update_current_reader", env, descP, &descP->currentReader.mon); - + + esock_free_env("recv_update_current_reader - current-read-env", + descP->currentReader.env); + descP->currentReader.env = NULL; + if (!activate_next_reader(env, descP, sockRef)) { SSDBG( descP, @@ -14205,7 +14313,8 @@ void recv_error_current_reader(ErlNifEnv* env, SSDBG( descP, ("SOCKET", "recv_error_current_reader -> abort %T\r\n", req.pid) ); - esock_send_abort_msg(env, sockRef, req.ref, reason, &req.pid); + esock_send_abort_msg(env, sockRef, req.ref, req.env, + reason, &req.pid); DEMONP("recv_error_current_reader -> pop'ed reader", env, descP, &req.mon); } @@ -14317,7 +14426,8 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env, * This function is called if we filled the allocated buffer. * But are we done yet? * - * toRead = 0 means: Give me everything you have + * toRead = 0 means: Give me everything you have => maybe + * toRead > 0 means: Yes */ static ERL_NIF_TERM recv_check_full(ErlNifEnv* env, @@ -14442,7 +14552,7 @@ ERL_NIF_TERM recv_check_full_maybe_done(ErlNifEnv* env, /* *** recv_check_full_done *** * - * A successful and full (that is, the buffer was filled) recv. + * A successful recv and we filled the buffer. */ static ERL_NIF_TERM recv_check_full_done(ErlNifEnv* env, @@ -14498,7 +14608,7 @@ ERL_NIF_TERM recv_check_fail(ErlNifEnv* env, SSDBG( descP, ("SOCKET", "recv_check_fail -> eagain\r\n") ); - res = recv_check_retry(env, descP, recvRef); + res = recv_check_retry(env, descP, sockRef, recvRef); } else { @@ -14565,11 +14675,12 @@ ERL_NIF_TERM recv_check_fail_closed(ErlNifEnv* env, static ERL_NIF_TERM recv_check_retry(ErlNifEnv* env, ESockDescriptor* descP, + ERL_NIF_TERM sockRef, ERL_NIF_TERM recvRef) { int sres; char* xres; - ERL_NIF_TERM res; + ERL_NIF_TERM reason; descP->rNumCnt = 0; if ((xres = recv_init_current_reader(env, descP, recvRef)) != NULL) @@ -14577,16 +14688,17 @@ ERL_NIF_TERM recv_check_retry(ErlNifEnv* env, SSDBG( descP, ("SOCKET", "recv_check_retry -> SELECT for more\r\n") ); - if ((sres = esock_select_read(env, descP->sock, descP, - NULL, recvRef)) < 0) { - res = esock_make_error(env, - MKT2(env, - esock_atom_select_failed, MKI(env, sres))); + if ((sres = esock_select_read(env, descP->sock, descP, NULL, + sockRef, descP->currentReader.ref)) < 0) { + /* Ouch + * Now what? We have copied ref into *its own* environment! + */ + reason = MKT2(env, esock_atom_select_failed, MKI(env, sres)); } else { - res = esock_make_error(env, esock_atom_eagain); + reason = esock_atom_eagain; } - return res; + return esock_make_error(env, reason); } @@ -14644,7 +14756,7 @@ ERL_NIF_TERM recv_check_partial(ErlNifEnv* env, SSDBG( descP, ("SOCKET", "recv_check_partial -> [%d] " "only part of message - expect more\r\n", toRead) ); - res = recv_check_partial_part(env, descP, read, bufP, recvRef); + res = recv_check_partial_part(env, descP, read, bufP, sockRef, recvRef); } return res; @@ -14695,6 +14807,7 @@ ERL_NIF_TERM recv_check_partial_part(ErlNifEnv* env, ESockDescriptor* descP, int read, ErlNifBinary* bufP, + ERL_NIF_TERM sockRef, ERL_NIF_TERM recvRef) { ERL_NIF_TERM res, reason, data; @@ -14713,7 +14826,8 @@ ERL_NIF_TERM recv_check_partial_part(ErlNifEnv* env, /* SELECT for more data */ - sres = esock_select_read(env, descP->sock, descP, NULL, recvRef); + sres = esock_select_read(env, descP->sock, descP, NULL, + sockRef, descP->currentReader.ref); if (sres < 0) { /* Result: {error, Reason} * Reason: {select_failed, sres, data} @@ -16591,10 +16705,14 @@ ESockDescriptor* alloc_descriptor(SOCKET sock, HANDLE event) char buf[64]; /* Buffer used for building the mutex name */ // This needs to be released when the socket is closed! - descP->env = enif_alloc_env(); + // descP->env = enif_alloc_env(); sprintf(buf, "esock[w,%d]", sock); descP->writeMtx = MCREATE(buf); + enif_set_pid_undefined(&descP->currentWriter.pid); + MON_INIT(&descP->currentWriter.mon); + descP->currentWriter.env = NULL; + descP->currentWriter.ref = esock_atom_undefined; descP->currentWriterP = NULL; // currentWriter not used descP->writersQ.first = NULL; descP->writersQ.last = NULL; @@ -16607,6 +16725,10 @@ ESockDescriptor* alloc_descriptor(SOCKET sock, HANDLE event) sprintf(buf, "esock[r,%d]", sock); descP->readMtx = MCREATE(buf); + enif_set_pid_undefined(&descP->currentReader.pid); + MON_INIT(&descP->currentReader.mon); + descP->currentReader.env = NULL; + descP->currentReader.ref = esock_atom_undefined; descP->currentReaderP = NULL; // currentReader not used descP->readersQ.first = NULL; descP->readersQ.last = NULL; @@ -16618,6 +16740,10 @@ ESockDescriptor* alloc_descriptor(SOCKET sock, HANDLE event) sprintf(buf, "esock[acc,%d]", sock); descP->accMtx = MCREATE(buf); + enif_set_pid_undefined(&descP->currentAcceptor.pid); + MON_INIT(&descP->currentAcceptor.mon); + descP->currentAcceptor.env = NULL; + descP->currentAcceptor.ref = esock_atom_undefined; descP->currentAcceptorP = NULL; // currentAcceptor not used descP->acceptorsQ.first = NULL; descP->acceptorsQ.last = NULL; @@ -16626,6 +16752,8 @@ ESockDescriptor* alloc_descriptor(SOCKET sock, HANDLE event) descP->closeMtx = MCREATE(buf); descP->closeEnv = NULL; descP->closeRef = esock_atom_undefined; + enif_set_pid_undefined(&descP->closerPid); + MON_INIT(&descP->closerMon); sprintf(buf, "esock[cfg,%d]", sock); descP->cfgMtx = MCREATE(buf); @@ -16640,11 +16768,9 @@ ESockDescriptor* alloc_descriptor(SOCKET sock, HANDLE event) descP->sock = sock; descP->event = event; - MON_INIT(&descP->currentWriter.mon); - MON_INIT(&descP->currentReader.mon); - MON_INIT(&descP->currentAcceptor.mon); + enif_set_pid_undefined(&descP->ctrlPid); MON_INIT(&descP->ctrlMon); - MON_INIT(&descP->closerMon); + } return descP; @@ -17166,21 +17292,23 @@ size_t my_strnlen(const char *s, size_t maxlen) */ static char* esock_send_close_msg(ErlNifEnv* env, - ESockDescriptor* descP) + ESockDescriptor* descP, + ErlNifPid* pid) { - ERL_NIF_TERM sr = ((descP->closeEnv != NULL) ? - enif_make_copy(descP->closeEnv, sockRef) : - sockRef); - char* res = esock_send_socket_msg(env, - sr, - esock_atom_close, - descP->closeRef, - &descP->closerPid, - descP->closeEnv); - - descP->closeEnv = NULL; - - return res; + ERL_NIF_TERM sockRef, msg; + ErlNifEnv* menv; + + if (descP->closeEnv != NULL) { + sockRef = enif_make_resource(descP->closeEnv, descP); + msg = mk_close_msg(descP->closeEnv, sockRef, descP->closeRef); + menv = descP->closeEnv; + } else { + sockRef = enif_make_resource(env, descP); + msg = mk_close_msg(env, sockRef, descP->closeRef); + menv = NULL; // This has the effect that the message will be copied + } + + return esock_send_msg(env, pid, msg, menv); } @@ -17195,93 +17323,126 @@ char* esock_send_close_msg(ErlNifEnv* env, static char* esock_send_abort_msg(ErlNifEnv* env, ERL_NIF_TERM sockRef, - ERL_NIF_TERM recvRef, + ERL_NIF_TERM opRef, + ErlNifEnv* msgEnv, ERL_NIF_TERM reason, ErlNifPid* pid) { - ErlNifEnv* msg_env = enif_alloc_env(); - ERL_NIF_TERM info = MKT2(msg_env, - enif_make_copy(msg_env, recvRef), - enif_make_copy(msg_env, reason)); + ERL_NIF_TERM msg = mk_abort_msg(msgEnv, + /* sockRef not in msgEnv so copy */ + CP_TERM(msgEnv, sockRef), + opRef, reason); + + return esock_send_msg(env, pid, msg, msgEnv); +} + + +/* Send a message to the specified process. + */ +static +char* esock_send_msg(ErlNifEnv* env, + ErlNifPid* pid, + ERL_NIF_TERM msg, + ErlNifEnv* msgEnv) +{ + int res = enif_send(env, pid, msgEnv, msg); + if (msgEnv) + esock_free_env("esock_msg_send - msg-env", msgEnv); - return esock_send_socket_msg(env, sockRef, esock_atom_abort, info, pid, - msg_env); + if (!res) + return str_exsend; + else + return NULL; } +#endif // #if defined(__WIN32__) + -/* *** esock_send_socket_msg *** +/* *** mk_abort_msg *** * - * This function sends a general purpose socket message to an erlang - * process. A general 'socket' message has the ("erlang") form: + * Create the abort message, which has the following form: * - * {'$socket', Socket, Tag, Info} + * {'$socket', Socket, abort, {OpRef, Reason}} * - * Where + * This message is for processes that are waiting in the + * erlang API functions for a select (or this) message. + */ +static +ERL_NIF_TERM mk_abort_msg(ErlNifEnv* env, + ERL_NIF_TERM sockRef, + ERL_NIF_TERM opRef, + ERL_NIF_TERM reason) +{ + ERL_NIF_TERM info = MKT2(env, opRef, reason); + + return mk_socket_msg(env, sockRef, esock_atom_abort, info); +} + + +/* *** mk_close_msg *** * - * Socket: #socket{ref = SockRef} ({socket, SockRef}) - * SockRef: reference() - * Tag: atom() - * Info: term() + * Construct a close (socket) message. It has the form: + * + * {'$socket', Socket, close, closeRef} * */ - static -char* esock_send_socket_msg(ErlNifEnv* env, - ERL_NIF_TERM sockRef, - ERL_NIF_TERM tag, - ERL_NIF_TERM info, - ErlNifPid* pid, - ErlNifEnv* msgEnv) +ERL_NIF_TERM mk_close_msg(ErlNifEnv* env, + ERL_NIF_TERM sockRef, + ERL_NIF_TERM closeRef) { - ErlNifEnv* menv; - ERL_NIF_TERM msg, msock, mtag, minfo; + return mk_socket_msg(env, sockRef, esock_atom_close, closeRef); +} - if (msgEnv == NULL) { - menv = enif_alloc_env(); - msock = make_socket_record(menv, enif_make_copy(menv, sockRef)); - mtag = enif_make_copy(menv, tag); - minfo = enif_make_copy(menv, info); - } else { - menv = msgEnv; - msock = make_socket_record(menv, sockRef); - mtag = tag; - minfo = info; - } - msg = MKT4(menv, esock_atom_socket_tag, msock, mtag, minfo); - return esock_send_msg(env, msg, pid, menv); +/* *** mk_select_msg *** + * + * Construct a select (socket) message. It has the form: + * + * {'$socket', Socket, select, selectRef} + * + */ +static +ERL_NIF_TERM mk_select_msg(ErlNifEnv* env, + ERL_NIF_TERM sockRef, + ERL_NIF_TERM selectRef) +{ + return mk_socket_msg(env, sockRef, atom_select, selectRef); } -/* Send a message to the specified process. +/* *** mk_socket_msg *** + * + * Construct the socket message: + * + * {'$socket', Socket, Tag, Info} + * + * Socket :: socket() (#socket{}) + * Tag :: atom() + * Info :: term() + * */ static -char* esock_send_msg(ErlNifEnv* env, - ERL_NIF_TERM msg, - ErlNifPid* pid, - ErlNifEnv* msg_env) +ERL_NIF_TERM mk_socket_msg(ErlNifEnv* env, + ERL_NIF_TERM sockRef, + ERL_NIF_TERM tag, + ERL_NIF_TERM info) { - int res = enif_send(env, pid, msg_env, msg); - if (msg_env) - enif_free_env(msg_env); + ERL_NIF_TERM socket = mk_socket(env, sockRef); - if (!res) - return str_exsend; - else - return NULL; + return MKT4(env, esock_atom_socket_tag, socket, tag, info); } -#endif // #if defined(__WIN32__) -/* *** make_socket_record *** +/* *** mk_socket *** * * Simple utility function that construct the socket resord: * * #socket{ref = SockRef} => {socket, SockRef :: reference()} */ static -ERL_NIF_TERM make_socket_record(ErlNifEnv* env, - ERL_NIF_TERM sockRef) +ERL_NIF_TERM mk_socket(ErlNifEnv* env, + ERL_NIF_TERM sockRef) { return MKT2(env, esock_atom_socket, sockRef); } @@ -17293,25 +17454,53 @@ ERL_NIF_TERM make_socket_record(ErlNifEnv* env, * ---------------------------------------------------------------------- */ +/* *** esock_select_read *** + * + * Perform a read select. When the select is triggered, a 'select' + * message (see mk_select_msg) will be sent. + * + * There are two ways to handle the select message: + * 1) Create "your own" environment and create the message using it + * and then pass it on to the select function. + * 2) Or, to create the message using any available environment, + * and then pass a NULL pointer to the select function. + * This will have the effect that the select function will + * create its own environment and then copy the message to it. + * We choose the second alternative. + */ static int esock_select_read(ErlNifEnv* env, - ErlNifEvent event, - void* obj, - const ErlNifPid* pid, - ERL_NIF_TERM ref) + ErlNifEvent event, // The file descriptor + void* obj, // The socket descriptor object + const ErlNifPid* pid, // Destination + ERL_NIF_TERM sockRef, // Socket + ERL_NIF_TERM selectRef) // "ID" of the operation { - return enif_select(env, event, (ERL_NIF_SELECT_READ), obj, pid, ref); + ERL_NIF_TERM selectMsg = mk_select_msg(env, sockRef, selectRef); + + return enif_select_read(env, event, obj, pid, selectMsg, NULL); + } +/* *** esock_select_write *** + * + * Perform a write select. When the select is triggered, a 'select' + * message (see mk_select_msg) will be sent. + * The sockRef is copied to the msgEnv when the socket message is created, + * so no need to do that here, but the selectRef needs to be copied. + */ static int esock_select_write(ErlNifEnv* env, - ErlNifEvent event, - void* obj, - const ErlNifPid* pid, - ERL_NIF_TERM ref) + ErlNifEvent event, // The file descriptor + void* obj, // The socket descriptor + const ErlNifPid* pid, // Destination + ERL_NIF_TERM sockRef, // Socket + ERL_NIF_TERM selectRef) // "ID" of the operation { - return enif_select(env, event, (ERL_NIF_SELECT_WRITE), obj, pid, ref); + ERL_NIF_TERM selectMsg = mk_select_msg(env, sockRef, selectRef); + + return enif_select_write(env, event, obj, pid, selectMsg, NULL); } @@ -17354,84 +17543,78 @@ int esock_select_cancel(ErlNifEnv* env, ACTIVATE_NEXT_FUNC_DECL(writer, write, currentWriter, writersQ) \ ACTIVATE_NEXT_FUNC_DECL(reader, read, currentReader, readersQ) -#define ACTIVATE_NEXT_FUNC_DECL(F, R, Q) \ - static \ - BOOLEAN_T activate_next_##F(ErlNifEnv* env, \ - ESockDescriptor* descP, \ - ERL_NIF_TERM sockRef) \ - { \ - return activate_next(env, descP, \ - &descP->R, &descP->Q, \ - sockRef); \ +#define ACTIVATE_NEXT_FUNC_DECL(F, S, R, Q) \ + static \ + BOOLEAN_T activate_next_##F(ErlNifEnv* env, \ + ESockDescriptor* descP, \ + ERL_NIF_TERM sockRef) \ + { \ + BOOLEAN_T popped, activated; \ + int sres; \ + ERL_NIF_TERM reason; \ + ESockRequestor* reqP = &descP->R; \ + ESockRequestQueue* q = &descP->Q; \ + \ + popped = FALSE; \ + do { \ + \ + if (requestor_pop(q, reqP)) { \ + \ + /* There was another one */ \ + \ + SSDBG( descP, \ + ("SOCKET", \ + "activate_next_" #F " -> new (active) requestor: " \ + "\r\n pid: %T" \ + "\r\n ref: %T" \ + "\r\n", reqP->pid, reqP->ref) ); \ + \ + if ((sres = esock_select_##S(env, descP->sock, descP, \ + &reqP->pid, sockRef, \ + reqP->ref)) < 0) { \ + \ + /* We need to inform this process, reqP->pid, */ \ + /* that we failed to select, so we don't leave */ \ + /* it hanging. */ \ + /* => send abort */ \ + \ + reason = MKT2(env, \ + esock_atom_select_failed, \ + MKI(env, sres)); \ + esock_send_abort_msg(env, sockRef, \ + reqP->ref, reqP->env, \ + reason, &reqP->pid); \ + \ + } else { \ + \ + /* Success: New requestor selected */ \ + popped = TRUE; \ + activated = FALSE; \ + \ + } \ + \ + } else { \ + \ + SSDBG( descP, \ + ("SOCKET", \ + "activate_next_" #F " -> no more requestors\r\n") ); \ + \ + popped = TRUE; \ + activated = FALSE; \ + } \ + \ + } while (!popped); \ + \ + SSDBG( descP, \ + ("SOCKET", "activate_next_" #F " -> " \ + "done with %s\r\n", B2S(activated)) ); \ + \ + return activated; \ } ACTIVATE_NEXT_FUNCS #undef ACTIVATE_NEXT_FUNC_DECL -/* *** activate_next *** - * - * This functions pops the requestor queue and then selects until it - * manages to successfully activate a new requestor or the queue is empty. - * Return value indicates if a new requestor was activated or not. - */ - -static -BOOLEAN_T activate_next(ErlNifEnv* env, - ESockDescriptor* descP, - ESockRequestor* reqP, - ESockRequestQueue* q, - ERL_NIF_TERM sockRef) -{ - BOOLEAN_T popped, activated; - int sres; - - popped = FALSE; - do { - - if (requestor_pop(q, reqP)) { - - /* There was another one */ - - SSDBG( descP, - ("SOCKET", "activate_next -> new (active) requestor: " - "\r\n pid: %T" - "\r\n ref: %T" - "\r\n", reqP->pid, reqP->ref) ); - - if ((sres = esock_select_read(env, descP->sock, descP, - &reqP->pid, reqP->ref)) < 0) { - /* We need to inform this process, reqP->pid, that we - * failed to select, so we don't leave it hanging. - * => send abort - */ - - esock_send_abort_msg(env, sockRef, reqP->ref, sres, &reqP->pid); - - } else { - - /* Success: New requestor selected */ - popped = TRUE; - activated = FALSE; - - } - - } else { - - SSDBG( descP, - ("SOCKET", "send_activate_next -> no more requestors\r\n") ); - - popped = TRUE; - activated = FALSE; - } - - } while (!popped); - - SSDBG( descP, - ("SOCKET", "activate_next -> " - "done with %s\r\n", B2S(activated)) ); - - return activated; -} @@ -17497,13 +17680,13 @@ REQ_SEARCH4PID_FUNCS ESockRequestor* reqP = &e->data; \ \ reqP->pid = pid; \ - reqP->ref = enif_make_copy(descP->env, ref); \ - \ if (MONP("reader_push -> " #F " request", \ env, descP, &pid, &reqP->mon) != 0) { \ FREE(reqP); \ return esock_make_error(env, atom_exmon); \ } \ + reqP->env = esock_alloc_env(#F "_push"); \ + reqP->ref = enif_make_copy(reqP->env, ref); \ \ qpush(&descP->Q, e); \ \ @@ -17579,13 +17762,15 @@ BOOLEAN_T requestor_pop(ESockRequestQueue* q, if (e != NULL) { reqP->pid = e->data.pid; reqP->mon = e->data.mon; + reqP->env = e->data.env; reqP->ref = e->data.ref; FREE(e); return TRUE; } else { - /* (writers) Queue was empty */ + /* Queue was empty */ enif_set_pid_undefined(&reqP->pid); - // *reqP->mon = NULL; we have no null value for monitors + MON_INIT(&reqP->mon); + reqP->env = NULL; reqP->ref = esock_atom_undefined; // Just in case return FALSE; } @@ -17763,10 +17948,10 @@ int esock_monitor(const char* slogan, res = enif_monitor_process(env, descP, pid, &monP->mon); if (res != 0) { - monP->is_active = 0; + monP->isActive = FALSE; SSDBG( descP, ("SOCKET", "[%d] monitor failed: %d\r\n", descP->sock, res) ); } else { - monP->is_active = 1; + monP->isActive = TRUE; } return res; @@ -17781,7 +17966,7 @@ int esock_demonitor(const char* slogan, { int res; - if (!monP->is_active) + if (!monP->isActive) return 1; SSDBG( descP, ("SOCKET", "[%d] %s: try demonitor\r\n", descP->sock, slogan) ); @@ -17802,7 +17987,7 @@ int esock_demonitor(const char* slogan, static void esock_monitor_init(ESockMonitor* monP) { - monP->is_active = 0; + monP->isActive = FALSE; } #endif // if !defined(__WIN32__) @@ -17824,10 +18009,6 @@ void socket_dtor(ErlNifEnv* env, void* obj) #if !defined(__WIN32__) ESockDescriptor* descP = (ESockDescriptor*) obj; - enif_clear_env(descP->env); - enif_free_env(descP->env); - descP->env = NULL; - MDESTROY(descP->writeMtx); MDESTROY(descP->readMtx); MDESTROY(descP->accMtx); @@ -17920,37 +18101,19 @@ void socket_stop(ErlNifEnv* env, void* obj, int fd, int is_direct_call) */ if (descP->currentWriterP != NULL) { + /* We have a (current) writer and *may* therefor also have * writers waiting. */ - DEMONP("socket_stop -> current writer", - env, descP, &descP->currentWriter.mon); + socket_stop_handle_current(env, + "writer", + descP, sockRef, &descP->currentWriter); - SSDBG( descP, ("SOCKET", "socket_stop -> handle current writer\r\n") ); - if (COMPARE_PIDS(&descP->closerPid, &descP->currentWriter.pid) != 0) { - SSDBG( descP, ("SOCKET", "socket_stop -> " - "send abort message to current writer %T\r\n", - descP->currentWriter.pid) ); - if (esock_send_abort_msg(env, - sockRef, - descP->currentWriter.ref, - atom_closed, - &descP->currentWriter.pid) != NULL) { - /* Shall we really do this? - * This happens if the controlling process has been killed! - */ - esock_warning_msg("Failed sending abort (%T) message to " - "current writer %T\r\n", - descP->currentWriter.ref, - descP->currentWriter.pid); - } - } - /* And also deal with the waiting writers (in the same way) */ SSDBG( descP, ("SOCKET", "socket_stop -> handle waiting writer(s)\r\n") ); inform_waiting_procs(env, "writer", - descP, &descP->writersQ, TRUE, atom_closed); + descP, sockRef, &descP->writersQ, TRUE, atom_closed); } @@ -17967,38 +18130,14 @@ void socket_stop(ErlNifEnv* env, void* obj, int fd, int is_direct_call) * readers waiting. */ - DEMONP("socket_stop -> current reader", - env, descP, &descP->currentReader.mon); + socket_stop_handle_current(env, + "reader", + descP, sockRef, &descP->currentReader); - SSDBG( descP, ("SOCKET", "socket_stop -> handle current reader\r\n") ); - if (COMPARE_PIDS(&descP->closerPid, &descP->currentReader.pid) != 0) { - SSDBG( descP, ("SOCKET", "socket_stop -> " - "send abort message to current reader %T\r\n", - descP->currentReader.pid) ); - /* - esock_dbg_printf("SOCKET", "socket_stop -> " - "send abort message to current reader %T\r\n", - descP->currentReader.pid); - */ - if (esock_send_abort_msg(env, - sockRef, - descP->currentReader.ref, - atom_closed, - &descP->currentReader.pid) != NULL) { - /* Shall we really do this? - * This happens if the controlling process has been killed! - */ - esock_warning_msg("Failed sending abort (%T) message to " - "current reader %T\r\n", - descP->currentReader.ref, - descP->currentReader.pid); - } - } - /* And also deal with the waiting readers (in the same way) */ SSDBG( descP, ("SOCKET", "socket_stop -> handle waiting reader(s)\r\n") ); inform_waiting_procs(env, "reader", - descP, &descP->readersQ, TRUE, atom_closed); + descP, sockRef, &descP->readersQ, TRUE, atom_closed); } @@ -18011,37 +18150,19 @@ void socket_stop(ErlNifEnv* env, void* obj, int fd, int is_direct_call) */ if (descP->currentAcceptorP != NULL) { + /* We have a (current) acceptor and *may* therefor also have * acceptors waiting. */ - - DEMONP("socket_stop -> current acceptor", - env, descP, &descP->currentAcceptor.mon); - - SSDBG( descP, ("SOCKET", "socket_stop -> handle current acceptor\r\n") ); - if (COMPARE_PIDS(&descP->closerPid, &descP->currentAcceptor.pid) != 0) { - SSDBG( descP, ("SOCKET", "socket_stop -> " - "send abort message to current acceptor %T\r\n", - descP->currentAcceptor.pid) ); - if (esock_send_abort_msg(env, - sockRef, - descP->currentAcceptor.ref, - atom_closed, - &descP->currentAcceptor.pid) != NULL) { - /* Shall we really do this? - * This happens if the controlling process has been killed! - */ - esock_warning_msg("Failed sending abort (%T) message to " - "current acceptor %T\r\n", - descP->currentAcceptor.ref, - descP->currentAcceptor.pid); - } - } - + + socket_stop_handle_current(env, + "acceptor", + descP, sockRef, &descP->currentAcceptor); + /* And also deal with the waiting acceptors (in the same way) */ SSDBG( descP, ("SOCKET", "socket_stop -> handle waiting acceptor(s)\r\n") ); inform_waiting_procs(env, "acceptor", - descP, &descP->acceptorsQ, TRUE, atom_closed); + descP, sockRef, &descP->acceptorsQ, TRUE, atom_closed); } @@ -18054,15 +18175,15 @@ void socket_stop(ErlNifEnv* env, void* obj, int fd, int is_direct_call) */ if (descP->sock != INVALID_SOCKET) { - + if (descP->closeLocal) { if (!is_direct_call) { /* +++ send close message to the waiting process +++ */ - esock_send_close_msg(env, descP, sockRef); - + esock_send_close_msg(env, descP, &descP->closerPid); + DEMONP("socket_stop -> closer", env, descP, &descP->closerMon); } else { @@ -18071,11 +18192,8 @@ void socket_stop(ErlNifEnv* env, void* obj, int fd, int is_direct_call) * since the message send takes care of it if scheduled. */ - if (descP->closeEnv != NULL) { - enif_clear_env(descP->closeEnv); - enif_free_env(descP->closeEnv); - descP->closeEnv = NULL; - } + if (descP->closeEnv != NULL) + esock_free_env("socket_stop - close-env", descP->closeEnv); } } @@ -18098,29 +18216,59 @@ void socket_stop(ErlNifEnv* env, void* obj, int fd, int is_direct_call) +/* *** socket_stop_handle_current *** + * + * Handle current requestor (reader, writer or acceptor) during + * socket stop. + */ +static +void socket_stop_handle_current(ErlNifEnv* env, + const char* role, + ESockDescriptor* descP, + ERL_NIF_TERM sockRef, + ESockRequestor* reqP) +{ + SSDBG( descP, ("SOCKET", "socket_stop -> handle current %s\r\n", role) ); + + DEMONP("socket_stop_handle_current", env, descP, &reqP->mon); + + if (COMPARE_PIDS(&descP->closerPid, &reqP->pid) != 0) { + + SSDBG( descP, ("SOCKET", "socket_stop_handle_current -> " + "send abort message to current %s %T\r\n", + role, reqP->pid) ); + + if (esock_send_abort_msg(env, sockRef, reqP->ref, reqP->env, + atom_closed, &reqP->pid) != NULL) { + + esock_warning_msg("Failed sending abort (%T) message to " + "current %s %T\r\n", + reqP->ref, role, reqP->pid); + } + } +} + + + /* This function traverse the queue and sends the specified * nif_abort message with the specified reason to each member, * and if the 'free' argument is TRUE, the queue will be emptied. */ #if !defined(__WIN32__) static void inform_waiting_procs(ErlNifEnv* env, - char* role, + const char* role, ESockDescriptor* descP, + ERL_NIF_TERM sockRef, ESockRequestQueue* q, BOOLEAN_T free, ERL_NIF_TERM reason) { ESockRequestQueueElement* currentP = q->first; ESockRequestQueueElement* nextP; - ERL_NIF_TERM sockRef = enif_make_resource(env, descP); - /* - esock_dbg_printf("SOCKET", "inform_waiting_procs -> entry with: " - "\r\n role: %s" - "\r\n free: %s" - "\r\n reason: %T" - "\r\n", role, B2S(free), reason); - */ + SSDBG( descP, + ("SOCKET", + "inform_waiting_procs -> handle waiting %s(s)\r\n", role) ); while (currentP != NULL) { @@ -18134,18 +18282,14 @@ static void inform_waiting_procs(ErlNifEnv* env, */ SSDBG( descP, - ("SOCKET", "inform_waiting_procs -> abort request %T (from %T)\r\n", + ("SOCKET", + "inform_waiting_procs -> abort request %T (from %T)\r\n", currentP->data.ref, currentP->data.pid) ); - /* - esock_dbg_printf("SOCKET", "inform_waiting_procs -> " - "try sending abort to %s %T " - "\r\n", role, currentP->data.pid); - */ - if (esock_send_abort_msg(env, sockRef, currentP->data.ref, + currentP->data.env, reason, ¤tP->data.pid) != NULL) { @@ -18211,7 +18355,7 @@ void socket_down(ErlNifEnv* env, descP->closeLocal = TRUE; descP->closerPid = *pid; MON_INIT(&descP->closerMon); - + sres = esock_select_stop(env, descP->sock, descP); if (sres & ERL_NIF_SELECT_STOP_CALLED) { diff --git a/erts/emulator/test/socket_SUITE.erl b/erts/emulator/test/socket_SUITE.erl index 2e3f40a350..2bd4163628 100644 --- a/erts/emulator/test/socket_SUITE.erl +++ b/erts/emulator/test/socket_SUITE.erl @@ -28,10 +28,14 @@ %% ESOCK_TEST_TRAFFIC: include %% ESOCK_TEST_TTEST: exclude %% +%% Variable that controls "verbosity" of the test case(s): +%% +%% ESOCK_TEST_QUIET: true (default) | false +%% %% Defines the runtime of the ttest cases %% (This is the time during which "measurement" is performed. %% the actual time it takes for the test case to complete -%% will be longer) +%% will be longer; setup, completion, ...) %% %% ESOCK_TEST_TTEST_RUNTIME: 10 seconds %% Format of values: <integer>[<unit>] @@ -5591,7 +5595,7 @@ sc_lc_receive_response_tcp(InitState) -> State1 = maps:remove(sock, State), {ok, State1}; {error, Reason} = ERROR -> - ?SEV_EPRINT("Unexpected read faulure: " + ?SEV_EPRINT("Unexpected read failure: " "~n ~p", [Reason]), ERROR end @@ -8987,6 +8991,7 @@ traffic_send_and_recv_chunks_tcp(InitState) -> end}, #{desc => "recv (one big)", cmd => fun(#{tester := Tester, csock := Sock, size := Size} = _State) -> + %% socket:setopt(Sock, otp, debug, true), case socket:recv(Sock, Size) of {ok, Data} -> ?SEV_ANNOUNCE_READY(Tester, @@ -11044,7 +11049,7 @@ tpp_tcp_client_msg_exchange_loop(Sock, _Send, _Recv, _Msg, end; tpp_tcp_client_msg_exchange_loop(Sock, Send, Recv, Data, Num, N, Sent, Received, Start) -> - %% d("tpp_tcp_client_msg_exchange_loop(~w,~w) try send", [Num,N]), + %% d("tpp_tcp_client_msg_exchange_loop(~w,~w) try send ~w", [Num,N,size(Data)]), case tpp_tcp_send_req(Sock, Send, Data) of {ok, SendSz} -> %% d("tpp_tcp_client_msg_exchange_loop(~w,~w) sent - " @@ -11057,11 +11062,13 @@ tpp_tcp_client_msg_exchange_loop(Sock, Send, Recv, Data, Received+RecvSz, Start); {error, RReason} -> - ?SEV_EPRINT("recv (~w of ~w): ~p", [N, Num, RReason]), + ?SEV_EPRINT("recv (~w of ~w): ~p: " + "~n ~p", [N, Num, RReason, mq()]), exit({recv, RReason, N}) end; {error, SReason} -> - ?SEV_EPRINT("send (~w of ~w): ~p", [N, Num, SReason]), + ?SEV_EPRINT("send (~w of ~w): ~p" + "~n ~p", [N, Num, SReason, mq()]), exit({send, SReason, N}) end. @@ -11121,7 +11128,7 @@ tpp_tcp_recv(Sock, Recv, Tag) -> tpp_tcp_recv(Sock, Recv, Tag, Remains, size(Msg), [Data]); {ok, <<Tag:32/integer, _/binary>>} -> {error, {invalid_msg_tag, Tag}}; - {error, _} = ERROR -> + {error, _R} = ERROR -> ERROR end. @@ -11135,7 +11142,7 @@ tpp_tcp_recv(Sock, Recv, Tag, Remaining, AccSz, Acc) -> tpp_tcp_recv(Sock, Recv, Tag, Remaining - size(Data), AccSz + size(Data), [Data | Acc]); - {error, _} = ERROR -> + {error, _R} = ERROR -> ERROR end. @@ -11173,6 +11180,14 @@ tpp_tcp_send_msg(Sock, Send, Msg, AccSz) when is_binary(Msg) -> %% size_of_iovec([B|IOVec], Sz) -> %% size_of_iovec(IOVec, Sz+size(B)). +mq() -> + mq(self()). + +mq(Pid) when is_pid(Pid) -> + Tag = messages, + {Tag, Msgs} = process_info(Pid, Tag), + Msgs. + %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% diff --git a/erts/preloaded/ebin/socket.beam b/erts/preloaded/ebin/socket.beam Binary files differindex 23b3269990..593bd7a797 100644 --- a/erts/preloaded/ebin/socket.beam +++ b/erts/preloaded/ebin/socket.beam diff --git a/erts/preloaded/src/socket.erl b/erts/preloaded/src/socket.erl index 5687b067f3..126db66cdd 100644 --- a/erts/preloaded/src/socket.erl +++ b/erts/preloaded/src/socket.erl @@ -520,7 +520,7 @@ %% necessary adapt (increase) the buffer size until all of %% it fits. %% -%% Note that not all of these flags is useful for every recv function! +%% Note that not all of these flags are useful for every recv function! %% -type recv_flags() :: [recv_flag()]. -type recv_flag() :: cmsg_cloexec | @@ -531,7 +531,6 @@ -type shutdown_how() :: read | write | read_write. -%% These are just place-holder(s) - used by the sendmsg/recvmsg functions... -type msghdr_flag() :: ctrunc | eor | errqueue | oob | trunc. -type msghdr_flags() :: [msghdr_flag()]. -type msghdr() :: #{ @@ -588,7 +587,7 @@ %% This is used in messages sent from the nif-code to erlang processes: %% -%% {?SOCKET_TAG, Socket, Tag, Info} +%% {?SOCKET_TAG, Socket :: socket(), Tag :: atom(), Info :: term()} %% -define(SOCKET_TAG, '$socket'). @@ -955,77 +954,50 @@ supports(_Key1, _Key2, _Key3) -> %% =========================================================================== %% -%% open - create an endpoint for communication -%% -%% Extra: netns -%% %% <KOLLA> %% %% How do we handle the case when an fd has been created (somehow) %% and we shall create a socket "from it". %% Can we figure out Domain, Type and Protocol from fd? -%% Yes we can: SO_DOMAIN, SO_PROTOCOL, SO_TYPE -%% But does that work on all platforms? Or shall we require that the -%% caller provide this explicitly? -%% +%% No we can't: For instance, its not possible to 'get' domain on FreeBSD. +%% +%% Instead, require: open(Domain, Stream, Proto, #{fd => FD}). +%% The last argument, Extra, is used to provide the fd. +%% %% </KOLLA> %% %% %% <KOLLA> %% -%% Start a controller process here, *before* the nif_open call. -%% If that call is successful, update with owner process (controlling -%% process) and SockRef. If the open fails, kill the process. -%% "Register" the process on success: -%% -%% nif_register(SockRef, self()). -%% -%% <ALSO> -%% -%% Maybe register the process under a name? -%% Something like: -%% -%% list_to_atom(lists:flatten(io_lib:format("socket-~p", [SockRef]))). -%% -%% </ALSO> +%% Possibly add a "registry" in the nif, allowing the user processes to +%% "register" themselves. +%% The point of this would be to ensure that these processes are +%% informed if the socket "terminates". Could possibly be used for +%% other things? If gen_tcp implements the active feature using +%% a reader process, the nif may need to know about this process, +%% since its probably "hidden" from the socket "owner" (someone +%% needs to handle it if it dies). +%% Register under a name? %% %% The nif sets up a monitor to this process, and if it dies the socket %% is closed. It is also used if someone wants to monitor the socket. %% -%% We therefor need monitor function(s): +%% We may therefor need monitor function(s): %% %% socket:monitor(Socket) %% socket:demonitor(Socket) %% -%% These are basically used to monitor the controller process. -%% Should the socket record therefor contain the pid of the controller process? -%% %% </KOLLA> %% -%% -spec open(FD) -> {ok, Socket} | {error, Reason} when -%% Socket :: socket(), -%% Reason :: term(). - -%% open(FD) -> -%% try -%% begin -%% case nif_open(FD) of -%% {ok, {SockRef, Domain, Type, Protocol}} -> -%% SocketInfo = #{domain => Domain, -%% type => Type, -%% protocol => Protocol}, -%% Socket = #socket{info = SocketInfo, -%% ref = SockRef}, -%% {ok, Socket}; -%% {error, _} = ERROR -> -%% ERROR -%% end -%% end -%% catch -%% _:_ -> % This must be improved!! -%% {error, einval} -%% end. + + +%% =========================================================================== +%% +%% open - create an endpoint for communication +%% +%% Extra: Currently only used for netns +%% -spec open(Domain, Type) -> {ok, Socket} | {error, Reason} when Domain :: domain(), @@ -1251,21 +1223,7 @@ connect(#socket{ref = SockRef}, #{family := Fam} = SockAddr, Timeout) %% Connecting... NewTimeout = next_timeout(TS, Timeout), receive - {select, SockRef, Ref, ready_output} -> - %% <KOLLA> - %% - %% See open above!! - %% - %% * Here we should start and *register* the reader process - %% (This will cause the nif code to create a monitor to - %% the process) - %% * The reader is basically used to implement the active-X - %% feature! - %% * If the reader dies for whatever reason, then the socket - %% (resource) closes and the owner (controlling) process - %% is informed (closed message). - %% - %% </KOLLA> + {?SOCKET_TAG, #socket{ref = SockRef}, select, Ref} -> nif_finalize_connection(SockRef) after NewTimeout -> cancel(SockRef, connect, Ref), @@ -1331,16 +1289,6 @@ do_accept(LSockRef, Timeout) -> AccRef = make_ref(), case nif_accept(LSockRef, AccRef) of {ok, SockRef} -> - %% <KOLLA> - %% - %% * Here we should start and *register* the reader process - %% (This will cause the nif code to create a monitor to the process) - %% * The reader is basically used to implement the active-X feature! - %% * If the reader dies for whatever reason, then the socket (resource) - %% closes and the owner (controlling) process is informed (closed - %% message). - %% - %% </KOLLA> Socket = #socket{ref = SockRef}, {ok, Socket}; @@ -1350,10 +1298,10 @@ do_accept(LSockRef, Timeout) -> %% the receive. NewTimeout = next_timeout(TS, Timeout), receive - {select, LSockRef, AccRef, ready_input} -> + {?SOCKET_TAG, #socket{ref = LSockRef}, select, AccRef} -> do_accept(LSockRef, next_timeout(TS, Timeout)); - {?SOCKET_TAG, _, abort, {AccRef, Reason}} -> + {?SOCKET_TAG, _Socket, abort, {AccRef, Reason}} -> {error, Reason} after NewTimeout -> @@ -1422,15 +1370,17 @@ do_send(SockRef, Data, EFlags, Timeout) -> NewTimeout = next_timeout(TS, Timeout), %% We are partially done, wait for continuation receive - {select, SockRef, SendRef, ready_output} when (Written > 0) -> + {?SOCKET_TAG, #socket{ref = SockRef}, select, SendRef} + when (Written > 0) -> <<_:Written/binary, Rest/binary>> = Data, do_send(SockRef, Rest, EFlags, next_timeout(TS, Timeout)); - {select, SockRef, SendRef, ready_output} -> + + {?SOCKET_TAG, #socket{ref = SockRef}, select, SendRef} -> do_send(SockRef, Data, EFlags, next_timeout(TS, Timeout)); - {?SOCKET_TAG, _, abort, {SendRef, Reason}} -> + {?SOCKET_TAG, _Socket, abort, {SendRef, Reason}} -> {error, Reason} after NewTimeout -> @@ -1439,11 +1389,11 @@ do_send(SockRef, Data, EFlags, Timeout) -> end; {error, eagain} -> receive - {select, SockRef, SendRef, ready_output} -> + {?SOCKET_TAG, #socket{ref = SockRef}, select, SendRef} -> do_send(SockRef, Data, EFlags, next_timeout(TS, Timeout)); - {?SOCKET_TAG, _, abort, {SendRef, Reason}} -> + {?SOCKET_TAG, _Socket, abort, {SendRef, Reason}} -> {error, Reason} after Timeout -> @@ -1527,15 +1477,17 @@ do_sendto(SockRef, Data, Dest, EFlags, Timeout) -> {ok, Written} -> %% We are partially done, wait for continuation receive - {select, SockRef, SendRef, ready_output} when (Written > 0) -> + {?SOCKET_TAG, #socket{ref = SockRef}, select, SendRef} + when (Written > 0) -> <<_:Written/binary, Rest/binary>> = Data, do_sendto(SockRef, Rest, Dest, EFlags, next_timeout(TS, Timeout)); - {select, SockRef, SendRef, ready_output} -> + + {?SOCKET_TAG, #socket{ref = SockRef}, select, SendRef} -> do_sendto(SockRef, Data, Dest, EFlags, next_timeout(TS, Timeout)); - {?SOCKET_TAG, _, abort, {SendRef, Reason}} -> + {?SOCKET_TAG, _Socket, abort, {SendRef, Reason}} -> {error, Reason} after Timeout -> @@ -1545,11 +1497,11 @@ do_sendto(SockRef, Data, Dest, EFlags, Timeout) -> {error, eagain} -> receive - {select, SockRef, SendRef, ready_output} -> + {?SOCKET_TAG, #socket{ref = SockRef}, select, SendRef} -> do_sendto(SockRef, Data, Dest, EFlags, next_timeout(TS, Timeout)); - {?SOCKET_TAG, _, abort, {SendRef, Reason}} -> + {?SOCKET_TAG, _Socket, abort, {SendRef, Reason}} -> {error, Reason} after Timeout -> @@ -1599,7 +1551,8 @@ sendmsg(Socket, MsgHdr, Timeout) sendmsg(Socket, MsgHdr, ?SOCKET_SENDMSG_FLAGS_DEFAULT, Timeout). --spec sendmsg(Socket, MsgHdr, Flags, Timeout) -> ok | {ok, Remaining} | {error, Reason} when +-spec sendmsg(Socket, MsgHdr, Flags, Timeout) -> + ok | {ok, Remaining} | {error, Reason} when Socket :: socket(), MsgHdr :: msghdr(), Flags :: send_flags(), @@ -1631,7 +1584,6 @@ do_sendmsg(SockRef, MsgHdr, EFlags, Timeout) -> ok; {ok, Written} when is_integer(Written) andalso (Written > 0) -> - %% We should not retry here since the protocol may not %% be able to handle a message being split. Leave it to %% the caller to figure out (call again with the rest). @@ -1644,9 +1596,10 @@ do_sendmsg(SockRef, MsgHdr, EFlags, Timeout) -> {error, eagain} -> receive - {select, SockRef, SendRef, ready_output} -> + {?SOCKET_TAG, #socket{ref = SockRef}, select, SendRef} -> do_sendmsg(SockRef, MsgHdr, EFlags, next_timeout(TS, Timeout)) + after Timeout -> cancel(SockRef, sendmsg, SendRef), {error, timeout} @@ -1674,13 +1627,6 @@ ensure_msghdr(_) -> %% =========================================================================== %% -%% writev - write data into multiple buffers -%% - - - -%% =========================================================================== -%% %% recv, recvfrom, recvmsg - receive a message from a socket %% %% Description: @@ -1763,14 +1709,10 @@ do_recv(SockRef, _OldRef, Length, EFlags, Acc, Timeout) (is_integer(Timeout) andalso (Timeout > 0)) -> TS = timestamp(Timeout), RecvRef = make_ref(), - %% p("do_recv -> try read with" - %% "~n Length: ~p", [Length]), case nif_recv(SockRef, RecvRef, Length, EFlags) of {ok, true = _Complete, Bin} when (size(Acc) =:= 0) -> - %% p("do_recv -> complete success: ~w", [size(Bin)]), {ok, Bin}; {ok, true = _Complete, Bin} -> - %% p("do_recv -> completed success: ~w (~w)", [size(Bin), size(Acc)]), {ok, <<Acc/binary, Bin/binary>>}; %% It depends on the amount of bytes we tried to read: @@ -1779,7 +1721,6 @@ do_recv(SockRef, _OldRef, Length, EFlags, Acc, Timeout) %% > 0 - We got a part of the message and we will be notified %% when there is more to read (a select message) {ok, false = _Complete, Bin} when (Length =:= 0) -> - %% p("do_recv -> partial success: ~w", [size(Bin)]), do_recv(SockRef, RecvRef, Length, EFlags, <<Acc/binary, Bin/binary>>, @@ -1789,17 +1730,15 @@ do_recv(SockRef, _OldRef, Length, EFlags, Acc, Timeout) %% We got the first chunk of it. %% We will be notified (select message) when there %% is more to read. - %% p("do_recv -> partial success(~w): ~w" - %% "~n ~p", [Length, size(Bin), Bin]), NewTimeout = next_timeout(TS, Timeout), receive - {select, SockRef, RecvRef, ready_input} -> + {?SOCKET_TAG, #socket{ref = SockRef}, select, RecvRef} -> do_recv(SockRef, RecvRef, Length-size(Bin), EFlags, Bin, next_timeout(TS, Timeout)); - {?SOCKET_TAG, _, abort, {RecvRef, Reason}} -> + {?SOCKET_TAG, _Socket, abort, {RecvRef, Reason}} -> {error, Reason} after NewTimeout -> @@ -1809,17 +1748,15 @@ do_recv(SockRef, _OldRef, Length, EFlags, Acc, Timeout) {ok, false = _Completed, Bin} -> %% We got a chunk of it! - %% p("do_recv -> partial success(~w): ~w (~w)", - %% [Length, size(Bin), size(Acc)]), NewTimeout = next_timeout(TS, Timeout), receive - {select, SockRef, RecvRef, ready_input} -> + {?SOCKET_TAG, #socket{ref = SockRef}, select, RecvRef} -> do_recv(SockRef, RecvRef, Length-size(Bin), EFlags, <<Acc/binary, Bin/binary>>, next_timeout(TS, Timeout)); - {?SOCKET_TAG, _, abort, {RecvRef, Reason}} -> + {?SOCKET_TAG, _Socket, abort, {RecvRef, Reason}} -> {error, Reason} after NewTimeout -> @@ -1835,16 +1772,15 @@ do_recv(SockRef, _OldRef, Length, EFlags, Acc, Timeout) {error, eagain} -> %% There is nothing just now, but we will be notified when there %% is something to read (a select message). - %% p("do_recv -> eagain(~w): ~w", [Length, size(Acc)]), NewTimeout = next_timeout(TS, Timeout), receive - {select, SockRef, RecvRef, ready_input} -> + {?SOCKET_TAG, #socket{ref = SockRef}, select, RecvRef} -> do_recv(SockRef, RecvRef, Length, EFlags, Acc, next_timeout(TS, Timeout)); - {?SOCKET_TAG, _, abort, {RecvRef, Reason}} -> + {?SOCKET_TAG, _Socket, abort, {RecvRef, Reason}} -> {error, Reason} after NewTimeout -> @@ -1893,7 +1829,7 @@ do_recv(_SockRef, _RecvRef, _Length, _EFlags, _Acc, _Timeout) -> %% It may be impossible to know what (buffer) size is appropriate %% "in advance", and in those cases it may be convenient to use the %% (recv) 'peek' flag. When this flag is provided the message is *not* -%% "consumed" from the underlying buffers, so another recvfrom call +%% "consumed" from the underlying (OS) buffers, so another recvfrom call %% is needed, possibly with a then adjusted buffer size. %% @@ -1979,11 +1915,11 @@ do_recvfrom(SockRef, BufSz, EFlags, Timeout) -> %% is something to read (a select message). NewTimeout = next_timeout(TS, Timeout), receive - {select, SockRef, RecvRef, ready_input} -> + {?SOCKET_TAG, #socket{ref = SockRef}, select, RecvRef} -> do_recvfrom(SockRef, BufSz, EFlags, next_timeout(TS, Timeout)); - {?SOCKET_TAG, _, abort, {RecvRef, Reason}} -> + {?SOCKET_TAG, _Socket, abort, {RecvRef, Reason}} -> {error, Reason} after NewTimeout -> @@ -1996,13 +1932,6 @@ do_recvfrom(SockRef, BufSz, EFlags, Timeout) -> end. -%% pi(Item) -> -%% pi(self(), Item). - -%% pi(Pid, Item) -> -%% {Item, Info} = process_info(Pid, Item), -%% Info. - %% --------------------------------------------------------------------------- %% @@ -2083,11 +2012,11 @@ do_recvmsg(SockRef, BufSz, CtrlSz, EFlags, Timeout) -> %% is something to read (a select message). NewTimeout = next_timeout(TS, Timeout), receive - {select, SockRef, RecvRef, ready_input} -> + {?SOCKET_TAG, #socket{ref = SockRef}, select, RecvRef} -> do_recvmsg(SockRef, BufSz, CtrlSz, EFlags, next_timeout(TS, Timeout)); - {?SOCKET_TAG, _, abort, {RecvRef, Reason}} -> + {?SOCKET_TAG, _Socket, abort, {RecvRef, Reason}} -> {error, Reason} after NewTimeout -> @@ -2106,12 +2035,6 @@ do_recvmsg(SockRef, BufSz, CtrlSz, EFlags, Timeout) -> -%% =========================================================================== -%% -%% readv - read data into multiple buffers -%% - - %% =========================================================================== %% @@ -2127,7 +2050,6 @@ do_recvmsg(SockRef, BufSz, CtrlSz, EFlags, Timeout) -> %% Before we call the socket close function, we set the socket %% BLOCKING. Thereby linger is handled properly. - -spec close(Socket) -> ok | {error, Reason} when Socket :: socket(), Reason :: term(). @@ -2387,6 +2309,8 @@ which_protocol(SockRef) -> end. + + %% =========================================================================== %% %% sockname - return the current address of the socket. @@ -3505,7 +3429,7 @@ cancel(SockRef, Op, OpRef) -> flush_select_msgs(SockRef, Ref) -> receive - {select, SockRef, Ref, _} -> + {?SOCKET_TAG, #socket{ref = SockRef}, select, Ref} -> flush_select_msgs(SockRef, Ref) after 0 -> ok @@ -3574,8 +3498,9 @@ tdiff(T1, T2) -> %% p(undefined, F, A) -> %% p("***", F, A); %% p(SName, F, A) -> -%% io:format(user,"[~s,~p] " ++ F ++ "~n", [SName, self()|A]), -%% io:format("[~s,~p] " ++ F ++ "~n", [SName, self()|A]). +%% TS = formated_timestamp(), +%% io:format(user,"[~s][~s,~p] " ++ F ++ "~n", [TS, SName, self()|A]), +%% io:format("[~s][~s,~p] " ++ F ++ "~n", [TS, SName, self()|A]). |