diff options
author | Micael Karlberg <[email protected]> | 2019-07-08 16:41:28 +0200 |
---|---|---|
committer | Micael Karlberg <[email protected]> | 2019-07-08 16:41:28 +0200 |
commit | 1e2343e748ba212536a568a5d40357d75ccb6b7a (patch) | |
tree | 17d7e6702b4bd39887d4c78a4fcfbf09e73d312d /erts | |
parent | f8bd1d27e86b338693b72fd1bd6938876660dfcf (diff) | |
parent | a492dd84c72ca8bad5e9a52e658c9448643a9cfe (diff) | |
download | otp-1e2343e748ba212536a568a5d40357d75ccb6b7a.tar.gz otp-1e2343e748ba212536a568a5d40357d75ccb6b7a.tar.bz2 otp-1e2343e748ba212536a568a5d40357d75ccb6b7a.zip |
Merge branch 'bmk/erts/esock/20190626/finalize_counters/OTP-15818' into maint
Diffstat (limited to 'erts')
-rw-r--r-- | erts/doc/src/socket.xml | 23 | ||||
-rw-r--r-- | erts/emulator/nifs/common/socket_int.h | 3 | ||||
-rw-r--r-- | erts/emulator/nifs/common/socket_nif.c | 446 | ||||
-rw-r--r-- | erts/emulator/test/socket_SUITE.erl | 1922 | ||||
-rw-r--r-- | erts/preloaded/ebin/socket.beam | bin | 75584 -> 76064 bytes | |||
-rw-r--r-- | erts/preloaded/src/socket.erl | 38 |
6 files changed, 2365 insertions, 67 deletions
diff --git a/erts/doc/src/socket.xml b/erts/doc/src/socket.xml index b4e22e86a8..da049b1a7f 100644 --- a/erts/doc/src/socket.xml +++ b/erts/doc/src/socket.xml @@ -112,6 +112,15 @@ <name name="select_info"/> </datatype> <datatype> + <name name="socket_counters"/> + </datatype> + <datatype> + <name name="socket_counter"/> + </datatype> + <datatype> + <name name="socket_info"/> + </datatype> + <datatype> <name name="ip4_address"/> </datatype> <datatype> @@ -456,6 +465,20 @@ </func> <func> + <name name="info" arity="1" since="OTP @OTP-15818@"/> + <fsummary>Get miscellaneous socket info.</fsummary> + <desc> + <p>Get miscellaneous info about the socket.</p> + <p>The function returns a map with each info item as a key-value + binding. It reflects the "current" state of the socket. </p> + <note> + <p>In order to ensure data integrity, mutex'es are taken when + needed. So, do not call this function often. </p> + </note> + </desc> + </func> + + <func> <name name="listen" arity="1" since="OTP 22.0"/> <name name="listen" arity="2" since="OTP 22.0"/> <fsummary>Listen for connections on a socket.</fsummary> diff --git a/erts/emulator/nifs/common/socket_int.h b/erts/emulator/nifs/common/socket_int.h index ac689e82b1..4161775a04 100644 --- a/erts/emulator/nifs/common/socket_int.h +++ b/erts/emulator/nifs/common/socket_int.h @@ -102,6 +102,9 @@ typedef unsigned int BOOLEAN_T; /* ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ * "Global" atoms + * + * Note that when an (global) atom is added here, it must also be added + * in the socket_nif.c file! */ #define GLOBAL_ATOM_DEFS \ diff --git a/erts/emulator/nifs/common/socket_nif.c b/erts/emulator/nifs/common/socket_nif.c index 8b0d7ce4a9..211f21cb40 100644 --- a/erts/emulator/nifs/common/socket_nif.c +++ b/erts/emulator/nifs/common/socket_nif.c @@ -673,7 +673,7 @@ typedef union { /* =================================================================== * * * - * Various enif macros * + * Various esockmacros * * * * =================================================================== */ @@ -682,6 +682,12 @@ typedef union { /* Socket specific debug */ #define SSDBG( __D__ , proto ) ESOCK_DBG_PRINTF( (__D__)->dbg , proto ) +#define SOCK_CNT_INC( __E__, __D__, SF, ACNT, CNT, INC) \ + { \ + if (cnt_inc(CNT, INC) && (__D__)->iow) { \ + esock_send_wrap_msg(__E__, __D__, SF, ACNT); \ + } \ + } /* =================================================================== * @@ -866,6 +872,7 @@ typedef struct { Uint32 readByteCnt; Uint32 readTries; Uint32 readWaits; + Uint32 readFails; /* +++ Accept stuff +++ */ ErlNifMutex* accMtx; @@ -1009,6 +1016,7 @@ ESOCK_NIF_FUNCS #if !defined(__WIN32__) /* And here comes the functions that does the actual work (for the most part) */ + static BOOLEAN_T ecommand2command(ErlNifEnv* env, ERL_NIF_TERM ecommand, Uint16* command, @@ -1018,6 +1026,28 @@ static ERL_NIF_TERM ncommand(ErlNifEnv* env, ERL_NIF_TERM ecdata); static ERL_NIF_TERM ncommand_debug(ErlNifEnv* env, ERL_NIF_TERM ecdata); +static ERL_NIF_TERM esock_global_info(ErlNifEnv* env); +static ERL_NIF_TERM esock_socket_info(ErlNifEnv* env, + ESockDescriptor* descP); +static ERL_NIF_TERM esock_socket_info_counters(ErlNifEnv* env, + ESockDescriptor* descP); +#define ESOCK_SOCKET_INFO_REQ_FUNCS \ + ESOCK_SOCKET_INFO_REQ_FUNC_DEF(readers); \ + ESOCK_SOCKET_INFO_REQ_FUNC_DEF(writers); \ + ESOCK_SOCKET_INFO_REQ_FUNC_DEF(acceptors); + +#define ESOCK_SOCKET_INFO_REQ_FUNC_DEF(F) \ + static ERL_NIF_TERM esock_socket_info_##F(ErlNifEnv* env, \ + ESockDescriptor* descP); +ESOCK_SOCKET_INFO_REQ_FUNCS +#undef ESOCK_SOCKET_INFO_REQ_FUNC_DEF + +static ERL_NIF_TERM socket_info_reqs(ErlNifEnv* env, + ESockDescriptor* descP, + ErlNifMutex* mtx, + ESockRequestor* crp, + ESockRequestQueue* q); + static ERL_NIF_TERM nsupports(ErlNifEnv* env, int key); static ERL_NIF_TERM nsupports_options(ErlNifEnv* env); static ERL_NIF_TERM nsupports_options_socket(ErlNifEnv* env); @@ -2555,6 +2585,10 @@ static void socket_down_reader(ErlNifEnv* env, ERL_NIF_TERM sockRef, const ErlNifPid* pid); +static char* esock_send_wrap_msg(ErlNifEnv* env, + ESockDescriptor* descP, + ERL_NIF_TERM sockRef, + ERL_NIF_TERM cnt); static char* esock_send_close_msg(ErlNifEnv* env, ESockDescriptor* descP, ErlNifPid* pid); @@ -2573,6 +2607,9 @@ static ERL_NIF_TERM mk_abort_msg(ErlNifEnv* env, ERL_NIF_TERM sockRef, ERL_NIF_TERM opRef, ERL_NIF_TERM reason); +static ERL_NIF_TERM mk_wrap_msg(ErlNifEnv* env, + ERL_NIF_TERM sockRef, + ERL_NIF_TERM cnt); static ERL_NIF_TERM mk_close_msg(ErlNifEnv* env, ERL_NIF_TERM sockRef, ERL_NIF_TERM closeRef); @@ -2647,7 +2684,10 @@ static char str_exsend[] = "exsend"; // failed send -/* *** Global atoms *** */ +/* *** Global atoms *** + * Note that when an (global) atom is added here, it must also be added + * in the socket_int.h file! + */ #define GLOBAL_ATOMS \ GLOBAL_ATOM_DECL(abort); \ GLOBAL_ATOM_DECL(accept); \ @@ -2871,6 +2911,8 @@ ERL_NIF_TERM esock_atom_socket_tag; // This has a "special" name ('$socket') LOCAL_ATOM_DECL(closed); \ LOCAL_ATOM_DECL(closing); \ LOCAL_ATOM_DECL(cookie_life); \ + LOCAL_ATOM_DECL(counter_wrap); \ + LOCAL_ATOM_DECL(counters); \ LOCAL_ATOM_DECL(data_in); \ LOCAL_ATOM_DECL(do); \ LOCAL_ATOM_DECL(dont); \ @@ -2894,6 +2936,7 @@ ERL_NIF_TERM esock_atom_socket_tag; // This has a "special" name ('$socket') LOCAL_ATOM_DECL(mode); \ LOCAL_ATOM_DECL(multiaddr); \ LOCAL_ATOM_DECL(null); \ + LOCAL_ATOM_DECL(num_acceptors); \ LOCAL_ATOM_DECL(num_dinet); \ LOCAL_ATOM_DECL(num_dinet6); \ LOCAL_ATOM_DECL(num_dlocal); \ @@ -2903,14 +2946,21 @@ ERL_NIF_TERM esock_atom_socket_tag; // This has a "special" name ('$socket') LOCAL_ATOM_DECL(num_psctp); \ LOCAL_ATOM_DECL(num_ptcp); \ LOCAL_ATOM_DECL(num_pudp); \ + LOCAL_ATOM_DECL(num_readers); \ LOCAL_ATOM_DECL(num_sockets); \ LOCAL_ATOM_DECL(num_tdgrams); \ LOCAL_ATOM_DECL(num_tseqpkgs); \ LOCAL_ATOM_DECL(num_tstreams); \ + LOCAL_ATOM_DECL(num_writers); \ LOCAL_ATOM_DECL(partial_delivery); \ LOCAL_ATOM_DECL(peer_error); \ LOCAL_ATOM_DECL(peer_rwnd); \ LOCAL_ATOM_DECL(probe); \ + LOCAL_ATOM_DECL(read_byte); \ + LOCAL_ATOM_DECL(read_fails); \ + LOCAL_ATOM_DECL(read_pkg); \ + LOCAL_ATOM_DECL(read_tries); \ + LOCAL_ATOM_DECL(read_waits); \ LOCAL_ATOM_DECL(select); \ LOCAL_ATOM_DECL(sender_dry); \ LOCAL_ATOM_DECL(send_failure); \ @@ -2919,7 +2969,12 @@ ERL_NIF_TERM esock_atom_socket_tag; // This has a "special" name ('$socket') LOCAL_ATOM_DECL(sourceaddr); \ LOCAL_ATOM_DECL(timeout); \ LOCAL_ATOM_DECL(true); \ - LOCAL_ATOM_DECL(want); + LOCAL_ATOM_DECL(want); \ + LOCAL_ATOM_DECL(write_byte); \ + LOCAL_ATOM_DECL(write_fails); \ + LOCAL_ATOM_DECL(write_pkg); \ + LOCAL_ATOM_DECL(write_tries); \ + LOCAL_ATOM_DECL(write_waits); /* Local error reason atoms */ #define LOCAL_ERROR_REASON_ATOMS \ @@ -3022,7 +3077,7 @@ static ESOCK_INLINE ErlNifEnv* esock_alloc_env(const char* slogan) * Description: * This is currently just a placeholder... */ -#define MKCT(E, T, C) MKT2((E), (T), MKI((E), (C))) +#define MKCT(E, T, C) MKT2((E), (T), MKUI((E), (C))) static ERL_NIF_TERM nif_info(ErlNifEnv* env, @@ -3032,41 +3087,166 @@ ERL_NIF_TERM nif_info(ErlNifEnv* env, #if defined(__WIN32__) return enif_raise_exception(env, MKA(env, "notsup")); #else - if (argc != 0) { + ERL_NIF_TERM info; + + SGDBG( ("SOCKET", "nif_info -> entry with %d args\r\n", argc) ); + + switch (argc) { + case 0: + info = esock_global_info(env); + break; + + case 1: + { + ESockDescriptor* descP; + + if (!enif_get_resource(env, argv[0], sockets, (void**) &descP)) { + return enif_make_badarg(env); + } + SSDBG( descP, ("SOCKET", "nif_info -> get socket info\r\n") ); + info = esock_socket_info(env, descP); + } + break; + + default: return enif_make_badarg(env); - } else { - ERL_NIF_TERM numSockets = MKCT(env, atom_num_sockets, data.numSockets); - ERL_NIF_TERM numTypeDGrams = MKCT(env, atom_num_tdgrams, data.numTypeDGrams); - ERL_NIF_TERM numTypeStreams = MKCT(env, atom_num_tstreams, data.numTypeStreams); - ERL_NIF_TERM numTypeSeqPkgs = MKCT(env, atom_num_tseqpkgs, data.numTypeSeqPkgs); - ERL_NIF_TERM numDomLocal = MKCT(env, atom_num_dlocal, data.numDomainLocal); - ERL_NIF_TERM numDomInet = MKCT(env, atom_num_dinet, data.numDomainInet); - ERL_NIF_TERM numDomInet6 = MKCT(env, atom_num_dinet6, data.numDomainInet6); - ERL_NIF_TERM numProtoIP = MKCT(env, atom_num_pip, data.numProtoIP); - ERL_NIF_TERM numProtoTCP = MKCT(env, atom_num_ptcp, data.numProtoTCP); - ERL_NIF_TERM numProtoUDP = MKCT(env, atom_num_pudp, data.numProtoUDP); - ERL_NIF_TERM numProtoSCTP = MKCT(env, atom_num_psctp, data.numProtoSCTP); - ERL_NIF_TERM gcnt[] = {numSockets, - numTypeDGrams, numTypeStreams, numTypeSeqPkgs, - numDomLocal, numDomInet, numDomInet6, - numProtoIP, numProtoTCP, numProtoUDP, numProtoSCTP}; - unsigned int lenGCnt = sizeof(gcnt) / sizeof(ERL_NIF_TERM); - ERL_NIF_TERM lgcnt = MKLA(env, gcnt, lenGCnt); - ERL_NIF_TERM keys[] = {esock_atom_debug, atom_iow, atom_global_counters}; - ERL_NIF_TERM vals[] = {BOOL2ATOM(data.dbg), BOOL2ATOM(data.iow), lgcnt}; - ERL_NIF_TERM info; - unsigned int numKeys = sizeof(keys) / sizeof(ERL_NIF_TERM); - unsigned int numVals = sizeof(vals) / sizeof(ERL_NIF_TERM); + } - ESOCK_ASSERT( (numKeys == numVals) ); + return info; + +#endif +} + + +/* + * This function return a property list containing "global" info. + */ +#if !defined(__WIN32__) +static +ERL_NIF_TERM esock_global_info(ErlNifEnv* env) +{ + ERL_NIF_TERM numSockets = MKCT(env, atom_num_sockets, data.numSockets); + ERL_NIF_TERM numTypeDGrams = MKCT(env, atom_num_tdgrams, data.numTypeDGrams); + ERL_NIF_TERM numTypeStreams = MKCT(env, atom_num_tstreams, data.numTypeStreams); + ERL_NIF_TERM numTypeSeqPkgs = MKCT(env, atom_num_tseqpkgs, data.numTypeSeqPkgs); + ERL_NIF_TERM numDomLocal = MKCT(env, atom_num_dlocal, data.numDomainLocal); + ERL_NIF_TERM numDomInet = MKCT(env, atom_num_dinet, data.numDomainInet); + ERL_NIF_TERM numDomInet6 = MKCT(env, atom_num_dinet6, data.numDomainInet6); + ERL_NIF_TERM numProtoIP = MKCT(env, atom_num_pip, data.numProtoIP); + ERL_NIF_TERM numProtoTCP = MKCT(env, atom_num_ptcp, data.numProtoTCP); + ERL_NIF_TERM numProtoUDP = MKCT(env, atom_num_pudp, data.numProtoUDP); + ERL_NIF_TERM numProtoSCTP = MKCT(env, atom_num_psctp, data.numProtoSCTP); + ERL_NIF_TERM gcnt[] = {numSockets, + numTypeDGrams, numTypeStreams, numTypeSeqPkgs, + numDomLocal, numDomInet, numDomInet6, + numProtoIP, numProtoTCP, numProtoUDP, numProtoSCTP}; + unsigned int lenGCnt = sizeof(gcnt) / sizeof(ERL_NIF_TERM); + ERL_NIF_TERM lgcnt = MKLA(env, gcnt, lenGCnt); + ERL_NIF_TERM keys[] = {esock_atom_debug, atom_iow, atom_global_counters}; + ERL_NIF_TERM vals[] = {BOOL2ATOM(data.dbg), BOOL2ATOM(data.iow), lgcnt}; + ERL_NIF_TERM info; + unsigned int numKeys = sizeof(keys) / sizeof(ERL_NIF_TERM); + unsigned int numVals = sizeof(vals) / sizeof(ERL_NIF_TERM); + + ESOCK_ASSERT( (numKeys == numVals) ); + + if (!MKMA(env, keys, vals, numKeys, &info)) + return enif_make_badarg(env); - if (!MKMA(env, keys, vals, numKeys, &info)) - return enif_make_badarg(env); + return info; +} + + + +/* + * This function return a property *map*. The properties are: + * counters: A list of each socket counter and there current values + * readers: The number of current and waiting readers + * writers: The number of current and waiting writers + * acceptors: The number of current and waiting acceptors + */ +static +ERL_NIF_TERM esock_socket_info(ErlNifEnv* env, + ESockDescriptor* descP) +{ + ERL_NIF_TERM counters = esock_socket_info_counters(env, descP); + ERL_NIF_TERM readers = esock_socket_info_readers(env, descP); + ERL_NIF_TERM writers = esock_socket_info_writers(env, descP); + ERL_NIF_TERM acceptors = esock_socket_info_acceptors(env, descP); + ERL_NIF_TERM keys[] = {atom_counters, atom_num_readers, + atom_num_writers, atom_num_acceptors}; + ERL_NIF_TERM vals[] = {counters, readers, writers, acceptors}; + ERL_NIF_TERM info; + unsigned int numKeys = sizeof(keys) / sizeof(ERL_NIF_TERM); + unsigned int numVals = sizeof(vals) / sizeof(ERL_NIF_TERM); + + SSDBG( descP, ("SOCKET", "esock_socket_info -> " + "\r\n numKeys: %d" + "\r\n numVals: %d" + "\r\n", numKeys, numVals) ); + + ESOCK_ASSERT( (numKeys == numVals) ); + + if (!MKMA(env, keys, vals, numKeys, &info)) + return enif_make_badarg(env); + + SSDBG( descP, ("SOCKET", "esock_socket_info -> done with" + "\r\n info: %T" + "\r\n", info) ); + + return info; - return info; +} + + +/* + * Collect all counters for a socket. + */ +static +ERL_NIF_TERM esock_socket_info_counters(ErlNifEnv* env, + ESockDescriptor* descP) +{ + ERL_NIF_TERM info; + + MLOCK(descP->writeMtx); + MLOCK(descP->readMtx); + + { + ERL_NIF_TERM readByteCnt = MKCT(env, atom_read_byte, descP->readByteCnt); + ERL_NIF_TERM readFails = MKCT(env, atom_read_fails, descP->readFails); + ERL_NIF_TERM readPkgCnt = MKCT(env, atom_read_pkg, descP->readPkgCnt); + ERL_NIF_TERM readTries = MKCT(env, atom_read_tries, descP->readTries); + ERL_NIF_TERM readWaits = MKCT(env, atom_read_waits, descP->readWaits); + ERL_NIF_TERM writeByteCnt = MKCT(env, atom_write_byte, descP->writeByteCnt); + ERL_NIF_TERM writeFails = MKCT(env, atom_write_fails, descP->writeFails); + ERL_NIF_TERM writePkgCnt = MKCT(env, atom_write_pkg, descP->writePkgCnt); + ERL_NIF_TERM writeTries = MKCT(env, atom_write_tries, descP->writeTries); + ERL_NIF_TERM writeWaits = MKCT(env, atom_write_waits, descP->writeWaits); + ERL_NIF_TERM acnt[] = {readByteCnt, readFails, readPkgCnt, + readTries, readWaits, + writeByteCnt, writeFails, writePkgCnt, + writeTries, writeWaits}; + unsigned int lenACnt = sizeof(acnt) / sizeof(ERL_NIF_TERM); + + info = MKLA(env, acnt, lenACnt); + + SSDBG( descP, ("SOCKET", "esock_socket_info_counters -> " + "\r\n lenACnt: %d" + "\r\n info: %T" + "\r\n", lenACnt, info) ); + } -#endif + + MUNLOCK(descP->readMtx); + MUNLOCK(descP->writeMtx); + + SSDBG( descP, ("SOCKET", "esock_socket_info_counters -> done with" + "\r\n info: %T" + "\r\n", info) ); + + return info; } +#endif /* ---------------------------------------------------------------------- @@ -3176,6 +3356,70 @@ ERL_NIF_TERM ncommand_debug(ErlNifEnv* env, ERL_NIF_TERM ecdata) #endif +/* *** esock_socket_info_readers *** + * *** esock_socket_info_writers *** + * *** esock_socket_info_acceptors *** + * + * Calculate how many readers | writers | acceptors we have for this socket. + * Current requestor + any waiting requestors (of the type). + * + */ + +#if !defined(__WIN32__) +#define SOCKET_INFO_REQ_FUNCS \ + SOCKET_INFO_REQ_FUNC_DECL(readers, readMtx, currentReaderP, readersQ) \ + SOCKET_INFO_REQ_FUNC_DECL(writers, writeMtx, currentWriterP, writersQ) \ + SOCKET_INFO_REQ_FUNC_DECL(acceptors, accMtx, currentAcceptorP, acceptorsQ) + +#define SOCKET_INFO_REQ_FUNC_DECL(F, MTX, CRP, Q) \ + static \ + ERL_NIF_TERM esock_socket_info_##F(ErlNifEnv* env, \ + ESockDescriptor* descP) \ + { \ + return socket_info_reqs(env, descP, descP->MTX, descP->CRP, &descP->Q); \ + } +SOCKET_INFO_REQ_FUNCS +#undef SOCKET_INFO_REQ_FUNC_DECL + + +static +ERL_NIF_TERM socket_info_reqs(ErlNifEnv* env, + ESockDescriptor* descP, + ErlNifMutex* mtx, + ESockRequestor* crp, + ESockRequestQueue* q) +{ + ESockRequestQueueElement* tmp; + ERL_NIF_TERM info; + unsigned int cnt = 0; + + MLOCK(mtx); + + if (crp != NULL) { + // We have an active requestor! + cnt++; + + // And add all the waiting requestors + tmp = q->first; + while (tmp != NULL) { + cnt++; + tmp = tmp->nextP; + } + } + + MUNLOCK(mtx); + + info = MKUI(env, cnt); + + SSDBG( descP, ("SOCKET", "socket_info_reqs -> done with" + "\r\n info: %T" + "\r\n", info) ); + + return info; +} +#endif + + /* ---------------------------------------------------------------------- * nif_supports * @@ -6031,7 +6275,8 @@ ERL_NIF_TERM nsend(ErlNifEnv* env, /* We ignore the wrap for the moment. * Maybe we should issue a wrap-message to controlling process... */ - cnt_inc(&descP->writeTries, 1); + // cnt_inc(&descP->writeTries, 1); + SOCK_CNT_INC(env, descP, sockRef, atom_write_tries, &descP->writeTries, 1); written = sock_send(descP->sock, sndDataP->data, sndDataP->size, flags); if (IS_SOCKET_ERROR(written)) @@ -6164,7 +6409,8 @@ ERL_NIF_TERM nsendto(ErlNifEnv* env, /* We ignore the wrap for the moment. * Maybe we should issue a wrap-message to controlling process... */ - cnt_inc(&descP->writeTries, 1); + // cnt_inc(&descP->writeTries, 1); + SOCK_CNT_INC(env, descP, sockRef, atom_write_tries, &descP->writeTries, 1); if (toAddrP != NULL) { written = sock_sendto(descP->sock, @@ -6393,7 +6639,8 @@ ERL_NIF_TERM nsendmsg(ErlNifEnv* env, /* We ignore the wrap for the moment. * Maybe we should issue a wrap-message to controlling process... */ - cnt_inc(&descP->writeTries, 1); + // cnt_inc(&descP->writeTries, 1); + SOCK_CNT_INC(env, descP, sockRef, atom_write_tries, &descP->writeTries, 1); /* And now, finally, try to send the message */ written = sock_sendmsg(descP->sock, &msgHdr, flags); @@ -6591,10 +6838,8 @@ ERL_NIF_TERM nrecv(ErlNifEnv* env, if (!ALLOC_BIN(bufSz, &buf)) return esock_make_error(env, atom_exalloc); - /* We ignore the wrap for the moment. - * Maybe we should issue a wrap-message to controlling process... - */ - cnt_inc(&descP->readTries, 1); + // cnt_inc(&descP->readTries, 1); + SOCK_CNT_INC(env, descP, sockRef, atom_read_tries, &descP->readTries, 1); // If it fails (read = -1), we need errno... SSDBG( descP, ("SOCKET", "nrecv -> try read (%d)\r\n", buf.size) ); @@ -6756,10 +7001,8 @@ ERL_NIF_TERM nrecvfrom(ErlNifEnv* env, if (!ALLOC_BIN(bufSz, &buf)) return esock_make_error(env, atom_exalloc); - /* We ignore the wrap for the moment. - * Maybe we should issue a wrap-message to controlling process... - */ - cnt_inc(&descP->readTries, 1); + // cnt_inc(&descP->readTries, 1); + SOCK_CNT_INC(env, descP, sockRef, atom_read_tries, &descP->readTries, 1); addrLen = sizeof(fromAddr); sys_memzero((char*) &fromAddr, addrLen); @@ -6946,10 +7189,8 @@ ERL_NIF_TERM nrecvmsg(ErlNifEnv* env, if (!ALLOC_BIN(ctrlSz, &ctrl)) return esock_make_error(env, atom_exalloc); - /* We ignore the wrap for the moment. - * Maybe we should issue a wrap-message to controlling process... - */ - cnt_inc(&descP->readTries, 1); + // cnt_inc(&descP->readTries, 1); + SOCK_CNT_INC(env, descP, sockRef, atom_read_tries, &descP->readTries, 1); addrLen = sizeof(addr); sys_memzero((char*) &addr, addrLen); @@ -14421,8 +14662,12 @@ ERL_NIF_TERM send_check_ok(ErlNifEnv* env, ssize_t dataSize, ERL_NIF_TERM sockRef) { - cnt_inc(&descP->writePkgCnt, 1); - cnt_inc(&descP->writeByteCnt, written); + // cnt_inc(&descP->writePkgCnt, 1); + SOCK_CNT_INC(env, descP, sockRef, + atom_write_pkg, &descP->writePkgCnt, 1); + // cnt_inc(&descP->writeByteCnt, written); + SOCK_CNT_INC(env, descP, sockRef, + atom_write_byte, &descP->writeByteCnt, written); if (descP->currentWriterP != NULL) { DEMONP("send_check_ok -> current writer", @@ -14468,7 +14713,8 @@ ERL_NIF_TERM send_check_fail(ErlNifEnv* env, ERL_NIF_TERM reason; req.env = NULL; - cnt_inc(&descP->writeFails, 1); + // cnt_inc(&descP->writeFails, 1); + SOCK_CNT_INC(env, descP, sockRef, atom_write_fails, &descP->writeFails, 1); SSDBG( descP, ("SOCKET", "send_check_fail -> error: %d\r\n", saveErrno) ); @@ -14533,7 +14779,8 @@ ERL_NIF_TERM send_check_retry(ErlNifEnv* env, } } - cnt_inc(&descP->writeWaits, 1); + // cnt_inc(&descP->writeWaits, 1); + SOCK_CNT_INC(env, descP, sockRef, atom_write_waits, &descP->writeWaits, 1); sres = esock_select_write(env, descP->sock, descP, NULL, sockRef, sendRef); @@ -14795,6 +15042,8 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env, res = esock_make_error(env, atom_closed); + SOCK_CNT_INC(env, descP, sockRef, atom_read_fails, &descP->readFails, 1); + /* * When a stream socket peer has performed an orderly shutdown, * the return value will be 0 (the traditional "end-of-file" return). @@ -14941,7 +15190,8 @@ ERL_NIF_TERM recv_check_full_maybe_done(ErlNifEnv* env, { char* xres; - cnt_inc(&descP->readByteCnt, read); + // cnt_inc(&descP->readByteCnt, read); + SOCK_CNT_INC(env, descP, sockRef, atom_read_byte, &descP->readByteCnt, read); if (descP->rNum > 0) { @@ -14950,7 +15200,8 @@ ERL_NIF_TERM recv_check_full_maybe_done(ErlNifEnv* env, descP->rNumCnt = 0; - cnt_inc(&descP->readPkgCnt, 1); + // cnt_inc(&descP->readPkgCnt, 1); + SOCK_CNT_INC(env, descP, sockRef, atom_read_pkg, &descP->readPkgCnt, 1); recv_update_current_reader(env, descP, sockRef); @@ -14998,8 +15249,10 @@ ERL_NIF_TERM recv_check_full_done(ErlNifEnv* env, { ERL_NIF_TERM data; - cnt_inc(&descP->readPkgCnt, 1); - cnt_inc(&descP->readByteCnt, read); + // cnt_inc(&descP->readPkgCnt, 1); + SOCK_CNT_INC(env, descP, sockRef, atom_read_pkg, &descP->readPkgCnt, 1); + // cnt_inc(&descP->readByteCnt, read); + SOCK_CNT_INC(env, descP, sockRef, atom_read_byte, &descP->readByteCnt, read); recv_update_current_reader(env, descP, sockRef); @@ -15036,6 +15289,10 @@ ERL_NIF_TERM recv_check_fail(ErlNifEnv* env, SSDBG( descP, ("SOCKET", "recv_check_fail -> closed\r\n") ); + // This is a bit overkill (to count here), but just in case... + // cnt_inc(&descP->readFails, 1); + SOCK_CNT_INC(env, descP, sockRef, atom_read_fails, &descP->readFails, 1); + res = recv_check_fail_closed(env, descP, sockRef, recvRef); } else if ((saveErrno == ERRNO_BLOCK) || @@ -15050,6 +15307,9 @@ ERL_NIF_TERM recv_check_fail(ErlNifEnv* env, SSDBG( descP, ("SOCKET", "recv_check_fail -> errno: %d\r\n", saveErrno) ); + // cnt_inc(&descP->readFails, 1); + SOCK_CNT_INC(env, descP, sockRef, atom_read_fails, &descP->readFails, 1); + res = recv_check_fail_gen(env, descP, saveErrno, sockRef); } @@ -15213,8 +15473,10 @@ ERL_NIF_TERM recv_check_partial_done(ErlNifEnv* env, ERL_NIF_TERM data; descP->rNumCnt = 0; - cnt_inc(&descP->readPkgCnt, 1); - cnt_inc(&descP->readByteCnt, read); + // cnt_inc(&descP->readPkgCnt, 1); + SOCK_CNT_INC(env, descP, sockRef, atom_read_pkg, &descP->readPkgCnt, 1); + // cnt_inc(&descP->readByteCnt, read); + SOCK_CNT_INC(env, descP, sockRef, atom_read_byte, &descP->readByteCnt, read); recv_update_current_reader(env, descP, sockRef); @@ -15257,7 +15519,8 @@ ERL_NIF_TERM recv_check_partial_part(ErlNifEnv* env, data = MKBIN(env, bufP); data = MKSBIN(env, data, 0, read); - cnt_inc(&descP->readByteCnt, read); + // cnt_inc(&descP->readByteCnt, read); + SOCK_CNT_INC(env, descP, sockRef, atom_read_byte, &descP->readByteCnt, read); /* SELECT for more data */ @@ -15344,6 +15607,12 @@ ERL_NIF_TERM recvfrom_check_result(ErlNifEnv* env, data = MKSBIN(env, data, 0, read); } + // cnt_inc(&descP->readPkgCnt, 1); + SOCK_CNT_INC(env, descP, sockRef, atom_read_pkg, &descP->readPkgCnt, 1); + // cnt_inc(&descP->readByteCnt, read); + SOCK_CNT_INC(env, descP, sockRef, atom_read_byte, + &descP->readByteCnt, read); + recv_update_current_reader(env, descP, sockRef); res = esock_make_ok2(env, MKT2(env, eSockAddr, data)); @@ -15401,6 +15670,7 @@ ERL_NIF_TERM recvmsg_check_result(ErlNifEnv* env, * *We* do never actually try to read 0 bytes from a stream socket! */ + SOCK_CNT_INC(env, descP, sockRef, atom_read_fails, &descP->readFails, 1); FREE_BIN(dataBufP); FREE_BIN(ctrlBufP); @@ -15471,6 +15741,21 @@ ERL_NIF_TERM recvmsg_check_msg(ErlNifEnv* env, "recvmsg_check_result -> " "(msghdr) encode failed: %s\r\n", xres) ); + /* So this is a bit strange. We did "successfully" read 'read' bytes, + * but then we fail to process the message header. So what counters + * shall we increment? + * Only failure? + * Or only success (pkg and byte), since the read was "ok" (or was it?) + * Or all of them? + * + * For now, we increment all three... + */ + + SOCK_CNT_INC(env, descP, sockRef, atom_read_fails, &descP->readFails, 1); + SOCK_CNT_INC(env, descP, sockRef, atom_read_pkg, &descP->readPkgCnt, 1); + SOCK_CNT_INC(env, descP, sockRef, atom_read_byte, + &descP->readByteCnt, read); + recv_update_current_reader(env, descP, sockRef); FREE_BIN(dataBufP); FREE_BIN(ctrlBufP); @@ -15482,6 +15767,10 @@ ERL_NIF_TERM recvmsg_check_msg(ErlNifEnv* env, SSDBG( descP, ("SOCKET", "recvmsg_check_result -> (msghdr) encode ok\r\n") ); + SOCK_CNT_INC(env, descP, sockRef, atom_read_pkg, &descP->readPkgCnt, 1); + SOCK_CNT_INC(env, descP, sockRef, atom_read_byte, + &descP->readByteCnt, read); + recv_update_current_reader(env, descP, sockRef); res = esock_make_ok2(env, eMsgHdr); @@ -17196,6 +17485,7 @@ ESockDescriptor* alloc_descriptor(SOCKET sock, HANDLE event) descP->readByteCnt = 0; descP->readTries = 0; descP->readWaits = 0; + descP->readFails = 0; sprintf(buf, "esock[acc,%d]", sock); descP->accMtx = MCREATE(buf); @@ -17797,6 +18087,25 @@ size_t my_strnlen(const char *s, size_t maxlen) #endif +/* Send an counter wrap message to the controlling process: + * A message in the form: + * + * {'$socket', Socket, counter_wrap, Counter :: atom()} + * + * This message will only be sent if the iow (Inform On Wrap) is TRUE. + */ +static +char* esock_send_wrap_msg(ErlNifEnv* env, + ESockDescriptor* descP, + ERL_NIF_TERM sockRef, + ERL_NIF_TERM cnt) +{ + ERL_NIF_TERM msg = mk_wrap_msg(env, sockRef, cnt); + + return esock_send_msg(env, &descP->ctrlPid, msg, NULL); +} + + /* Send an close message to the specified process: * A message in the form: * @@ -17894,6 +18203,22 @@ ERL_NIF_TERM mk_abort_msg(ErlNifEnv* env, } +/* *** mk_wrap_msg *** + * + * Construct a counter wrap (socket) message. It has the form: + * + * {'$socket', Socket, counter_wrap, Counter} + * + */ +static +ERL_NIF_TERM mk_wrap_msg(ErlNifEnv* env, + ERL_NIF_TERM sockRef, + ERL_NIF_TERM cnt) +{ + return mk_socket_msg(env, sockRef, atom_counter_wrap, cnt); +} + + /* *** mk_close_msg *** * * Construct a close (socket) message. It has the form: @@ -19191,10 +19516,13 @@ ErlNifFunc socket_funcs[] = { // Some utility and support functions {"nif_info", 0, nif_info, 0}, + {"nif_info", 1, nif_info, 0}, {"nif_supports", 1, nif_supports, 0}, {"nif_command", 1, nif_command, 0}, // The proper "socket" interface + // nif_open/1 is (supposed to be) used when we already have a file descriptor + // {"nif_open", 1, nif_open, 0}, {"nif_open", 4, nif_open, 0}, {"nif_bind", 2, nif_bind, 0}, {"nif_connect", 2, nif_connect, 0}, diff --git a/erts/emulator/test/socket_SUITE.erl b/erts/emulator/test/socket_SUITE.erl index 410582f4a1..3fab6b98a8 100644 --- a/erts/emulator/test/socket_SUITE.erl +++ b/erts/emulator/test/socket_SUITE.erl @@ -171,6 +171,15 @@ sc_rs_recvmsg_send_shutdown_receive_tcpL/1, %% *** Traffic *** + traffic_send_and_recv_counters_tcp4/1, + traffic_send_and_recv_counters_tcpL/1, + traffic_sendmsg_and_recvmsg_counters_tcp4/1, + traffic_sendmsg_and_recvmsg_counters_tcpL/1, + traffic_sendto_and_recvfrom_counters_udp4/1, + traffic_sendto_and_recvfrom_counters_udpL/1, + traffic_sendmsg_and_recvmsg_counters_udp4/1, + traffic_sendmsg_and_recvmsg_counters_udpL/1, + traffic_send_and_recv_chunks_tcp4/1, traffic_send_and_recv_chunks_tcp6/1, traffic_send_and_recv_chunks_tcpL/1, @@ -560,10 +569,10 @@ suite() -> {timetrap,{minutes,1}}]. all() -> - Groups = [{api, "ESOCK_TEST_API", include}, - {socket_closure, "ESOCK_TEST_SOCK_CLOSE", include}, - {traffic, "ESOCK_TEST_TRAFFIC", include}, - {ttest, "ESOCK_TEST_TTEST", exclude}], + Groups = [{api, "ESOCK_TEST_API", include}, + {socket_close, "ESOCK_TEST_SOCK_CLOSE", include}, + {traffic, "ESOCK_TEST_TRAFFIC", include}, + {ttest, "ESOCK_TEST_TTEST", exclude}], [use_group(Group, Env, Default) || {Group, Env, Default} <- Groups]. use_group(Group, Env, Default) -> @@ -593,12 +602,13 @@ groups() -> {api_options_otp, [], api_options_otp_cases()}, {api_options_ip, [], api_options_ip_cases()}, {api_op_with_timeout, [], api_op_with_timeout_cases()}, - {socket_closure, [], socket_closure_cases()}, + {socket_close, [], socket_close_cases()}, {sc_ctrl_proc_exit, [], sc_cp_exit_cases()}, {sc_local_close, [], sc_lc_cases()}, {sc_remote_close, [], sc_rc_cases()}, {sc_remote_shutdown, [], sc_rs_cases()}, {traffic, [], traffic_cases()}, + {traffic_counters, [], traffic_counters_cases()}, {traffic_chunks, [], traffic_chunks_cases()}, {traffic_pp_send_recv, [], traffic_pp_send_recv_cases()}, {traffic_pp_sendto_recvfrom, [], traffic_pp_sendto_recvfrom_cases()}, @@ -757,7 +767,7 @@ api_op_with_timeout_cases() -> %% These cases tests what happens when the socket is closed/shutdown, %% locally or remotely. -socket_closure_cases() -> +socket_close_cases() -> [ {group, sc_ctrl_proc_exit}, {group, sc_local_close}, @@ -828,12 +838,25 @@ sc_rs_cases() -> traffic_cases() -> [ + {group, traffic_counters}, {group, traffic_chunks}, {group, traffic_pp_send_recv}, {group, traffic_pp_sendto_recvfrom}, {group, traffic_pp_sendmsg_recvmsg} ]. +traffic_counters_cases() -> + [ + traffic_send_and_recv_counters_tcp4, + traffic_send_and_recv_counters_tcpL, + traffic_sendmsg_and_recvmsg_counters_tcp4, + traffic_sendmsg_and_recvmsg_counters_tcpL, + traffic_sendto_and_recvfrom_counters_udp4, + traffic_sendto_and_recvfrom_counters_udpL, + traffic_sendmsg_and_recvmsg_counters_udp4, + traffic_sendmsg_and_recvmsg_counters_udpL + ]. + traffic_chunks_cases() -> [ traffic_send_and_recv_chunks_tcp4, @@ -13895,6 +13918,1893 @@ sc_rs_recvmsg_send_shutdown_receive_tcpL(_Config) when is_list(_Config) -> %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%% This test case is intended to (simply) test that the counters +%% for both read and write. +%% So that its easy to extend, we use fun's for read and write. +%% We use TCP on IPv4. + +traffic_send_and_recv_counters_tcp4(suite) -> + []; +traffic_send_and_recv_counters_tcp4(doc) -> + []; +traffic_send_and_recv_counters_tcp4(_Config) when is_list(_Config) -> + ?TT(?SECS(15)), + tc_try(traffic_send_and_recv_counters_tcp4, + fun() -> + InitState = #{domain => inet, + proto => tcp, + recv => fun(S) -> socket:recv(S) end, + send => fun(S, D) -> socket:send(S, D) end}, + ok = traffic_send_and_recv_tcp(InitState) + end). + + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%% This test case is intended to (simply) test that the counters +%% for both read and write. +%% So that its easy to extend, we use fun's for read and write. +%% We use default (TCP) on local. + +traffic_send_and_recv_counters_tcpL(suite) -> + []; +traffic_send_and_recv_counters_tcpL(doc) -> + []; +traffic_send_and_recv_counters_tcpL(_Config) when is_list(_Config) -> + ?TT(?SECS(15)), + tc_try(traffic_send_and_recv_counters_tcpL, + fun() -> + InitState = #{domain => local, + proto => default, + recv => fun(S) -> socket:recv(S) end, + send => fun(S, D) -> socket:send(S, D) end}, + ok = traffic_send_and_recv_tcp(InitState) + end). + + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%% This test case is intended to (simply) test that the counters +%% for both read and write. +%% So that its easy to extend, we use fun's for read and write. +%% We use TCP on IPv4. + +traffic_sendmsg_and_recvmsg_counters_tcp4(suite) -> + []; +traffic_sendmsg_and_recvmsg_counters_tcp4(doc) -> + []; +traffic_sendmsg_and_recvmsg_counters_tcp4(_Config) when is_list(_Config) -> + ?TT(?SECS(15)), + tc_try(traffic_sendmsg_and_recvmsg_counters_tcp4, + fun() -> + InitState = #{domain => inet, + proto => tcp, + recv => fun(S) -> + case socket:recvmsg(S) of + {ok, #{addr := _Source, + iov := [Data]}} -> + {ok, Data}; + {error, _} = ERROR -> + ERROR + end + end, + send => fun(S, Data) -> + MsgHdr = #{iov => [Data]}, + socket:sendmsg(S, MsgHdr) + end}, + ok = traffic_send_and_recv_tcp(InitState) + end). + + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%% This test case is intended to (simply) test that the counters +%% for both read and write. +%% So that its easy to extend, we use fun's for read and write. +%% We use default (TCP) on local. + +traffic_sendmsg_and_recvmsg_counters_tcpL(suite) -> + []; +traffic_sendmsg_and_recvmsg_counters_tcpL(doc) -> + []; +traffic_sendmsg_and_recvmsg_counters_tcpL(_Config) when is_list(_Config) -> + ?TT(?SECS(15)), + tc_try(traffic_sendmsg_and_recvmsg_counters_tcpL, + fun() -> + InitState = #{domain => local, + proto => default, + recv => fun(S) -> + case socket:recvmsg(S) of + {ok, #{addr := _Source, + iov := [Data]}} -> + {ok, Data}; + {error, _} = ERROR -> + ERROR + end + end, + send => fun(S, Data) -> + MsgHdr = #{iov => [Data]}, + socket:sendmsg(S, MsgHdr) + end}, + ok = traffic_send_and_recv_tcp(InitState) + end). + + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +traffic_send_and_recv_tcp(InitState) -> + ServerSeq = + [ + %% *** Wait for start order part *** + #{desc => "await start", + cmd => fun(State) -> + Tester = ?SEV_AWAIT_START(), + {ok, State#{tester => Tester}} + end}, + #{desc => "monitor tester", + cmd => fun(#{tester := Tester} = _State) -> + _MRef = erlang:monitor(process, Tester), + ok + end}, + + %% *** Init part *** + #{desc => "which local address", + cmd => fun(#{domain := Domain} = State) -> + LSA = which_local_socket_addr(Domain), + {ok, State#{local_sa => LSA}} + end}, + #{desc => "create listen socket", + cmd => fun(#{domain := Domain, proto := Proto} = State) -> + case socket:open(Domain, stream, Proto) of + {ok, Sock} -> + {ok, State#{lsock => Sock}}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "bind to local address", + cmd => fun(#{domain := local, + lsock := LSock, + local_sa := LSA} = _State) -> + case socket:bind(LSock, LSA) of + {ok, _Port} -> + ok; % We do not care about the port for local + {error, _} = ERROR -> + ERROR + end; + (#{lsock := LSock, + local_sa := LSA} = State) -> + case socket:bind(LSock, LSA) of + {ok, Port} -> + {ok, State#{lport => Port}}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "make listen socket", + cmd => fun(#{lsock := LSock}) -> + socket:listen(LSock) + end}, + #{desc => "announce ready (init)", + cmd => fun(#{domain := local, + tester := Tester, + local_sa := #{path := Path}}) -> + ?SEV_ANNOUNCE_READY(Tester, init, Path), + ok; + (#{tester := Tester, + lport := Port}) -> + ?SEV_ANNOUNCE_READY(Tester, init, Port), + ok + end}, + + %% The actual test + #{desc => "await continue (accept)", + cmd => fun(#{tester := Tester} = _State) -> + ?SEV_AWAIT_CONTINUE(Tester, tester, accept) + end}, + #{desc => "accept", + cmd => fun(#{lsock := LSock} = State) -> + case socket:accept(LSock) of + {ok, Sock} -> + {ok, State#{csock => Sock}}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "initial counter validation (=zero)", + cmd => fun(#{csock := Sock} = _State) -> + try socket:info(Sock) of + #{counters := Counters} -> + ?SEV_IPRINT("Validate initial counters: " + "~n ~p", [Counters]), + traffic_sar_counters_validation(Counters) + catch + C:E:S -> + ?SEV_EPRINT("Failed get socket info: " + "~n Class: ~p" + "~n Error: ~p" + "~n Stack: ~p", [C, E, S]), + {error, {socket_info_failed, {C, E, S}}} + end + end}, + #{desc => "announce ready (accept)", + cmd => fun(#{tester := Tester}) -> + ?SEV_ANNOUNCE_READY(Tester, accept), + ok + end}, + + #{desc => "await continue (recv_and_validate 1)", + cmd => fun(#{tester := Tester} = _State) -> + ?SEV_AWAIT_CONTINUE(Tester, tester, recv_and_validate) + end}, + #{desc => "recv (1)", + cmd => fun(#{csock := Sock, + recv := Recv} = State) -> + case Recv(Sock) of + {ok, Data} -> + ?SEV_IPRINT("recv ~p bytes", [size(Data)]), + {ok, State#{read_pkg => 1, + read_byte => size(Data)}}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "validate (recv 1)", + cmd => fun(#{csock := Sock, + read_pkg := Pkg, + read_byte := Byte} = _State) -> + try socket:info(Sock) of + #{counters := Counters} -> + ?SEV_IPRINT("validate counters: " + "~n ~p", [Counters]), + traffic_sar_counters_validation( + Counters, + [{read_pkg, Pkg}, + {read_byte, Byte}, + {read_tries, any}]) + catch + C:E:S -> + ?SEV_EPRINT("Failed get socket info: " + "~n Class: ~p" + "~n Error: ~p" + "~n Stack: ~p", [C, E, S]), + {error, {socket_info_failed, {C, E, S}}} + end + end}, + #{desc => "announce ready (recv_and_validate 1)", + cmd => fun(#{tester := Tester}) -> + ?SEV_ANNOUNCE_READY(Tester, recv_and_validate), + ok + end}, + + #{desc => "await continue (send_and_validate 1)", + cmd => fun(#{tester := Tester} = _State) -> + ?SEV_AWAIT_CONTINUE(Tester, tester, send_and_validate) + end}, + #{desc => "send (1)", + cmd => fun(#{csock := Sock, + send := Send} = State) -> + Data = ?DATA, + case Send(Sock, Data) of + ok -> + ?SEV_IPRINT("sent ~p bytes", [size(Data)]), + {ok, State#{write_pkg => 1, + write_byte => size(Data)}}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "validate (send 1)", + cmd => fun(#{csock := Sock, + read_pkg := RPkg, + read_byte := RByte, + write_pkg := SPkg, + write_byte := SByte} = _State) -> + try socket:info(Sock) of + #{counters := Counters} -> + ?SEV_IPRINT("validate counters: " + "~n ~p", [Counters]), + traffic_sar_counters_validation( + Counters, + [{read_pkg, RPkg}, + {read_byte, RByte}, + {write_pkg, SPkg}, + {write_byte, SByte}, + {read_tries, any}, + {write_tries, any}]) + catch + C:E:S -> + ?SEV_EPRINT("Failed get socket info: " + "~n Class: ~p" + "~n Error: ~p" + "~n Stack: ~p", [C, E, S]), + {error, {socket_info_failed, {C, E, S}}} + end + end}, + #{desc => "announce ready (send_and_validate 1)", + cmd => fun(#{tester := Tester}) -> + ?SEV_ANNOUNCE_READY(Tester, send_and_validate), + ok + end}, + + #{desc => "await continue (recv_and_validate 2)", + cmd => fun(#{tester := Tester} = _State) -> + ?SEV_AWAIT_CONTINUE(Tester, tester, recv_and_validate) + end}, + #{desc => "recv (2)", + cmd => fun(#{csock := Sock, + recv := Recv, + read_pkg := Pkg, + read_byte := Byte} = State) -> + case Recv(Sock) of + {ok, Data} -> + ?SEV_IPRINT("recv ~p bytes", [size(Data)]), + {ok, State#{read_pkg => Pkg + 1, + read_byte => Byte + size(Data)}}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "validate (recv 2)", + cmd => fun(#{csock := Sock, + read_pkg := RPkg, + read_byte := RByte, + write_pkg := SPkg, + write_byte := SByte} = _State) -> + try socket:info(Sock) of + #{counters := Counters} -> + ?SEV_IPRINT("validate counters: " + "~n ~p", [Counters]), + traffic_sar_counters_validation( + Counters, + [{read_pkg, RPkg}, + {read_byte, RByte}, + {write_pkg, SPkg}, + {write_byte, SByte}, + {read_tries, any}, + {write_tries, any}]) + catch + C:E:S -> + ?SEV_EPRINT("Failed get socket info: " + "~n Class: ~p" + "~n Error: ~p" + "~n Stack: ~p", [C, E, S]), + {error, {socket_info_failed, {C, E, S}}} + end + end}, + #{desc => "announce ready (recv_and_validate 2)", + cmd => fun(#{tester := Tester}) -> + ?SEV_ANNOUNCE_READY(Tester, recv_and_validate), + ok + end}, + + #{desc => "await continue (send_and_validate 2)", + cmd => fun(#{tester := Tester} = _State) -> + ?SEV_AWAIT_CONTINUE(Tester, tester, send_and_validate) + end}, + #{desc => "send (2)", + cmd => fun(#{csock := Sock, + send := Send, + write_pkg := Pkg, + write_byte := Byte} = State) -> + Data = ?DATA, + case Send(Sock, Data) of + ok -> + ?SEV_IPRINT("sent ~p bytes", [size(Data)]), + {ok, State#{write_pkg => Pkg + 1, + write_byte => Byte + size(Data)}}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "validate (send 2)", + cmd => fun(#{csock := Sock, + read_pkg := RPkg, + read_byte := RByte, + write_pkg := SPkg, + write_byte := SByte} = _State) -> + try socket:info(Sock) of + #{counters := Counters} -> + ?SEV_IPRINT("validate counters: " + "~n ~p", [Counters]), + traffic_sar_counters_validation( + Counters, + [{read_pkg, RPkg}, + {read_byte, RByte}, + {write_pkg, SPkg}, + {write_byte, SByte}, + {read_tries, any}, + {write_tries, any}]) + catch + C:E:S -> + ?SEV_EPRINT("Failed get socket info: " + "~n Class: ~p" + "~n Error: ~p" + "~n Stack: ~p", [C, E, S]), + {error, {socket_info_failed, {C, E, S}}} + end + end}, + #{desc => "announce ready (send_and_validate 2)", + cmd => fun(#{tester := Tester}) -> + ?SEV_ANNOUNCE_READY(Tester, send_and_validate), + ok + end}, + + + %% Termination + #{desc => "await terminate (from tester)", + cmd => fun(#{tester := Tester} = State) -> + case ?SEV_AWAIT_TERMINATE(Tester, tester) of + ok -> + {ok, maps:remove(tester, State)}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "close connection socket (just in case)", + cmd => fun(#{csock := Sock} = State) -> + (catch socket:close(Sock)), + {ok, maps:remove(csock, State)} + end}, + #{desc => "close listen socket", + cmd => fun(#{domain := local, + lsock := Sock, + local_sa := #{path := Path}} = State) -> + ok = socket:close(Sock), + State1 = + unlink_path(Path, + fun() -> + maps:remove(local_sa, State) + end, + fun() -> State end), + {ok, maps:remove(lsock, State1)}; + (#{lsock := Sock} = State) -> + (catch socket:close(Sock)), + {ok, maps:remove(lsock, State)} + end}, + + %% *** We are done *** + ?SEV_FINISH_NORMAL + ], + + ClientSeq = + [ + %% *** Wait for start order part *** + #{desc => "await start (from tester)", + cmd => fun(#{domain := local} = State) -> + {Tester, Path} = ?SEV_AWAIT_START(), + {ok, State#{tester => Tester, server_path => Path}}; + (State) -> + {Tester, Port} = ?SEV_AWAIT_START(), + {ok, State#{tester => Tester, server_port => Port}} + end}, + #{desc => "monitor tester", + cmd => fun(#{tester := Tester} = _State) -> + _MRef = erlang:monitor(process, Tester), + ok + end}, + + %% *** Init part *** + #{desc => "which server (local) address", + cmd => fun(#{domain := local = Domain, + server_path := Path} = State) -> + LSA = which_local_socket_addr(Domain), + SSA = #{family => Domain, path => Path}, + {ok, State#{local_sa => LSA, server_sa => SSA}}; + (#{domain := Domain, server_port := Port} = State) -> + LSA = which_local_socket_addr(Domain), + SSA = LSA#{port => Port}, + {ok, State#{local_sa => LSA, server_sa => SSA}} + end}, + #{desc => "create socket", + cmd => fun(#{domain := Domain, + proto := Proto} = State) -> + case socket:open(Domain, stream, Proto) of + {ok, Sock} -> + {ok, State#{sock => Sock}}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "bind to local address", + cmd => fun(#{sock := Sock, local_sa := LSA} = _State) -> + case socket:bind(Sock, LSA) of + {ok, _Port} -> + ok; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "announce ready (init)", + cmd => fun(#{tester := Tester}) -> + ?SEV_ANNOUNCE_READY(Tester, init), + ok + end}, + + %% The actual test + #{desc => "await continue (connect)", + cmd => fun(#{tester := Tester} = _State) -> + ?SEV_AWAIT_CONTINUE(Tester, tester, connect), + ok + end}, + #{desc => "connect to server", + cmd => fun(#{sock := Sock, server_sa := SSA}) -> + socket:connect(Sock, SSA) + end}, + #{desc => "announce ready (connect)", + cmd => fun(#{tester := Tester}) -> + ?SEV_ANNOUNCE_READY(Tester, connect), + ok + end}, + + #{desc => "await continue (send_and_validate 1)", + cmd => fun(#{tester := Tester} = _State) -> + ?SEV_AWAIT_CONTINUE(Tester, tester, send_and_validate) + end}, + #{desc => "send (1)", + cmd => fun(#{sock := Sock, + send := Send} = State) -> + Data = ?DATA, + case Send(Sock, Data) of + ok -> + ?SEV_IPRINT("sent ~p bytes", [size(Data)]), + {ok, State#{write_pkg => 1, + write_byte => size(Data)}}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "validate (send 1)", + cmd => fun(#{sock := Sock, + write_pkg := SPkg, + write_byte := SByte} = _State) -> + try socket:info(Sock) of + #{counters := Counters} -> + ?SEV_IPRINT("validate counters: " + "~n ~p", [Counters]), + traffic_sar_counters_validation( + Counters, + [{write_pkg, SPkg}, + {write_byte, SByte}, + {write_tries, any}]) + catch + C:E:S -> + ?SEV_EPRINT("Failed get socket info: " + "~n Class: ~p" + "~n Error: ~p" + "~n Stack: ~p", [C, E, S]), + {error, {socket_info_failed, {C, E, S}}} + end + end}, + #{desc => "announce ready (send_and_validate 1)", + cmd => fun(#{tester := Tester}) -> + ?SEV_ANNOUNCE_READY(Tester, send_and_validate), + ok + end}, + + #{desc => "await continue (recv_and_validate 1)", + cmd => fun(#{tester := Tester} = _State) -> + ?SEV_AWAIT_CONTINUE(Tester, tester, recv_and_validate) + end}, + #{desc => "recv (1)", + cmd => fun(#{sock := Sock, + recv := Recv} = State) -> + case Recv(Sock) of + {ok, Data} -> + ?SEV_IPRINT("recv ~p bytes", [size(Data)]), + {ok, State#{read_pkg => 1, + read_byte => size(Data)}}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "validate (recv 1)", + cmd => fun(#{sock := Sock, + read_pkg := RPkg, + read_byte := RByte, + write_pkg := SPkg, + write_byte := SByte} = _State) -> + try socket:info(Sock) of + #{counters := Counters} -> + ?SEV_IPRINT("validate counters: " + "~n ~p", [Counters]), + traffic_sar_counters_validation( + Counters, + [{read_pkg, RPkg}, + {read_byte, RByte}, + {write_pkg, SPkg}, + {write_byte, SByte}, + {read_tries, any}, + {write_tries, any}]) + catch + C:E:S -> + ?SEV_EPRINT("Failed get socket info: " + "~n Class: ~p" + "~n Error: ~p" + "~n Stack: ~p", [C, E, S]), + {error, {socket_info_failed, {C, E, S}}} + end + end}, + #{desc => "announce ready (recv_and_validate 1)", + cmd => fun(#{tester := Tester}) -> + ?SEV_ANNOUNCE_READY(Tester, recv_and_validate), + ok + end}, + + #{desc => "await continue (send_and_validate 2)", + cmd => fun(#{tester := Tester} = _State) -> + ?SEV_AWAIT_CONTINUE(Tester, tester, send_and_validate) + end}, + #{desc => "send (2)", + cmd => fun(#{sock := Sock, + send := Send, + write_pkg := SPkg, + write_byte := SByte} = State) -> + Data = ?DATA, + case Send(Sock, Data) of + ok -> + ?SEV_IPRINT("sent ~p bytes", [size(Data)]), + {ok, State#{write_pkg => SPkg + 1, + write_byte => SByte + size(Data)}}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "validate (send 2)", + cmd => fun(#{sock := Sock, + read_pkg := RPkg, + read_byte := RByte, + write_pkg := SPkg, + write_byte := SByte} = _State) -> + try socket:info(Sock) of + #{counters := Counters} -> + ?SEV_IPRINT("validate counters: " + "~n ~p", [Counters]), + traffic_sar_counters_validation( + Counters, + [{read_pkg, RPkg}, + {read_byte, RByte}, + {write_pkg, SPkg}, + {write_byte, SByte}, + {read_tries, any}, + {write_tries, any}]) + catch + C:E:S -> + ?SEV_EPRINT("Failed get socket info: " + "~n Class: ~p" + "~n Error: ~p" + "~n Stack: ~p", [C, E, S]), + {error, {socket_info_failed, {C, E, S}}} + end + end}, + #{desc => "announce ready (send_and_validate 2)", + cmd => fun(#{tester := Tester}) -> + ?SEV_ANNOUNCE_READY(Tester, send_and_validate), + ok + end}, + + #{desc => "await continue (recv_and_validate 2)", + cmd => fun(#{tester := Tester} = _State) -> + ?SEV_AWAIT_CONTINUE(Tester, tester, recv_and_validate) + end}, + #{desc => "recv (2)", + cmd => fun(#{sock := Sock, + recv := Recv, + read_pkg := RPkg, + read_byte := RByte} = State) -> + case Recv(Sock) of + {ok, Data} -> + ?SEV_IPRINT("recv ~p bytes", [size(Data)]), + {ok, State#{read_pkg => RPkg + 1, + read_byte => RByte + size(Data)}}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "validate (recv 2)", + cmd => fun(#{sock := Sock, + read_pkg := RPkg, + read_byte := RByte, + write_pkg := SPkg, + write_byte := SByte} = _State) -> + try socket:info(Sock) of + #{counters := Counters} -> + ?SEV_IPRINT("validate counters: " + "~n ~p", [Counters]), + traffic_sar_counters_validation( + Counters, + [{read_pkg, RPkg}, + {read_byte, RByte}, + {write_pkg, SPkg}, + {write_byte, SByte}, + {read_tries, any}, + {write_tries, any}]) + catch + C:E:S -> + ?SEV_EPRINT("Failed get socket info: " + "~n Class: ~p" + "~n Error: ~p" + "~n Stack: ~p", [C, E, S]), + {error, {socket_info_failed, {C, E, S}}} + end + end}, + #{desc => "announce ready (recv_and_validate 2)", + cmd => fun(#{tester := Tester}) -> + ?SEV_ANNOUNCE_READY(Tester, recv_and_validate), + ok + end}, + + %% Termination + #{desc => "await terminate (from tester)", + cmd => fun(#{tester := Tester} = State) -> + case ?SEV_AWAIT_TERMINATE(Tester, tester) of + ok -> + {ok, maps:remove(tester, State)}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "close connection socket", + cmd => fun(#{domain := local, + sock := Sock, + local_sa := #{path := Path}} = State) -> + ok = socket:close(Sock), + State1 = + unlink_path(Path, + fun() -> + maps:remove(local_sa, State) + end, + fun() -> State end), + {ok, maps:remove(sock, State1)}; + (#{sock := Sock} = State) -> + socket:close(Sock), + {ok, maps:remove(sock, State)} + end}, + + %% *** We are done *** + ?SEV_FINISH_NORMAL + ], + + + TesterSeq = + [ + %% *** Init part *** + #{desc => "monitor server", + cmd => fun(#{server := Pid} = _State) -> + _MRef = erlang:monitor(process, Pid), + ok + end}, + #{desc => "monitor client", + cmd => fun(#{client := Pid} = _State) -> + _MRef = erlang:monitor(process, Pid), + ok + end}, + + %% Start the server + #{desc => "order server start", + cmd => fun(#{server := Pid} = _State) -> + ?SEV_ANNOUNCE_START(Pid), + ok + end}, + #{desc => "await server ready (init)", + cmd => fun(#{domain := local, + server := Pid} = State) -> + {ok, Path} = ?SEV_AWAIT_READY(Pid, server, init), + {ok, State#{path => Path}}; + (#{server := Pid} = State) -> + {ok, Port} = ?SEV_AWAIT_READY(Pid, server, init), + {ok, State#{port => Port}} + end}, + + %% Start the client + #{desc => "order client start", + cmd => fun(#{domain := local, + client := Pid, + path := Path} = _State) -> + ?SEV_ANNOUNCE_START(Pid, Path), + ok; + (#{client := Pid, + port := Port} = _State) -> + ?SEV_ANNOUNCE_START(Pid, Port), + ok + end}, + #{desc => "await client ready (init)", + cmd => fun(#{client := Pid} = _State) -> + ok = ?SEV_AWAIT_READY(Pid, client, init) + end}, + + %% *** The actual test *** + + #{desc => "order server to continue (with accept)", + cmd => fun(#{server := Server} = _State) -> + ?SEV_ANNOUNCE_CONTINUE(Server, accept), + ok + end}, + + ?SEV_SLEEP(?SECS(1)), + + #{desc => "order client to continue (with connect)", + cmd => fun(#{client := Client} = _State) -> + ?SEV_ANNOUNCE_CONTINUE(Client, connect), + ok + end}, + #{desc => "await client ready (connect)", + cmd => fun(#{client := Client} = _State) -> + ?SEV_AWAIT_READY(Client, client, connect) + end}, + #{desc => "await server ready (accept)", + cmd => fun(#{server := Server} = _State) -> + ?SEV_AWAIT_READY(Server, server, accept) + end}, + + ?SEV_SLEEP(?SECS(1)), + + #{desc => "order server to continue (recv_and_validate 1)", + cmd => fun(#{server := Server} = _State) -> + ?SEV_ANNOUNCE_CONTINUE(Server, recv_and_validate), + ok + end}, + #{desc => "order client to continue (send_and_validate 1)", + cmd => fun(#{client := Client} = _State) -> + ?SEV_ANNOUNCE_CONTINUE(Client, send_and_validate), + ok + end}, + #{desc => "await client ready (send_and_validate 1)", + cmd => fun(#{client := Client} = _State) -> + ?SEV_AWAIT_READY(Client, client, send_and_validate) + end}, + #{desc => "await server ready (recv_and_validate 1)", + cmd => fun(#{server := Server} = _State) -> + ?SEV_AWAIT_READY(Server, server, recv_and_validate) + end}, + + ?SEV_SLEEP(?SECS(1)), + + #{desc => "order client to continue (recv_and_validate 1)", + cmd => fun(#{client := Client} = _State) -> + ?SEV_ANNOUNCE_CONTINUE(Client, recv_and_validate), + ok + end}, + #{desc => "order server to continue (send_and_validate 1)", + cmd => fun(#{server := Server} = _State) -> + ?SEV_ANNOUNCE_CONTINUE(Server, send_and_validate), + ok + end}, + #{desc => "await server ready (send_and_validate 1)", + cmd => fun(#{server := Server} = _State) -> + ?SEV_AWAIT_READY(Server, server, send_and_validate) + end}, + #{desc => "await client ready (recv_and_validate 1)", + cmd => fun(#{client := Client} = _State) -> + ?SEV_AWAIT_READY(Client, client, recv_and_validate) + end}, + + ?SEV_SLEEP(?SECS(1)), + + #{desc => "order server to continue (recv_and_validate 2)", + cmd => fun(#{server := Server} = _State) -> + ?SEV_ANNOUNCE_CONTINUE(Server, recv_and_validate), + ok + end}, + #{desc => "order client to continue (send_and_validate 2)", + cmd => fun(#{client := Client} = _State) -> + ?SEV_ANNOUNCE_CONTINUE(Client, send_and_validate), + ok + end}, + #{desc => "await client ready (send_and_validate 2)", + cmd => fun(#{client := Client} = _State) -> + ?SEV_AWAIT_READY(Client, client, send_and_validate) + end}, + #{desc => "await server ready (recv_and_validate 2)", + cmd => fun(#{server := Server} = _State) -> + ?SEV_AWAIT_READY(Server, server, recv_and_validate) + end}, + + ?SEV_SLEEP(?SECS(1)), + + #{desc => "order client to continue (recv_and_validate 2)", + cmd => fun(#{client := Client} = _State) -> + ?SEV_ANNOUNCE_CONTINUE(Client, recv_and_validate), + ok + end}, + #{desc => "order server to continue (send_and_validate 2)", + cmd => fun(#{server := Server} = _State) -> + ?SEV_ANNOUNCE_CONTINUE(Server, send_and_validate), + ok + end}, + #{desc => "await server ready (send_and_validate 2)", + cmd => fun(#{server := Server} = _State) -> + ?SEV_AWAIT_READY(Server, server, send_and_validate) + end}, + #{desc => "await client ready (recv_and_validate 2)", + cmd => fun(#{client := Client} = _State) -> + ?SEV_AWAIT_READY(Client, client, recv_and_validate) + end}, + + %% *** Termination *** + #{desc => "order client to terminate", + cmd => fun(#{client := Client} = _State) -> + ?SEV_ANNOUNCE_TERMINATE(Client), + ok + end}, + #{desc => "await client termination", + cmd => fun(#{client := Client} = State) -> + ?SEV_AWAIT_TERMINATION(Client), + State1 = maps:remove(client, State), + {ok, State1} + end}, + #{desc => "order server to terminate", + cmd => fun(#{server := Server} = _State) -> + ?SEV_ANNOUNCE_TERMINATE(Server), + ok + end}, + #{desc => "await server termination", + cmd => fun(#{server := Server} = State) -> + ?SEV_AWAIT_TERMINATION(Server), + State1 = maps:remove(server, State), + {ok, State1} + end}, + + %% *** We are done *** + ?SEV_FINISH_NORMAL + ], + + i("start server evaluator"), + ServerInitState = InitState#{host => local_host()}, + Server = ?SEV_START("server", ServerSeq, ServerInitState), + + i("start client evaluator(s)"), + ClientInitState = InitState#{host => local_host()}, + Client = ?SEV_START("client", ClientSeq, ClientInitState), + + i("start 'tester' evaluator"), + TesterInitState = #{server => Server#ev.pid, + client => Client#ev.pid}, + Tester = ?SEV_START("tester", TesterSeq, TesterInitState), + + i("await evaluator"), + ok = ?SEV_AWAIT_FINISH([Server, Client, Tester]). + + + +traffic_sar_counters_validation(Counters) -> + traffic_sar_counters_validation(Counters, []). + +traffic_sar_counters_validation(Counters, []) -> + (catch lists:foreach( + fun({_Cnt, 0}) -> ok; + ({Cnt, Val}) -> throw({error, {invalid_counter, Cnt, Val}}) + end, + Counters)); +traffic_sar_counters_validation(Counters, [{Cnt, Val}|ValidateCounters]) -> + case lists:keysearch(Cnt, 1, Counters) of + {value, {Cnt, Val}} -> + Counters2 = lists:keydelete(Cnt, 1, Counters), + traffic_sar_counters_validation(Counters2, ValidateCounters); + {value, {Cnt, _Val}} when (Val =:= any) -> + Counters2 = lists:keydelete(Cnt, 1, Counters), + traffic_sar_counters_validation(Counters2, ValidateCounters); + {value, {Cnt, InvVal}} -> + {error, {invalid_counter, Cnt, InvVal, Val}}; + false -> + {error, {unknown_counter, Cnt, Counters}} + end. + + + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%% This test case is intended to (simply) test that the counters +%% for both read and write. +%% So that its easy to extend, we use fun's for read and write. +%% We use UDP on IPv4. + +traffic_sendto_and_recvfrom_counters_udp4(suite) -> + []; +traffic_sendto_and_recvfrom_counters_udp4(doc) -> + []; +traffic_sendto_and_recvfrom_counters_udp4(_Config) when is_list(_Config) -> + ?TT(?SECS(15)), + tc_try(traffic_sendto_and_recvfrom_counters_udp4, + fun() -> + InitState = #{domain => inet, + proto => udp, + recv => fun(S) -> + socket:recvfrom(S) + end, + send => fun(S, Data, Dest) -> + socket:sendto(S, Data, Dest) + end}, + ok = traffic_send_and_recv_udp(InitState) + end). + + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%% This test case is intended to (simply) test that the counters +%% for both read and write. +%% So that its easy to extend, we use fun's for read and write. +%% We use default (UDP) on local. + +traffic_sendto_and_recvfrom_counters_udpL(suite) -> + []; +traffic_sendto_and_recvfrom_counters_udpL(doc) -> + []; +traffic_sendto_and_recvfrom_counters_udpL(_Config) when is_list(_Config) -> + ?TT(?SECS(15)), + tc_try(traffic_sendto_and_recvfrom_counters_udp4, + fun() -> + InitState = #{domain => local, + proto => default, + recv => fun(S) -> + socket:recvfrom(S) + end, + send => fun(S, Data, Dest) -> + socket:sendto(S, Data, Dest) + end}, + ok = traffic_send_and_recv_udp(InitState) + end). + + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%% This test case is intended to (simply) test that the counters +%% for both read and write. +%% So that its easy to extend, we use fun's for read and write. +%% We use UDP on IPv4. + +traffic_sendmsg_and_recvmsg_counters_udp4(suite) -> + []; +traffic_sendmsg_and_recvmsg_counters_udp4(doc) -> + []; +traffic_sendmsg_and_recvmsg_counters_udp4(_Config) when is_list(_Config) -> + ?TT(?SECS(15)), + tc_try(traffic_sendmsg_and_recvmsg_counters_udp4, + fun() -> + InitState = #{domain => inet, + proto => udp, + recv => fun(S) -> + case socket:recvmsg(S) of + {ok, #{addr := Source, + iov := [Data]}} -> + {ok, {Source, Data}}; + {error, _} = ERROR -> + ERROR + end + end, + send => fun(S, Data, Dest) -> + MsgHdr = #{addr => Dest, + iov => [Data]}, + socket:sendmsg(S, MsgHdr) + end}, + ok = traffic_send_and_recv_udp(InitState) + end). + + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%% This test case is intended to (simply) test that the counters +%% for both read and write. +%% So that its easy to extend, we use fun's for read and write. +%% We use default (UDP) on local. + +traffic_sendmsg_and_recvmsg_counters_udpL(suite) -> + []; +traffic_sendmsg_and_recvmsg_counters_udpL(doc) -> + []; +traffic_sendmsg_and_recvmsg_counters_udpL(_Config) when is_list(_Config) -> + ?TT(?SECS(15)), + tc_try(traffic_sendmsg_and_recvmsg_counters_udpL, + fun() -> + InitState = #{domain => local, + proto => default, + recv => fun(S) -> + case socket:recvmsg(S) of + {ok, #{addr := Source, + iov := [Data]}} -> + {ok, {Source, Data}}; + {error, _} = ERROR -> + ERROR + end + end, + send => fun(S, Data, Dest) -> + MsgHdr = #{addr => Dest, + iov => [Data]}, + socket:sendmsg(S, MsgHdr) + end}, + ok = traffic_send_and_recv_udp(InitState) + end). + + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +traffic_send_and_recv_udp(InitState) -> + ServerSeq = + [ + %% *** Wait for start order part *** + #{desc => "await start", + cmd => fun(State) -> + Tester = ?SEV_AWAIT_START(), + {ok, State#{tester => Tester}} + end}, + #{desc => "monitor tester", + cmd => fun(#{tester := Tester} = _State) -> + _MRef = erlang:monitor(process, Tester), + ok + end}, + + %% *** Init part *** + #{desc => "which local address", + cmd => fun(#{domain := Domain} = State) -> + LSA = which_local_socket_addr(Domain), + {ok, State#{local_sa => LSA}} + end}, + #{desc => "create socket", + cmd => fun(#{domain := Domain, proto := Proto} = State) -> + case socket:open(Domain, dgram, Proto) of + {ok, Sock} -> + {ok, State#{sock => Sock}}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "bind to local address", + cmd => fun(#{domain := local, + sock := Sock, + local_sa := LSA} = _State) -> + case socket:bind(Sock, LSA) of + {ok, _Port} -> + ok; % We do not care about the port for local + {error, _} = ERROR -> + ERROR + end; + (#{sock := LSock, + local_sa := LSA} = State) -> + case socket:bind(LSock, LSA) of + {ok, Port} -> + {ok, State#{lport => Port}}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "initial counter validation (=zero)", + cmd => fun(#{sock := Sock} = _State) -> + try socket:info(Sock) of + #{counters := Counters} -> + ?SEV_IPRINT("Validate initial counters: " + "~n ~p", [Counters]), + traffic_sar_counters_validation(Counters) + catch + C:E:S -> + ?SEV_EPRINT("Failed get socket info: " + "~n Class: ~p" + "~n Error: ~p" + "~n Stack: ~p", [C, E, S]), + {error, {socket_info_failed, {C, E, S}}} + end + end}, + #{desc => "announce ready (init)", + cmd => fun(#{domain := local, + tester := Tester, + local_sa := #{path := Path}}) -> + ?SEV_ANNOUNCE_READY(Tester, init, Path), + ok; + (#{tester := Tester, + lport := Port}) -> + ?SEV_ANNOUNCE_READY(Tester, init, Port), + ok + end}, + + %% The actual test + #{desc => "await continue (recv_and_validate 1)", + cmd => fun(#{tester := Tester} = _State) -> + ?SEV_AWAIT_CONTINUE(Tester, tester, recv_and_validate) + end}, + #{desc => "recv (1)", + cmd => fun(#{sock := Sock, + recv := Recv} = State) -> + case Recv(Sock) of + {ok, {ClientSA, Data}} -> + ?SEV_IPRINT("recv ~p bytes", [size(Data)]), + {ok, State#{client_sa => ClientSA, + read_pkg => 1, + read_byte => size(Data)}}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "validate (recv 1)", + cmd => fun(#{sock := Sock, + read_pkg := Pkg, + read_byte := Byte} = _State) -> + try socket:info(Sock) of + #{counters := Counters} -> + ?SEV_IPRINT("validate counters: " + "~n ~p", [Counters]), + traffic_sar_counters_validation( + Counters, + [{read_pkg, Pkg}, + {read_byte, Byte}, + {read_tries, any}]) + catch + C:E:S -> + ?SEV_EPRINT("Failed get socket info: " + "~n Class: ~p" + "~n Error: ~p" + "~n Stack: ~p", [C, E, S]), + {error, {socket_info_failed, {C, E, S}}} + end + end}, + #{desc => "announce ready (recv_and_validate 1)", + cmd => fun(#{tester := Tester}) -> + ?SEV_ANNOUNCE_READY(Tester, recv_and_validate), + ok + end}, + + #{desc => "await continue (send_and_validate 1)", + cmd => fun(#{tester := Tester} = _State) -> + ?SEV_AWAIT_CONTINUE(Tester, tester, send_and_validate) + end}, + #{desc => "send (1)", + cmd => fun(#{sock := Sock, + send := Send, + client_sa := ClientSA} = State) -> + Data = ?DATA, + case Send(Sock, Data, ClientSA) of + ok -> + ?SEV_IPRINT("sent ~p bytes", [size(Data)]), + {ok, State#{write_pkg => 1, + write_byte => size(Data)}}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "validate (send 1)", + cmd => fun(#{sock := Sock, + read_pkg := RPkg, + read_byte := RByte, + write_pkg := SPkg, + write_byte := SByte} = _State) -> + try socket:info(Sock) of + #{counters := Counters} -> + ?SEV_IPRINT("validate counters: " + "~n ~p", [Counters]), + traffic_sar_counters_validation( + Counters, + [{read_pkg, RPkg}, + {read_byte, RByte}, + {write_pkg, SPkg}, + {write_byte, SByte}, + {read_tries, any}, + {write_tries, any}]) + catch + C:E:S -> + ?SEV_EPRINT("Failed get socket info: " + "~n Class: ~p" + "~n Error: ~p" + "~n Stack: ~p", [C, E, S]), + {error, {socket_info_failed, {C, E, S}}} + end + end}, + #{desc => "announce ready (send_and_validate 1)", + cmd => fun(#{tester := Tester}) -> + ?SEV_ANNOUNCE_READY(Tester, send_and_validate), + ok + end}, + + #{desc => "await continue (recv_and_validate 2)", + cmd => fun(#{tester := Tester} = _State) -> + ?SEV_AWAIT_CONTINUE(Tester, tester, recv_and_validate) + end}, + #{desc => "recv (2)", + cmd => fun(#{sock := Sock, + recv := Recv, + read_pkg := Pkg, + read_byte := Byte} = State) -> + case Recv(Sock) of + {ok, {Source, Data}} -> + ?SEV_IPRINT("recv ~p bytes", [size(Data)]), + {ok, State#{client_sa => Source, + read_pkg => Pkg + 1, + read_byte => Byte + size(Data)}}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "validate (recv 2)", + cmd => fun(#{sock := Sock, + read_pkg := RPkg, + read_byte := RByte, + write_pkg := SPkg, + write_byte := SByte} = _State) -> + try socket:info(Sock) of + #{counters := Counters} -> + ?SEV_IPRINT("validate counters: " + "~n ~p", [Counters]), + traffic_sar_counters_validation( + Counters, + [{read_pkg, RPkg}, + {read_byte, RByte}, + {write_pkg, SPkg}, + {write_byte, SByte}, + {read_tries, any}, + {write_tries, any}]) + catch + C:E:S -> + ?SEV_EPRINT("Failed get socket info: " + "~n Class: ~p" + "~n Error: ~p" + "~n Stack: ~p", [C, E, S]), + {error, {socket_info_failed, {C, E, S}}} + end + end}, + #{desc => "announce ready (recv_and_validate 2)", + cmd => fun(#{tester := Tester}) -> + ?SEV_ANNOUNCE_READY(Tester, recv_and_validate), + ok + end}, + + #{desc => "await continue (send_and_validate 2)", + cmd => fun(#{tester := Tester} = _State) -> + ?SEV_AWAIT_CONTINUE(Tester, tester, send_and_validate) + end}, + #{desc => "send (2)", + cmd => fun(#{sock := Sock, + client_sa := ClientSA, + send := Send, + write_pkg := Pkg, + write_byte := Byte} = State) -> + Data = ?DATA, + case Send(Sock, Data, ClientSA) of + ok -> + ?SEV_IPRINT("sent ~p bytes", [size(Data)]), + {ok, State#{write_pkg => Pkg + 1, + write_byte => Byte + size(Data)}}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "validate (send 2)", + cmd => fun(#{sock := Sock, + read_pkg := RPkg, + read_byte := RByte, + write_pkg := SPkg, + write_byte := SByte} = _State) -> + try socket:info(Sock) of + #{counters := Counters} -> + ?SEV_IPRINT("validate counters: " + "~n ~p", [Counters]), + traffic_sar_counters_validation( + Counters, + [{read_pkg, RPkg}, + {read_byte, RByte}, + {write_pkg, SPkg}, + {write_byte, SByte}, + {read_tries, any}, + {write_tries, any}]) + catch + C:E:S -> + ?SEV_EPRINT("Failed get socket info: " + "~n Class: ~p" + "~n Error: ~p" + "~n Stack: ~p", [C, E, S]), + {error, {socket_info_failed, {C, E, S}}} + end + end}, + #{desc => "announce ready (send_and_validate 2)", + cmd => fun(#{tester := Tester}) -> + ?SEV_ANNOUNCE_READY(Tester, send_and_validate), + ok + end}, + + + %% Termination + #{desc => "await terminate (from tester)", + cmd => fun(#{tester := Tester} = State) -> + case ?SEV_AWAIT_TERMINATE(Tester, tester) of + ok -> + {ok, maps:remove(tester, State)}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "close socket (just in case)", + cmd => fun(#{domain := local, + sock := Sock, + local_sa := #{path := Path}} = State) -> + ok = socket:close(Sock), + State1 = + unlink_path(Path, + fun() -> + maps:remove(local_sa, State) + end, + fun() -> State end), + {ok, maps:remove(lsock, State1)}; + (#{sock := Sock} = State) -> + (catch socket:close(Sock)), + {ok, maps:remove(sock, State)} + end}, + + %% *** We are done *** + ?SEV_FINISH_NORMAL + ], + + ClientSeq = + [ + %% *** Wait for start order part *** + #{desc => "await start (from tester)", + cmd => fun(#{domain := local} = State) -> + {Tester, Path} = ?SEV_AWAIT_START(), + {ok, State#{tester => Tester, server_path => Path}}; + (State) -> + {Tester, Port} = ?SEV_AWAIT_START(), + {ok, State#{tester => Tester, server_port => Port}} + end}, + #{desc => "monitor tester", + cmd => fun(#{tester := Tester} = _State) -> + _MRef = erlang:monitor(process, Tester), + ok + end}, + + %% *** Init part *** + #{desc => "which server (local) address", + cmd => fun(#{domain := local = Domain, + server_path := Path} = State) -> + LSA = which_local_socket_addr(Domain), + SSA = #{family => Domain, path => Path}, + {ok, State#{local_sa => LSA, server_sa => SSA}}; + (#{domain := Domain, server_port := Port} = State) -> + LSA = which_local_socket_addr(Domain), + SSA = LSA#{port => Port}, + {ok, State#{local_sa => LSA, server_sa => SSA}} + end}, + #{desc => "create socket", + cmd => fun(#{domain := Domain, + proto := Proto} = State) -> + case socket:open(Domain, dgram, Proto) of + {ok, Sock} -> + {ok, State#{sock => Sock}}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "bind to local address", + cmd => fun(#{sock := Sock, local_sa := LSA} = _State) -> + case socket:bind(Sock, LSA) of + {ok, _Port} -> + ok; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "initial counter validation (=zero)", + cmd => fun(#{sock := Sock} = _State) -> + try socket:info(Sock) of + #{counters := Counters} -> + ?SEV_IPRINT("Validate initial counters: " + "~n ~p", [Counters]), + traffic_sar_counters_validation(Counters) + catch + C:E:S -> + ?SEV_EPRINT("Failed get socket info: " + "~n Class: ~p" + "~n Error: ~p" + "~n Stack: ~p", [C, E, S]), + {error, {socket_info_failed, {C, E, S}}} + end + end}, + #{desc => "announce ready (init)", + cmd => fun(#{tester := Tester}) -> + ?SEV_ANNOUNCE_READY(Tester, init), + ok + end}, + + %% The actual test + #{desc => "await continue (send_and_validate 1)", + cmd => fun(#{tester := Tester} = _State) -> + ?SEV_AWAIT_CONTINUE(Tester, tester, send_and_validate) + end}, + #{desc => "send (1)", + cmd => fun(#{sock := Sock, + send := Send, + server_sa := ServerSA} = State) -> + Data = ?DATA, + case Send(Sock, Data, ServerSA) of + ok -> + ?SEV_IPRINT("sent ~p bytes", [size(Data)]), + {ok, State#{write_pkg => 1, + write_byte => size(Data)}}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "validate (send 1)", + cmd => fun(#{sock := Sock, + write_pkg := SPkg, + write_byte := SByte} = _State) -> + try socket:info(Sock) of + #{counters := Counters} -> + ?SEV_IPRINT("validate counters: " + "~n ~p", [Counters]), + traffic_sar_counters_validation( + Counters, + [{write_pkg, SPkg}, + {write_byte, SByte}, + {write_tries, any}]) + catch + C:E:S -> + ?SEV_EPRINT("Failed get socket info: " + "~n Class: ~p" + "~n Error: ~p" + "~n Stack: ~p", [C, E, S]), + {error, {socket_info_failed, {C, E, S}}} + end + end}, + #{desc => "announce ready (send_and_validate 1)", + cmd => fun(#{tester := Tester}) -> + ?SEV_ANNOUNCE_READY(Tester, send_and_validate), + ok + end}, + + #{desc => "await continue (recv_and_validate 1)", + cmd => fun(#{tester := Tester} = _State) -> + ?SEV_AWAIT_CONTINUE(Tester, tester, recv_and_validate) + end}, + #{desc => "recv (1)", + cmd => fun(#{sock := Sock, + recv := Recv, + server_sa := ServerSA} = State) -> + case Recv(Sock) of + {ok, {ServerSA, Data}} -> + ?SEV_IPRINT("recv ~p bytes", [size(Data)]), + {ok, State#{read_pkg => 1, + read_byte => size(Data)}}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "validate (recv 1)", + cmd => fun(#{sock := Sock, + read_pkg := RPkg, + read_byte := RByte, + write_pkg := SPkg, + write_byte := SByte} = _State) -> + try socket:info(Sock) of + #{counters := Counters} -> + ?SEV_IPRINT("validate counters: " + "~n ~p", [Counters]), + traffic_sar_counters_validation( + Counters, + [{read_pkg, RPkg}, + {read_byte, RByte}, + {write_pkg, SPkg}, + {write_byte, SByte}, + {read_tries, any}, + {write_tries, any}]) + catch + C:E:S -> + ?SEV_EPRINT("Failed get socket info: " + "~n Class: ~p" + "~n Error: ~p" + "~n Stack: ~p", [C, E, S]), + {error, {socket_info_failed, {C, E, S}}} + end + end}, + #{desc => "announce ready (recv_and_validate 1)", + cmd => fun(#{tester := Tester}) -> + ?SEV_ANNOUNCE_READY(Tester, recv_and_validate), + ok + end}, + + #{desc => "await continue (send_and_validate 2)", + cmd => fun(#{tester := Tester} = _State) -> + ?SEV_AWAIT_CONTINUE(Tester, tester, send_and_validate) + end}, + #{desc => "send (2)", + cmd => fun(#{sock := Sock, + send := Send, + server_sa := ServerSA, + write_pkg := SPkg, + write_byte := SByte} = State) -> + Data = ?DATA, + case Send(Sock, Data, ServerSA) of + ok -> + ?SEV_IPRINT("sent ~p bytes", [size(Data)]), + {ok, State#{write_pkg => SPkg + 1, + write_byte => SByte + size(Data)}}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "validate (send 2)", + cmd => fun(#{sock := Sock, + read_pkg := RPkg, + read_byte := RByte, + write_pkg := SPkg, + write_byte := SByte} = _State) -> + try socket:info(Sock) of + #{counters := Counters} -> + ?SEV_IPRINT("validate counters: " + "~n ~p", [Counters]), + traffic_sar_counters_validation( + Counters, + [{read_pkg, RPkg}, + {read_byte, RByte}, + {write_pkg, SPkg}, + {write_byte, SByte}, + {read_tries, any}, + {write_tries, any}]) + catch + C:E:S -> + ?SEV_EPRINT("Failed get socket info: " + "~n Class: ~p" + "~n Error: ~p" + "~n Stack: ~p", [C, E, S]), + {error, {socket_info_failed, {C, E, S}}} + end + end}, + #{desc => "announce ready (send_and_validate 2)", + cmd => fun(#{tester := Tester}) -> + ?SEV_ANNOUNCE_READY(Tester, send_and_validate), + ok + end}, + + #{desc => "await continue (recv_and_validate 2)", + cmd => fun(#{tester := Tester} = _State) -> + ?SEV_AWAIT_CONTINUE(Tester, tester, recv_and_validate) + end}, + #{desc => "recv (2)", + cmd => fun(#{sock := Sock, + server_sa := ServerSA, + recv := Recv, + read_pkg := RPkg, + read_byte := RByte} = State) -> + case Recv(Sock) of + {ok, {ServerSA, Data}} -> + ?SEV_IPRINT("recv ~p bytes", [size(Data)]), + {ok, State#{read_pkg => RPkg + 1, + read_byte => RByte + size(Data)}}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "validate (recv 2)", + cmd => fun(#{sock := Sock, + read_pkg := RPkg, + read_byte := RByte, + write_pkg := SPkg, + write_byte := SByte} = _State) -> + try socket:info(Sock) of + #{counters := Counters} -> + ?SEV_IPRINT("validate counters: " + "~n ~p", [Counters]), + traffic_sar_counters_validation( + Counters, + [{read_pkg, RPkg}, + {read_byte, RByte}, + {write_pkg, SPkg}, + {write_byte, SByte}, + {read_tries, any}, + {write_tries, any}]) + catch + C:E:S -> + ?SEV_EPRINT("Failed get socket info: " + "~n Class: ~p" + "~n Error: ~p" + "~n Stack: ~p", [C, E, S]), + {error, {socket_info_failed, {C, E, S}}} + end + end}, + #{desc => "announce ready (recv_and_validate 2)", + cmd => fun(#{tester := Tester}) -> + ?SEV_ANNOUNCE_READY(Tester, recv_and_validate), + ok + end}, + + %% Termination + #{desc => "await terminate (from tester)", + cmd => fun(#{tester := Tester} = State) -> + case ?SEV_AWAIT_TERMINATE(Tester, tester) of + ok -> + {ok, maps:remove(tester, State)}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "close connection socket", + cmd => fun(#{domain := local, + sock := Sock, + local_sa := #{path := Path}} = State) -> + ok = socket:close(Sock), + State1 = + unlink_path(Path, + fun() -> + maps:remove(local_sa, State) + end, + fun() -> State end), + {ok, maps:remove(sock, State1)}; + (#{sock := Sock} = State) -> + socket:close(Sock), + {ok, maps:remove(sock, State)} + end}, + + %% *** We are done *** + ?SEV_FINISH_NORMAL + ], + + + TesterSeq = + [ + %% *** Init part *** + #{desc => "monitor server", + cmd => fun(#{server := Pid} = _State) -> + _MRef = erlang:monitor(process, Pid), + ok + end}, + #{desc => "monitor client", + cmd => fun(#{client := Pid} = _State) -> + _MRef = erlang:monitor(process, Pid), + ok + end}, + + %% Start the server + #{desc => "order server start", + cmd => fun(#{server := Pid} = _State) -> + ?SEV_ANNOUNCE_START(Pid), + ok + end}, + #{desc => "await server ready (init)", + cmd => fun(#{domain := local, + server := Pid} = State) -> + {ok, Path} = ?SEV_AWAIT_READY(Pid, server, init), + {ok, State#{path => Path}}; + (#{server := Pid} = State) -> + {ok, Port} = ?SEV_AWAIT_READY(Pid, server, init), + {ok, State#{port => Port}} + end}, + + %% Start the client + #{desc => "order client start", + cmd => fun(#{domain := local, + client := Pid, + path := Path} = _State) -> + ?SEV_ANNOUNCE_START(Pid, Path), + ok; + (#{client := Pid, + port := Port} = _State) -> + ?SEV_ANNOUNCE_START(Pid, Port), + ok + end}, + #{desc => "await client ready (init)", + cmd => fun(#{client := Pid} = _State) -> + ok = ?SEV_AWAIT_READY(Pid, client, init) + end}, + + %% *** The actual test *** + + #{desc => "order server to continue (recv_and_validate 1)", + cmd => fun(#{server := Server} = _State) -> + ?SEV_ANNOUNCE_CONTINUE(Server, recv_and_validate), + ok + end}, + #{desc => "order client to continue (send_and_validate 1)", + cmd => fun(#{client := Client} = _State) -> + ?SEV_ANNOUNCE_CONTINUE(Client, send_and_validate), + ok + end}, + #{desc => "await client ready (send_and_validate 1)", + cmd => fun(#{client := Client} = _State) -> + ?SEV_AWAIT_READY(Client, client, send_and_validate) + end}, + #{desc => "await server ready (recv_and_validate 1)", + cmd => fun(#{server := Server} = _State) -> + ?SEV_AWAIT_READY(Server, server, recv_and_validate) + end}, + + ?SEV_SLEEP(?SECS(1)), + + #{desc => "order client to continue (recv_and_validate 1)", + cmd => fun(#{client := Client} = _State) -> + ?SEV_ANNOUNCE_CONTINUE(Client, recv_and_validate), + ok + end}, + #{desc => "order server to continue (send_and_validate 1)", + cmd => fun(#{server := Server} = _State) -> + ?SEV_ANNOUNCE_CONTINUE(Server, send_and_validate), + ok + end}, + #{desc => "await server ready (send_and_validate 1)", + cmd => fun(#{server := Server} = _State) -> + ?SEV_AWAIT_READY(Server, server, send_and_validate) + end}, + #{desc => "await client ready (recv_and_validate 1)", + cmd => fun(#{client := Client} = _State) -> + ?SEV_AWAIT_READY(Client, client, recv_and_validate) + end}, + + ?SEV_SLEEP(?SECS(1)), + + #{desc => "order server to continue (recv_and_validate 2)", + cmd => fun(#{server := Server} = _State) -> + ?SEV_ANNOUNCE_CONTINUE(Server, recv_and_validate), + ok + end}, + #{desc => "order client to continue (send_and_validate 2)", + cmd => fun(#{client := Client} = _State) -> + ?SEV_ANNOUNCE_CONTINUE(Client, send_and_validate), + ok + end}, + #{desc => "await client ready (send_and_validate 2)", + cmd => fun(#{client := Client} = _State) -> + ?SEV_AWAIT_READY(Client, client, send_and_validate) + end}, + #{desc => "await server ready (recv_and_validate 2)", + cmd => fun(#{server := Server} = _State) -> + ?SEV_AWAIT_READY(Server, server, recv_and_validate) + end}, + + ?SEV_SLEEP(?SECS(1)), + + #{desc => "order client to continue (recv_and_validate 2)", + cmd => fun(#{client := Client} = _State) -> + ?SEV_ANNOUNCE_CONTINUE(Client, recv_and_validate), + ok + end}, + #{desc => "order server to continue (send_and_validate 2)", + cmd => fun(#{server := Server} = _State) -> + ?SEV_ANNOUNCE_CONTINUE(Server, send_and_validate), + ok + end}, + #{desc => "await server ready (send_and_validate 2)", + cmd => fun(#{server := Server} = _State) -> + ?SEV_AWAIT_READY(Server, server, send_and_validate) + end}, + #{desc => "await client ready (recv_and_validate 2)", + cmd => fun(#{client := Client} = _State) -> + ?SEV_AWAIT_READY(Client, client, recv_and_validate) + end}, + + %% *** Termination *** + #{desc => "order client to terminate", + cmd => fun(#{client := Client} = _State) -> + ?SEV_ANNOUNCE_TERMINATE(Client), + ok + end}, + #{desc => "await client termination", + cmd => fun(#{client := Client} = State) -> + ?SEV_AWAIT_TERMINATION(Client), + State1 = maps:remove(client, State), + {ok, State1} + end}, + #{desc => "order server to terminate", + cmd => fun(#{server := Server} = _State) -> + ?SEV_ANNOUNCE_TERMINATE(Server), + ok + end}, + #{desc => "await server termination", + cmd => fun(#{server := Server} = State) -> + ?SEV_AWAIT_TERMINATION(Server), + State1 = maps:remove(server, State), + {ok, State1} + end}, + + %% *** We are done *** + ?SEV_FINISH_NORMAL + ], + + i("start server evaluator"), + ServerInitState = InitState#{host => local_host()}, + Server = ?SEV_START("server", ServerSeq, ServerInitState), + + i("start client evaluator(s)"), + ClientInitState = InitState#{host => local_host()}, + Client = ?SEV_START("client", ClientSeq, ClientInitState), + + i("start 'tester' evaluator"), + TesterInitState = #{server => Server#ev.pid, + client => Client#ev.pid}, + Tester = ?SEV_START("tester", TesterSeq, TesterInitState), + + i("await evaluator"), + ok = ?SEV_AWAIT_FINISH([Server, Client, Tester]). + + + + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %% This test case is intended to test that the send and recv functions %% behave as expected when sending and/or reading chunks. %% First send data in one "big" chunk, and read it in "small" chunks. diff --git a/erts/preloaded/ebin/socket.beam b/erts/preloaded/ebin/socket.beam Binary files differindex c4680c2baa..25eb0b2f4a 100644 --- a/erts/preloaded/ebin/socket.beam +++ b/erts/preloaded/ebin/socket.beam diff --git a/erts/preloaded/src/socket.erl b/erts/preloaded/src/socket.erl index e9a76dc1e9..be94e3a867 100644 --- a/erts/preloaded/src/socket.erl +++ b/erts/preloaded/src/socket.erl @@ -31,7 +31,7 @@ debug/1, %% command/1, - info/0, + info/0, info/1, supports/0, supports/1, supports/2, supports/3 ]). @@ -67,6 +67,10 @@ select_ref/0, select_info/0, + socket_counters/0, + socket_counter/0, + socket_info/0, + %% command/0, domain/0, @@ -153,6 +157,15 @@ }. %% -type command() :: debug_command(). +-type socket_counters() :: [{socket_counter(), non_neg_integer()}]. +-type socket_counter() :: read_byte | read_fails | read_pkg | read_tries | + read_waits | write_byte | write_fails | write_pkg | + write_tries | write_waits. +-type socket_info() :: #{counters := socket_counters(), + num_readers := non_neg_integer(), + num_writers := non_neg_integer(), + num_acceptors := non_neg_integer()}. + -type uint8() :: 0..16#FF. -type uint16() :: 0..16#FFFF. -type uint20() :: 0..16#FFFFF. @@ -879,7 +892,7 @@ on_load(Extra) -> --spec info() -> list(). +-spec info() -> map(). info() -> nif_info(). @@ -904,6 +917,24 @@ command(#{command := debug, %% =========================================================================== %% +%% info - Get miscellaneous information about a socket. +%% +%% Generates a list of various info about the socket, such as counter values. +%% +%% Do *not* call this function often. +%% +%% =========================================================================== + +-spec info(Socket) -> socket_info() when + Socket :: socket(). + +info(#socket{ref = SockRef}) -> + nif_info(SockRef). + + + +%% =========================================================================== +%% %% supports - get information about what the platform "supports". %% %% Generates a list of various info about what the plaform can support. @@ -3879,6 +3910,9 @@ error(Reason) -> nif_info() -> erlang:nif_error(undef). +nif_info(_SRef) -> + erlang:nif_error(undef). + nif_command(_Command) -> erlang:nif_error(undef). |