From 342d35f457c15a9cea426e8ca83bfd52b0ec2f2e Mon Sep 17 00:00:00 2001 From: Micael Karlberg Date: Wed, 12 Dec 2018 18:37:55 +0100 Subject: [socket-nif] Message interface between socket.erl and nif updated Previously the "message interface" between the functions in socket.erl and the nif-code (socket_nif.c) was "ad hoc". This has now been changed so that we have a unified message {'$socket', SockRef | undefined, Tag, Info} This also has the added advantage of preparing the code for when we start using the new select-fucntions (with which its possible to specify your own message). This will be used in order to get around our eterm "leak" (we will use a simple counter, maintained in the nif, instead of the [Recv|Send|Acc]Ref we generate in the erlang code today. OTP-14831 --- erts/emulator/nifs/common/socket_int.h | 3 + erts/emulator/nifs/common/socket_nif.c | 314 ++++++++++++++++++++++----------- 2 files changed, 217 insertions(+), 100 deletions(-) (limited to 'erts/emulator') diff --git a/erts/emulator/nifs/common/socket_int.h b/erts/emulator/nifs/common/socket_int.h index ec17e45f25..0f973855ae 100644 --- a/erts/emulator/nifs/common/socket_int.h +++ b/erts/emulator/nifs/common/socket_int.h @@ -103,6 +103,7 @@ typedef unsigned int BOOLEAN_T; /* ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ * "Global" atoms */ +extern ERL_NIF_TERM esock_atom_abort; extern ERL_NIF_TERM esock_atom_accept; extern ERL_NIF_TERM esock_atom_acceptconn; extern ERL_NIF_TERM esock_atom_acceptfilter; @@ -126,6 +127,7 @@ extern ERL_NIF_TERM esock_atom_block_source; extern ERL_NIF_TERM esock_atom_broadcast; extern ERL_NIF_TERM esock_atom_busy_poll; extern ERL_NIF_TERM esock_atom_checksum; +extern ERL_NIF_TERM esock_atom_close; extern ERL_NIF_TERM esock_atom_connect; extern ERL_NIF_TERM esock_atom_congestion; extern ERL_NIF_TERM esock_atom_context; @@ -269,6 +271,7 @@ extern ERL_NIF_TERM esock_atom_sndbufforce; extern ERL_NIF_TERM esock_atom_sndlowat; extern ERL_NIF_TERM esock_atom_sndtimeo; extern ERL_NIF_TERM esock_atom_socket; +extern ERL_NIF_TERM esock_atom_socket_tag; extern ERL_NIF_TERM esock_atom_spec_dst; extern ERL_NIF_TERM esock_atom_status; extern ERL_NIF_TERM esock_atom_stream; diff --git a/erts/emulator/nifs/common/socket_nif.c b/erts/emulator/nifs/common/socket_nif.c index 1e0533535c..80903c487f 100644 --- a/erts/emulator/nifs/common/socket_nif.c +++ b/erts/emulator/nifs/common/socket_nif.c @@ -760,24 +760,24 @@ typedef struct { typedef struct { /* +++ The actual socket +++ */ - SOCKET sock; - HANDLE event; + SOCKET sock; + HANDLE event; /* +++ Stuff "about" the socket +++ */ - int domain; - int type; - int protocol; + int domain; + int type; + int protocol; - unsigned int state; - SocketAddress remote; - unsigned int addrLen; + unsigned int state; + SocketAddress remote; + unsigned int addrLen; - ErlNifEnv* env; + ErlNifEnv* env; /* +++ Controller (owner) process +++ */ - ErlNifPid ctrlPid; - // ErlNifMonitor ctrlMon; - ESockMonitor ctrlMon; + ErlNifPid ctrlPid; + // ErlNifMonitor ctrlMon; + ESockMonitor ctrlMon; /* +++ Write stuff +++ */ ErlNifMutex* writeMtx; @@ -996,11 +996,13 @@ static ERL_NIF_TERM naccept(ErlNifEnv* env, ERL_NIF_TERM ref); static ERL_NIF_TERM nsend(ErlNifEnv* env, SocketDescriptor* descP, + ERL_NIF_TERM sockRef, ERL_NIF_TERM sendRef, ErlNifBinary* dataP, int flags); static ERL_NIF_TERM nsendto(ErlNifEnv* env, SocketDescriptor* descP, + ERL_NIF_TERM sockRef, ERL_NIF_TERM sendRef, ErlNifBinary* dataP, int flags, @@ -1008,21 +1010,25 @@ static ERL_NIF_TERM nsendto(ErlNifEnv* env, unsigned int toAddrLen); static ERL_NIF_TERM nsendmsg(ErlNifEnv* env, SocketDescriptor* descP, + ERL_NIF_TERM sockRef, ERL_NIF_TERM sendRef, ERL_NIF_TERM eMsgHdr, int flags); static ERL_NIF_TERM nrecv(ErlNifEnv* env, SocketDescriptor* descP, + ERL_NIF_TERM sendRef, ERL_NIF_TERM recvRef, int len, int flags); static ERL_NIF_TERM nrecvfrom(ErlNifEnv* env, SocketDescriptor* descP, + ERL_NIF_TERM sockRef, ERL_NIF_TERM recvRef, uint16_t bufSz, int flags); static ERL_NIF_TERM nrecvmsg(ErlNifEnv* env, SocketDescriptor* descP, + ERL_NIF_TERM sockRef, ERL_NIF_TERM recvRef, uint16_t bufLen, uint16_t ctrlLen, @@ -1986,6 +1992,7 @@ static ERL_NIF_TERM send_check_result(ErlNifEnv* env, ssize_t written, ssize_t dataSize, int saveErrno, + ERL_NIF_TERM sockRef, ERL_NIF_TERM sendRef); static BOOLEAN_T recv_check_reader(ErlNifEnv* env, SocketDescriptor* descP, @@ -1998,6 +2005,7 @@ static ERL_NIF_TERM recv_update_current_reader(ErlNifEnv* env, SocketDescriptor* descP); static void recv_error_current_reader(ErlNifEnv* env, SocketDescriptor* descP, + ERL_NIF_TERM sockRef, ERL_NIF_TERM reason); static ERL_NIF_TERM recv_check_result(ErlNifEnv* env, SocketDescriptor* descP, @@ -2005,6 +2013,7 @@ static ERL_NIF_TERM recv_check_result(ErlNifEnv* env, int toRead, int saveErrno, ErlNifBinary* bufP, + ERL_NIF_TERM sockRef, ERL_NIF_TERM recvRef); static ERL_NIF_TERM recvfrom_check_result(ErlNifEnv* env, SocketDescriptor* descP, @@ -2013,6 +2022,7 @@ static ERL_NIF_TERM recvfrom_check_result(ErlNifEnv* env, ErlNifBinary* bufP, SocketAddress* fromAddrP, unsigned int fromAddrLen, + ERL_NIF_TERM sockRef, ERL_NIF_TERM recvRef); static ERL_NIF_TERM recvmsg_check_result(ErlNifEnv* env, SocketDescriptor* descP, @@ -2021,6 +2031,7 @@ static ERL_NIF_TERM recvmsg_check_result(ErlNifEnv* env, struct msghdr* msgHdrP, ErlNifBinary* dataBufP, ErlNifBinary* ctrlBufP, + ERL_NIF_TERM sockRef, ERL_NIF_TERM recvRef); static ERL_NIF_TERM nfinalize_connection(ErlNifEnv* env, @@ -2310,22 +2321,22 @@ static void socket_down_reader(ErlNifEnv* env, SocketDescriptor* descP, const ErlNifPid* pid); -/* -static char* send_msg_error_closed(ErlNifEnv* env, +static char* esock_send_close_msg(ErlNifEnv* env, + ERL_NIF_TERM closeRef, + ErlNifPid* pid); +static char* esock_send_abort_msg(ErlNifEnv* env, + ERL_NIF_TERM sockRef, + ERL_NIF_TERM recvRef, + ERL_NIF_TERM reason, + ErlNifPid* pid); +static char* esock_send_socket_msg(ErlNifEnv* env, + ERL_NIF_TERM sockRef, + ERL_NIF_TERM tag, + ERL_NIF_TERM info, ErlNifPid* pid); -*/ -/* -static char* send_msg_error(ErlNifEnv* env, - ERL_NIF_TERM reason, +static char* esock_send_msg(ErlNifEnv* env, + ERL_NIF_TERM msg, ErlNifPid* pid); -*/ -static char* send_msg_nif_abort(ErlNifEnv* env, - ERL_NIF_TERM ref, - ERL_NIF_TERM reason, - ErlNifPid* pid); -static char* send_msg(ErlNifEnv* env, - ERL_NIF_TERM msg, - ErlNifPid* pid); static BOOLEAN_T extract_debug(ErlNifEnv* env, ERL_NIF_TERM map); @@ -2394,7 +2405,7 @@ static char str_max_rxt[] = "max_rxt"; static char str_min[] = "min"; static char str_mode[] = "mode"; static char str_multiaddr[] = "multiaddr"; -static char str_nif_abort[] = "nif_abort"; +// static char str_nif_abort[] = "nif_abort"; static char str_null[] = "null"; static char str_num_dlocal[] = "num_domain_local"; static char str_num_dinet[] = "num_domain_inet"; @@ -2436,6 +2447,7 @@ static char str_exsend[] = "exsend"; // failed send /* *** "Global" Atoms *** */ +ERL_NIF_TERM esock_atom_abort; ERL_NIF_TERM esock_atom_accept; ERL_NIF_TERM esock_atom_acceptconn; ERL_NIF_TERM esock_atom_acceptfilter; @@ -2459,6 +2471,7 @@ ERL_NIF_TERM esock_atom_block_source; ERL_NIF_TERM esock_atom_broadcast; ERL_NIF_TERM esock_atom_busy_poll; ERL_NIF_TERM esock_atom_checksum; +ERL_NIF_TERM esock_atom_close; ERL_NIF_TERM esock_atom_connect; ERL_NIF_TERM esock_atom_congestion; ERL_NIF_TERM esock_atom_context; @@ -2598,6 +2611,7 @@ ERL_NIF_TERM esock_atom_seqpacket; ERL_NIF_TERM esock_atom_setfib; ERL_NIF_TERM esock_atom_set_peer_primary_addr; ERL_NIF_TERM esock_atom_socket; +ERL_NIF_TERM esock_atom_socket_tag; ERL_NIF_TERM esock_atom_sndbuf; ERL_NIF_TERM esock_atom_sndbufforce; ERL_NIF_TERM esock_atom_sndlowat; @@ -2665,7 +2679,7 @@ static ERL_NIF_TERM atom_max_rxt; static ERL_NIF_TERM atom_min; static ERL_NIF_TERM atom_mode; static ERL_NIF_TERM atom_multiaddr; -static ERL_NIF_TERM atom_nif_abort; +// static ERL_NIF_TERM atom_nif_abort; static ERL_NIF_TERM atom_null; static ERL_NIF_TERM atom_num_dinet; static ERL_NIF_TERM atom_num_dinet6; @@ -4929,7 +4943,7 @@ ERL_NIF_TERM naccept_listening(ErlNifEnv* env, accDescP->wCtrlSz = descP->wCtrlSz; // Inherit buffer size accRef = enif_make_resource(env, accDescP); - enif_release_resource(accDescP); // We should really store a reference ... + enif_release_resource(accDescP); accDescP->ctrlPid = caller; if (MONP("naccept_listening -> ctrl", @@ -5143,7 +5157,7 @@ ERL_NIF_TERM nif_send(ErlNifEnv* env, const ERL_NIF_TERM argv[]) { SocketDescriptor* descP; - ERL_NIF_TERM sendRef; + ERL_NIF_TERM sockRef, sendRef; ErlNifBinary sndData; unsigned int eflags; int flags; @@ -5154,13 +5168,17 @@ ERL_NIF_TERM nif_send(ErlNifEnv* env, /* Extract arguments and perform preliminary validation */ if ((argc != 4) || - !enif_get_resource(env, argv[0], sockets, (void**) &descP) || !GET_BIN(env, argv[2], &sndData) || !GET_UINT(env, argv[3], &eflags)) { return enif_make_badarg(env); } + sockRef = argv[0]; // We need this in case we send in case we send abort sendRef = argv[1]; + if (!enif_get_resource(env, sockRef, sockets, (void**) &descP)) { + return enif_make_badarg(env); + } + if (IS_CLOSED(descP) || IS_CLOSING(descP)) return esock_make_error(env, atom_closed); @@ -5170,7 +5188,7 @@ ERL_NIF_TERM nif_send(ErlNifEnv* env, "\r\n SendRef: %T" "\r\n Size of data: %d" "\r\n eFlags: %d" - "\r\n", descP->sock, argv[0], sendRef, sndData.size, eflags) ); + "\r\n", descP->sock, sockRef, sendRef, sndData.size, eflags) ); if (!IS_CONNECTED(descP)) return esock_make_error(env, atom_enotconn); @@ -5193,7 +5211,7 @@ ERL_NIF_TERM nif_send(ErlNifEnv* env, * time we do. */ - res = nsend(env, descP, sendRef, &sndData, flags); + res = nsend(env, descP, sockRef, sendRef, &sndData, flags); MUNLOCK(descP->writeMtx); @@ -5211,6 +5229,7 @@ ERL_NIF_TERM nif_send(ErlNifEnv* env, static ERL_NIF_TERM nsend(ErlNifEnv* env, SocketDescriptor* descP, + ERL_NIF_TERM sockRef, ERL_NIF_TERM sendRef, ErlNifBinary* sndDataP, int flags) @@ -5238,7 +5257,8 @@ ERL_NIF_TERM nsend(ErlNifEnv* env, save_errno = -1; // The value does not actually matter in this case return send_check_result(env, descP, - written, sndDataP->size, save_errno, sendRef); + written, sndDataP->size, save_errno, + sockRef, sendRef); } @@ -5264,7 +5284,7 @@ ERL_NIF_TERM nif_sendto(ErlNifEnv* env, const ERL_NIF_TERM argv[]) { SocketDescriptor* descP; - ERL_NIF_TERM sendRef; + ERL_NIF_TERM sockRef, sendRef; ErlNifBinary sndData; unsigned int eflags; int flags; @@ -5284,9 +5304,14 @@ ERL_NIF_TERM nif_sendto(ErlNifEnv* env, !GET_UINT(env, argv[4], &eflags)) { return enif_make_badarg(env); } + sockRef = argv[0]; // We need this in case we send in case we send abort sendRef = argv[1]; eSockAddr = argv[3]; + if (!enif_get_resource(env, sockRef, sockets, (void**) &descP)) { + return enif_make_badarg(env); + } + if (IS_CLOSED(descP) || IS_CLOSING(descP)) return esock_make_error(env, atom_closed); @@ -5298,7 +5323,7 @@ ERL_NIF_TERM nif_sendto(ErlNifEnv* env, "\r\n eSockAddr: %T" "\r\n eflags: %d" "\r\n", - descP->sock, argv[0], sendRef, sndData.size, eSockAddr, eflags) ); + descP->sock, sockRef, sendRef, sndData.size, eSockAddr, eflags) ); /* THIS TEST IS NOT CORRECT!!! */ if (!IS_OPEN(descP)) { @@ -5320,7 +5345,7 @@ ERL_NIF_TERM nif_sendto(ErlNifEnv* env, MLOCK(descP->writeMtx); - res = nsendto(env, descP, sendRef, &sndData, flags, + res = nsendto(env, descP, sockRef, sendRef, &sndData, flags, &remoteAddr, remoteAddrLen); MUNLOCK(descP->writeMtx); @@ -5336,6 +5361,7 @@ ERL_NIF_TERM nif_sendto(ErlNifEnv* env, static ERL_NIF_TERM nsendto(ErlNifEnv* env, SocketDescriptor* descP, + ERL_NIF_TERM sockRef, ERL_NIF_TERM sendRef, ErlNifBinary* dataP, int flags, @@ -5372,7 +5398,8 @@ ERL_NIF_TERM nsendto(ErlNifEnv* env, else save_errno = -1; // The value does not actually matter in this case - return send_check_result(env, descP, written, dataP->size, save_errno, sendRef); + return send_check_result(env, descP, written, dataP->size, save_errno, + sockRef, sendRef); } @@ -5395,7 +5422,7 @@ ERL_NIF_TERM nif_sendmsg(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) { - ERL_NIF_TERM res, sendRef, eMsgHdr; + ERL_NIF_TERM res, sockRef, sendRef, eMsgHdr; SocketDescriptor* descP; unsigned int eflags; int flags; @@ -5405,14 +5432,18 @@ ERL_NIF_TERM nif_sendmsg(ErlNifEnv* env, /* Extract arguments and perform preliminary validation */ if ((argc != 4) || - !enif_get_resource(env, argv[0], sockets, (void**) &descP) || !IS_MAP(env, argv[2]) || !GET_UINT(env, argv[3], &eflags)) { return enif_make_badarg(env); } + sockRef = argv[0]; // We need this in case we send in case we send abort sendRef = argv[1]; eMsgHdr = argv[2]; + if (!enif_get_resource(env, sockRef, sockets, (void**) &descP)) { + return enif_make_badarg(env); + } + if (IS_CLOSED(descP) || IS_CLOSING(descP)) return esock_make_error(env, atom_closed); @@ -5433,7 +5464,7 @@ ERL_NIF_TERM nif_sendmsg(ErlNifEnv* env, MLOCK(descP->writeMtx); - res = nsendmsg(env, descP, sendRef, eMsgHdr, flags); + res = nsendmsg(env, descP, sockRef, sendRef, eMsgHdr, flags); MUNLOCK(descP->writeMtx); @@ -5449,6 +5480,7 @@ ERL_NIF_TERM nif_sendmsg(ErlNifEnv* env, static ERL_NIF_TERM nsendmsg(ErlNifEnv* env, SocketDescriptor* descP, + ERL_NIF_TERM sockRef, ERL_NIF_TERM sendRef, ERL_NIF_TERM eMsgHdr, int flags) @@ -5589,7 +5621,8 @@ ERL_NIF_TERM nsendmsg(ErlNifEnv* env, else save_errno = -1; // OK or not complete: this value should not matter in this case - res = send_check_result(env, descP, written, dataSize, save_errno, sendRef); + res = send_check_result(env, descP, written, dataSize, save_errno, + sockRef, sendRef); FREE(iovBins); FREE(iov); @@ -5691,20 +5724,24 @@ ERL_NIF_TERM nif_recv(ErlNifEnv* env, const ERL_NIF_TERM argv[]) { SocketDescriptor* descP; - ERL_NIF_TERM recvRef; + ERL_NIF_TERM sockRef, recvRef; int len; unsigned int eflags; int flags; ERL_NIF_TERM res; if ((argc != 4) || - !enif_get_resource(env, argv[0], sockets, (void**) &descP) || !GET_INT(env, argv[2], &len) || !GET_UINT(env, argv[3], &eflags)) { return enif_make_badarg(env); } + sockRef = argv[0]; // We need this in case we send in case we send abort recvRef = argv[1]; + if (!enif_get_resource(env, sockRef, sockets, (void**) &descP)) { + return enif_make_badarg(env); + } + if (IS_CLOSED(descP) || IS_CLOSING(descP)) return esock_make_error(env, atom_closed); @@ -5729,7 +5766,7 @@ ERL_NIF_TERM nif_recv(ErlNifEnv* env, * time we do. */ - res = nrecv(env, descP, recvRef, len, flags); + res = nrecv(env, descP, sockRef, recvRef, len, flags); MUNLOCK(descP->readMtx); @@ -5746,6 +5783,7 @@ ERL_NIF_TERM nif_recv(ErlNifEnv* env, static ERL_NIF_TERM nrecv(ErlNifEnv* env, SocketDescriptor* descP, + ERL_NIF_TERM sockRef, ERL_NIF_TERM recvRef, int len, int flags) @@ -5794,6 +5832,7 @@ ERL_NIF_TERM nrecv(ErlNifEnv* env, read, len, save_errno, &buf, + sockRef, recvRef); } @@ -5829,7 +5868,7 @@ ERL_NIF_TERM nif_recvfrom(ErlNifEnv* env, const ERL_NIF_TERM argv[]) { SocketDescriptor* descP; - ERL_NIF_TERM recvRef; + ERL_NIF_TERM sockRef, recvRef; unsigned int bufSz; unsigned int eflags; int flags; @@ -5840,12 +5879,16 @@ ERL_NIF_TERM nif_recvfrom(ErlNifEnv* env, /* Extract arguments and perform preliminary validation */ if ((argc != 4) || - !enif_get_resource(env, argv[0], sockets, (void**) &descP) || !GET_UINT(env, argv[2], &bufSz) || !GET_UINT(env, argv[3], &eflags)) { return enif_make_badarg(env); } - recvRef = argv[1]; + sockRef = argv[0]; // We need this in case we send in case we send abort + recvRef = argv[1]; + + if (!enif_get_resource(env, sockRef, sockets, (void**) &descP)) { + return enif_make_badarg(env); + } if (IS_CLOSED(descP) || IS_CLOSING(descP)) return esock_make_error(env, atom_closed); @@ -5883,7 +5926,7 @@ ERL_NIF_TERM nif_recvfrom(ErlNifEnv* env, * */ - res = nrecvfrom(env, descP, recvRef, bufSz, flags); + res = nrecvfrom(env, descP, sockRef, recvRef, bufSz, flags); MUNLOCK(descP->readMtx); @@ -5900,6 +5943,7 @@ ERL_NIF_TERM nif_recvfrom(ErlNifEnv* env, static ERL_NIF_TERM nrecvfrom(ErlNifEnv* env, SocketDescriptor* descP, + ERL_NIF_TERM sockRef, ERL_NIF_TERM recvRef, uint16_t len, int flags) @@ -5951,6 +5995,7 @@ ERL_NIF_TERM nrecvfrom(ErlNifEnv* env, save_errno, &buf, &fromAddr, addrLen, + sockRef, recvRef); } @@ -5990,7 +6035,7 @@ ERL_NIF_TERM nif_recvmsg(ErlNifEnv* env, const ERL_NIF_TERM argv[]) { SocketDescriptor* descP; - ERL_NIF_TERM recvRef; + ERL_NIF_TERM sockRef, recvRef; unsigned int bufSz; unsigned int ctrlSz; unsigned int eflags; @@ -6002,14 +6047,18 @@ ERL_NIF_TERM nif_recvmsg(ErlNifEnv* env, /* Extract arguments and perform preliminary validation */ if ((argc != 5) || - !enif_get_resource(env, argv[0], sockets, (void**) &descP) || !GET_UINT(env, argv[2], &bufSz) || !GET_UINT(env, argv[3], &ctrlSz) || !GET_UINT(env, argv[4], &eflags)) { return enif_make_badarg(env); } - recvRef = argv[1]; + sockRef = argv[0]; // We need this in case we send in case we send abort + recvRef = argv[1]; + if (!enif_get_resource(env, sockRef, sockets, (void**) &descP)) { + return enif_make_badarg(env); + } + if (IS_CLOSED(descP) || IS_CLOSING(descP)) return esock_make_error(env, atom_closed); @@ -6047,7 +6096,7 @@ ERL_NIF_TERM nif_recvmsg(ErlNifEnv* env, * */ - res = nrecvmsg(env, descP, recvRef, bufSz, ctrlSz, flags); + res = nrecvmsg(env, descP, sockRef, recvRef, bufSz, ctrlSz, flags); MUNLOCK(descP->readMtx); @@ -6064,6 +6113,7 @@ ERL_NIF_TERM nif_recvmsg(ErlNifEnv* env, static ERL_NIF_TERM nrecvmsg(ErlNifEnv* env, SocketDescriptor* descP, + ERL_NIF_TERM sockRef, ERL_NIF_TERM recvRef, uint16_t bufLen, uint16_t ctrlLen, @@ -6144,6 +6194,7 @@ ERL_NIF_TERM nrecvmsg(ErlNifEnv* env, &msgHdr, data, // Needed for iov encode &ctrl, // Needed for ctrl header encode + sockRef, recvRef); } @@ -13254,6 +13305,7 @@ ERL_NIF_TERM send_check_result(ErlNifEnv* env, ssize_t written, ssize_t dataSize, int saveErrno, + ERL_NIF_TERM sockRef, ERL_NIF_TERM sendRef) { SSDBG( descP, @@ -13330,7 +13382,7 @@ ERL_NIF_TERM send_check_result(ErlNifEnv* env, while (writer_pop(env, descP, &pid, &mon, &ref)) { SSDBG( descP, ("SOCKET", "send_check_result -> abort %T\r\n", pid) ); - send_msg_nif_abort(env, ref, res, &pid); + esock_send_abort_msg(env, sockRef, ref, res, &pid); DEMONP("send_check_result -> pop'ed writer", env, descP, &mon); } @@ -13527,6 +13579,7 @@ ERL_NIF_TERM recv_update_current_reader(ErlNifEnv* env, static void recv_error_current_reader(ErlNifEnv* env, SocketDescriptor* descP, + ERL_NIF_TERM sockRef, ERL_NIF_TERM reason) { if (descP->currentReaderP != NULL) { @@ -13541,7 +13594,7 @@ void recv_error_current_reader(ErlNifEnv* env, while (reader_pop(env, descP, &pid, &mon, &ref)) { SSDBG( descP, ("SOCKET", "recv_error_current_reader -> abort %T\r\n", pid) ); - send_msg_nif_abort(env, ref, reason, &pid); + esock_send_abort_msg(env, sockRef, ref, reason, &pid); DEMONP("recv_error_current_reader -> pop'ed reader", env, descP, &mon); } @@ -13561,6 +13614,7 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env, int toRead, int saveErrno, ErlNifBinary* bufP, + ERL_NIF_TERM sockRef, ERL_NIF_TERM recvRef) { char* xres; @@ -13595,7 +13649,7 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env, * We must also notify any waiting readers! */ - recv_error_current_reader(env, descP, res); + recv_error_current_reader(env, descP, sockRef, res); FREE_BIN(bufP); @@ -13713,7 +13767,7 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env, descP->closeLocal = FALSE; descP->state = SOCKET_STATE_CLOSING; - recv_error_current_reader(env, descP, res); + recv_error_current_reader(env, descP, sockRef, res); SELECT(env, descP->sock, @@ -13751,7 +13805,7 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env, SSDBG( descP, ("SOCKET", "recv_check_result -> [%d] errno: %d\r\n", toRead, saveErrno) ); - recv_error_current_reader(env, descP, res); + recv_error_current_reader(env, descP, sockRef, res); FREE_BIN(bufP); @@ -13835,6 +13889,7 @@ ERL_NIF_TERM recvfrom_check_result(ErlNifEnv* env, ErlNifBinary* bufP, SocketAddress* fromAddrP, unsigned int fromAddrLen, + ERL_NIF_TERM sockRef, ERL_NIF_TERM recvRef) { char* xres; @@ -13876,7 +13931,7 @@ ERL_NIF_TERM recvfrom_check_result(ErlNifEnv* env, descP->closeLocal = FALSE; descP->state = SOCKET_STATE_CLOSING; - recv_error_current_reader(env, descP, res); + recv_error_current_reader(env, descP, sockRef, res); SELECT(env, descP->sock, @@ -13909,7 +13964,7 @@ ERL_NIF_TERM recvfrom_check_result(ErlNifEnv* env, ("SOCKET", "recvfrom_check_result -> errno: %d\r\n", saveErrno) ); - recv_error_current_reader(env, descP, res); + recv_error_current_reader(env, descP, sockRef, res); FREE_BIN(bufP); @@ -13962,6 +14017,7 @@ ERL_NIF_TERM recvmsg_check_result(ErlNifEnv* env, struct msghdr* msgHdrP, ErlNifBinary* dataBufP, ErlNifBinary* ctrlBufP, + ERL_NIF_TERM sockRef, ERL_NIF_TERM recvRef) { @@ -14026,7 +14082,7 @@ ERL_NIF_TERM recvmsg_check_result(ErlNifEnv* env, descP->closeLocal = FALSE; descP->state = SOCKET_STATE_CLOSING; - recv_error_current_reader(env, descP, res); + recv_error_current_reader(env, descP, sockRef, res); SELECT(env, descP->sock, @@ -14060,7 +14116,7 @@ ERL_NIF_TERM recvmsg_check_result(ErlNifEnv* env, ("SOCKET", "recvmsg_check_result -> errno: %d\r\n", saveErrno) ); - recv_error_current_reader(env, descP, res); + recv_error_current_reader(env, descP, sockRef, res); FREE_BIN(dataBufP); FREE_BIN(ctrlBufP); @@ -16350,37 +16406,89 @@ char* send_msg_error(ErlNifEnv* env, */ -/* Send an (nif-) abort message to the specified process: +/* Send an close message to the specified process: * A message in the form: * - * {nif_abort, Ref, Reason} + * {'$socket', SockRef, close, CloseRef} * - * This message is for processes that are waiting in the + * This message is for processes that is waiting in the + * erlang API (close-) function for the socket to be "closed" + * (actually that the 'stop' callback function has been called). + */ +static +char* esock_send_close_msg(ErlNifEnv* env, + ERL_NIF_TERM closeRef, + ErlNifPid* pid) +{ + return esock_send_socket_msg(env, + esock_atom_undefined, + esock_atom_close, closeRef, pid); +} + + +/* Send an abort message to the specified process: + * A message in the form: + * + * {'$socket', SockRef, abort, {RecvRef, Reason}} + * + * This message is for processes that is waiting in the * erlang API functions for a select message. */ static -char* send_msg_nif_abort(ErlNifEnv* env, - ERL_NIF_TERM ref, - ERL_NIF_TERM reason, - ErlNifPid* pid) +char* esock_send_abort_msg(ErlNifEnv* env, + ERL_NIF_TERM sockRef, + ERL_NIF_TERM recvRef, + ERL_NIF_TERM reason, + ErlNifPid* pid) { - ERL_NIF_TERM msg = MKT3(env, atom_nif_abort, ref, reason); + ERL_NIF_TERM info = MKT2(env, recvRef, reason); - return send_msg(env, msg, pid); + /* + esock_dbg_printf("SEND MSG", + "try send abort message to %T:\r\n", + "\r\n sockRef: %T" + "\r\n recvRef: %T" + "\r\n reason: %T" + "\r\n", *pid, sockRef, recvRef, reason); + */ + + return esock_send_socket_msg(env, sockRef, esock_atom_abort, info, pid); +} + + +/* *** esock_send_socket_msg *** + * + * This function sends a general purpose socket message to an erlang + * process. A general 'socket' message has the form: + * + * {'$socket', SockRef, Tag, Info} + * + */ + +static +char* esock_send_socket_msg(ErlNifEnv* env, + ERL_NIF_TERM sockRef, + ERL_NIF_TERM tag, + ERL_NIF_TERM info, + ErlNifPid* pid) +{ + ERL_NIF_TERM msg = MKT4(env, esock_atom_socket_tag, sockRef, tag, info); + + return esock_send_msg(env, msg, pid); } /* Send a message to the specified process. */ static -char* send_msg(ErlNifEnv* env, - ERL_NIF_TERM msg, - ErlNifPid* pid) +char* esock_send_msg(ErlNifEnv* env, + ERL_NIF_TERM msg, + ErlNifPid* pid) { - if (!enif_send(env, pid, NULL, msg)) - return str_exsend; - else - return NULL; + if (!enif_send(env, pid, NULL, msg)) + return str_exsend; + else + return NULL; } @@ -17022,10 +17130,11 @@ void socket_stop(ErlNifEnv* env, void* obj, int fd, int is_direct_call) SSDBG( descP, ("SOCKET", "socket_stop -> " "send abort message to current writer %T\r\n", descP->currentWriter.pid) ); - if (send_msg_nif_abort(env, - descP->currentWriter.ref, - atom_closed, - &descP->currentWriter.pid) != NULL) { + if (esock_send_abort_msg(env, + esock_atom_undefined, + descP->currentWriter.ref, + atom_closed, + &descP->currentWriter.pid) != NULL) { /* Shall we really do this? * This happens if the controlling process has been killed! */ @@ -17061,10 +17170,11 @@ void socket_stop(ErlNifEnv* env, void* obj, int fd, int is_direct_call) SSDBG( descP, ("SOCKET", "socket_stop -> " "send abort message to current reader %T\r\n", descP->currentReader.pid) ); - if (send_msg_nif_abort(env, - descP->currentReader.ref, - atom_closed, - &descP->currentReader.pid) != NULL) { + if (esock_send_abort_msg(env, + esock_atom_undefined, + descP->currentReader.ref, + atom_closed, + &descP->currentReader.pid) != NULL) { /* Shall we really do this? * This happens if the controlling process has been killed! */ @@ -17099,10 +17209,11 @@ void socket_stop(ErlNifEnv* env, void* obj, int fd, int is_direct_call) SSDBG( descP, ("SOCKET", "socket_stop -> " "send abort message to current acceptor %T\r\n", descP->currentWriter.pid) ); - if (send_msg_nif_abort(env, - descP->currentAcceptor.ref, - atom_closed, - &descP->currentAcceptor.pid) != NULL) { + if (esock_send_abort_msg(env, + esock_atom_undefined, + descP->currentAcceptor.ref, + atom_closed, + &descP->currentAcceptor.pid) != NULL) { /* Shall we really do this? * This happens if the controlling process has been killed! */ @@ -17145,13 +17256,12 @@ void socket_stop(ErlNifEnv* env, void* obj, int fd, int is_direct_call) * Also, we should really *always* use a tag unique to this * (nif-) module. Some like (in this case): * - * {'$socket', close, CloseRef} + * {'$socket', undefined, close, CloseRef} * * */ - - send_msg(env, - MKT2(env, atom_close, descP->closeRef), &descP->closerPid); + + esock_send_close_msg(env, descP->closeRef, &descP->closerPid); DEMONP("socket_stop -> closer", env, descP, &descP->closerMon); @@ -17221,13 +17331,14 @@ void inform_waiting_procs(ErlNifEnv* env, */ SSDBG( descP, - ("SOCKET", "inform_waiting_procs -> abort %T (%T)\r\n", + ("SOCKET", "inform_waiting_procs -> abort request %T (from %T)\r\n", currentP->data.ref, currentP->data.pid) ); - ESOCK_ASSERT( (NULL == send_msg_nif_abort(env, - currentP->data.ref, - reason, - ¤tP->data.pid)) ); + ESOCK_ASSERT( (NULL == esock_send_abort_msg(env, + esock_atom_undefined, + currentP->data.ref, + reason, + ¤tP->data.pid)) ); DEMONP("inform_waiting_procs -> current 'request'", env, descP, ¤tP->data.mon); @@ -17783,7 +17894,7 @@ int on_load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info) atom_min = MKA(env, str_min); atom_mode = MKA(env, str_mode); atom_multiaddr = MKA(env, str_multiaddr); - atom_nif_abort = MKA(env, str_nif_abort); + // atom_nif_abort = MKA(env, str_nif_abort); atom_null = MKA(env, str_null); atom_num_dinet = MKA(env, str_num_dinet); atom_num_dinet6 = MKA(env, str_num_dinet6); @@ -17813,6 +17924,7 @@ int on_load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info) atom_want = MKA(env, str_want); /* Global atom(s) */ + esock_atom_abort = MKA(env, "abort"); esock_atom_accept = MKA(env, "accept"); esock_atom_acceptconn = MKA(env, "acceptconn"); esock_atom_acceptfilter = MKA(env, "acceptfilter"); @@ -17836,6 +17948,7 @@ int on_load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info) esock_atom_broadcast = MKA(env, "broadcast"); esock_atom_busy_poll = MKA(env, "busy_poll"); esock_atom_checksum = MKA(env, "checksum"); + esock_atom_close = MKA(env, "close"); esock_atom_connect = MKA(env, "connect"); esock_atom_congestion = MKA(env, "congestion"); esock_atom_context = MKA(env, "context"); @@ -17979,6 +18092,7 @@ int on_load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info) esock_atom_sndlowat = MKA(env, "sndlowat"); esock_atom_sndtimeo = MKA(env, "sndtimeo"); esock_atom_socket = MKA(env, "socket"); + esock_atom_socket_tag = MKA(env, "$socket"); esock_atom_spec_dst = MKA(env, "spec_dst"); esock_atom_status = MKA(env, "status"); esock_atom_stream = MKA(env, "stream"); -- cgit v1.2.3 From 73eae9c9c3e458d6d6cffb0cdbfb6ce80a340be1 Mon Sep 17 00:00:00 2001 From: Micael Karlberg Date: Wed, 12 Dec 2018 18:39:34 +0100 Subject: [socket-nif|test] Correct the (api) connect timeout test case Assumed the wrong success value ({ok, _} instead of ok) for the api-to-connect-tcp test case. OTP-14831 --- erts/emulator/test/socket_SUITE.erl | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'erts/emulator') diff --git a/erts/emulator/test/socket_SUITE.erl b/erts/emulator/test/socket_SUITE.erl index 71e32f8e95..779cc80ad6 100644 --- a/erts/emulator/test/socket_SUITE.erl +++ b/erts/emulator/test/socket_SUITE.erl @@ -1911,7 +1911,7 @@ api_to_connect_tcp(InitState) -> client := Client} = _State) -> case ?SEV_AWAIT_READY(Client, client, connect, [{server, Server}]) of - {ok, _} -> + ok -> ok; {error, _} = ERROR -> ERROR @@ -3670,6 +3670,7 @@ sc_lc_receive_response_tcp(InitState) -> end}, #{desc => "close the connection socket", cmd => fun(#{csock := Sock} = State) -> + %% ok = socket:setopt(Sock, otp, debug, true), case socket:close(Sock) of ok -> {ok, maps:remove(csock, State)}; -- cgit v1.2.3 From aed92219aacf962a9d6c71ea448809fe2c9acae6 Mon Sep 17 00:00:00 2001 From: Micael Karlberg Date: Thu, 13 Dec 2018 16:38:52 +0100 Subject: [socket-nif|test] Add a "global" logger Added a global logger that make it possible to log from the slave nodes (with "ease"). Also "fixed" the test case that failed on "older" linux (Ubuntu 14.04). For now we let it skip instead (we should really check the OS version). Also corrected a couple of (ping-pong) cases for which the buffer adjustments did not work. OTP-14831 --- erts/emulator/test/Makefile | 1 + erts/emulator/test/socket_SUITE.erl | 198 +++++++++++++-------------- erts/emulator/test/socket_test_evaluator.erl | 10 +- erts/emulator/test/socket_test_lib.erl | 10 +- erts/emulator/test/socket_test_logger.erl | 99 ++++++++++++++ 5 files changed, 208 insertions(+), 110 deletions(-) create mode 100644 erts/emulator/test/socket_test_logger.erl (limited to 'erts/emulator') diff --git a/erts/emulator/test/Makefile b/erts/emulator/test/Makefile index 09bfe6f104..300270a10e 100644 --- a/erts/emulator/test/Makefile +++ b/erts/emulator/test/Makefile @@ -33,6 +33,7 @@ SOCKET_MODULES = \ socket_server \ socket_client \ socket_test_lib \ + socket_test_logger \ socket_test_evaluator \ socket_test_ttest_lib \ socket_test_ttest_tcp_gen \ diff --git a/erts/emulator/test/socket_SUITE.erl b/erts/emulator/test/socket_SUITE.erl index 779cc80ad6..57f562f24d 100644 --- a/erts/emulator/test/socket_SUITE.erl +++ b/erts/emulator/test/socket_SUITE.erl @@ -149,6 +149,7 @@ -define(TT(T), ct:timetrap(T)). -define(LIB, socket_test_lib). +-define(LOGGER, socket_test_logger). -define(TPP_SMALL, lists:seq(1, 8)). -define(TPP_MEDIUM, lists:flatten(lists:duplicate(1024, ?TPP_SMALL))). @@ -335,15 +336,19 @@ traffic_cases() -> %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% init_per_suite(Config) -> + ?LOGGER:start(), Config. end_per_suite(_) -> + ?LOGGER:stop(), ok. init_per_testcase(_TC, Config) -> + ?LOGGER:start(), Config. end_per_testcase(_TC, Config) -> + ?LOGGER:stop(), Config. @@ -1798,6 +1803,10 @@ api_to_connect_tcp(InitState) -> [{tester, Tester}]) of {ok, ok = _Result} -> {ok, maps:remove(connect_limit, State)}; + {ok, {error, {connect_limit_reached,R,L}}} -> + {skip, + ?LIB:f("Connect limit reached ~w: ~w", + [L, R])}; {ok, Result} -> Result; {error, _} = ERROR -> @@ -1968,25 +1977,25 @@ api_to_connect_tcp(InitState) -> api_toc_tcp_client_start(Node) -> Self = self(), - GL = group_leader(), - Fun = fun() -> api_toc_tcp_client(Self, GL) end, + Fun = fun() -> api_toc_tcp_client(Self) end, erlang:spawn(Node, Fun). -api_toc_tcp_client(Parent, GL) -> - api_toc_tcp_client_init(Parent, GL), +api_toc_tcp_client(Parent) -> + api_toc_tcp_client_init(Parent), ServerSA = api_toc_tcp_client_await_start(Parent), Domain = maps:get(family, ServerSA), api_toc_tcp_client_announce_ready(Parent, init), {To, ConLimit} = api_toc_tcp_client_await_continue(Parent, connect), Result = api_to_connect_tcp_await_timeout(To, ServerSA, Domain, ConLimit), + ?SEV_IPRINT("result: ~p", [Result]), api_toc_tcp_client_announce_ready(Parent, connect, Result), Reason = api_toc_tcp_client_await_terminate(Parent), exit(Reason). -api_toc_tcp_client_init(Parent, GL) -> +api_toc_tcp_client_init(Parent) -> + put(sname, "rclient"), %% i("api_toc_tcp_client_init -> entry"), _MRef = erlang:monitor(process, Parent), - group_leader(self(), GL), ok. api_toc_tcp_client_await_start(Parent) -> @@ -3710,7 +3719,8 @@ sc_lc_receive_response_tcp(InitState) -> ?SEV_FINISH_NORMAL ], - %% The point of this is to perform the recv for which we are testing the reponse + %% The point of this is to perform the recv for which + %% we are testing the reponse. HandlerSeq = [ %% *** Wait for start order part *** @@ -3758,7 +3768,8 @@ sc_lc_receive_response_tcp(InitState) -> ?SEV_EPRINT("Unexpected data received"), {error, unexpected_success}; {error, closed} -> - ?SEV_IPRINT("received expected 'closed' result"), + ?SEV_IPRINT("received expected 'closed' " + "result"), State1 = maps:remove(sock, State), {ok, State1}; {error, Reason} = ERROR -> @@ -5806,13 +5817,12 @@ sc_rc_receive_response_tcp(InitState) -> sc_rc_tcp_client_start(Node) -> Self = self(), - GL = group_leader(), - Fun = fun() -> sc_rc_tcp_client(Self, GL) end, + Fun = fun() -> sc_rc_tcp_client(Self) end, erlang:spawn(Node, Fun). -sc_rc_tcp_client(Parent, GL) -> - sc_rc_tcp_client_init(Parent, GL), +sc_rc_tcp_client(Parent) -> + sc_rc_tcp_client_init(Parent), ServerSA = sc_rc_tcp_client_await_start(Parent), Domain = maps:get(family, ServerSA), Sock = sc_rc_tcp_client_create(Domain), @@ -5825,12 +5835,13 @@ sc_rc_tcp_client(Parent, GL) -> sc_rc_tcp_client_close(Sock), sc_rc_tcp_client_announce_ready(Parent, close), Reason = sc_rc_tcp_client_await_terminate(Parent), + ?SEV_IPRINT("terminate"), exit(Reason). -sc_rc_tcp_client_init(Parent, GL) -> - i("sc_rc_tcp_client_init -> entry"), +sc_rc_tcp_client_init(Parent) -> + put(sname, "rclient"), + ?SEV_IPRINT("init"), _MRef = erlang:monitor(process, Parent), - group_leader(self(), GL), ok. sc_rc_tcp_client_await_start(Parent) -> @@ -6633,13 +6644,12 @@ sc_rs_send_shutdown_receive_tcp(InitState) -> sc_rs_tcp_client_start(Node, Send) -> Self = self(), - GL = group_leader(), - Fun = fun() -> sc_rs_tcp_client(Self, Send, GL) end, + Fun = fun() -> sc_rs_tcp_client(Self, Send) end, erlang:spawn(Node, Fun). -sc_rs_tcp_client(Parent, Send, GL) -> - sc_rs_tcp_client_init(Parent, GL), +sc_rs_tcp_client(Parent, Send) -> + sc_rs_tcp_client_init(Parent), ServerSA = sc_rs_tcp_client_await_start(Parent), Domain = maps:get(family, ServerSA), Sock = sc_rs_tcp_client_create(Domain), @@ -6658,12 +6668,13 @@ sc_rs_tcp_client(Parent, Send, GL) -> sc_rs_tcp_client_close(Sock), sc_rs_tcp_client_announce_ready(Parent, close), Reason = sc_rs_tcp_client_await_terminate(Parent), + ?SEV_IPRINT("terminate"), exit(Reason). -sc_rs_tcp_client_init(Parent, GL) -> - i("sc_rs_tcp_client_init -> entry"), +sc_rs_tcp_client_init(Parent) -> + put(sname, "rclient"), + ?SEV_IPRINT("init"), _MRef = erlang:monitor(process, Parent), - group_leader(self(), GL), ok. sc_rs_tcp_client_await_start(Parent) -> @@ -7782,12 +7793,11 @@ traffic_send_and_recv_chunks_tcp(InitState) -> traffic_snr_tcp_client_start(Node) -> Self = self(), - GL = group_leader(), - Fun = fun() -> traffic_snr_tcp_client(Self, GL) end, + Fun = fun() -> traffic_snr_tcp_client(Self) end, erlang:spawn(Node, Fun). -traffic_snr_tcp_client(Parent, GL) -> - {Sock, ServerSA} = traffic_snr_tcp_client_init(Parent, GL), +traffic_snr_tcp_client(Parent) -> + {Sock, ServerSA} = traffic_snr_tcp_client_init(Parent), traffic_snr_tcp_client_announce_ready(Parent, init), traffic_snr_tcp_client_await_continue(Parent, connect), traffic_snr_tcp_client_connect(Sock, ServerSA), @@ -7816,10 +7826,10 @@ traffic_snr_tcp_client_send_loop(Parent, Sock) -> exit({await_continue, Reason}) end. -traffic_snr_tcp_client_init(Parent, GL) -> - i("traffic_snr_tcp_client_init -> entry"), +traffic_snr_tcp_client_init(Parent) -> + put(sname, "rclient"), + ?SEV_IPRINT("init"), _MRef = erlang:monitor(process, Parent), - group_leader(self(), GL), ServerSA = traffic_snr_tcp_client_await_start(Parent), Domain = maps:get(family, ServerSA), Sock = traffic_snr_tcp_client_create(Domain), @@ -8146,8 +8156,9 @@ traffic_ping_pong_medium_sendto_and_recvfrom_udp6(_Config) when is_list(_Config) Num = ?TPP_MEDIUM_NUM, tc_try(traffic_ping_pong_medium_sendto_and_recvfrom_udp6, fun() -> + not_yet_implemented(), ?TT(?SECS(45)), - InitState = #{domain => inet, + InitState = #{domain => inet6, msg => Msg, num => Num}, ok = traffic_ping_pong_sendto_and_recvfrom_udp(InitState) @@ -8201,7 +8212,7 @@ traffic_ping_pong_small_sendmsg_and_recvmsg_tcp6(_Config) when is_list(_Config) fun() -> not_yet_implemented(), ?TT(?SECS(20)), - InitState = #{domain => inet, + InitState = #{domain => inet6, msg => Msg, num => Num}, ok = traffic_ping_pong_sendmsg_and_recvmsg_tcp(InitState) @@ -8471,6 +8482,9 @@ traffic_ping_pong_send_and_receive_tcp(#{msg := Msg} = InitState) -> true -> ok end, + + + ok = socket:setopt(Sock, otp, rcvbuf, 8*1024) end, traffic_ping_pong_send_and_receive_tcp2(InitState#{buf_init => Fun}). @@ -9109,12 +9123,11 @@ tpp_tcp_handler_msg_exchange_loop(Sock, Send, Recv, N, Sent, Received, Start) -> tpp_tcp_client_create(Node) -> Self = self(), - GL = group_leader(), - Fun = fun() -> tpp_tcp_client(Self, GL) end, + Fun = fun() -> tpp_tcp_client(Self) end, erlang:spawn(Node, Fun). -tpp_tcp_client(Parent, GL) -> - tpp_tcp_client_init(Parent, GL), +tpp_tcp_client(Parent) -> + tpp_tcp_client_init(Parent), {ServerSA, BufInit, Send, Recv} = tpp_tcp_client_await_start(Parent), Domain = maps:get(family, ServerSA), Sock = tpp_tcp_client_sock_open(Domain, BufInit), @@ -9131,11 +9144,10 @@ tpp_tcp_client(Parent, GL) -> ?SEV_IPRINT("terminating"), exit(Reason). -tpp_tcp_client_init(Parent, GL) -> +tpp_tcp_client_init(Parent) -> put(sname, "rclient"), ?SEV_IPRINT("init"), _MRef = erlang:monitor(process, Parent), - group_leader(self(), GL), ok. tpp_tcp_client_await_start(Parent) -> @@ -9362,20 +9374,23 @@ traffic_ping_pong_sendmsg_and_recvmsg_udp(InitState) -> traffic_ping_pong_send_and_receive_udp(#{msg := Msg} = InitState) -> Fun = fun(Sock) -> {ok, RcvSz} = socket:getopt(Sock, socket, rcvbuf), - if (RcvSz < size(Msg)) -> + if (RcvSz =< (8+size(Msg))) -> + i("adjust socket rcvbuf buffer size"), ok = socket:setopt(Sock, socket, rcvbuf, 1024+size(Msg)); true -> ok end, {ok, SndSz} = socket:getopt(Sock, socket, sndbuf), - if (SndSz < size(Msg)) -> + if (SndSz =< (8+size(Msg))) -> + i("adjust socket sndbuf buffer size"), ok = socket:setopt(Sock, socket, sndbuf, 1024+size(Msg)); true -> ok end, {ok, OtpRcvBuf} = socket:getopt(Sock, otp, rcvbuf), if - (OtpRcvBuf < size(Msg)) -> + (OtpRcvBuf =< (8+size(Msg))) -> + i("adjust otp rcvbuf buffer size"), ok = socket:setopt(Sock, otp, rcvbuf, 1024+size(Msg)); true -> ok @@ -9837,6 +9852,7 @@ traffic_ping_pong_send_and_receive_udp2(InitState) -> ?SEV_FINISH_NORMAL ], + i("start server evaluator"), ServerInitState = #{domain => maps:get(domain, InitState), recv => maps:get(recv, InitState), @@ -9884,7 +9900,8 @@ tpp_udp_server_handler_init(Parent) -> ok. tpp_udp_server_handler_msg_exchange(Sock, Send, Recv) -> - tpp_udp_server_handler_msg_exchange_loop(Sock, Send, Recv, 0, 0, 0, undefined). + tpp_udp_server_handler_msg_exchange_loop(Sock, Send, Recv, + 0, 0, 0, undefined). tpp_udp_server_handler_msg_exchange_loop(Sock, Send, Recv, N, Sent, Received, Start) -> @@ -9920,7 +9937,8 @@ tpp_udp_server_handler_msg_exchange_loop(Sock, Send, Recv, %% exit({'ping-send', Reason, N}) %% end; {error, closed} -> - ?SEV_IPRINT("closed - we are done: ~w, ~w, ~w", [N, Sent, Received]), + ?SEV_IPRINT("closed - we are done: ~w, ~w, ~w", + [N, Sent, Received]), Stop = ?LIB:timestamp(), {N, Sent, Received, Start, Stop}; {error, RReason} -> @@ -9933,36 +9951,45 @@ tpp_udp_server_handler_msg_exchange_loop(Sock, Send, Recv, tpp_udp_client_handler_create(Node) -> Self = self(), - GL = group_leader(), - Fun = fun() -> tpp_udp_client_handler(Self, GL) end, + Fun = fun() -> put(sname, "chandler"), tpp_udp_client_handler(Self) end, erlang:spawn(Node, Fun). -tpp_udp_client_handler(Parent, GL) -> - tpp_udp_client_handler_init(Parent, GL), +tpp_udp_client_handler(Parent) -> + tpp_udp_client_handler_init(Parent), + ?SEV_IPRINT("await start command"), {ServerSA, BufInit, Send, Recv} = tpp_udp_handler_await_start(Parent), + ?SEV_IPRINT("start command with" + "~n ServerSA: ~p", [ServerSA]), Domain = maps:get(family, ServerSA), Sock = tpp_udp_sock_open(Domain, BufInit), tpp_udp_sock_bind(Sock, Domain), + ?SEV_IPRINT("announce ready", []), tpp_udp_handler_announce_ready(Parent, init), {InitMsg, Num} = tpp_udp_handler_await_continue(Parent, send), + ?SEV_IPRINT("received continue with" + "~n Num: ~p", [Num]), Result = tpp_udp_client_handler_msg_exchange(Sock, ServerSA, Send, Recv, InitMsg, Num), + ?SEV_IPRINT("ready"), tpp_udp_handler_announce_ready(Parent, send, Result), + ?SEV_IPRINT("await terminate"), Reason = tpp_udp_handler_await_terminate(Parent), + ?SEV_IPRINT("terminate with ~p", [Reason]), tpp_udp_sock_close(Sock), ?SEV_IPRINT("terminating"), exit(Reason). -tpp_udp_client_handler_init(Parent, GL) -> +tpp_udp_client_handler_init(Parent) -> put(sname, "chandler"), ?SEV_IPRINT("init"), _MRef = erlang:monitor(process, Parent), - group_leader(self(), GL), ok. -tpp_udp_client_handler_msg_exchange(Sock, ServerSA, Send, Recv, InitMsg, Num) -> +tpp_udp_client_handler_msg_exchange(Sock, ServerSA, + Send, Recv, InitMsg, Num) -> Start = ?LIB:timestamp(), - tpp_udp_client_handler_msg_exchange_loop(Sock, ServerSA, Send, Recv, InitMsg, + tpp_udp_client_handler_msg_exchange_loop(Sock, ServerSA, + Send, Recv, InitMsg, Num, 0, 0, 0, Start). tpp_udp_client_handler_msg_exchange_loop(_Sock, _Dest, _Send, _Recv, _Msg, @@ -9970,16 +9997,14 @@ tpp_udp_client_handler_msg_exchange_loop(_Sock, _Dest, _Send, _Recv, _Msg, Start) -> Stop = ?LIB:timestamp(), {Sent, Received, Start, Stop}; -tpp_udp_client_handler_msg_exchange_loop(Sock, Dest, Send, Recv, Data, +tpp_udp_client_handler_msg_exchange_loop(Sock, Dest, Send, Recv, Data, Num, N, Sent, Received, Start) -> - %% d("tpp_udp_client_handler_msg_exchange_loop(~w,~w) try send", [Num,N]), case tpp_udp_send_req(Sock, Send, Data, Dest) of {ok, SendSz} -> - %% d("tpp_tcp_client_msg_exchange_loop(~w,~w) sent - " - %% "now try recv", [Num,N]), case tpp_udp_recv_rep(Sock, Recv) of {ok, NewData, RecvSz, Dest} -> - tpp_udp_client_handler_msg_exchange_loop(Sock, Dest, Send, Recv, + tpp_udp_client_handler_msg_exchange_loop(Sock, Dest, + Send, Recv, NewData, Num, N+1, Sent+SendSz, Received+RecvSz, @@ -10009,55 +10034,17 @@ tpp_udp_recv(Sock, Recv, Tag) -> %% "~n Source: ~p" %% "~n Tag: ~p" %% "~n Sz: ~p" - %% "~n size(Data): ~p", [Source, Tag, Sz, size(Data)]), + %% "~n size(Data): ~p", + %% [Source, Tag, Sz, size(Data)]), {ok, Data, size(Msg), Source}; - {ok, {Source, <> = Msg}} -> - %% ?SEV_IPRINT("tpp_udp_recv -> got part: " - %% "~n Source: ~p" - %% "~n Tag: ~p" - %% "~n Sz: ~p" - %% "~n size(Data): ~p", [Source, Tag, Sz, size(Data)]), - Remains = Sz - size(Data), - tpp_tcp_recv(Sock, Source, Recv, Tag, Remains, size(Msg), [Data]); + {ok, {_Source, <>}} -> + {error, {invalid_msg, Sz, size(Data)}}; {ok, {_, <>}} -> {error, {invalid_msg_tag, Tag}}; {error, _} = ERROR -> ERROR end. -%% We match against Source since we only communicate with one peer -tpp_tcp_recv(Sock, Source, Recv, Tag, Remaining, AccSz, Acc) -> - %% ?SEV_IPRINT("tpp_tcp_recv -> entry with" - %% "~n Tag: ~p" - %% "~n Remaining: ~p" - %% "~n AccSz: ~p" - %% "~n RcvBuf: ~p" - %% "~n SndBuf: ~p", - %% [Tag, Remaining, AccSz, - %% socket:getopt(Sock, socket, rcvbuf), - %% socket:getopt(Sock, socket, sndbuf)]), - case Recv(Sock, Remaining) of - {ok, {Source, Data}} when (Remaining =:= size(Data)) -> - %% ?SEV_IPRINT("tpp_udp_recv -> got rest: " - %% "~n Source: ~p" - %% "~n size(Data): ~p", [Source, size(Data)]), - %% We got the rest - TotSz = AccSz + size(Data), - {ok, - erlang:iolist_to_binary(lists:reverse([Data | Acc])), - TotSz, Source}; - {ok, {Source, Data}} when (Remaining > size(Data)) -> - %% ?SEV_IPRINT("tpp_udp_recv -> got part of rest: " - %% "~n Source: ~p" - %% "~n size(Data): ~p", [Source, size(Data)]), - tpp_tcp_recv(Sock, Source, Recv, Tag, - Remaining - size(Data), AccSz + size(Data), - [Data | Acc]); - {error, _} = ERROR -> - ERROR - end. - - tpp_udp_send_req(Sock, Send, Data, Dest) -> tpp_udp_send(Sock, Send, ?TPP_REQUEST, Data, Dest). @@ -10070,15 +10057,6 @@ tpp_udp_send(Sock, Send, Tag, Data, Dest) -> tpp_udp_send_msg(Sock, Send, Msg, Dest, 0). tpp_udp_send_msg(Sock, Send, Msg, Dest, AccSz) when is_binary(Msg) -> - %% d("tpp_udp_send_msg -> entry with" - %% "~n size(Msg): ~p" - %% "~n Dest: ~p" - %% "~n AccSz: ~p" - %% "~n RcvBuf: ~p" - %% "~n SndBuf: ~p", - %% [size(Msg), Dest, AccSz, - %% socket:getopt(Sock, socket, rcvbuf), - %% socket:getopt(Sock, socket, sndbuf)]), case Send(Sock, Msg, Dest) of ok -> {ok, AccSz+size(Msg)}; @@ -10162,6 +10140,14 @@ start_node(Host, NodeName) -> UniqueNodeName = f("~w_~w", [NodeName, erlang:system_time(millisecond)]), case do_start_node(Host, UniqueNodeName) of {ok, _} = OK -> + global:sync(), + %% i("Node ~p started: " + %% "~n Nodes: ~p" + %% "~n Logger: ~p" + %% "~n Global Names: ~p", + %% [NodeName, nodes(), + %% global:whereis_name(socket_test_logger), + %% global:registered_names()]), OK; {error, Reason, _} -> {error, Reason} diff --git a/erts/emulator/test/socket_test_evaluator.erl b/erts/emulator/test/socket_test_evaluator.erl index e8755a9512..48fb6a027e 100644 --- a/erts/emulator/test/socket_test_evaluator.erl +++ b/erts/emulator/test/socket_test_evaluator.erl @@ -76,6 +76,7 @@ %% ============================================================================ -define(LIB, socket_test_lib). +-define(LOGGER, socket_test_logger). -define(EXTRA_NOTHING, '$nothing'). -define(ANNOUNCEMENT_START, '$start'). @@ -122,6 +123,8 @@ loop(ID, [#{desc := Desc, loop(ID + 1, Cmds, State); {ok, NewState} -> loop(ID + 1, Cmds, NewState); + {skip, Reason} -> + exit({skip, Reason}); {error, Reason} -> eprint("command ~w failed: " "~n Reason: ~p", [ID, Reason]), @@ -160,6 +163,8 @@ await_finish(Evs, Fails) -> iprint("unknown process ~p died (normal)", [Pid]), await_finish(Evs, Fails) end; + {'DOWN', _MRef, process, Pid, {skip, Reason}} -> + ?LIB:skip(Reason); {'DOWN', _MRef, process, Pid, Reason} -> case lists:keysearch(Pid, #ev.pid, Evs) of {value, #ev{name = Name}} -> @@ -486,6 +491,5 @@ print(Prefix, F, A) -> SName -> f("[~s][~p]", [SName, self()]) end, - FStr = f("[~s]~s ~s" ++ F, [?LIB:formated_timestamp(), IDStr, Prefix | A]), - io:format(user, FStr ++ "~n", []), - io:format(FStr, []). + ?LOGGER:format("[~s]~s ~s" ++ F, + [?LIB:formated_timestamp(), IDStr, Prefix | A]). diff --git a/erts/emulator/test/socket_test_lib.erl b/erts/emulator/test/socket_test_lib.erl index c36cc4fbfa..f55f338ef9 100644 --- a/erts/emulator/test/socket_test_lib.erl +++ b/erts/emulator/test/socket_test_lib.erl @@ -27,13 +27,15 @@ formated_timestamp/0, format_timestamp/1, + %% String and format + f/2, + %% Skipping not_yet_implemented/0, skip/1 ]). - %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% timestamp() -> @@ -60,6 +62,12 @@ format_timestamp({_N1, _N2, _N3} = TS) -> lists:flatten(FormatTS). +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +f(F, A) -> + lists:flatten(io_lib:format(F, A)). + + %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% not_yet_implemented() -> diff --git a/erts/emulator/test/socket_test_logger.erl b/erts/emulator/test/socket_test_logger.erl new file mode 100644 index 0000000000..5996bbe855 --- /dev/null +++ b/erts/emulator/test/socket_test_logger.erl @@ -0,0 +1,99 @@ +%% +%% %CopyrightBegin% +%% +%% Copyright Ericsson AB 2018-2018. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%% +%% %CopyrightEnd% +%% + +-module(socket_test_logger). + +-export([ + start/0, + stop/0, + format/2 + ]). + + +-define(LIB, socket_test_lib). +-define(LOGGER, ?MODULE). + + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +start() -> + case global:whereis_name(?LOGGER) of + Pid when is_pid(Pid) -> + ok; + undefined -> + Self = self(), + Pid = spawn_link(fun() -> init(Self) end), + yes = global:register_name(?LOGGER, Pid), + ok + end. + + +stop() -> + case global:whereis_name(?LOGGER) of + undefined -> + ok; + Pid when is_pid(Pid) -> + global:unregister_name(?LOGGER), + Pid ! {?LOGGER, '$logger', stop}, + ok + end. + + +format(F, []) -> + do_format(F); +format(F, A) -> + do_format(?LIB:f(F, A)). + +do_format(Msg) -> + case global:whereis_name(?LOGGER) of + undefined -> + ok; + Pid when is_pid(Pid) -> + Pid ! {?MODULE, '$logger', {msg, Msg}}, + ok + end. + +init(Parent) -> + put(sname, "logger"), + print("[~s][logger] starting~n", [?LIB:formated_timestamp()]), + loop(#{parent => Parent}). + +loop(#{parent := Parent} = State) -> + receive + {'EXIT', Parent, _} -> + print("[~s][logger] parent exit~n", [?LIB:formated_timestamp()]), + exit(normal); + + {?MODULE, '$logger', stop} -> + print("[~s][logger] stopping~n", [?LIB:formated_timestamp()]), + exit(normal); + + {?MODULE, '$logger', {msg, Msg}} -> + print(Msg), + loop(State) + end. + + +print(F, A) -> + print(?LIB:f(F, A)). + +print(Str) -> + io:format(user, Str ++ "~n", []), + io:format(Str, []). -- cgit v1.2.3