diff options
author | Micael Karlberg <[email protected]> | 2018-12-12 18:37:55 +0100 |
---|---|---|
committer | Micael Karlberg <[email protected]> | 2018-12-14 11:46:28 +0100 |
commit | 342d35f457c15a9cea426e8ca83bfd52b0ec2f2e (patch) | |
tree | e4e98ea22598735af6972f5721195b80f0afe273 | |
parent | a33acd53524160d65929d06f06e387ce8419b1c0 (diff) | |
download | otp-342d35f457c15a9cea426e8ca83bfd52b0ec2f2e.tar.gz otp-342d35f457c15a9cea426e8ca83bfd52b0ec2f2e.tar.bz2 otp-342d35f457c15a9cea426e8ca83bfd52b0ec2f2e.zip |
[socket-nif] Message interface between socket.erl and nif updated
Previously the "message interface" between the functions in
socket.erl and the nif-code (socket_nif.c) was "ad hoc".
This has now been changed so that we have a unified message
{'$socket', SockRef | undefined, Tag, Info}
This also has the added advantage of preparing the code for
when we start using the new select-fucntions (with which
its possible to specify your own message). This will be used
in order to get around our eterm "leak" (we will use a simple
counter, maintained in the nif, instead of the [Recv|Send|Acc]Ref
we generate in the erlang code today.
OTP-14831
-rw-r--r-- | erts/emulator/nifs/common/socket_int.h | 3 | ||||
-rw-r--r-- | erts/emulator/nifs/common/socket_nif.c | 314 | ||||
-rw-r--r-- | erts/preloaded/ebin/socket.beam | bin | 69392 -> 70272 bytes | |||
-rw-r--r-- | erts/preloaded/src/socket.erl | 34 |
4 files changed, 236 insertions, 115 deletions
diff --git a/erts/emulator/nifs/common/socket_int.h b/erts/emulator/nifs/common/socket_int.h index ec17e45f25..0f973855ae 100644 --- a/erts/emulator/nifs/common/socket_int.h +++ b/erts/emulator/nifs/common/socket_int.h @@ -103,6 +103,7 @@ typedef unsigned int BOOLEAN_T; /* ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ * "Global" atoms */ +extern ERL_NIF_TERM esock_atom_abort; extern ERL_NIF_TERM esock_atom_accept; extern ERL_NIF_TERM esock_atom_acceptconn; extern ERL_NIF_TERM esock_atom_acceptfilter; @@ -126,6 +127,7 @@ extern ERL_NIF_TERM esock_atom_block_source; extern ERL_NIF_TERM esock_atom_broadcast; extern ERL_NIF_TERM esock_atom_busy_poll; extern ERL_NIF_TERM esock_atom_checksum; +extern ERL_NIF_TERM esock_atom_close; extern ERL_NIF_TERM esock_atom_connect; extern ERL_NIF_TERM esock_atom_congestion; extern ERL_NIF_TERM esock_atom_context; @@ -269,6 +271,7 @@ extern ERL_NIF_TERM esock_atom_sndbufforce; extern ERL_NIF_TERM esock_atom_sndlowat; extern ERL_NIF_TERM esock_atom_sndtimeo; extern ERL_NIF_TERM esock_atom_socket; +extern ERL_NIF_TERM esock_atom_socket_tag; extern ERL_NIF_TERM esock_atom_spec_dst; extern ERL_NIF_TERM esock_atom_status; extern ERL_NIF_TERM esock_atom_stream; diff --git a/erts/emulator/nifs/common/socket_nif.c b/erts/emulator/nifs/common/socket_nif.c index 1e0533535c..80903c487f 100644 --- a/erts/emulator/nifs/common/socket_nif.c +++ b/erts/emulator/nifs/common/socket_nif.c @@ -760,24 +760,24 @@ typedef struct { typedef struct { /* +++ The actual socket +++ */ - SOCKET sock; - HANDLE event; + SOCKET sock; + HANDLE event; /* +++ Stuff "about" the socket +++ */ - int domain; - int type; - int protocol; + int domain; + int type; + int protocol; - unsigned int state; - SocketAddress remote; - unsigned int addrLen; + unsigned int state; + SocketAddress remote; + unsigned int addrLen; - ErlNifEnv* env; + ErlNifEnv* env; /* +++ Controller (owner) process +++ */ - ErlNifPid ctrlPid; - // ErlNifMonitor ctrlMon; - ESockMonitor ctrlMon; + ErlNifPid ctrlPid; + // ErlNifMonitor ctrlMon; + ESockMonitor ctrlMon; /* +++ Write stuff +++ */ ErlNifMutex* writeMtx; @@ -996,11 +996,13 @@ static ERL_NIF_TERM naccept(ErlNifEnv* env, ERL_NIF_TERM ref); static ERL_NIF_TERM nsend(ErlNifEnv* env, SocketDescriptor* descP, + ERL_NIF_TERM sockRef, ERL_NIF_TERM sendRef, ErlNifBinary* dataP, int flags); static ERL_NIF_TERM nsendto(ErlNifEnv* env, SocketDescriptor* descP, + ERL_NIF_TERM sockRef, ERL_NIF_TERM sendRef, ErlNifBinary* dataP, int flags, @@ -1008,21 +1010,25 @@ static ERL_NIF_TERM nsendto(ErlNifEnv* env, unsigned int toAddrLen); static ERL_NIF_TERM nsendmsg(ErlNifEnv* env, SocketDescriptor* descP, + ERL_NIF_TERM sockRef, ERL_NIF_TERM sendRef, ERL_NIF_TERM eMsgHdr, int flags); static ERL_NIF_TERM nrecv(ErlNifEnv* env, SocketDescriptor* descP, + ERL_NIF_TERM sendRef, ERL_NIF_TERM recvRef, int len, int flags); static ERL_NIF_TERM nrecvfrom(ErlNifEnv* env, SocketDescriptor* descP, + ERL_NIF_TERM sockRef, ERL_NIF_TERM recvRef, uint16_t bufSz, int flags); static ERL_NIF_TERM nrecvmsg(ErlNifEnv* env, SocketDescriptor* descP, + ERL_NIF_TERM sockRef, ERL_NIF_TERM recvRef, uint16_t bufLen, uint16_t ctrlLen, @@ -1986,6 +1992,7 @@ static ERL_NIF_TERM send_check_result(ErlNifEnv* env, ssize_t written, ssize_t dataSize, int saveErrno, + ERL_NIF_TERM sockRef, ERL_NIF_TERM sendRef); static BOOLEAN_T recv_check_reader(ErlNifEnv* env, SocketDescriptor* descP, @@ -1998,6 +2005,7 @@ static ERL_NIF_TERM recv_update_current_reader(ErlNifEnv* env, SocketDescriptor* descP); static void recv_error_current_reader(ErlNifEnv* env, SocketDescriptor* descP, + ERL_NIF_TERM sockRef, ERL_NIF_TERM reason); static ERL_NIF_TERM recv_check_result(ErlNifEnv* env, SocketDescriptor* descP, @@ -2005,6 +2013,7 @@ static ERL_NIF_TERM recv_check_result(ErlNifEnv* env, int toRead, int saveErrno, ErlNifBinary* bufP, + ERL_NIF_TERM sockRef, ERL_NIF_TERM recvRef); static ERL_NIF_TERM recvfrom_check_result(ErlNifEnv* env, SocketDescriptor* descP, @@ -2013,6 +2022,7 @@ static ERL_NIF_TERM recvfrom_check_result(ErlNifEnv* env, ErlNifBinary* bufP, SocketAddress* fromAddrP, unsigned int fromAddrLen, + ERL_NIF_TERM sockRef, ERL_NIF_TERM recvRef); static ERL_NIF_TERM recvmsg_check_result(ErlNifEnv* env, SocketDescriptor* descP, @@ -2021,6 +2031,7 @@ static ERL_NIF_TERM recvmsg_check_result(ErlNifEnv* env, struct msghdr* msgHdrP, ErlNifBinary* dataBufP, ErlNifBinary* ctrlBufP, + ERL_NIF_TERM sockRef, ERL_NIF_TERM recvRef); static ERL_NIF_TERM nfinalize_connection(ErlNifEnv* env, @@ -2310,22 +2321,22 @@ static void socket_down_reader(ErlNifEnv* env, SocketDescriptor* descP, const ErlNifPid* pid); -/* -static char* send_msg_error_closed(ErlNifEnv* env, +static char* esock_send_close_msg(ErlNifEnv* env, + ERL_NIF_TERM closeRef, + ErlNifPid* pid); +static char* esock_send_abort_msg(ErlNifEnv* env, + ERL_NIF_TERM sockRef, + ERL_NIF_TERM recvRef, + ERL_NIF_TERM reason, + ErlNifPid* pid); +static char* esock_send_socket_msg(ErlNifEnv* env, + ERL_NIF_TERM sockRef, + ERL_NIF_TERM tag, + ERL_NIF_TERM info, ErlNifPid* pid); -*/ -/* -static char* send_msg_error(ErlNifEnv* env, - ERL_NIF_TERM reason, +static char* esock_send_msg(ErlNifEnv* env, + ERL_NIF_TERM msg, ErlNifPid* pid); -*/ -static char* send_msg_nif_abort(ErlNifEnv* env, - ERL_NIF_TERM ref, - ERL_NIF_TERM reason, - ErlNifPid* pid); -static char* send_msg(ErlNifEnv* env, - ERL_NIF_TERM msg, - ErlNifPid* pid); static BOOLEAN_T extract_debug(ErlNifEnv* env, ERL_NIF_TERM map); @@ -2394,7 +2405,7 @@ static char str_max_rxt[] = "max_rxt"; static char str_min[] = "min"; static char str_mode[] = "mode"; static char str_multiaddr[] = "multiaddr"; -static char str_nif_abort[] = "nif_abort"; +// static char str_nif_abort[] = "nif_abort"; static char str_null[] = "null"; static char str_num_dlocal[] = "num_domain_local"; static char str_num_dinet[] = "num_domain_inet"; @@ -2436,6 +2447,7 @@ static char str_exsend[] = "exsend"; // failed send /* *** "Global" Atoms *** */ +ERL_NIF_TERM esock_atom_abort; ERL_NIF_TERM esock_atom_accept; ERL_NIF_TERM esock_atom_acceptconn; ERL_NIF_TERM esock_atom_acceptfilter; @@ -2459,6 +2471,7 @@ ERL_NIF_TERM esock_atom_block_source; ERL_NIF_TERM esock_atom_broadcast; ERL_NIF_TERM esock_atom_busy_poll; ERL_NIF_TERM esock_atom_checksum; +ERL_NIF_TERM esock_atom_close; ERL_NIF_TERM esock_atom_connect; ERL_NIF_TERM esock_atom_congestion; ERL_NIF_TERM esock_atom_context; @@ -2598,6 +2611,7 @@ ERL_NIF_TERM esock_atom_seqpacket; ERL_NIF_TERM esock_atom_setfib; ERL_NIF_TERM esock_atom_set_peer_primary_addr; ERL_NIF_TERM esock_atom_socket; +ERL_NIF_TERM esock_atom_socket_tag; ERL_NIF_TERM esock_atom_sndbuf; ERL_NIF_TERM esock_atom_sndbufforce; ERL_NIF_TERM esock_atom_sndlowat; @@ -2665,7 +2679,7 @@ static ERL_NIF_TERM atom_max_rxt; static ERL_NIF_TERM atom_min; static ERL_NIF_TERM atom_mode; static ERL_NIF_TERM atom_multiaddr; -static ERL_NIF_TERM atom_nif_abort; +// static ERL_NIF_TERM atom_nif_abort; static ERL_NIF_TERM atom_null; static ERL_NIF_TERM atom_num_dinet; static ERL_NIF_TERM atom_num_dinet6; @@ -4929,7 +4943,7 @@ ERL_NIF_TERM naccept_listening(ErlNifEnv* env, accDescP->wCtrlSz = descP->wCtrlSz; // Inherit buffer size accRef = enif_make_resource(env, accDescP); - enif_release_resource(accDescP); // We should really store a reference ... + enif_release_resource(accDescP); accDescP->ctrlPid = caller; if (MONP("naccept_listening -> ctrl", @@ -5143,7 +5157,7 @@ ERL_NIF_TERM nif_send(ErlNifEnv* env, const ERL_NIF_TERM argv[]) { SocketDescriptor* descP; - ERL_NIF_TERM sendRef; + ERL_NIF_TERM sockRef, sendRef; ErlNifBinary sndData; unsigned int eflags; int flags; @@ -5154,13 +5168,17 @@ ERL_NIF_TERM nif_send(ErlNifEnv* env, /* Extract arguments and perform preliminary validation */ if ((argc != 4) || - !enif_get_resource(env, argv[0], sockets, (void**) &descP) || !GET_BIN(env, argv[2], &sndData) || !GET_UINT(env, argv[3], &eflags)) { return enif_make_badarg(env); } + sockRef = argv[0]; // We need this in case we send in case we send abort sendRef = argv[1]; + if (!enif_get_resource(env, sockRef, sockets, (void**) &descP)) { + return enif_make_badarg(env); + } + if (IS_CLOSED(descP) || IS_CLOSING(descP)) return esock_make_error(env, atom_closed); @@ -5170,7 +5188,7 @@ ERL_NIF_TERM nif_send(ErlNifEnv* env, "\r\n SendRef: %T" "\r\n Size of data: %d" "\r\n eFlags: %d" - "\r\n", descP->sock, argv[0], sendRef, sndData.size, eflags) ); + "\r\n", descP->sock, sockRef, sendRef, sndData.size, eflags) ); if (!IS_CONNECTED(descP)) return esock_make_error(env, atom_enotconn); @@ -5193,7 +5211,7 @@ ERL_NIF_TERM nif_send(ErlNifEnv* env, * time we do. */ - res = nsend(env, descP, sendRef, &sndData, flags); + res = nsend(env, descP, sockRef, sendRef, &sndData, flags); MUNLOCK(descP->writeMtx); @@ -5211,6 +5229,7 @@ ERL_NIF_TERM nif_send(ErlNifEnv* env, static ERL_NIF_TERM nsend(ErlNifEnv* env, SocketDescriptor* descP, + ERL_NIF_TERM sockRef, ERL_NIF_TERM sendRef, ErlNifBinary* sndDataP, int flags) @@ -5238,7 +5257,8 @@ ERL_NIF_TERM nsend(ErlNifEnv* env, save_errno = -1; // The value does not actually matter in this case return send_check_result(env, descP, - written, sndDataP->size, save_errno, sendRef); + written, sndDataP->size, save_errno, + sockRef, sendRef); } @@ -5264,7 +5284,7 @@ ERL_NIF_TERM nif_sendto(ErlNifEnv* env, const ERL_NIF_TERM argv[]) { SocketDescriptor* descP; - ERL_NIF_TERM sendRef; + ERL_NIF_TERM sockRef, sendRef; ErlNifBinary sndData; unsigned int eflags; int flags; @@ -5284,9 +5304,14 @@ ERL_NIF_TERM nif_sendto(ErlNifEnv* env, !GET_UINT(env, argv[4], &eflags)) { return enif_make_badarg(env); } + sockRef = argv[0]; // We need this in case we send in case we send abort sendRef = argv[1]; eSockAddr = argv[3]; + if (!enif_get_resource(env, sockRef, sockets, (void**) &descP)) { + return enif_make_badarg(env); + } + if (IS_CLOSED(descP) || IS_CLOSING(descP)) return esock_make_error(env, atom_closed); @@ -5298,7 +5323,7 @@ ERL_NIF_TERM nif_sendto(ErlNifEnv* env, "\r\n eSockAddr: %T" "\r\n eflags: %d" "\r\n", - descP->sock, argv[0], sendRef, sndData.size, eSockAddr, eflags) ); + descP->sock, sockRef, sendRef, sndData.size, eSockAddr, eflags) ); /* THIS TEST IS NOT CORRECT!!! */ if (!IS_OPEN(descP)) { @@ -5320,7 +5345,7 @@ ERL_NIF_TERM nif_sendto(ErlNifEnv* env, MLOCK(descP->writeMtx); - res = nsendto(env, descP, sendRef, &sndData, flags, + res = nsendto(env, descP, sockRef, sendRef, &sndData, flags, &remoteAddr, remoteAddrLen); MUNLOCK(descP->writeMtx); @@ -5336,6 +5361,7 @@ ERL_NIF_TERM nif_sendto(ErlNifEnv* env, static ERL_NIF_TERM nsendto(ErlNifEnv* env, SocketDescriptor* descP, + ERL_NIF_TERM sockRef, ERL_NIF_TERM sendRef, ErlNifBinary* dataP, int flags, @@ -5372,7 +5398,8 @@ ERL_NIF_TERM nsendto(ErlNifEnv* env, else save_errno = -1; // The value does not actually matter in this case - return send_check_result(env, descP, written, dataP->size, save_errno, sendRef); + return send_check_result(env, descP, written, dataP->size, save_errno, + sockRef, sendRef); } @@ -5395,7 +5422,7 @@ ERL_NIF_TERM nif_sendmsg(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]) { - ERL_NIF_TERM res, sendRef, eMsgHdr; + ERL_NIF_TERM res, sockRef, sendRef, eMsgHdr; SocketDescriptor* descP; unsigned int eflags; int flags; @@ -5405,14 +5432,18 @@ ERL_NIF_TERM nif_sendmsg(ErlNifEnv* env, /* Extract arguments and perform preliminary validation */ if ((argc != 4) || - !enif_get_resource(env, argv[0], sockets, (void**) &descP) || !IS_MAP(env, argv[2]) || !GET_UINT(env, argv[3], &eflags)) { return enif_make_badarg(env); } + sockRef = argv[0]; // We need this in case we send in case we send abort sendRef = argv[1]; eMsgHdr = argv[2]; + if (!enif_get_resource(env, sockRef, sockets, (void**) &descP)) { + return enif_make_badarg(env); + } + if (IS_CLOSED(descP) || IS_CLOSING(descP)) return esock_make_error(env, atom_closed); @@ -5433,7 +5464,7 @@ ERL_NIF_TERM nif_sendmsg(ErlNifEnv* env, MLOCK(descP->writeMtx); - res = nsendmsg(env, descP, sendRef, eMsgHdr, flags); + res = nsendmsg(env, descP, sockRef, sendRef, eMsgHdr, flags); MUNLOCK(descP->writeMtx); @@ -5449,6 +5480,7 @@ ERL_NIF_TERM nif_sendmsg(ErlNifEnv* env, static ERL_NIF_TERM nsendmsg(ErlNifEnv* env, SocketDescriptor* descP, + ERL_NIF_TERM sockRef, ERL_NIF_TERM sendRef, ERL_NIF_TERM eMsgHdr, int flags) @@ -5589,7 +5621,8 @@ ERL_NIF_TERM nsendmsg(ErlNifEnv* env, else save_errno = -1; // OK or not complete: this value should not matter in this case - res = send_check_result(env, descP, written, dataSize, save_errno, sendRef); + res = send_check_result(env, descP, written, dataSize, save_errno, + sockRef, sendRef); FREE(iovBins); FREE(iov); @@ -5691,20 +5724,24 @@ ERL_NIF_TERM nif_recv(ErlNifEnv* env, const ERL_NIF_TERM argv[]) { SocketDescriptor* descP; - ERL_NIF_TERM recvRef; + ERL_NIF_TERM sockRef, recvRef; int len; unsigned int eflags; int flags; ERL_NIF_TERM res; if ((argc != 4) || - !enif_get_resource(env, argv[0], sockets, (void**) &descP) || !GET_INT(env, argv[2], &len) || !GET_UINT(env, argv[3], &eflags)) { return enif_make_badarg(env); } + sockRef = argv[0]; // We need this in case we send in case we send abort recvRef = argv[1]; + if (!enif_get_resource(env, sockRef, sockets, (void**) &descP)) { + return enif_make_badarg(env); + } + if (IS_CLOSED(descP) || IS_CLOSING(descP)) return esock_make_error(env, atom_closed); @@ -5729,7 +5766,7 @@ ERL_NIF_TERM nif_recv(ErlNifEnv* env, * time we do. */ - res = nrecv(env, descP, recvRef, len, flags); + res = nrecv(env, descP, sockRef, recvRef, len, flags); MUNLOCK(descP->readMtx); @@ -5746,6 +5783,7 @@ ERL_NIF_TERM nif_recv(ErlNifEnv* env, static ERL_NIF_TERM nrecv(ErlNifEnv* env, SocketDescriptor* descP, + ERL_NIF_TERM sockRef, ERL_NIF_TERM recvRef, int len, int flags) @@ -5794,6 +5832,7 @@ ERL_NIF_TERM nrecv(ErlNifEnv* env, read, len, save_errno, &buf, + sockRef, recvRef); } @@ -5829,7 +5868,7 @@ ERL_NIF_TERM nif_recvfrom(ErlNifEnv* env, const ERL_NIF_TERM argv[]) { SocketDescriptor* descP; - ERL_NIF_TERM recvRef; + ERL_NIF_TERM sockRef, recvRef; unsigned int bufSz; unsigned int eflags; int flags; @@ -5840,12 +5879,16 @@ ERL_NIF_TERM nif_recvfrom(ErlNifEnv* env, /* Extract arguments and perform preliminary validation */ if ((argc != 4) || - !enif_get_resource(env, argv[0], sockets, (void**) &descP) || !GET_UINT(env, argv[2], &bufSz) || !GET_UINT(env, argv[3], &eflags)) { return enif_make_badarg(env); } - recvRef = argv[1]; + sockRef = argv[0]; // We need this in case we send in case we send abort + recvRef = argv[1]; + + if (!enif_get_resource(env, sockRef, sockets, (void**) &descP)) { + return enif_make_badarg(env); + } if (IS_CLOSED(descP) || IS_CLOSING(descP)) return esock_make_error(env, atom_closed); @@ -5883,7 +5926,7 @@ ERL_NIF_TERM nif_recvfrom(ErlNifEnv* env, * </KOLLA> */ - res = nrecvfrom(env, descP, recvRef, bufSz, flags); + res = nrecvfrom(env, descP, sockRef, recvRef, bufSz, flags); MUNLOCK(descP->readMtx); @@ -5900,6 +5943,7 @@ ERL_NIF_TERM nif_recvfrom(ErlNifEnv* env, static ERL_NIF_TERM nrecvfrom(ErlNifEnv* env, SocketDescriptor* descP, + ERL_NIF_TERM sockRef, ERL_NIF_TERM recvRef, uint16_t len, int flags) @@ -5951,6 +5995,7 @@ ERL_NIF_TERM nrecvfrom(ErlNifEnv* env, save_errno, &buf, &fromAddr, addrLen, + sockRef, recvRef); } @@ -5990,7 +6035,7 @@ ERL_NIF_TERM nif_recvmsg(ErlNifEnv* env, const ERL_NIF_TERM argv[]) { SocketDescriptor* descP; - ERL_NIF_TERM recvRef; + ERL_NIF_TERM sockRef, recvRef; unsigned int bufSz; unsigned int ctrlSz; unsigned int eflags; @@ -6002,14 +6047,18 @@ ERL_NIF_TERM nif_recvmsg(ErlNifEnv* env, /* Extract arguments and perform preliminary validation */ if ((argc != 5) || - !enif_get_resource(env, argv[0], sockets, (void**) &descP) || !GET_UINT(env, argv[2], &bufSz) || !GET_UINT(env, argv[3], &ctrlSz) || !GET_UINT(env, argv[4], &eflags)) { return enif_make_badarg(env); } - recvRef = argv[1]; + sockRef = argv[0]; // We need this in case we send in case we send abort + recvRef = argv[1]; + if (!enif_get_resource(env, sockRef, sockets, (void**) &descP)) { + return enif_make_badarg(env); + } + if (IS_CLOSED(descP) || IS_CLOSING(descP)) return esock_make_error(env, atom_closed); @@ -6047,7 +6096,7 @@ ERL_NIF_TERM nif_recvmsg(ErlNifEnv* env, * </KOLLA> */ - res = nrecvmsg(env, descP, recvRef, bufSz, ctrlSz, flags); + res = nrecvmsg(env, descP, sockRef, recvRef, bufSz, ctrlSz, flags); MUNLOCK(descP->readMtx); @@ -6064,6 +6113,7 @@ ERL_NIF_TERM nif_recvmsg(ErlNifEnv* env, static ERL_NIF_TERM nrecvmsg(ErlNifEnv* env, SocketDescriptor* descP, + ERL_NIF_TERM sockRef, ERL_NIF_TERM recvRef, uint16_t bufLen, uint16_t ctrlLen, @@ -6144,6 +6194,7 @@ ERL_NIF_TERM nrecvmsg(ErlNifEnv* env, &msgHdr, data, // Needed for iov encode &ctrl, // Needed for ctrl header encode + sockRef, recvRef); } @@ -13254,6 +13305,7 @@ ERL_NIF_TERM send_check_result(ErlNifEnv* env, ssize_t written, ssize_t dataSize, int saveErrno, + ERL_NIF_TERM sockRef, ERL_NIF_TERM sendRef) { SSDBG( descP, @@ -13330,7 +13382,7 @@ ERL_NIF_TERM send_check_result(ErlNifEnv* env, while (writer_pop(env, descP, &pid, &mon, &ref)) { SSDBG( descP, ("SOCKET", "send_check_result -> abort %T\r\n", pid) ); - send_msg_nif_abort(env, ref, res, &pid); + esock_send_abort_msg(env, sockRef, ref, res, &pid); DEMONP("send_check_result -> pop'ed writer", env, descP, &mon); } @@ -13527,6 +13579,7 @@ ERL_NIF_TERM recv_update_current_reader(ErlNifEnv* env, static void recv_error_current_reader(ErlNifEnv* env, SocketDescriptor* descP, + ERL_NIF_TERM sockRef, ERL_NIF_TERM reason) { if (descP->currentReaderP != NULL) { @@ -13541,7 +13594,7 @@ void recv_error_current_reader(ErlNifEnv* env, while (reader_pop(env, descP, &pid, &mon, &ref)) { SSDBG( descP, ("SOCKET", "recv_error_current_reader -> abort %T\r\n", pid) ); - send_msg_nif_abort(env, ref, reason, &pid); + esock_send_abort_msg(env, sockRef, ref, reason, &pid); DEMONP("recv_error_current_reader -> pop'ed reader", env, descP, &mon); } @@ -13561,6 +13614,7 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env, int toRead, int saveErrno, ErlNifBinary* bufP, + ERL_NIF_TERM sockRef, ERL_NIF_TERM recvRef) { char* xres; @@ -13595,7 +13649,7 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env, * We must also notify any waiting readers! */ - recv_error_current_reader(env, descP, res); + recv_error_current_reader(env, descP, sockRef, res); FREE_BIN(bufP); @@ -13713,7 +13767,7 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env, descP->closeLocal = FALSE; descP->state = SOCKET_STATE_CLOSING; - recv_error_current_reader(env, descP, res); + recv_error_current_reader(env, descP, sockRef, res); SELECT(env, descP->sock, @@ -13751,7 +13805,7 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env, SSDBG( descP, ("SOCKET", "recv_check_result -> [%d] errno: %d\r\n", toRead, saveErrno) ); - recv_error_current_reader(env, descP, res); + recv_error_current_reader(env, descP, sockRef, res); FREE_BIN(bufP); @@ -13835,6 +13889,7 @@ ERL_NIF_TERM recvfrom_check_result(ErlNifEnv* env, ErlNifBinary* bufP, SocketAddress* fromAddrP, unsigned int fromAddrLen, + ERL_NIF_TERM sockRef, ERL_NIF_TERM recvRef) { char* xres; @@ -13876,7 +13931,7 @@ ERL_NIF_TERM recvfrom_check_result(ErlNifEnv* env, descP->closeLocal = FALSE; descP->state = SOCKET_STATE_CLOSING; - recv_error_current_reader(env, descP, res); + recv_error_current_reader(env, descP, sockRef, res); SELECT(env, descP->sock, @@ -13909,7 +13964,7 @@ ERL_NIF_TERM recvfrom_check_result(ErlNifEnv* env, ("SOCKET", "recvfrom_check_result -> errno: %d\r\n", saveErrno) ); - recv_error_current_reader(env, descP, res); + recv_error_current_reader(env, descP, sockRef, res); FREE_BIN(bufP); @@ -13962,6 +14017,7 @@ ERL_NIF_TERM recvmsg_check_result(ErlNifEnv* env, struct msghdr* msgHdrP, ErlNifBinary* dataBufP, ErlNifBinary* ctrlBufP, + ERL_NIF_TERM sockRef, ERL_NIF_TERM recvRef) { @@ -14026,7 +14082,7 @@ ERL_NIF_TERM recvmsg_check_result(ErlNifEnv* env, descP->closeLocal = FALSE; descP->state = SOCKET_STATE_CLOSING; - recv_error_current_reader(env, descP, res); + recv_error_current_reader(env, descP, sockRef, res); SELECT(env, descP->sock, @@ -14060,7 +14116,7 @@ ERL_NIF_TERM recvmsg_check_result(ErlNifEnv* env, ("SOCKET", "recvmsg_check_result -> errno: %d\r\n", saveErrno) ); - recv_error_current_reader(env, descP, res); + recv_error_current_reader(env, descP, sockRef, res); FREE_BIN(dataBufP); FREE_BIN(ctrlBufP); @@ -16350,37 +16406,89 @@ char* send_msg_error(ErlNifEnv* env, */ -/* Send an (nif-) abort message to the specified process: +/* Send an close message to the specified process: * A message in the form: * - * {nif_abort, Ref, Reason} + * {'$socket', SockRef, close, CloseRef} * - * This message is for processes that are waiting in the + * This message is for processes that is waiting in the + * erlang API (close-) function for the socket to be "closed" + * (actually that the 'stop' callback function has been called). + */ +static +char* esock_send_close_msg(ErlNifEnv* env, + ERL_NIF_TERM closeRef, + ErlNifPid* pid) +{ + return esock_send_socket_msg(env, + esock_atom_undefined, + esock_atom_close, closeRef, pid); +} + + +/* Send an abort message to the specified process: + * A message in the form: + * + * {'$socket', SockRef, abort, {RecvRef, Reason}} + * + * This message is for processes that is waiting in the * erlang API functions for a select message. */ static -char* send_msg_nif_abort(ErlNifEnv* env, - ERL_NIF_TERM ref, - ERL_NIF_TERM reason, - ErlNifPid* pid) +char* esock_send_abort_msg(ErlNifEnv* env, + ERL_NIF_TERM sockRef, + ERL_NIF_TERM recvRef, + ERL_NIF_TERM reason, + ErlNifPid* pid) { - ERL_NIF_TERM msg = MKT3(env, atom_nif_abort, ref, reason); + ERL_NIF_TERM info = MKT2(env, recvRef, reason); - return send_msg(env, msg, pid); + /* + esock_dbg_printf("SEND MSG", + "try send abort message to %T:\r\n", + "\r\n sockRef: %T" + "\r\n recvRef: %T" + "\r\n reason: %T" + "\r\n", *pid, sockRef, recvRef, reason); + */ + + return esock_send_socket_msg(env, sockRef, esock_atom_abort, info, pid); +} + + +/* *** esock_send_socket_msg *** + * + * This function sends a general purpose socket message to an erlang + * process. A general 'socket' message has the form: + * + * {'$socket', SockRef, Tag, Info} + * + */ + +static +char* esock_send_socket_msg(ErlNifEnv* env, + ERL_NIF_TERM sockRef, + ERL_NIF_TERM tag, + ERL_NIF_TERM info, + ErlNifPid* pid) +{ + ERL_NIF_TERM msg = MKT4(env, esock_atom_socket_tag, sockRef, tag, info); + + return esock_send_msg(env, msg, pid); } /* Send a message to the specified process. */ static -char* send_msg(ErlNifEnv* env, - ERL_NIF_TERM msg, - ErlNifPid* pid) +char* esock_send_msg(ErlNifEnv* env, + ERL_NIF_TERM msg, + ErlNifPid* pid) { - if (!enif_send(env, pid, NULL, msg)) - return str_exsend; - else - return NULL; + if (!enif_send(env, pid, NULL, msg)) + return str_exsend; + else + return NULL; } @@ -17022,10 +17130,11 @@ void socket_stop(ErlNifEnv* env, void* obj, int fd, int is_direct_call) SSDBG( descP, ("SOCKET", "socket_stop -> " "send abort message to current writer %T\r\n", descP->currentWriter.pid) ); - if (send_msg_nif_abort(env, - descP->currentWriter.ref, - atom_closed, - &descP->currentWriter.pid) != NULL) { + if (esock_send_abort_msg(env, + esock_atom_undefined, + descP->currentWriter.ref, + atom_closed, + &descP->currentWriter.pid) != NULL) { /* Shall we really do this? * This happens if the controlling process has been killed! */ @@ -17061,10 +17170,11 @@ void socket_stop(ErlNifEnv* env, void* obj, int fd, int is_direct_call) SSDBG( descP, ("SOCKET", "socket_stop -> " "send abort message to current reader %T\r\n", descP->currentReader.pid) ); - if (send_msg_nif_abort(env, - descP->currentReader.ref, - atom_closed, - &descP->currentReader.pid) != NULL) { + if (esock_send_abort_msg(env, + esock_atom_undefined, + descP->currentReader.ref, + atom_closed, + &descP->currentReader.pid) != NULL) { /* Shall we really do this? * This happens if the controlling process has been killed! */ @@ -17099,10 +17209,11 @@ void socket_stop(ErlNifEnv* env, void* obj, int fd, int is_direct_call) SSDBG( descP, ("SOCKET", "socket_stop -> " "send abort message to current acceptor %T\r\n", descP->currentWriter.pid) ); - if (send_msg_nif_abort(env, - descP->currentAcceptor.ref, - atom_closed, - &descP->currentAcceptor.pid) != NULL) { + if (esock_send_abort_msg(env, + esock_atom_undefined, + descP->currentAcceptor.ref, + atom_closed, + &descP->currentAcceptor.pid) != NULL) { /* Shall we really do this? * This happens if the controlling process has been killed! */ @@ -17145,13 +17256,12 @@ void socket_stop(ErlNifEnv* env, void* obj, int fd, int is_direct_call) * Also, we should really *always* use a tag unique to this * (nif-) module. Some like (in this case): * - * {'$socket', close, CloseRef} + * {'$socket', undefined, close, CloseRef} * * </KOLLA> */ - - send_msg(env, - MKT2(env, atom_close, descP->closeRef), &descP->closerPid); + + esock_send_close_msg(env, descP->closeRef, &descP->closerPid); DEMONP("socket_stop -> closer", env, descP, &descP->closerMon); @@ -17221,13 +17331,14 @@ void inform_waiting_procs(ErlNifEnv* env, */ SSDBG( descP, - ("SOCKET", "inform_waiting_procs -> abort %T (%T)\r\n", + ("SOCKET", "inform_waiting_procs -> abort request %T (from %T)\r\n", currentP->data.ref, currentP->data.pid) ); - ESOCK_ASSERT( (NULL == send_msg_nif_abort(env, - currentP->data.ref, - reason, - ¤tP->data.pid)) ); + ESOCK_ASSERT( (NULL == esock_send_abort_msg(env, + esock_atom_undefined, + currentP->data.ref, + reason, + ¤tP->data.pid)) ); DEMONP("inform_waiting_procs -> current 'request'", env, descP, ¤tP->data.mon); @@ -17783,7 +17894,7 @@ int on_load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info) atom_min = MKA(env, str_min); atom_mode = MKA(env, str_mode); atom_multiaddr = MKA(env, str_multiaddr); - atom_nif_abort = MKA(env, str_nif_abort); + // atom_nif_abort = MKA(env, str_nif_abort); atom_null = MKA(env, str_null); atom_num_dinet = MKA(env, str_num_dinet); atom_num_dinet6 = MKA(env, str_num_dinet6); @@ -17813,6 +17924,7 @@ int on_load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info) atom_want = MKA(env, str_want); /* Global atom(s) */ + esock_atom_abort = MKA(env, "abort"); esock_atom_accept = MKA(env, "accept"); esock_atom_acceptconn = MKA(env, "acceptconn"); esock_atom_acceptfilter = MKA(env, "acceptfilter"); @@ -17836,6 +17948,7 @@ int on_load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info) esock_atom_broadcast = MKA(env, "broadcast"); esock_atom_busy_poll = MKA(env, "busy_poll"); esock_atom_checksum = MKA(env, "checksum"); + esock_atom_close = MKA(env, "close"); esock_atom_connect = MKA(env, "connect"); esock_atom_congestion = MKA(env, "congestion"); esock_atom_context = MKA(env, "context"); @@ -17979,6 +18092,7 @@ int on_load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info) esock_atom_sndlowat = MKA(env, "sndlowat"); esock_atom_sndtimeo = MKA(env, "sndtimeo"); esock_atom_socket = MKA(env, "socket"); + esock_atom_socket_tag = MKA(env, "$socket"); esock_atom_spec_dst = MKA(env, "spec_dst"); esock_atom_status = MKA(env, "status"); esock_atom_stream = MKA(env, "stream"); diff --git a/erts/preloaded/ebin/socket.beam b/erts/preloaded/ebin/socket.beam Binary files differindex 25046e6aad..ddd50fdefa 100644 --- a/erts/preloaded/ebin/socket.beam +++ b/erts/preloaded/ebin/socket.beam diff --git a/erts/preloaded/src/socket.erl b/erts/preloaded/src/socket.erl index a40692881b..2e295a91ae 100644 --- a/erts/preloaded/src/socket.erl +++ b/erts/preloaded/src/socket.erl @@ -1331,7 +1331,7 @@ do_accept(LSockRef, Timeout) -> {select, LSockRef, AccRef, ready_input} -> do_accept(LSockRef, next_timeout(TS, Timeout)); - {nif_abort, AccRef, Reason} -> + {'$socket', _, abort, {AccRef, Reason}} -> {error, Reason} after NewTimeout -> @@ -1408,7 +1408,7 @@ do_send(SockRef, Data, EFlags, Timeout) -> do_send(SockRef, Data, EFlags, next_timeout(TS, Timeout)); - {nif_abort, SendRef, Reason} -> + {'$socket', _, abort, {SendRef, Reason}} -> {error, Reason} after NewTimeout -> @@ -1421,7 +1421,7 @@ do_send(SockRef, Data, EFlags, Timeout) -> do_send(SockRef, Data, EFlags, next_timeout(TS, Timeout)); - {nif_abort, SendRef, Reason} -> + {'$socket', _, abort, {SendRef, Reason}} -> {error, Reason} after Timeout -> @@ -1513,7 +1513,7 @@ do_sendto(SockRef, Data, Dest, EFlags, Timeout) -> do_sendto(SockRef, Data, Dest, EFlags, next_timeout(TS, Timeout)); - {nif_abort, SendRef, Reason} -> + {'$socket', _, abort, {SendRef, Reason}} -> {error, Reason} after Timeout -> @@ -1525,7 +1525,11 @@ do_sendto(SockRef, Data, Dest, EFlags, Timeout) -> receive {select, SockRef, SendRef, ready_output} -> do_sendto(SockRef, Data, Dest, EFlags, - next_timeout(TS, Timeout)) + next_timeout(TS, Timeout)); + + {'$socket', _, abort, {SendRef, Reason}} -> + {error, Reason} + after Timeout -> cancel(SockRef, sendto, SendRef), {error, timeout} @@ -1773,10 +1777,9 @@ do_recv(SockRef, _OldRef, Length, EFlags, Acc, Timeout) Bin, next_timeout(TS, Timeout)); - {nif_abort, RecvRef, Reason} -> + {'$socket', _, abort, {RecvRef, Reason}} -> {error, Reason} - after NewTimeout -> cancel(SockRef, recv, RecvRef), {error, {timeout, Acc}} @@ -1794,10 +1797,9 @@ do_recv(SockRef, _OldRef, Length, EFlags, Acc, Timeout) <<Acc/binary, Bin/binary>>, next_timeout(TS, Timeout)); - {nif_abort, RecvRef, Reason} -> + {'$socket', _, abort, {RecvRef, Reason}} -> {error, Reason} - after NewTimeout -> cancel(SockRef, recv, RecvRef), {error, {timeout, Acc}} @@ -1805,6 +1807,7 @@ do_recv(SockRef, _OldRef, Length, EFlags, Acc, Timeout) %% We return with the accumulated binary (if its non-empty) {error, eagain} when (Length =:= 0) andalso (size(Acc) > 0) -> + %% CAN WE REALLY DO THIS? THE NIF HAS SELECTED!! OR? {ok, Acc}; {error, eagain} -> @@ -1819,7 +1822,7 @@ do_recv(SockRef, _OldRef, Length, EFlags, Acc, Timeout) Acc, next_timeout(TS, Timeout)); - {nif_abort, RecvRef, Reason} -> + {'$socket', _, abort, {RecvRef, Reason}} -> {error, Reason} after NewTimeout -> @@ -1850,7 +1853,8 @@ do_recv(SockRef, RecvRef, 0 = _Length, _Eflags, Acc, _Timeout) -> %% any waiting reader. cancel(SockRef, recv, RecvRef), {ok, Acc}; -do_recv(_SockRef, _RecvRef, _Length, _EFlags, Acc, _Timeout) when (size(Acc) > 0) -> +do_recv(_SockRef, _RecvRef, _Length, _EFlags, Acc, _Timeout) + when (size(Acc) > 0) -> {error, {timeout, Acc}}; do_recv(_SockRef, _RecvRef, _Length, _EFlags, _Acc, _Timeout) -> {error, timeout}. @@ -1957,8 +1961,7 @@ do_recvfrom(SockRef, BufSz, EFlags, Timeout) -> do_recvfrom(SockRef, BufSz, EFlags, next_timeout(TS, Timeout)); - {nif_abort, RecvRef, Reason} -> - %% p("received nif-abort: ~p", [Reason]), + {'$socket', _, abort, {RecvRef, Reason}} -> {error, Reason} after NewTimeout -> @@ -2062,7 +2065,7 @@ do_recvmsg(SockRef, BufSz, CtrlSz, EFlags, Timeout) -> do_recvmsg(SockRef, BufSz, CtrlSz, EFlags, next_timeout(TS, Timeout)); - {nif_abort, RecvRef, Reason} -> + {'$socket', _, abort, {RecvRef, Reason}} -> {error, Reason} after NewTimeout -> @@ -2107,7 +2110,8 @@ do_close(SockRef) -> {ok, CloseRef} -> %% We must wait receive - {close, CloseRef} -> + {'$socket', _, close, CloseRef} -> +%% {close, CloseRef} -> %% <KOLLA> %% %% WHAT HAPPENS IF THIS PROCESS IS KILLED |