diff options
-rw-r--r-- | erts/emulator/nifs/common/socket_int.h | 3 | ||||
-rw-r--r-- | erts/emulator/nifs/common/socket_nif.c | 314 | ||||
-rw-r--r-- | erts/emulator/test/Makefile | 1 | ||||
-rw-r--r-- | erts/emulator/test/socket_SUITE.erl | 201 | ||||
-rw-r--r-- | erts/emulator/test/socket_test_evaluator.erl | 10 | ||||
-rw-r--r-- | erts/emulator/test/socket_test_lib.erl | 10 | ||||
-rw-r--r-- | erts/emulator/test/socket_test_logger.erl | 99 | ||||
-rw-r--r-- | erts/preloaded/ebin/socket.beam | bin | 69392 -> 70272 bytes | |||
-rw-r--r-- | erts/preloaded/src/socket.erl | 34 |
9 files changed, 446 insertions, 226 deletions
diff --git a/erts/emulator/nifs/common/socket_int.h b/erts/emulator/nifs/common/socket_int.h index ec17e45f25..0f973855ae 100644 --- a/erts/emulator/nifs/common/socket_int.h +++ b/erts/emulator/nifs/common/socket_int.h @@ -103,6 +103,7 @@ typedef unsigned int BOOLEAN_T; /* ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ * "Global" atoms */ +extern ERL_NIF_TERM esock_atom_abort; extern ERL_NIF_TERM esock_atom_accept; extern ERL_NIF_TERM esock_atom_acceptconn; extern ERL_NIF_TERM esock_atom_acceptfilter; @@ -126,6 +127,7 @@ extern ERL_NIF_TERM esock_atom_block_source; extern ERL_NIF_TERM esock_atom_broadcast; extern ERL_NIF_TERM esock_atom_busy_poll; extern ERL_NIF_TERM esock_atom_checksum; +extern ERL_NIF_TERM esock_atom_close; extern ERL_NIF_TERM esock_atom_connect; extern ERL_NIF_TERM esock_atom_congestion; extern ERL_NIF_TERM esock_atom_context; @@ -269,6 +271,7 @@ extern ERL_NIF_TERM esock_atom_sndbufforce; extern ERL_NIF_TERM esock_atom_sndlowat; extern ERL_NIF_TERM esock_atom_sndtimeo; extern ERL_NIF_TERM esock_atom_socket; +extern ERL_NIF_TERM esock_atom_socket_tag; extern ERL_NIF_TERM esock_atom_spec_dst; extern ERL_NIF_TERM esock_atom_status; extern ERL_NIF_TERM esock_atom_stream; diff --git a/erts/emulator/nifs/common/socket_nif.c b/erts/emulator/nifs/common/socket_nif.c index 1e0533535c..80903c487f 100644 --- a/erts/emulator/nifs/common/socket_nif.c +++ b/erts/emulator/nifs/common/socket_nif.c @@ -760,24 +760,24 @@ typedef struct { typedef struct { /* +++ The actual socket +++ */ - SOCKET sock; - HANDLE event; + SOCKET sock; + HANDLE event; /* +++ Stuff "about" the socket +++ */ - int domain; - int type; - int protocol; + int domain; + int type; + int protocol; - unsigned int state; - SocketAddress remote; - unsigned int addrLen; + unsigned int state; + SocketAddress remote; + unsigned int addrLen; - ErlNifEnv* env; + ErlNifEnv* env; /* +++ Controller (owner) process +++ */ - ErlNifPid ctrlPid; - // ErlNifMonitor ctrlMon; - ESockMonitor ctrlMon; + ErlNifPid ctrlPid; + // ErlNifMonitor ctrlMon; + ESockMonitor ctrlMon; /* +++ Write stuff +++ */ ErlNifMutex* writeMtx; @@ -996,11 +996,13 @@ static ERL_NIF_TERM naccept(ErlNifEnv* env, ERL_NIF_TERM ref); static ERL_NIF_TERM nsend(ErlNifEnv* env, SocketDescriptor* descP, + ERL_NIF_TERM sockRef, ERL_NIF_TERM sendRef, ErlNifBinary* dataP, int flags); static ERL_NIF_TERM nsendto(ErlNifEnv* env, SocketDescriptor* descP, + ERL_NIF_TERM sockRef, ERL_NIF_TERM sendRef, ErlNifBinary* dataP, int flags, @@ -1008,21 +1010,25 @@ static ERL_NIF_TERM nsendto(ErlNifEnv* env, unsigned int toAddrLen); static ERL_NIF_TERM nsendmsg(ErlNifEnv* env, SocketDescriptor* descP, + ERL_NIF_TERM sockRef, ERL_NIF_TERM sendRef, ERL_NIF_TERM eMsgHdr, int flags); static ERL_NIF_TERM nrecv(ErlNifEnv* env, SocketDescriptor* descP, + ERL_NIF_TERM sendRef, ERL_NIF_TERM recvRef, int len, int flags); static ERL_NIF_TERM nrecvfrom(ErlNifEnv* env, SocketDescriptor* descP, + ERL_NIF_TERM sockRef, ERL_NIF_TERM recvRef, uint16_t bufSz, int flags); static ERL_NIF_TERM nrecvmsg(ErlNifEnv* env, SocketDescriptor* descP, + ERL_NIF_TERM sockRef, ERL_NIF_TERM recvRef, uint16_t bufLen, uint16_t ctrlLen, @@ -1986,6 +1992,7 @@ static ERL_NIF_TERM send_check_result(ErlNifEnv* env, ssize_t written, ssize_t dataSize, int saveErrno, + ERL_NIF_TERM sockRef, ERL_NIF_TERM sendRef); static BOOLEAN_T recv_check_reader(ErlNifEnv* env, SocketDescriptor* descP, @@ -1998,6 +2005,7 @@ static ERL_NIF_TERM recv_update_current_reader(ErlNifEnv* env, SocketDescriptor* descP); static void recv_error_current_reader(ErlNifEnv* env, SocketDescriptor* descP, + ERL_NIF_TERM sockRef, ERL_NIF_TERM reason); static ERL_NIF_TERM recv_check_result(ErlNifEnv* env, SocketDescriptor* descP, @@ -2005,6 +2013,7 @@ static ERL_NIF_TERM recv_check_result(ErlNifEnv* env, int toRead, int saveErrno, ErlNifBinary* bufP, + ERL_NIF_TERM sockRef, ERL_NIF_TERM recvRef); static ERL_NIF_TERM recvfrom_check_result(ErlNifEnv* env, SocketDescriptor* descP, @@ -2013,6 +2022,7 @@ static ERL_NIF_TERM recvfrom_check_result(ErlNifEnv* env, ErlNifBinary* bufP, SocketAddress* fromAddrP, unsigned int fromAddrLen, + ERL_NIF_TERM sockRef, ERL_NIF_TERM recvRef); static ERL_NIF_TERM recvmsg_check_result(ErlNifEnv* env, SocketDescriptor* descP, @@ -2021,6 +2031,7 @@ static ERL_NIF_TERM recvmsg_check_result(ErlNifEnv* env, struct msghdr* msgHdrP, ErlNifBinary* dataBufP, ErlNifBinary* ctrlBufP, + ERL_NIF_TERM sockRef, ERL_NIF_TERM recvRef); static ERL_NIF_TERM nfinalize_connection(ErlNifEnv* env, @@ -2310,22 +2321,22 @@ static void socket_down_reader(ErlNifEnv* env, SocketDescriptor* descP, const ErlNifPid* pid); -/* -static char* send_msg_error_closed(ErlNifEnv* env, +static char* esock_send_close_msg(ErlNifEnv* env, + ERL_NIF_TERM closeRef, + ErlNifPid* pid); +static char* esock_send_abort_msg(ErlNifEnv* env, + ERL_NIF_TERM sockRef, + ERL_NIF_TERM recvRef, + ERL_NIF_TERM reason, + ErlNifPid* pid); +static char* esock_send_socket_msg(ErlNifEnv* env, + ERL_NIF_TERM sockRef, + ERL_NIF_TERM tag, + ERL_NIF_TERM info, ErlNifPid* pid); -*/ -/* -static char* send_msg_error(ErlNifEnv* env, - ERL_NIF_TERM reason, +static char* esock_send_msg(ErlNifEnv* env, + ERL_NIF_TERM msg, ErlNifPid* pid); -*/ -static char* send_msg_nif_abort(ErlNifEnv* env, - ERL_NIF_TERM ref, - ERL_NIF_TERM reason, - ErlNifPid* pid); -static char* send_msg(ErlNifEnv* env, - ERL_NIF_TERM msg, - ErlNifPid* pid); static BOOLEAN_T extract_debug(ErlNifEnv* env, ERL_NIF_TERM map); @@ -2394,7 +2405,7 @@ static char str_max_rxt[] = "max_rxt"; static char str_min[] = "min"; static char str_mode[] = "mode"; static char str_multiaddr[] = "multiaddr"; -static char str_nif_abort[] = "nif_abort"; +// static char str_nif_abort[] = "nif_abort"; static char str_null[] = "null"; static char str_num_dlocal[] = "num_domain_local"; static char str_num_dinet[] = "num_domain_inet"; @@ -2436,6 +2447,7 @@ static char str_exsend[] = "exsend"; // failed send /* *** "Global" Atoms *** */ +ERL_NIF_TERM esock_atom_abort; ERL_NIF_TERM esock_atom_accept; ERL_NIF_TERM esock_atom_acceptconn; ERL_NIF_TERM esock_atom_acceptfilter; @@ -2459,6 +2471,7 @@ ERL_NIF_TERM esock_atom_block_source; ERL_NIF_TERM esock_atom_broadcast; ERL_NIF_TERM esock_atom_busy_poll; ERL_NIF_TERM esock_atom_checksum; +ERL_NIF_TERM esock_atom_close; ERL_NIF_TERM esock_atom_connect; ERL_NIF_TERM esock_atom_congestion; ERL_NIF_TERM esock_atom_context; @@ -2598,6 +2611,7 @@ ERL_NIF_TERM esock_atom_seqpacket; ERL_NIF_TERM esock_atom_setfib; ERL_NIF_TERM esock_atom_set_peer_primary_addr; ERL_NIF_TERM esock_atom_socket; +ERL_NIF_TERM esock_atom_socket_tag; ERL_NIF_TERM esock_atom_sndbuf; ERL_NIF_TERM esock_atom_sndbufforce; ERL_NIF_TERM esock_atom_sndlowat; @@ -2665,7 +2679,7 @@ static ERL_NIF_TERM atom_max_rxt; static ERL_NIF_TERM atom_min; static ERL_NIF_TERM atom_mode; static ERL_NIF_TERM atom_multiaddr; -static ERL_NIF_TERM atom_nif_abort; +// static ERL_NIF_TERM atom_nif_abort; static ERL_NIF_TERM atom_null; static ERL_NIF_TERM atom_num_dinet; static ERL_NIF_TERM atom_num_dinet6; @@ -4929,7 +4943,7 @@ ERL_NIF_TERM naccept_listening(ErlNifEnv* env, accDescP->wCtrlSz = descP->wCtrlSz; // Inherit buffer size accRef = enif_make_resource(env, accDescP); - enif_release_resource(accDescP); // We should really store a reference ... + enif_release_resource(accDescP); accDescP->ctrlPid = caller; if (MONP("naccept_listening -> ctrl", @@ -5143,7 +5157,7 @@ ERL_NIF_TERM nif_send(ErlNifEnv* env, const ERL_NIF_TERM argv[]) { SocketDescriptor* descP; - ERL_NIF_TERM sendRef; + ERL_NIF_TERM sockRef, sendRef; ErlNifBinary sndData; unsigned int eflags; int flags; @@ -5154,13 +5168,17 @@ ERL_NIF_TERM nif_send(ErlNifEnv* env, /* Extract arguments and perform preliminary validation */ if ((argc != 4) || - !enif_get_resource(env, argv[0], sockets, (void**) &descP) || !GET_BIN(env, argv[2], &sndData) || !GET_UINT(env, argv[3], &eflags)) { return enif_make_badarg(env); } + sockRef = argv[0]; // We need this in case we send in case we send abort sendRef = argv[1]; + if (!enif_get_resource(env, sockRef, sockets, (void**) &descP)) { + return enif_make_badarg(env); + } + if (IS_CLOSED(descP) || IS_CLOSING(descP)) return esock_make_error(env, atom_closed); @@ -5170,7 +5188,7 @@ ERL_NIF_TERM nif_send(ErlNifEnv* env, "\r\n SendRef: %T" "\r\n Size of data: %d" "\r\n eFlags: %d" - "\r\n", descP->sock, argv[0], sendRef, sndData.size, eflags) ); + "\r\n", descP->sock, sockRef, sendRef, sndData.size, eflags) ); if (!IS_CONNECTED(descP)) return esock_make_error(env, atom_enotconn); @@ -5193,7 +5211,7 @@ ERL_NIF_TERM nif_send(ErlNifEnv* env, * time we do. */ - res = nsend(env, descP, sendRef, &sndData, flags); + res = nsend(env, descP, sockRef, sendRef, &sndData, flags); MUNLOCK(descP->writeMtx); @@ -5211,6 +5229,7 @@ ERL_NIF_TERM nif_send(ErlNifEnv* env, static ERL_NIF_TERM nsend(ErlNifEnv* env, SocketDescriptor* descP, + ERL_NIF_TERM sockRef, ERL_NIF_TERM sendRef, ErlNifBinary* sndDataP, int flags) @@ -5238,7 +5257,8 @@ ERL_NIF_TERM nsend(ErlNifEnv* env, save_errno = -1; // The value does not actually matter in this case return send_check_result(env, descP, - written, sndDataP->size, save_errno, sendRef); + written, sndDataP->size, save_errno, + sockRef, sendRef); } @@ -5264,7 +5284,7 @@ ERL_NIF_TERM nif_sendto(ErlNifEnv* env, const ERL_NIF_TERM argv[]) { SocketDescriptor* descP; - ERL_NIF_TERM sendRef; + ERL_NIF_TERM sockRef, sendRef; ErlNifBinary sndData; unsigned int eflags; int flags; @@ -5284,9 +5304,14 @@ ERL_NIF_TERM nif_sendto(ErlNifEnv* env, !GET_UINT(env, argv[4], &eflags)) { return enif_make_badarg(env); } + sockRef = argv[0]; // We need this in case we send in case we send abort sendRef = argv[1]; eSockAddr = argv[3]; + if (!enif_get_resource(env, sockRef, sockets, (void**) &descP)) { + return enif_make_badarg(env); + } + if (IS_CLOSED(descP) || IS_CLOSING(descP)) return esock_make_error(env, atom_closed); @@ -5298,7 +5323,7 @@ ERL_NIF_TERM nif_sendto(ErlNifEnv* env, "\r\n eSockAddr: %T" "\r\n eflags: %d" "\r\n", - descP->sock, argv[0], sendRef, sndData.size, eSockAddr, eflags) ); + descP->sock, sockRef, sendRef, sndData.size, eSockAddr, eflags) ); /* THIS TEST IS NOT CORRECT!!! */ if (!IS_OPEN(descP)) { @@ -5320,7 +5345,7 @@ ERL_NIF_TERM nif_sendto(ErlNifEnv* env, MLOCK(descP->writeMtx); - res = nsendto(env, descP, sendRef, &sndData, flags, + res = nsendto(env, descP, sockRef, sendRef, &sndData, flags, &remoteAddr, remoteAddrLen); MUNLOCK(descP->writeMtx); @@ -5336,6 +5361,7 @@ ERL_NIF_TERM nif_sendto(ErlNifEnv* env, static ERL_NIF_TERM nsendto(ErlNifEnv* env, SocketDescriptor* descP, + ERL_NIF_TERM sockRef, ERL_NIF_TERM sendRef, ErlNifBinary* dataP, int flags, @@ -5372,7 +5398,8 @@ ERL_NIF_TERM nsendto(ErlNifEnv* env, else save_errno = -1; // The value does not actually matter in this case - return send_check_result(env, descP, written, dataP->size, save_errno, sendRef); + return send_check_result(env, descP, written, dataP->size, save_errno, + sockRef, sendRef); } @@ -5395,7 +5422,7 @@ ERL_NIF_TERM nif_sendmsg(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) { - ERL_NIF_TERM res, sendRef, eMsgHdr; + ERL_NIF_TERM res, sockRef, sendRef, eMsgHdr; SocketDescriptor* descP; unsigned int eflags; int flags; @@ -5405,14 +5432,18 @@ ERL_NIF_TERM nif_sendmsg(ErlNifEnv* env, /* Extract arguments and perform preliminary validation */ if ((argc != 4) || - !enif_get_resource(env, argv[0], sockets, (void**) &descP) || !IS_MAP(env, argv[2]) || !GET_UINT(env, argv[3], &eflags)) { return enif_make_badarg(env); } + sockRef = argv[0]; // We need this in case we send in case we send abort sendRef = argv[1]; eMsgHdr = argv[2]; + if (!enif_get_resource(env, sockRef, sockets, (void**) &descP)) { + return enif_make_badarg(env); + } + if (IS_CLOSED(descP) || IS_CLOSING(descP)) return esock_make_error(env, atom_closed); @@ -5433,7 +5464,7 @@ ERL_NIF_TERM nif_sendmsg(ErlNifEnv* env, MLOCK(descP->writeMtx); - res = nsendmsg(env, descP, sendRef, eMsgHdr, flags); + res = nsendmsg(env, descP, sockRef, sendRef, eMsgHdr, flags); MUNLOCK(descP->writeMtx); @@ -5449,6 +5480,7 @@ ERL_NIF_TERM nif_sendmsg(ErlNifEnv* env, static ERL_NIF_TERM nsendmsg(ErlNifEnv* env, SocketDescriptor* descP, + ERL_NIF_TERM sockRef, ERL_NIF_TERM sendRef, ERL_NIF_TERM eMsgHdr, int flags) @@ -5589,7 +5621,8 @@ ERL_NIF_TERM nsendmsg(ErlNifEnv* env, else save_errno = -1; // OK or not complete: this value should not matter in this case - res = send_check_result(env, descP, written, dataSize, save_errno, sendRef); + res = send_check_result(env, descP, written, dataSize, save_errno, + sockRef, sendRef); FREE(iovBins); FREE(iov); @@ -5691,20 +5724,24 @@ ERL_NIF_TERM nif_recv(ErlNifEnv* env, const ERL_NIF_TERM argv[]) { SocketDescriptor* descP; - ERL_NIF_TERM recvRef; + ERL_NIF_TERM sockRef, recvRef; int len; unsigned int eflags; int flags; ERL_NIF_TERM res; if ((argc != 4) || - !enif_get_resource(env, argv[0], sockets, (void**) &descP) || !GET_INT(env, argv[2], &len) || !GET_UINT(env, argv[3], &eflags)) { return enif_make_badarg(env); } + sockRef = argv[0]; // We need this in case we send in case we send abort recvRef = argv[1]; + if (!enif_get_resource(env, sockRef, sockets, (void**) &descP)) { + return enif_make_badarg(env); + } + if (IS_CLOSED(descP) || IS_CLOSING(descP)) return esock_make_error(env, atom_closed); @@ -5729,7 +5766,7 @@ ERL_NIF_TERM nif_recv(ErlNifEnv* env, * time we do. */ - res = nrecv(env, descP, recvRef, len, flags); + res = nrecv(env, descP, sockRef, recvRef, len, flags); MUNLOCK(descP->readMtx); @@ -5746,6 +5783,7 @@ ERL_NIF_TERM nif_recv(ErlNifEnv* env, static ERL_NIF_TERM nrecv(ErlNifEnv* env, SocketDescriptor* descP, + ERL_NIF_TERM sockRef, ERL_NIF_TERM recvRef, int len, int flags) @@ -5794,6 +5832,7 @@ ERL_NIF_TERM nrecv(ErlNifEnv* env, read, len, save_errno, &buf, + sockRef, recvRef); } @@ -5829,7 +5868,7 @@ ERL_NIF_TERM nif_recvfrom(ErlNifEnv* env, const ERL_NIF_TERM argv[]) { SocketDescriptor* descP; - ERL_NIF_TERM recvRef; + ERL_NIF_TERM sockRef, recvRef; unsigned int bufSz; unsigned int eflags; int flags; @@ -5840,12 +5879,16 @@ ERL_NIF_TERM nif_recvfrom(ErlNifEnv* env, /* Extract arguments and perform preliminary validation */ if ((argc != 4) || - !enif_get_resource(env, argv[0], sockets, (void**) &descP) || !GET_UINT(env, argv[2], &bufSz) || !GET_UINT(env, argv[3], &eflags)) { return enif_make_badarg(env); } - recvRef = argv[1]; + sockRef = argv[0]; // We need this in case we send in case we send abort + recvRef = argv[1]; + + if (!enif_get_resource(env, sockRef, sockets, (void**) &descP)) { + return enif_make_badarg(env); + } if (IS_CLOSED(descP) || IS_CLOSING(descP)) return esock_make_error(env, atom_closed); @@ -5883,7 +5926,7 @@ ERL_NIF_TERM nif_recvfrom(ErlNifEnv* env, * </KOLLA> */ - res = nrecvfrom(env, descP, recvRef, bufSz, flags); + res = nrecvfrom(env, descP, sockRef, recvRef, bufSz, flags); MUNLOCK(descP->readMtx); @@ -5900,6 +5943,7 @@ ERL_NIF_TERM nif_recvfrom(ErlNifEnv* env, static ERL_NIF_TERM nrecvfrom(ErlNifEnv* env, SocketDescriptor* descP, + ERL_NIF_TERM sockRef, ERL_NIF_TERM recvRef, uint16_t len, int flags) @@ -5951,6 +5995,7 @@ ERL_NIF_TERM nrecvfrom(ErlNifEnv* env, save_errno, &buf, &fromAddr, addrLen, + sockRef, recvRef); } @@ -5990,7 +6035,7 @@ ERL_NIF_TERM nif_recvmsg(ErlNifEnv* env, const ERL_NIF_TERM argv[]) { SocketDescriptor* descP; - ERL_NIF_TERM recvRef; + ERL_NIF_TERM sockRef, recvRef; unsigned int bufSz; unsigned int ctrlSz; unsigned int eflags; @@ -6002,14 +6047,18 @@ ERL_NIF_TERM nif_recvmsg(ErlNifEnv* env, /* Extract arguments and perform preliminary validation */ if ((argc != 5) || - !enif_get_resource(env, argv[0], sockets, (void**) &descP) || !GET_UINT(env, argv[2], &bufSz) || !GET_UINT(env, argv[3], &ctrlSz) || !GET_UINT(env, argv[4], &eflags)) { return enif_make_badarg(env); } - recvRef = argv[1]; + sockRef = argv[0]; // We need this in case we send in case we send abort + recvRef = argv[1]; + if (!enif_get_resource(env, sockRef, sockets, (void**) &descP)) { + return enif_make_badarg(env); + } + if (IS_CLOSED(descP) || IS_CLOSING(descP)) return esock_make_error(env, atom_closed); @@ -6047,7 +6096,7 @@ ERL_NIF_TERM nif_recvmsg(ErlNifEnv* env, * </KOLLA> */ - res = nrecvmsg(env, descP, recvRef, bufSz, ctrlSz, flags); + res = nrecvmsg(env, descP, sockRef, recvRef, bufSz, ctrlSz, flags); MUNLOCK(descP->readMtx); @@ -6064,6 +6113,7 @@ ERL_NIF_TERM nif_recvmsg(ErlNifEnv* env, static ERL_NIF_TERM nrecvmsg(ErlNifEnv* env, SocketDescriptor* descP, + ERL_NIF_TERM sockRef, ERL_NIF_TERM recvRef, uint16_t bufLen, uint16_t ctrlLen, @@ -6144,6 +6194,7 @@ ERL_NIF_TERM nrecvmsg(ErlNifEnv* env, &msgHdr, data, // Needed for iov encode &ctrl, // Needed for ctrl header encode + sockRef, recvRef); } @@ -13254,6 +13305,7 @@ ERL_NIF_TERM send_check_result(ErlNifEnv* env, ssize_t written, ssize_t dataSize, int saveErrno, + ERL_NIF_TERM sockRef, ERL_NIF_TERM sendRef) { SSDBG( descP, @@ -13330,7 +13382,7 @@ ERL_NIF_TERM send_check_result(ErlNifEnv* env, while (writer_pop(env, descP, &pid, &mon, &ref)) { SSDBG( descP, ("SOCKET", "send_check_result -> abort %T\r\n", pid) ); - send_msg_nif_abort(env, ref, res, &pid); + esock_send_abort_msg(env, sockRef, ref, res, &pid); DEMONP("send_check_result -> pop'ed writer", env, descP, &mon); } @@ -13527,6 +13579,7 @@ ERL_NIF_TERM recv_update_current_reader(ErlNifEnv* env, static void recv_error_current_reader(ErlNifEnv* env, SocketDescriptor* descP, + ERL_NIF_TERM sockRef, ERL_NIF_TERM reason) { if (descP->currentReaderP != NULL) { @@ -13541,7 +13594,7 @@ void recv_error_current_reader(ErlNifEnv* env, while (reader_pop(env, descP, &pid, &mon, &ref)) { SSDBG( descP, ("SOCKET", "recv_error_current_reader -> abort %T\r\n", pid) ); - send_msg_nif_abort(env, ref, reason, &pid); + esock_send_abort_msg(env, sockRef, ref, reason, &pid); DEMONP("recv_error_current_reader -> pop'ed reader", env, descP, &mon); } @@ -13561,6 +13614,7 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env, int toRead, int saveErrno, ErlNifBinary* bufP, + ERL_NIF_TERM sockRef, ERL_NIF_TERM recvRef) { char* xres; @@ -13595,7 +13649,7 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env, * We must also notify any waiting readers! */ - recv_error_current_reader(env, descP, res); + recv_error_current_reader(env, descP, sockRef, res); FREE_BIN(bufP); @@ -13713,7 +13767,7 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env, descP->closeLocal = FALSE; descP->state = SOCKET_STATE_CLOSING; - recv_error_current_reader(env, descP, res); + recv_error_current_reader(env, descP, sockRef, res); SELECT(env, descP->sock, @@ -13751,7 +13805,7 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env, SSDBG( descP, ("SOCKET", "recv_check_result -> [%d] errno: %d\r\n", toRead, saveErrno) ); - recv_error_current_reader(env, descP, res); + recv_error_current_reader(env, descP, sockRef, res); FREE_BIN(bufP); @@ -13835,6 +13889,7 @@ ERL_NIF_TERM recvfrom_check_result(ErlNifEnv* env, ErlNifBinary* bufP, SocketAddress* fromAddrP, unsigned int fromAddrLen, + ERL_NIF_TERM sockRef, ERL_NIF_TERM recvRef) { char* xres; @@ -13876,7 +13931,7 @@ ERL_NIF_TERM recvfrom_check_result(ErlNifEnv* env, descP->closeLocal = FALSE; descP->state = SOCKET_STATE_CLOSING; - recv_error_current_reader(env, descP, res); + recv_error_current_reader(env, descP, sockRef, res); SELECT(env, descP->sock, @@ -13909,7 +13964,7 @@ ERL_NIF_TERM recvfrom_check_result(ErlNifEnv* env, ("SOCKET", "recvfrom_check_result -> errno: %d\r\n", saveErrno) ); - recv_error_current_reader(env, descP, res); + recv_error_current_reader(env, descP, sockRef, res); FREE_BIN(bufP); @@ -13962,6 +14017,7 @@ ERL_NIF_TERM recvmsg_check_result(ErlNifEnv* env, struct msghdr* msgHdrP, ErlNifBinary* dataBufP, ErlNifBinary* ctrlBufP, + ERL_NIF_TERM sockRef, ERL_NIF_TERM recvRef) { @@ -14026,7 +14082,7 @@ ERL_NIF_TERM recvmsg_check_result(ErlNifEnv* env, descP->closeLocal = FALSE; descP->state = SOCKET_STATE_CLOSING; - recv_error_current_reader(env, descP, res); + recv_error_current_reader(env, descP, sockRef, res); SELECT(env, descP->sock, @@ -14060,7 +14116,7 @@ ERL_NIF_TERM recvmsg_check_result(ErlNifEnv* env, ("SOCKET", "recvmsg_check_result -> errno: %d\r\n", saveErrno) ); - recv_error_current_reader(env, descP, res); + recv_error_current_reader(env, descP, sockRef, res); FREE_BIN(dataBufP); FREE_BIN(ctrlBufP); @@ -16350,37 +16406,89 @@ char* send_msg_error(ErlNifEnv* env, */ -/* Send an (nif-) abort message to the specified process: +/* Send an close message to the specified process: * A message in the form: * - * {nif_abort, Ref, Reason} + * {'$socket', SockRef, close, CloseRef} * - * This message is for processes that are waiting in the + * This message is for processes that is waiting in the + * erlang API (close-) function for the socket to be "closed" + * (actually that the 'stop' callback function has been called). + */ +static +char* esock_send_close_msg(ErlNifEnv* env, + ERL_NIF_TERM closeRef, + ErlNifPid* pid) +{ + return esock_send_socket_msg(env, + esock_atom_undefined, + esock_atom_close, closeRef, pid); +} + + +/* Send an abort message to the specified process: + * A message in the form: + * + * {'$socket', SockRef, abort, {RecvRef, Reason}} + * + * This message is for processes that is waiting in the * erlang API functions for a select message. */ static -char* send_msg_nif_abort(ErlNifEnv* env, - ERL_NIF_TERM ref, - ERL_NIF_TERM reason, - ErlNifPid* pid) +char* esock_send_abort_msg(ErlNifEnv* env, + ERL_NIF_TERM sockRef, + ERL_NIF_TERM recvRef, + ERL_NIF_TERM reason, + ErlNifPid* pid) { - ERL_NIF_TERM msg = MKT3(env, atom_nif_abort, ref, reason); + ERL_NIF_TERM info = MKT2(env, recvRef, reason); - return send_msg(env, msg, pid); + /* + esock_dbg_printf("SEND MSG", + "try send abort message to %T:\r\n", + "\r\n sockRef: %T" + "\r\n recvRef: %T" + "\r\n reason: %T" + "\r\n", *pid, sockRef, recvRef, reason); + */ + + return esock_send_socket_msg(env, sockRef, esock_atom_abort, info, pid); +} + + +/* *** esock_send_socket_msg *** + * + * This function sends a general purpose socket message to an erlang + * process. A general 'socket' message has the form: + * + * {'$socket', SockRef, Tag, Info} + * + */ + +static +char* esock_send_socket_msg(ErlNifEnv* env, + ERL_NIF_TERM sockRef, + ERL_NIF_TERM tag, + ERL_NIF_TERM info, + ErlNifPid* pid) +{ + ERL_NIF_TERM msg = MKT4(env, esock_atom_socket_tag, sockRef, tag, info); + + return esock_send_msg(env, msg, pid); } /* Send a message to the specified process. */ static -char* send_msg(ErlNifEnv* env, - ERL_NIF_TERM msg, - ErlNifPid* pid) +char* esock_send_msg(ErlNifEnv* env, + ERL_NIF_TERM msg, + ErlNifPid* pid) { - if (!enif_send(env, pid, NULL, msg)) - return str_exsend; - else - return NULL; + if (!enif_send(env, pid, NULL, msg)) + return str_exsend; + else + return NULL; } @@ -17022,10 +17130,11 @@ void socket_stop(ErlNifEnv* env, void* obj, int fd, int is_direct_call) SSDBG( descP, ("SOCKET", "socket_stop -> " "send abort message to current writer %T\r\n", descP->currentWriter.pid) ); - if (send_msg_nif_abort(env, - descP->currentWriter.ref, - atom_closed, - &descP->currentWriter.pid) != NULL) { + if (esock_send_abort_msg(env, + esock_atom_undefined, + descP->currentWriter.ref, + atom_closed, + &descP->currentWriter.pid) != NULL) { /* Shall we really do this? * This happens if the controlling process has been killed! */ @@ -17061,10 +17170,11 @@ void socket_stop(ErlNifEnv* env, void* obj, int fd, int is_direct_call) SSDBG( descP, ("SOCKET", "socket_stop -> " "send abort message to current reader %T\r\n", descP->currentReader.pid) ); - if (send_msg_nif_abort(env, - descP->currentReader.ref, - atom_closed, - &descP->currentReader.pid) != NULL) { + if (esock_send_abort_msg(env, + esock_atom_undefined, + descP->currentReader.ref, + atom_closed, + &descP->currentReader.pid) != NULL) { /* Shall we really do this? * This happens if the controlling process has been killed! */ @@ -17099,10 +17209,11 @@ void socket_stop(ErlNifEnv* env, void* obj, int fd, int is_direct_call) SSDBG( descP, ("SOCKET", "socket_stop -> " "send abort message to current acceptor %T\r\n", descP->currentWriter.pid) ); - if (send_msg_nif_abort(env, - descP->currentAcceptor.ref, - atom_closed, - &descP->currentAcceptor.pid) != NULL) { + if (esock_send_abort_msg(env, + esock_atom_undefined, + descP->currentAcceptor.ref, + atom_closed, + &descP->currentAcceptor.pid) != NULL) { /* Shall we really do this? * This happens if the controlling process has been killed! */ @@ -17145,13 +17256,12 @@ void socket_stop(ErlNifEnv* env, void* obj, int fd, int is_direct_call) * Also, we should really *always* use a tag unique to this * (nif-) module. Some like (in this case): * - * {'$socket', close, CloseRef} + * {'$socket', undefined, close, CloseRef} * * </KOLLA> */ - - send_msg(env, - MKT2(env, atom_close, descP->closeRef), &descP->closerPid); + + esock_send_close_msg(env, descP->closeRef, &descP->closerPid); DEMONP("socket_stop -> closer", env, descP, &descP->closerMon); @@ -17221,13 +17331,14 @@ void inform_waiting_procs(ErlNifEnv* env, */ SSDBG( descP, - ("SOCKET", "inform_waiting_procs -> abort %T (%T)\r\n", + ("SOCKET", "inform_waiting_procs -> abort request %T (from %T)\r\n", currentP->data.ref, currentP->data.pid) ); - ESOCK_ASSERT( (NULL == send_msg_nif_abort(env, - currentP->data.ref, - reason, - ¤tP->data.pid)) ); + ESOCK_ASSERT( (NULL == esock_send_abort_msg(env, + esock_atom_undefined, + currentP->data.ref, + reason, + ¤tP->data.pid)) ); DEMONP("inform_waiting_procs -> current 'request'", env, descP, ¤tP->data.mon); @@ -17783,7 +17894,7 @@ int on_load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info) atom_min = MKA(env, str_min); atom_mode = MKA(env, str_mode); atom_multiaddr = MKA(env, str_multiaddr); - atom_nif_abort = MKA(env, str_nif_abort); + // atom_nif_abort = MKA(env, str_nif_abort); atom_null = MKA(env, str_null); atom_num_dinet = MKA(env, str_num_dinet); atom_num_dinet6 = MKA(env, str_num_dinet6); @@ -17813,6 +17924,7 @@ int on_load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info) atom_want = MKA(env, str_want); /* Global atom(s) */ + esock_atom_abort = MKA(env, "abort"); esock_atom_accept = MKA(env, "accept"); esock_atom_acceptconn = MKA(env, "acceptconn"); esock_atom_acceptfilter = MKA(env, "acceptfilter"); @@ -17836,6 +17948,7 @@ int on_load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info) esock_atom_broadcast = MKA(env, "broadcast"); esock_atom_busy_poll = MKA(env, "busy_poll"); esock_atom_checksum = MKA(env, "checksum"); + esock_atom_close = MKA(env, "close"); esock_atom_connect = MKA(env, "connect"); esock_atom_congestion = MKA(env, "congestion"); esock_atom_context = MKA(env, "context"); @@ -17979,6 +18092,7 @@ int on_load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info) esock_atom_sndlowat = MKA(env, "sndlowat"); esock_atom_sndtimeo = MKA(env, "sndtimeo"); esock_atom_socket = MKA(env, "socket"); + esock_atom_socket_tag = MKA(env, "$socket"); esock_atom_spec_dst = MKA(env, "spec_dst"); esock_atom_status = MKA(env, "status"); esock_atom_stream = MKA(env, "stream"); diff --git a/erts/emulator/test/Makefile b/erts/emulator/test/Makefile index 09bfe6f104..300270a10e 100644 --- a/erts/emulator/test/Makefile +++ b/erts/emulator/test/Makefile @@ -33,6 +33,7 @@ SOCKET_MODULES = \ socket_server \ socket_client \ socket_test_lib \ + socket_test_logger \ socket_test_evaluator \ socket_test_ttest_lib \ socket_test_ttest_tcp_gen \ diff --git a/erts/emulator/test/socket_SUITE.erl b/erts/emulator/test/socket_SUITE.erl index 71e32f8e95..57f562f24d 100644 --- a/erts/emulator/test/socket_SUITE.erl +++ b/erts/emulator/test/socket_SUITE.erl @@ -149,6 +149,7 @@ -define(TT(T), ct:timetrap(T)). -define(LIB, socket_test_lib). +-define(LOGGER, socket_test_logger). -define(TPP_SMALL, lists:seq(1, 8)). -define(TPP_MEDIUM, lists:flatten(lists:duplicate(1024, ?TPP_SMALL))). @@ -335,15 +336,19 @@ traffic_cases() -> %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% init_per_suite(Config) -> + ?LOGGER:start(), Config. end_per_suite(_) -> + ?LOGGER:stop(), ok. init_per_testcase(_TC, Config) -> + ?LOGGER:start(), Config. end_per_testcase(_TC, Config) -> + ?LOGGER:stop(), Config. @@ -1798,6 +1803,10 @@ api_to_connect_tcp(InitState) -> [{tester, Tester}]) of {ok, ok = _Result} -> {ok, maps:remove(connect_limit, State)}; + {ok, {error, {connect_limit_reached,R,L}}} -> + {skip, + ?LIB:f("Connect limit reached ~w: ~w", + [L, R])}; {ok, Result} -> Result; {error, _} = ERROR -> @@ -1911,7 +1920,7 @@ api_to_connect_tcp(InitState) -> client := Client} = _State) -> case ?SEV_AWAIT_READY(Client, client, connect, [{server, Server}]) of - {ok, _} -> + ok -> ok; {error, _} = ERROR -> ERROR @@ -1968,25 +1977,25 @@ api_to_connect_tcp(InitState) -> api_toc_tcp_client_start(Node) -> Self = self(), - GL = group_leader(), - Fun = fun() -> api_toc_tcp_client(Self, GL) end, + Fun = fun() -> api_toc_tcp_client(Self) end, erlang:spawn(Node, Fun). -api_toc_tcp_client(Parent, GL) -> - api_toc_tcp_client_init(Parent, GL), +api_toc_tcp_client(Parent) -> + api_toc_tcp_client_init(Parent), ServerSA = api_toc_tcp_client_await_start(Parent), Domain = maps:get(family, ServerSA), api_toc_tcp_client_announce_ready(Parent, init), {To, ConLimit} = api_toc_tcp_client_await_continue(Parent, connect), Result = api_to_connect_tcp_await_timeout(To, ServerSA, Domain, ConLimit), + ?SEV_IPRINT("result: ~p", [Result]), api_toc_tcp_client_announce_ready(Parent, connect, Result), Reason = api_toc_tcp_client_await_terminate(Parent), exit(Reason). -api_toc_tcp_client_init(Parent, GL) -> +api_toc_tcp_client_init(Parent) -> + put(sname, "rclient"), %% i("api_toc_tcp_client_init -> entry"), _MRef = erlang:monitor(process, Parent), - group_leader(self(), GL), ok. api_toc_tcp_client_await_start(Parent) -> @@ -3670,6 +3679,7 @@ sc_lc_receive_response_tcp(InitState) -> end}, #{desc => "close the connection socket", cmd => fun(#{csock := Sock} = State) -> + %% ok = socket:setopt(Sock, otp, debug, true), case socket:close(Sock) of ok -> {ok, maps:remove(csock, State)}; @@ -3709,7 +3719,8 @@ sc_lc_receive_response_tcp(InitState) -> ?SEV_FINISH_NORMAL ], - %% The point of this is to perform the recv for which we are testing the reponse + %% The point of this is to perform the recv for which + %% we are testing the reponse. HandlerSeq = [ %% *** Wait for start order part *** @@ -3757,7 +3768,8 @@ sc_lc_receive_response_tcp(InitState) -> ?SEV_EPRINT("Unexpected data received"), {error, unexpected_success}; {error, closed} -> - ?SEV_IPRINT("received expected 'closed' result"), + ?SEV_IPRINT("received expected 'closed' " + "result"), State1 = maps:remove(sock, State), {ok, State1}; {error, Reason} = ERROR -> @@ -5805,13 +5817,12 @@ sc_rc_receive_response_tcp(InitState) -> sc_rc_tcp_client_start(Node) -> Self = self(), - GL = group_leader(), - Fun = fun() -> sc_rc_tcp_client(Self, GL) end, + Fun = fun() -> sc_rc_tcp_client(Self) end, erlang:spawn(Node, Fun). -sc_rc_tcp_client(Parent, GL) -> - sc_rc_tcp_client_init(Parent, GL), +sc_rc_tcp_client(Parent) -> + sc_rc_tcp_client_init(Parent), ServerSA = sc_rc_tcp_client_await_start(Parent), Domain = maps:get(family, ServerSA), Sock = sc_rc_tcp_client_create(Domain), @@ -5824,12 +5835,13 @@ sc_rc_tcp_client(Parent, GL) -> sc_rc_tcp_client_close(Sock), sc_rc_tcp_client_announce_ready(Parent, close), Reason = sc_rc_tcp_client_await_terminate(Parent), + ?SEV_IPRINT("terminate"), exit(Reason). -sc_rc_tcp_client_init(Parent, GL) -> - i("sc_rc_tcp_client_init -> entry"), +sc_rc_tcp_client_init(Parent) -> + put(sname, "rclient"), + ?SEV_IPRINT("init"), _MRef = erlang:monitor(process, Parent), - group_leader(self(), GL), ok. sc_rc_tcp_client_await_start(Parent) -> @@ -6632,13 +6644,12 @@ sc_rs_send_shutdown_receive_tcp(InitState) -> sc_rs_tcp_client_start(Node, Send) -> Self = self(), - GL = group_leader(), - Fun = fun() -> sc_rs_tcp_client(Self, Send, GL) end, + Fun = fun() -> sc_rs_tcp_client(Self, Send) end, erlang:spawn(Node, Fun). -sc_rs_tcp_client(Parent, Send, GL) -> - sc_rs_tcp_client_init(Parent, GL), +sc_rs_tcp_client(Parent, Send) -> + sc_rs_tcp_client_init(Parent), ServerSA = sc_rs_tcp_client_await_start(Parent), Domain = maps:get(family, ServerSA), Sock = sc_rs_tcp_client_create(Domain), @@ -6657,12 +6668,13 @@ sc_rs_tcp_client(Parent, Send, GL) -> sc_rs_tcp_client_close(Sock), sc_rs_tcp_client_announce_ready(Parent, close), Reason = sc_rs_tcp_client_await_terminate(Parent), + ?SEV_IPRINT("terminate"), exit(Reason). -sc_rs_tcp_client_init(Parent, GL) -> - i("sc_rs_tcp_client_init -> entry"), +sc_rs_tcp_client_init(Parent) -> + put(sname, "rclient"), + ?SEV_IPRINT("init"), _MRef = erlang:monitor(process, Parent), - group_leader(self(), GL), ok. sc_rs_tcp_client_await_start(Parent) -> @@ -7781,12 +7793,11 @@ traffic_send_and_recv_chunks_tcp(InitState) -> traffic_snr_tcp_client_start(Node) -> Self = self(), - GL = group_leader(), - Fun = fun() -> traffic_snr_tcp_client(Self, GL) end, + Fun = fun() -> traffic_snr_tcp_client(Self) end, erlang:spawn(Node, Fun). -traffic_snr_tcp_client(Parent, GL) -> - {Sock, ServerSA} = traffic_snr_tcp_client_init(Parent, GL), +traffic_snr_tcp_client(Parent) -> + {Sock, ServerSA} = traffic_snr_tcp_client_init(Parent), traffic_snr_tcp_client_announce_ready(Parent, init), traffic_snr_tcp_client_await_continue(Parent, connect), traffic_snr_tcp_client_connect(Sock, ServerSA), @@ -7815,10 +7826,10 @@ traffic_snr_tcp_client_send_loop(Parent, Sock) -> exit({await_continue, Reason}) end. -traffic_snr_tcp_client_init(Parent, GL) -> - i("traffic_snr_tcp_client_init -> entry"), +traffic_snr_tcp_client_init(Parent) -> + put(sname, "rclient"), + ?SEV_IPRINT("init"), _MRef = erlang:monitor(process, Parent), - group_leader(self(), GL), ServerSA = traffic_snr_tcp_client_await_start(Parent), Domain = maps:get(family, ServerSA), Sock = traffic_snr_tcp_client_create(Domain), @@ -8145,8 +8156,9 @@ traffic_ping_pong_medium_sendto_and_recvfrom_udp6(_Config) when is_list(_Config) Num = ?TPP_MEDIUM_NUM, tc_try(traffic_ping_pong_medium_sendto_and_recvfrom_udp6, fun() -> + not_yet_implemented(), ?TT(?SECS(45)), - InitState = #{domain => inet, + InitState = #{domain => inet6, msg => Msg, num => Num}, ok = traffic_ping_pong_sendto_and_recvfrom_udp(InitState) @@ -8200,7 +8212,7 @@ traffic_ping_pong_small_sendmsg_and_recvmsg_tcp6(_Config) when is_list(_Config) fun() -> not_yet_implemented(), ?TT(?SECS(20)), - InitState = #{domain => inet, + InitState = #{domain => inet6, msg => Msg, num => Num}, ok = traffic_ping_pong_sendmsg_and_recvmsg_tcp(InitState) @@ -8470,6 +8482,9 @@ traffic_ping_pong_send_and_receive_tcp(#{msg := Msg} = InitState) -> true -> ok end, + + + ok = socket:setopt(Sock, otp, rcvbuf, 8*1024) end, traffic_ping_pong_send_and_receive_tcp2(InitState#{buf_init => Fun}). @@ -9108,12 +9123,11 @@ tpp_tcp_handler_msg_exchange_loop(Sock, Send, Recv, N, Sent, Received, Start) -> tpp_tcp_client_create(Node) -> Self = self(), - GL = group_leader(), - Fun = fun() -> tpp_tcp_client(Self, GL) end, + Fun = fun() -> tpp_tcp_client(Self) end, erlang:spawn(Node, Fun). -tpp_tcp_client(Parent, GL) -> - tpp_tcp_client_init(Parent, GL), +tpp_tcp_client(Parent) -> + tpp_tcp_client_init(Parent), {ServerSA, BufInit, Send, Recv} = tpp_tcp_client_await_start(Parent), Domain = maps:get(family, ServerSA), Sock = tpp_tcp_client_sock_open(Domain, BufInit), @@ -9130,11 +9144,10 @@ tpp_tcp_client(Parent, GL) -> ?SEV_IPRINT("terminating"), exit(Reason). -tpp_tcp_client_init(Parent, GL) -> +tpp_tcp_client_init(Parent) -> put(sname, "rclient"), ?SEV_IPRINT("init"), _MRef = erlang:monitor(process, Parent), - group_leader(self(), GL), ok. tpp_tcp_client_await_start(Parent) -> @@ -9361,20 +9374,23 @@ traffic_ping_pong_sendmsg_and_recvmsg_udp(InitState) -> traffic_ping_pong_send_and_receive_udp(#{msg := Msg} = InitState) -> Fun = fun(Sock) -> {ok, RcvSz} = socket:getopt(Sock, socket, rcvbuf), - if (RcvSz < size(Msg)) -> + if (RcvSz =< (8+size(Msg))) -> + i("adjust socket rcvbuf buffer size"), ok = socket:setopt(Sock, socket, rcvbuf, 1024+size(Msg)); true -> ok end, {ok, SndSz} = socket:getopt(Sock, socket, sndbuf), - if (SndSz < size(Msg)) -> + if (SndSz =< (8+size(Msg))) -> + i("adjust socket sndbuf buffer size"), ok = socket:setopt(Sock, socket, sndbuf, 1024+size(Msg)); true -> ok end, {ok, OtpRcvBuf} = socket:getopt(Sock, otp, rcvbuf), if - (OtpRcvBuf < size(Msg)) -> + (OtpRcvBuf =< (8+size(Msg))) -> + i("adjust otp rcvbuf buffer size"), ok = socket:setopt(Sock, otp, rcvbuf, 1024+size(Msg)); true -> ok @@ -9836,6 +9852,7 @@ traffic_ping_pong_send_and_receive_udp2(InitState) -> ?SEV_FINISH_NORMAL ], + i("start server evaluator"), ServerInitState = #{domain => maps:get(domain, InitState), recv => maps:get(recv, InitState), @@ -9883,7 +9900,8 @@ tpp_udp_server_handler_init(Parent) -> ok. tpp_udp_server_handler_msg_exchange(Sock, Send, Recv) -> - tpp_udp_server_handler_msg_exchange_loop(Sock, Send, Recv, 0, 0, 0, undefined). + tpp_udp_server_handler_msg_exchange_loop(Sock, Send, Recv, + 0, 0, 0, undefined). tpp_udp_server_handler_msg_exchange_loop(Sock, Send, Recv, N, Sent, Received, Start) -> @@ -9919,7 +9937,8 @@ tpp_udp_server_handler_msg_exchange_loop(Sock, Send, Recv, %% exit({'ping-send', Reason, N}) %% end; {error, closed} -> - ?SEV_IPRINT("closed - we are done: ~w, ~w, ~w", [N, Sent, Received]), + ?SEV_IPRINT("closed - we are done: ~w, ~w, ~w", + [N, Sent, Received]), Stop = ?LIB:timestamp(), {N, Sent, Received, Start, Stop}; {error, RReason} -> @@ -9932,36 +9951,45 @@ tpp_udp_server_handler_msg_exchange_loop(Sock, Send, Recv, tpp_udp_client_handler_create(Node) -> Self = self(), - GL = group_leader(), - Fun = fun() -> tpp_udp_client_handler(Self, GL) end, + Fun = fun() -> put(sname, "chandler"), tpp_udp_client_handler(Self) end, erlang:spawn(Node, Fun). -tpp_udp_client_handler(Parent, GL) -> - tpp_udp_client_handler_init(Parent, GL), +tpp_udp_client_handler(Parent) -> + tpp_udp_client_handler_init(Parent), + ?SEV_IPRINT("await start command"), {ServerSA, BufInit, Send, Recv} = tpp_udp_handler_await_start(Parent), + ?SEV_IPRINT("start command with" + "~n ServerSA: ~p", [ServerSA]), Domain = maps:get(family, ServerSA), Sock = tpp_udp_sock_open(Domain, BufInit), tpp_udp_sock_bind(Sock, Domain), + ?SEV_IPRINT("announce ready", []), tpp_udp_handler_announce_ready(Parent, init), {InitMsg, Num} = tpp_udp_handler_await_continue(Parent, send), + ?SEV_IPRINT("received continue with" + "~n Num: ~p", [Num]), Result = tpp_udp_client_handler_msg_exchange(Sock, ServerSA, Send, Recv, InitMsg, Num), + ?SEV_IPRINT("ready"), tpp_udp_handler_announce_ready(Parent, send, Result), + ?SEV_IPRINT("await terminate"), Reason = tpp_udp_handler_await_terminate(Parent), + ?SEV_IPRINT("terminate with ~p", [Reason]), tpp_udp_sock_close(Sock), ?SEV_IPRINT("terminating"), exit(Reason). -tpp_udp_client_handler_init(Parent, GL) -> +tpp_udp_client_handler_init(Parent) -> put(sname, "chandler"), ?SEV_IPRINT("init"), _MRef = erlang:monitor(process, Parent), - group_leader(self(), GL), ok. -tpp_udp_client_handler_msg_exchange(Sock, ServerSA, Send, Recv, InitMsg, Num) -> +tpp_udp_client_handler_msg_exchange(Sock, ServerSA, + Send, Recv, InitMsg, Num) -> Start = ?LIB:timestamp(), - tpp_udp_client_handler_msg_exchange_loop(Sock, ServerSA, Send, Recv, InitMsg, + tpp_udp_client_handler_msg_exchange_loop(Sock, ServerSA, + Send, Recv, InitMsg, Num, 0, 0, 0, Start). tpp_udp_client_handler_msg_exchange_loop(_Sock, _Dest, _Send, _Recv, _Msg, @@ -9969,16 +9997,14 @@ tpp_udp_client_handler_msg_exchange_loop(_Sock, _Dest, _Send, _Recv, _Msg, Start) -> Stop = ?LIB:timestamp(), {Sent, Received, Start, Stop}; -tpp_udp_client_handler_msg_exchange_loop(Sock, Dest, Send, Recv, Data, +tpp_udp_client_handler_msg_exchange_loop(Sock, Dest, Send, Recv, Data, Num, N, Sent, Received, Start) -> - %% d("tpp_udp_client_handler_msg_exchange_loop(~w,~w) try send", [Num,N]), case tpp_udp_send_req(Sock, Send, Data, Dest) of {ok, SendSz} -> - %% d("tpp_tcp_client_msg_exchange_loop(~w,~w) sent - " - %% "now try recv", [Num,N]), case tpp_udp_recv_rep(Sock, Recv) of {ok, NewData, RecvSz, Dest} -> - tpp_udp_client_handler_msg_exchange_loop(Sock, Dest, Send, Recv, + tpp_udp_client_handler_msg_exchange_loop(Sock, Dest, + Send, Recv, NewData, Num, N+1, Sent+SendSz, Received+RecvSz, @@ -10008,55 +10034,17 @@ tpp_udp_recv(Sock, Recv, Tag) -> %% "~n Source: ~p" %% "~n Tag: ~p" %% "~n Sz: ~p" - %% "~n size(Data): ~p", [Source, Tag, Sz, size(Data)]), + %% "~n size(Data): ~p", + %% [Source, Tag, Sz, size(Data)]), {ok, Data, size(Msg), Source}; - {ok, {Source, <<Tag:32/integer, Sz:32/integer, Data/binary>> = Msg}} -> - %% ?SEV_IPRINT("tpp_udp_recv -> got part: " - %% "~n Source: ~p" - %% "~n Tag: ~p" - %% "~n Sz: ~p" - %% "~n size(Data): ~p", [Source, Tag, Sz, size(Data)]), - Remains = Sz - size(Data), - tpp_tcp_recv(Sock, Source, Recv, Tag, Remains, size(Msg), [Data]); + {ok, {_Source, <<Tag:32/integer, Sz:32/integer, Data/binary>>}} -> + {error, {invalid_msg, Sz, size(Data)}}; {ok, {_, <<Tag:32/integer, _/binary>>}} -> {error, {invalid_msg_tag, Tag}}; {error, _} = ERROR -> ERROR end. -%% We match against Source since we only communicate with one peer -tpp_tcp_recv(Sock, Source, Recv, Tag, Remaining, AccSz, Acc) -> - %% ?SEV_IPRINT("tpp_tcp_recv -> entry with" - %% "~n Tag: ~p" - %% "~n Remaining: ~p" - %% "~n AccSz: ~p" - %% "~n RcvBuf: ~p" - %% "~n SndBuf: ~p", - %% [Tag, Remaining, AccSz, - %% socket:getopt(Sock, socket, rcvbuf), - %% socket:getopt(Sock, socket, sndbuf)]), - case Recv(Sock, Remaining) of - {ok, {Source, Data}} when (Remaining =:= size(Data)) -> - %% ?SEV_IPRINT("tpp_udp_recv -> got rest: " - %% "~n Source: ~p" - %% "~n size(Data): ~p", [Source, size(Data)]), - %% We got the rest - TotSz = AccSz + size(Data), - {ok, - erlang:iolist_to_binary(lists:reverse([Data | Acc])), - TotSz, Source}; - {ok, {Source, Data}} when (Remaining > size(Data)) -> - %% ?SEV_IPRINT("tpp_udp_recv -> got part of rest: " - %% "~n Source: ~p" - %% "~n size(Data): ~p", [Source, size(Data)]), - tpp_tcp_recv(Sock, Source, Recv, Tag, - Remaining - size(Data), AccSz + size(Data), - [Data | Acc]); - {error, _} = ERROR -> - ERROR - end. - - tpp_udp_send_req(Sock, Send, Data, Dest) -> tpp_udp_send(Sock, Send, ?TPP_REQUEST, Data, Dest). @@ -10069,15 +10057,6 @@ tpp_udp_send(Sock, Send, Tag, Data, Dest) -> tpp_udp_send_msg(Sock, Send, Msg, Dest, 0). tpp_udp_send_msg(Sock, Send, Msg, Dest, AccSz) when is_binary(Msg) -> - %% d("tpp_udp_send_msg -> entry with" - %% "~n size(Msg): ~p" - %% "~n Dest: ~p" - %% "~n AccSz: ~p" - %% "~n RcvBuf: ~p" - %% "~n SndBuf: ~p", - %% [size(Msg), Dest, AccSz, - %% socket:getopt(Sock, socket, rcvbuf), - %% socket:getopt(Sock, socket, sndbuf)]), case Send(Sock, Msg, Dest) of ok -> {ok, AccSz+size(Msg)}; @@ -10161,6 +10140,14 @@ start_node(Host, NodeName) -> UniqueNodeName = f("~w_~w", [NodeName, erlang:system_time(millisecond)]), case do_start_node(Host, UniqueNodeName) of {ok, _} = OK -> + global:sync(), + %% i("Node ~p started: " + %% "~n Nodes: ~p" + %% "~n Logger: ~p" + %% "~n Global Names: ~p", + %% [NodeName, nodes(), + %% global:whereis_name(socket_test_logger), + %% global:registered_names()]), OK; {error, Reason, _} -> {error, Reason} diff --git a/erts/emulator/test/socket_test_evaluator.erl b/erts/emulator/test/socket_test_evaluator.erl index e8755a9512..48fb6a027e 100644 --- a/erts/emulator/test/socket_test_evaluator.erl +++ b/erts/emulator/test/socket_test_evaluator.erl @@ -76,6 +76,7 @@ %% ============================================================================ -define(LIB, socket_test_lib). +-define(LOGGER, socket_test_logger). -define(EXTRA_NOTHING, '$nothing'). -define(ANNOUNCEMENT_START, '$start'). @@ -122,6 +123,8 @@ loop(ID, [#{desc := Desc, loop(ID + 1, Cmds, State); {ok, NewState} -> loop(ID + 1, Cmds, NewState); + {skip, Reason} -> + exit({skip, Reason}); {error, Reason} -> eprint("command ~w failed: " "~n Reason: ~p", [ID, Reason]), @@ -160,6 +163,8 @@ await_finish(Evs, Fails) -> iprint("unknown process ~p died (normal)", [Pid]), await_finish(Evs, Fails) end; + {'DOWN', _MRef, process, Pid, {skip, Reason}} -> + ?LIB:skip(Reason); {'DOWN', _MRef, process, Pid, Reason} -> case lists:keysearch(Pid, #ev.pid, Evs) of {value, #ev{name = Name}} -> @@ -486,6 +491,5 @@ print(Prefix, F, A) -> SName -> f("[~s][~p]", [SName, self()]) end, - FStr = f("[~s]~s ~s" ++ F, [?LIB:formated_timestamp(), IDStr, Prefix | A]), - io:format(user, FStr ++ "~n", []), - io:format(FStr, []). + ?LOGGER:format("[~s]~s ~s" ++ F, + [?LIB:formated_timestamp(), IDStr, Prefix | A]). diff --git a/erts/emulator/test/socket_test_lib.erl b/erts/emulator/test/socket_test_lib.erl index c36cc4fbfa..f55f338ef9 100644 --- a/erts/emulator/test/socket_test_lib.erl +++ b/erts/emulator/test/socket_test_lib.erl @@ -27,13 +27,15 @@ formated_timestamp/0, format_timestamp/1, + %% String and format + f/2, + %% Skipping not_yet_implemented/0, skip/1 ]). - %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% timestamp() -> @@ -62,6 +64,12 @@ format_timestamp({_N1, _N2, _N3} = TS) -> %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +f(F, A) -> + lists:flatten(io_lib:format(F, A)). + + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + not_yet_implemented() -> skip("not yet implemented"). diff --git a/erts/emulator/test/socket_test_logger.erl b/erts/emulator/test/socket_test_logger.erl new file mode 100644 index 0000000000..5996bbe855 --- /dev/null +++ b/erts/emulator/test/socket_test_logger.erl @@ -0,0 +1,99 @@ +%% +%% %CopyrightBegin% +%% +%% Copyright Ericsson AB 2018-2018. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%% +%% %CopyrightEnd% +%% + +-module(socket_test_logger). + +-export([ + start/0, + stop/0, + format/2 + ]). + + +-define(LIB, socket_test_lib). +-define(LOGGER, ?MODULE). + + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +start() -> + case global:whereis_name(?LOGGER) of + Pid when is_pid(Pid) -> + ok; + undefined -> + Self = self(), + Pid = spawn_link(fun() -> init(Self) end), + yes = global:register_name(?LOGGER, Pid), + ok + end. + + +stop() -> + case global:whereis_name(?LOGGER) of + undefined -> + ok; + Pid when is_pid(Pid) -> + global:unregister_name(?LOGGER), + Pid ! {?LOGGER, '$logger', stop}, + ok + end. + + +format(F, []) -> + do_format(F); +format(F, A) -> + do_format(?LIB:f(F, A)). + +do_format(Msg) -> + case global:whereis_name(?LOGGER) of + undefined -> + ok; + Pid when is_pid(Pid) -> + Pid ! {?MODULE, '$logger', {msg, Msg}}, + ok + end. + +init(Parent) -> + put(sname, "logger"), + print("[~s][logger] starting~n", [?LIB:formated_timestamp()]), + loop(#{parent => Parent}). + +loop(#{parent := Parent} = State) -> + receive + {'EXIT', Parent, _} -> + print("[~s][logger] parent exit~n", [?LIB:formated_timestamp()]), + exit(normal); + + {?MODULE, '$logger', stop} -> + print("[~s][logger] stopping~n", [?LIB:formated_timestamp()]), + exit(normal); + + {?MODULE, '$logger', {msg, Msg}} -> + print(Msg), + loop(State) + end. + + +print(F, A) -> + print(?LIB:f(F, A)). + +print(Str) -> + io:format(user, Str ++ "~n", []), + io:format(Str, []). diff --git a/erts/preloaded/ebin/socket.beam b/erts/preloaded/ebin/socket.beam Binary files differindex 25046e6aad..ddd50fdefa 100644 --- a/erts/preloaded/ebin/socket.beam +++ b/erts/preloaded/ebin/socket.beam diff --git a/erts/preloaded/src/socket.erl b/erts/preloaded/src/socket.erl index a40692881b..2e295a91ae 100644 --- a/erts/preloaded/src/socket.erl +++ b/erts/preloaded/src/socket.erl @@ -1331,7 +1331,7 @@ do_accept(LSockRef, Timeout) -> {select, LSockRef, AccRef, ready_input} -> do_accept(LSockRef, next_timeout(TS, Timeout)); - {nif_abort, AccRef, Reason} -> + {'$socket', _, abort, {AccRef, Reason}} -> {error, Reason} after NewTimeout -> @@ -1408,7 +1408,7 @@ do_send(SockRef, Data, EFlags, Timeout) -> do_send(SockRef, Data, EFlags, next_timeout(TS, Timeout)); - {nif_abort, SendRef, Reason} -> + {'$socket', _, abort, {SendRef, Reason}} -> {error, Reason} after NewTimeout -> @@ -1421,7 +1421,7 @@ do_send(SockRef, Data, EFlags, Timeout) -> do_send(SockRef, Data, EFlags, next_timeout(TS, Timeout)); - {nif_abort, SendRef, Reason} -> + {'$socket', _, abort, {SendRef, Reason}} -> {error, Reason} after Timeout -> @@ -1513,7 +1513,7 @@ do_sendto(SockRef, Data, Dest, EFlags, Timeout) -> do_sendto(SockRef, Data, Dest, EFlags, next_timeout(TS, Timeout)); - {nif_abort, SendRef, Reason} -> + {'$socket', _, abort, {SendRef, Reason}} -> {error, Reason} after Timeout -> @@ -1525,7 +1525,11 @@ do_sendto(SockRef, Data, Dest, EFlags, Timeout) -> receive {select, SockRef, SendRef, ready_output} -> do_sendto(SockRef, Data, Dest, EFlags, - next_timeout(TS, Timeout)) + next_timeout(TS, Timeout)); + + {'$socket', _, abort, {SendRef, Reason}} -> + {error, Reason} + after Timeout -> cancel(SockRef, sendto, SendRef), {error, timeout} @@ -1773,10 +1777,9 @@ do_recv(SockRef, _OldRef, Length, EFlags, Acc, Timeout) Bin, next_timeout(TS, Timeout)); - {nif_abort, RecvRef, Reason} -> + {'$socket', _, abort, {RecvRef, Reason}} -> {error, Reason} - after NewTimeout -> cancel(SockRef, recv, RecvRef), {error, {timeout, Acc}} @@ -1794,10 +1797,9 @@ do_recv(SockRef, _OldRef, Length, EFlags, Acc, Timeout) <<Acc/binary, Bin/binary>>, next_timeout(TS, Timeout)); - {nif_abort, RecvRef, Reason} -> + {'$socket', _, abort, {RecvRef, Reason}} -> {error, Reason} - after NewTimeout -> cancel(SockRef, recv, RecvRef), {error, {timeout, Acc}} @@ -1805,6 +1807,7 @@ do_recv(SockRef, _OldRef, Length, EFlags, Acc, Timeout) %% We return with the accumulated binary (if its non-empty) {error, eagain} when (Length =:= 0) andalso (size(Acc) > 0) -> + %% CAN WE REALLY DO THIS? THE NIF HAS SELECTED!! OR? {ok, Acc}; {error, eagain} -> @@ -1819,7 +1822,7 @@ do_recv(SockRef, _OldRef, Length, EFlags, Acc, Timeout) Acc, next_timeout(TS, Timeout)); - {nif_abort, RecvRef, Reason} -> + {'$socket', _, abort, {RecvRef, Reason}} -> {error, Reason} after NewTimeout -> @@ -1850,7 +1853,8 @@ do_recv(SockRef, RecvRef, 0 = _Length, _Eflags, Acc, _Timeout) -> %% any waiting reader. cancel(SockRef, recv, RecvRef), {ok, Acc}; -do_recv(_SockRef, _RecvRef, _Length, _EFlags, Acc, _Timeout) when (size(Acc) > 0) -> +do_recv(_SockRef, _RecvRef, _Length, _EFlags, Acc, _Timeout) + when (size(Acc) > 0) -> {error, {timeout, Acc}}; do_recv(_SockRef, _RecvRef, _Length, _EFlags, _Acc, _Timeout) -> {error, timeout}. @@ -1957,8 +1961,7 @@ do_recvfrom(SockRef, BufSz, EFlags, Timeout) -> do_recvfrom(SockRef, BufSz, EFlags, next_timeout(TS, Timeout)); - {nif_abort, RecvRef, Reason} -> - %% p("received nif-abort: ~p", [Reason]), + {'$socket', _, abort, {RecvRef, Reason}} -> {error, Reason} after NewTimeout -> @@ -2062,7 +2065,7 @@ do_recvmsg(SockRef, BufSz, CtrlSz, EFlags, Timeout) -> do_recvmsg(SockRef, BufSz, CtrlSz, EFlags, next_timeout(TS, Timeout)); - {nif_abort, RecvRef, Reason} -> + {'$socket', _, abort, {RecvRef, Reason}} -> {error, Reason} after NewTimeout -> @@ -2107,7 +2110,8 @@ do_close(SockRef) -> {ok, CloseRef} -> %% We must wait receive - {close, CloseRef} -> + {'$socket', _, close, CloseRef} -> +%% {close, CloseRef} -> %% <KOLLA> %% %% WHAT HAPPENS IF THIS PROCESS IS KILLED |