aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMicael Karlberg <[email protected]>2018-12-12 18:37:55 +0100
committerMicael Karlberg <[email protected]>2018-12-14 11:46:28 +0100
commit342d35f457c15a9cea426e8ca83bfd52b0ec2f2e (patch)
treee4e98ea22598735af6972f5721195b80f0afe273
parenta33acd53524160d65929d06f06e387ce8419b1c0 (diff)
downloadotp-342d35f457c15a9cea426e8ca83bfd52b0ec2f2e.tar.gz
otp-342d35f457c15a9cea426e8ca83bfd52b0ec2f2e.tar.bz2
otp-342d35f457c15a9cea426e8ca83bfd52b0ec2f2e.zip
[socket-nif] Message interface between socket.erl and nif updated
Previously the "message interface" between the functions in socket.erl and the nif-code (socket_nif.c) was "ad hoc". This has now been changed so that we have a unified message {'$socket', SockRef | undefined, Tag, Info} This also has the added advantage of preparing the code for when we start using the new select-fucntions (with which its possible to specify your own message). This will be used in order to get around our eterm "leak" (we will use a simple counter, maintained in the nif, instead of the [Recv|Send|Acc]Ref we generate in the erlang code today. OTP-14831
-rw-r--r--erts/emulator/nifs/common/socket_int.h3
-rw-r--r--erts/emulator/nifs/common/socket_nif.c314
-rw-r--r--erts/preloaded/ebin/socket.beambin69392 -> 70272 bytes
-rw-r--r--erts/preloaded/src/socket.erl34
4 files changed, 236 insertions, 115 deletions
diff --git a/erts/emulator/nifs/common/socket_int.h b/erts/emulator/nifs/common/socket_int.h
index ec17e45f25..0f973855ae 100644
--- a/erts/emulator/nifs/common/socket_int.h
+++ b/erts/emulator/nifs/common/socket_int.h
@@ -103,6 +103,7 @@ typedef unsigned int BOOLEAN_T;
/* ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
* "Global" atoms
*/
+extern ERL_NIF_TERM esock_atom_abort;
extern ERL_NIF_TERM esock_atom_accept;
extern ERL_NIF_TERM esock_atom_acceptconn;
extern ERL_NIF_TERM esock_atom_acceptfilter;
@@ -126,6 +127,7 @@ extern ERL_NIF_TERM esock_atom_block_source;
extern ERL_NIF_TERM esock_atom_broadcast;
extern ERL_NIF_TERM esock_atom_busy_poll;
extern ERL_NIF_TERM esock_atom_checksum;
+extern ERL_NIF_TERM esock_atom_close;
extern ERL_NIF_TERM esock_atom_connect;
extern ERL_NIF_TERM esock_atom_congestion;
extern ERL_NIF_TERM esock_atom_context;
@@ -269,6 +271,7 @@ extern ERL_NIF_TERM esock_atom_sndbufforce;
extern ERL_NIF_TERM esock_atom_sndlowat;
extern ERL_NIF_TERM esock_atom_sndtimeo;
extern ERL_NIF_TERM esock_atom_socket;
+extern ERL_NIF_TERM esock_atom_socket_tag;
extern ERL_NIF_TERM esock_atom_spec_dst;
extern ERL_NIF_TERM esock_atom_status;
extern ERL_NIF_TERM esock_atom_stream;
diff --git a/erts/emulator/nifs/common/socket_nif.c b/erts/emulator/nifs/common/socket_nif.c
index 1e0533535c..80903c487f 100644
--- a/erts/emulator/nifs/common/socket_nif.c
+++ b/erts/emulator/nifs/common/socket_nif.c
@@ -760,24 +760,24 @@ typedef struct {
typedef struct {
/* +++ The actual socket +++ */
- SOCKET sock;
- HANDLE event;
+ SOCKET sock;
+ HANDLE event;
/* +++ Stuff "about" the socket +++ */
- int domain;
- int type;
- int protocol;
+ int domain;
+ int type;
+ int protocol;
- unsigned int state;
- SocketAddress remote;
- unsigned int addrLen;
+ unsigned int state;
+ SocketAddress remote;
+ unsigned int addrLen;
- ErlNifEnv* env;
+ ErlNifEnv* env;
/* +++ Controller (owner) process +++ */
- ErlNifPid ctrlPid;
- // ErlNifMonitor ctrlMon;
- ESockMonitor ctrlMon;
+ ErlNifPid ctrlPid;
+ // ErlNifMonitor ctrlMon;
+ ESockMonitor ctrlMon;
/* +++ Write stuff +++ */
ErlNifMutex* writeMtx;
@@ -996,11 +996,13 @@ static ERL_NIF_TERM naccept(ErlNifEnv* env,
ERL_NIF_TERM ref);
static ERL_NIF_TERM nsend(ErlNifEnv* env,
SocketDescriptor* descP,
+ ERL_NIF_TERM sockRef,
ERL_NIF_TERM sendRef,
ErlNifBinary* dataP,
int flags);
static ERL_NIF_TERM nsendto(ErlNifEnv* env,
SocketDescriptor* descP,
+ ERL_NIF_TERM sockRef,
ERL_NIF_TERM sendRef,
ErlNifBinary* dataP,
int flags,
@@ -1008,21 +1010,25 @@ static ERL_NIF_TERM nsendto(ErlNifEnv* env,
unsigned int toAddrLen);
static ERL_NIF_TERM nsendmsg(ErlNifEnv* env,
SocketDescriptor* descP,
+ ERL_NIF_TERM sockRef,
ERL_NIF_TERM sendRef,
ERL_NIF_TERM eMsgHdr,
int flags);
static ERL_NIF_TERM nrecv(ErlNifEnv* env,
SocketDescriptor* descP,
+ ERL_NIF_TERM sendRef,
ERL_NIF_TERM recvRef,
int len,
int flags);
static ERL_NIF_TERM nrecvfrom(ErlNifEnv* env,
SocketDescriptor* descP,
+ ERL_NIF_TERM sockRef,
ERL_NIF_TERM recvRef,
uint16_t bufSz,
int flags);
static ERL_NIF_TERM nrecvmsg(ErlNifEnv* env,
SocketDescriptor* descP,
+ ERL_NIF_TERM sockRef,
ERL_NIF_TERM recvRef,
uint16_t bufLen,
uint16_t ctrlLen,
@@ -1986,6 +1992,7 @@ static ERL_NIF_TERM send_check_result(ErlNifEnv* env,
ssize_t written,
ssize_t dataSize,
int saveErrno,
+ ERL_NIF_TERM sockRef,
ERL_NIF_TERM sendRef);
static BOOLEAN_T recv_check_reader(ErlNifEnv* env,
SocketDescriptor* descP,
@@ -1998,6 +2005,7 @@ static ERL_NIF_TERM recv_update_current_reader(ErlNifEnv* env,
SocketDescriptor* descP);
static void recv_error_current_reader(ErlNifEnv* env,
SocketDescriptor* descP,
+ ERL_NIF_TERM sockRef,
ERL_NIF_TERM reason);
static ERL_NIF_TERM recv_check_result(ErlNifEnv* env,
SocketDescriptor* descP,
@@ -2005,6 +2013,7 @@ static ERL_NIF_TERM recv_check_result(ErlNifEnv* env,
int toRead,
int saveErrno,
ErlNifBinary* bufP,
+ ERL_NIF_TERM sockRef,
ERL_NIF_TERM recvRef);
static ERL_NIF_TERM recvfrom_check_result(ErlNifEnv* env,
SocketDescriptor* descP,
@@ -2013,6 +2022,7 @@ static ERL_NIF_TERM recvfrom_check_result(ErlNifEnv* env,
ErlNifBinary* bufP,
SocketAddress* fromAddrP,
unsigned int fromAddrLen,
+ ERL_NIF_TERM sockRef,
ERL_NIF_TERM recvRef);
static ERL_NIF_TERM recvmsg_check_result(ErlNifEnv* env,
SocketDescriptor* descP,
@@ -2021,6 +2031,7 @@ static ERL_NIF_TERM recvmsg_check_result(ErlNifEnv* env,
struct msghdr* msgHdrP,
ErlNifBinary* dataBufP,
ErlNifBinary* ctrlBufP,
+ ERL_NIF_TERM sockRef,
ERL_NIF_TERM recvRef);
static ERL_NIF_TERM nfinalize_connection(ErlNifEnv* env,
@@ -2310,22 +2321,22 @@ static void socket_down_reader(ErlNifEnv* env,
SocketDescriptor* descP,
const ErlNifPid* pid);
-/*
-static char* send_msg_error_closed(ErlNifEnv* env,
+static char* esock_send_close_msg(ErlNifEnv* env,
+ ERL_NIF_TERM closeRef,
+ ErlNifPid* pid);
+static char* esock_send_abort_msg(ErlNifEnv* env,
+ ERL_NIF_TERM sockRef,
+ ERL_NIF_TERM recvRef,
+ ERL_NIF_TERM reason,
+ ErlNifPid* pid);
+static char* esock_send_socket_msg(ErlNifEnv* env,
+ ERL_NIF_TERM sockRef,
+ ERL_NIF_TERM tag,
+ ERL_NIF_TERM info,
ErlNifPid* pid);
-*/
-/*
-static char* send_msg_error(ErlNifEnv* env,
- ERL_NIF_TERM reason,
+static char* esock_send_msg(ErlNifEnv* env,
+ ERL_NIF_TERM msg,
ErlNifPid* pid);
-*/
-static char* send_msg_nif_abort(ErlNifEnv* env,
- ERL_NIF_TERM ref,
- ERL_NIF_TERM reason,
- ErlNifPid* pid);
-static char* send_msg(ErlNifEnv* env,
- ERL_NIF_TERM msg,
- ErlNifPid* pid);
static BOOLEAN_T extract_debug(ErlNifEnv* env,
ERL_NIF_TERM map);
@@ -2394,7 +2405,7 @@ static char str_max_rxt[] = "max_rxt";
static char str_min[] = "min";
static char str_mode[] = "mode";
static char str_multiaddr[] = "multiaddr";
-static char str_nif_abort[] = "nif_abort";
+// static char str_nif_abort[] = "nif_abort";
static char str_null[] = "null";
static char str_num_dlocal[] = "num_domain_local";
static char str_num_dinet[] = "num_domain_inet";
@@ -2436,6 +2447,7 @@ static char str_exsend[] = "exsend"; // failed send
/* *** "Global" Atoms *** */
+ERL_NIF_TERM esock_atom_abort;
ERL_NIF_TERM esock_atom_accept;
ERL_NIF_TERM esock_atom_acceptconn;
ERL_NIF_TERM esock_atom_acceptfilter;
@@ -2459,6 +2471,7 @@ ERL_NIF_TERM esock_atom_block_source;
ERL_NIF_TERM esock_atom_broadcast;
ERL_NIF_TERM esock_atom_busy_poll;
ERL_NIF_TERM esock_atom_checksum;
+ERL_NIF_TERM esock_atom_close;
ERL_NIF_TERM esock_atom_connect;
ERL_NIF_TERM esock_atom_congestion;
ERL_NIF_TERM esock_atom_context;
@@ -2598,6 +2611,7 @@ ERL_NIF_TERM esock_atom_seqpacket;
ERL_NIF_TERM esock_atom_setfib;
ERL_NIF_TERM esock_atom_set_peer_primary_addr;
ERL_NIF_TERM esock_atom_socket;
+ERL_NIF_TERM esock_atom_socket_tag;
ERL_NIF_TERM esock_atom_sndbuf;
ERL_NIF_TERM esock_atom_sndbufforce;
ERL_NIF_TERM esock_atom_sndlowat;
@@ -2665,7 +2679,7 @@ static ERL_NIF_TERM atom_max_rxt;
static ERL_NIF_TERM atom_min;
static ERL_NIF_TERM atom_mode;
static ERL_NIF_TERM atom_multiaddr;
-static ERL_NIF_TERM atom_nif_abort;
+// static ERL_NIF_TERM atom_nif_abort;
static ERL_NIF_TERM atom_null;
static ERL_NIF_TERM atom_num_dinet;
static ERL_NIF_TERM atom_num_dinet6;
@@ -4929,7 +4943,7 @@ ERL_NIF_TERM naccept_listening(ErlNifEnv* env,
accDescP->wCtrlSz = descP->wCtrlSz; // Inherit buffer size
accRef = enif_make_resource(env, accDescP);
- enif_release_resource(accDescP); // We should really store a reference ...
+ enif_release_resource(accDescP);
accDescP->ctrlPid = caller;
if (MONP("naccept_listening -> ctrl",
@@ -5143,7 +5157,7 @@ ERL_NIF_TERM nif_send(ErlNifEnv* env,
const ERL_NIF_TERM argv[])
{
SocketDescriptor* descP;
- ERL_NIF_TERM sendRef;
+ ERL_NIF_TERM sockRef, sendRef;
ErlNifBinary sndData;
unsigned int eflags;
int flags;
@@ -5154,13 +5168,17 @@ ERL_NIF_TERM nif_send(ErlNifEnv* env,
/* Extract arguments and perform preliminary validation */
if ((argc != 4) ||
- !enif_get_resource(env, argv[0], sockets, (void**) &descP) ||
!GET_BIN(env, argv[2], &sndData) ||
!GET_UINT(env, argv[3], &eflags)) {
return enif_make_badarg(env);
}
+ sockRef = argv[0]; // We need this in case we send in case we send abort
sendRef = argv[1];
+ if (!enif_get_resource(env, sockRef, sockets, (void**) &descP)) {
+ return enif_make_badarg(env);
+ }
+
if (IS_CLOSED(descP) || IS_CLOSING(descP))
return esock_make_error(env, atom_closed);
@@ -5170,7 +5188,7 @@ ERL_NIF_TERM nif_send(ErlNifEnv* env,
"\r\n SendRef: %T"
"\r\n Size of data: %d"
"\r\n eFlags: %d"
- "\r\n", descP->sock, argv[0], sendRef, sndData.size, eflags) );
+ "\r\n", descP->sock, sockRef, sendRef, sndData.size, eflags) );
if (!IS_CONNECTED(descP))
return esock_make_error(env, atom_enotconn);
@@ -5193,7 +5211,7 @@ ERL_NIF_TERM nif_send(ErlNifEnv* env,
* time we do.
*/
- res = nsend(env, descP, sendRef, &sndData, flags);
+ res = nsend(env, descP, sockRef, sendRef, &sndData, flags);
MUNLOCK(descP->writeMtx);
@@ -5211,6 +5229,7 @@ ERL_NIF_TERM nif_send(ErlNifEnv* env,
static
ERL_NIF_TERM nsend(ErlNifEnv* env,
SocketDescriptor* descP,
+ ERL_NIF_TERM sockRef,
ERL_NIF_TERM sendRef,
ErlNifBinary* sndDataP,
int flags)
@@ -5238,7 +5257,8 @@ ERL_NIF_TERM nsend(ErlNifEnv* env,
save_errno = -1; // The value does not actually matter in this case
return send_check_result(env, descP,
- written, sndDataP->size, save_errno, sendRef);
+ written, sndDataP->size, save_errno,
+ sockRef, sendRef);
}
@@ -5264,7 +5284,7 @@ ERL_NIF_TERM nif_sendto(ErlNifEnv* env,
const ERL_NIF_TERM argv[])
{
SocketDescriptor* descP;
- ERL_NIF_TERM sendRef;
+ ERL_NIF_TERM sockRef, sendRef;
ErlNifBinary sndData;
unsigned int eflags;
int flags;
@@ -5284,9 +5304,14 @@ ERL_NIF_TERM nif_sendto(ErlNifEnv* env,
!GET_UINT(env, argv[4], &eflags)) {
return enif_make_badarg(env);
}
+ sockRef = argv[0]; // We need this in case we send in case we send abort
sendRef = argv[1];
eSockAddr = argv[3];
+ if (!enif_get_resource(env, sockRef, sockets, (void**) &descP)) {
+ return enif_make_badarg(env);
+ }
+
if (IS_CLOSED(descP) || IS_CLOSING(descP))
return esock_make_error(env, atom_closed);
@@ -5298,7 +5323,7 @@ ERL_NIF_TERM nif_sendto(ErlNifEnv* env,
"\r\n eSockAddr: %T"
"\r\n eflags: %d"
"\r\n",
- descP->sock, argv[0], sendRef, sndData.size, eSockAddr, eflags) );
+ descP->sock, sockRef, sendRef, sndData.size, eSockAddr, eflags) );
/* THIS TEST IS NOT CORRECT!!! */
if (!IS_OPEN(descP)) {
@@ -5320,7 +5345,7 @@ ERL_NIF_TERM nif_sendto(ErlNifEnv* env,
MLOCK(descP->writeMtx);
- res = nsendto(env, descP, sendRef, &sndData, flags,
+ res = nsendto(env, descP, sockRef, sendRef, &sndData, flags,
&remoteAddr, remoteAddrLen);
MUNLOCK(descP->writeMtx);
@@ -5336,6 +5361,7 @@ ERL_NIF_TERM nif_sendto(ErlNifEnv* env,
static
ERL_NIF_TERM nsendto(ErlNifEnv* env,
SocketDescriptor* descP,
+ ERL_NIF_TERM sockRef,
ERL_NIF_TERM sendRef,
ErlNifBinary* dataP,
int flags,
@@ -5372,7 +5398,8 @@ ERL_NIF_TERM nsendto(ErlNifEnv* env,
else
save_errno = -1; // The value does not actually matter in this case
- return send_check_result(env, descP, written, dataP->size, save_errno, sendRef);
+ return send_check_result(env, descP, written, dataP->size, save_errno,
+ sockRef, sendRef);
}
@@ -5395,7 +5422,7 @@ ERL_NIF_TERM nif_sendmsg(ErlNifEnv* env,
int argc,
const ERL_NIF_TERM argv[])
{
- ERL_NIF_TERM res, sendRef, eMsgHdr;
+ ERL_NIF_TERM res, sockRef, sendRef, eMsgHdr;
SocketDescriptor* descP;
unsigned int eflags;
int flags;
@@ -5405,14 +5432,18 @@ ERL_NIF_TERM nif_sendmsg(ErlNifEnv* env,
/* Extract arguments and perform preliminary validation */
if ((argc != 4) ||
- !enif_get_resource(env, argv[0], sockets, (void**) &descP) ||
!IS_MAP(env, argv[2]) ||
!GET_UINT(env, argv[3], &eflags)) {
return enif_make_badarg(env);
}
+ sockRef = argv[0]; // We need this in case we send in case we send abort
sendRef = argv[1];
eMsgHdr = argv[2];
+ if (!enif_get_resource(env, sockRef, sockets, (void**) &descP)) {
+ return enif_make_badarg(env);
+ }
+
if (IS_CLOSED(descP) || IS_CLOSING(descP))
return esock_make_error(env, atom_closed);
@@ -5433,7 +5464,7 @@ ERL_NIF_TERM nif_sendmsg(ErlNifEnv* env,
MLOCK(descP->writeMtx);
- res = nsendmsg(env, descP, sendRef, eMsgHdr, flags);
+ res = nsendmsg(env, descP, sockRef, sendRef, eMsgHdr, flags);
MUNLOCK(descP->writeMtx);
@@ -5449,6 +5480,7 @@ ERL_NIF_TERM nif_sendmsg(ErlNifEnv* env,
static
ERL_NIF_TERM nsendmsg(ErlNifEnv* env,
SocketDescriptor* descP,
+ ERL_NIF_TERM sockRef,
ERL_NIF_TERM sendRef,
ERL_NIF_TERM eMsgHdr,
int flags)
@@ -5589,7 +5621,8 @@ ERL_NIF_TERM nsendmsg(ErlNifEnv* env,
else
save_errno = -1; // OK or not complete: this value should not matter in this case
- res = send_check_result(env, descP, written, dataSize, save_errno, sendRef);
+ res = send_check_result(env, descP, written, dataSize, save_errno,
+ sockRef, sendRef);
FREE(iovBins);
FREE(iov);
@@ -5691,20 +5724,24 @@ ERL_NIF_TERM nif_recv(ErlNifEnv* env,
const ERL_NIF_TERM argv[])
{
SocketDescriptor* descP;
- ERL_NIF_TERM recvRef;
+ ERL_NIF_TERM sockRef, recvRef;
int len;
unsigned int eflags;
int flags;
ERL_NIF_TERM res;
if ((argc != 4) ||
- !enif_get_resource(env, argv[0], sockets, (void**) &descP) ||
!GET_INT(env, argv[2], &len) ||
!GET_UINT(env, argv[3], &eflags)) {
return enif_make_badarg(env);
}
+ sockRef = argv[0]; // We need this in case we send in case we send abort
recvRef = argv[1];
+ if (!enif_get_resource(env, sockRef, sockets, (void**) &descP)) {
+ return enif_make_badarg(env);
+ }
+
if (IS_CLOSED(descP) || IS_CLOSING(descP))
return esock_make_error(env, atom_closed);
@@ -5729,7 +5766,7 @@ ERL_NIF_TERM nif_recv(ErlNifEnv* env,
* time we do.
*/
- res = nrecv(env, descP, recvRef, len, flags);
+ res = nrecv(env, descP, sockRef, recvRef, len, flags);
MUNLOCK(descP->readMtx);
@@ -5746,6 +5783,7 @@ ERL_NIF_TERM nif_recv(ErlNifEnv* env,
static
ERL_NIF_TERM nrecv(ErlNifEnv* env,
SocketDescriptor* descP,
+ ERL_NIF_TERM sockRef,
ERL_NIF_TERM recvRef,
int len,
int flags)
@@ -5794,6 +5832,7 @@ ERL_NIF_TERM nrecv(ErlNifEnv* env,
read, len,
save_errno,
&buf,
+ sockRef,
recvRef);
}
@@ -5829,7 +5868,7 @@ ERL_NIF_TERM nif_recvfrom(ErlNifEnv* env,
const ERL_NIF_TERM argv[])
{
SocketDescriptor* descP;
- ERL_NIF_TERM recvRef;
+ ERL_NIF_TERM sockRef, recvRef;
unsigned int bufSz;
unsigned int eflags;
int flags;
@@ -5840,12 +5879,16 @@ ERL_NIF_TERM nif_recvfrom(ErlNifEnv* env,
/* Extract arguments and perform preliminary validation */
if ((argc != 4) ||
- !enif_get_resource(env, argv[0], sockets, (void**) &descP) ||
!GET_UINT(env, argv[2], &bufSz) ||
!GET_UINT(env, argv[3], &eflags)) {
return enif_make_badarg(env);
}
- recvRef = argv[1];
+ sockRef = argv[0]; // We need this in case we send in case we send abort
+ recvRef = argv[1];
+
+ if (!enif_get_resource(env, sockRef, sockets, (void**) &descP)) {
+ return enif_make_badarg(env);
+ }
if (IS_CLOSED(descP) || IS_CLOSING(descP))
return esock_make_error(env, atom_closed);
@@ -5883,7 +5926,7 @@ ERL_NIF_TERM nif_recvfrom(ErlNifEnv* env,
* </KOLLA>
*/
- res = nrecvfrom(env, descP, recvRef, bufSz, flags);
+ res = nrecvfrom(env, descP, sockRef, recvRef, bufSz, flags);
MUNLOCK(descP->readMtx);
@@ -5900,6 +5943,7 @@ ERL_NIF_TERM nif_recvfrom(ErlNifEnv* env,
static
ERL_NIF_TERM nrecvfrom(ErlNifEnv* env,
SocketDescriptor* descP,
+ ERL_NIF_TERM sockRef,
ERL_NIF_TERM recvRef,
uint16_t len,
int flags)
@@ -5951,6 +5995,7 @@ ERL_NIF_TERM nrecvfrom(ErlNifEnv* env,
save_errno,
&buf,
&fromAddr, addrLen,
+ sockRef,
recvRef);
}
@@ -5990,7 +6035,7 @@ ERL_NIF_TERM nif_recvmsg(ErlNifEnv* env,
const ERL_NIF_TERM argv[])
{
SocketDescriptor* descP;
- ERL_NIF_TERM recvRef;
+ ERL_NIF_TERM sockRef, recvRef;
unsigned int bufSz;
unsigned int ctrlSz;
unsigned int eflags;
@@ -6002,14 +6047,18 @@ ERL_NIF_TERM nif_recvmsg(ErlNifEnv* env,
/* Extract arguments and perform preliminary validation */
if ((argc != 5) ||
- !enif_get_resource(env, argv[0], sockets, (void**) &descP) ||
!GET_UINT(env, argv[2], &bufSz) ||
!GET_UINT(env, argv[3], &ctrlSz) ||
!GET_UINT(env, argv[4], &eflags)) {
return enif_make_badarg(env);
}
- recvRef = argv[1];
+ sockRef = argv[0]; // We need this in case we send in case we send abort
+ recvRef = argv[1];
+ if (!enif_get_resource(env, sockRef, sockets, (void**) &descP)) {
+ return enif_make_badarg(env);
+ }
+
if (IS_CLOSED(descP) || IS_CLOSING(descP))
return esock_make_error(env, atom_closed);
@@ -6047,7 +6096,7 @@ ERL_NIF_TERM nif_recvmsg(ErlNifEnv* env,
* </KOLLA>
*/
- res = nrecvmsg(env, descP, recvRef, bufSz, ctrlSz, flags);
+ res = nrecvmsg(env, descP, sockRef, recvRef, bufSz, ctrlSz, flags);
MUNLOCK(descP->readMtx);
@@ -6064,6 +6113,7 @@ ERL_NIF_TERM nif_recvmsg(ErlNifEnv* env,
static
ERL_NIF_TERM nrecvmsg(ErlNifEnv* env,
SocketDescriptor* descP,
+ ERL_NIF_TERM sockRef,
ERL_NIF_TERM recvRef,
uint16_t bufLen,
uint16_t ctrlLen,
@@ -6144,6 +6194,7 @@ ERL_NIF_TERM nrecvmsg(ErlNifEnv* env,
&msgHdr,
data, // Needed for iov encode
&ctrl, // Needed for ctrl header encode
+ sockRef,
recvRef);
}
@@ -13254,6 +13305,7 @@ ERL_NIF_TERM send_check_result(ErlNifEnv* env,
ssize_t written,
ssize_t dataSize,
int saveErrno,
+ ERL_NIF_TERM sockRef,
ERL_NIF_TERM sendRef)
{
SSDBG( descP,
@@ -13330,7 +13382,7 @@ ERL_NIF_TERM send_check_result(ErlNifEnv* env,
while (writer_pop(env, descP, &pid, &mon, &ref)) {
SSDBG( descP,
("SOCKET", "send_check_result -> abort %T\r\n", pid) );
- send_msg_nif_abort(env, ref, res, &pid);
+ esock_send_abort_msg(env, sockRef, ref, res, &pid);
DEMONP("send_check_result -> pop'ed writer",
env, descP, &mon);
}
@@ -13527,6 +13579,7 @@ ERL_NIF_TERM recv_update_current_reader(ErlNifEnv* env,
static
void recv_error_current_reader(ErlNifEnv* env,
SocketDescriptor* descP,
+ ERL_NIF_TERM sockRef,
ERL_NIF_TERM reason)
{
if (descP->currentReaderP != NULL) {
@@ -13541,7 +13594,7 @@ void recv_error_current_reader(ErlNifEnv* env,
while (reader_pop(env, descP, &pid, &mon, &ref)) {
SSDBG( descP,
("SOCKET", "recv_error_current_reader -> abort %T\r\n", pid) );
- send_msg_nif_abort(env, ref, reason, &pid);
+ esock_send_abort_msg(env, sockRef, ref, reason, &pid);
DEMONP("recv_error_current_reader -> pop'ed reader",
env, descP, &mon);
}
@@ -13561,6 +13614,7 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env,
int toRead,
int saveErrno,
ErlNifBinary* bufP,
+ ERL_NIF_TERM sockRef,
ERL_NIF_TERM recvRef)
{
char* xres;
@@ -13595,7 +13649,7 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env,
* We must also notify any waiting readers!
*/
- recv_error_current_reader(env, descP, res);
+ recv_error_current_reader(env, descP, sockRef, res);
FREE_BIN(bufP);
@@ -13713,7 +13767,7 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env,
descP->closeLocal = FALSE;
descP->state = SOCKET_STATE_CLOSING;
- recv_error_current_reader(env, descP, res);
+ recv_error_current_reader(env, descP, sockRef, res);
SELECT(env,
descP->sock,
@@ -13751,7 +13805,7 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env,
SSDBG( descP, ("SOCKET", "recv_check_result -> [%d] errno: %d\r\n",
toRead, saveErrno) );
- recv_error_current_reader(env, descP, res);
+ recv_error_current_reader(env, descP, sockRef, res);
FREE_BIN(bufP);
@@ -13835,6 +13889,7 @@ ERL_NIF_TERM recvfrom_check_result(ErlNifEnv* env,
ErlNifBinary* bufP,
SocketAddress* fromAddrP,
unsigned int fromAddrLen,
+ ERL_NIF_TERM sockRef,
ERL_NIF_TERM recvRef)
{
char* xres;
@@ -13876,7 +13931,7 @@ ERL_NIF_TERM recvfrom_check_result(ErlNifEnv* env,
descP->closeLocal = FALSE;
descP->state = SOCKET_STATE_CLOSING;
- recv_error_current_reader(env, descP, res);
+ recv_error_current_reader(env, descP, sockRef, res);
SELECT(env,
descP->sock,
@@ -13909,7 +13964,7 @@ ERL_NIF_TERM recvfrom_check_result(ErlNifEnv* env,
("SOCKET",
"recvfrom_check_result -> errno: %d\r\n", saveErrno) );
- recv_error_current_reader(env, descP, res);
+ recv_error_current_reader(env, descP, sockRef, res);
FREE_BIN(bufP);
@@ -13962,6 +14017,7 @@ ERL_NIF_TERM recvmsg_check_result(ErlNifEnv* env,
struct msghdr* msgHdrP,
ErlNifBinary* dataBufP,
ErlNifBinary* ctrlBufP,
+ ERL_NIF_TERM sockRef,
ERL_NIF_TERM recvRef)
{
@@ -14026,7 +14082,7 @@ ERL_NIF_TERM recvmsg_check_result(ErlNifEnv* env,
descP->closeLocal = FALSE;
descP->state = SOCKET_STATE_CLOSING;
- recv_error_current_reader(env, descP, res);
+ recv_error_current_reader(env, descP, sockRef, res);
SELECT(env,
descP->sock,
@@ -14060,7 +14116,7 @@ ERL_NIF_TERM recvmsg_check_result(ErlNifEnv* env,
("SOCKET",
"recvmsg_check_result -> errno: %d\r\n", saveErrno) );
- recv_error_current_reader(env, descP, res);
+ recv_error_current_reader(env, descP, sockRef, res);
FREE_BIN(dataBufP); FREE_BIN(ctrlBufP);
@@ -16350,37 +16406,89 @@ char* send_msg_error(ErlNifEnv* env,
*/
-/* Send an (nif-) abort message to the specified process:
+/* Send an close message to the specified process:
* A message in the form:
*
- * {nif_abort, Ref, Reason}
+ * {'$socket', SockRef, close, CloseRef}
*
- * This message is for processes that are waiting in the
+ * This message is for processes that is waiting in the
+ * erlang API (close-) function for the socket to be "closed"
+ * (actually that the 'stop' callback function has been called).
+ */
+static
+char* esock_send_close_msg(ErlNifEnv* env,
+ ERL_NIF_TERM closeRef,
+ ErlNifPid* pid)
+{
+ return esock_send_socket_msg(env,
+ esock_atom_undefined,
+ esock_atom_close, closeRef, pid);
+}
+
+
+/* Send an abort message to the specified process:
+ * A message in the form:
+ *
+ * {'$socket', SockRef, abort, {RecvRef, Reason}}
+ *
+ * This message is for processes that is waiting in the
* erlang API functions for a select message.
*/
static
-char* send_msg_nif_abort(ErlNifEnv* env,
- ERL_NIF_TERM ref,
- ERL_NIF_TERM reason,
- ErlNifPid* pid)
+char* esock_send_abort_msg(ErlNifEnv* env,
+ ERL_NIF_TERM sockRef,
+ ERL_NIF_TERM recvRef,
+ ERL_NIF_TERM reason,
+ ErlNifPid* pid)
{
- ERL_NIF_TERM msg = MKT3(env, atom_nif_abort, ref, reason);
+ ERL_NIF_TERM info = MKT2(env, recvRef, reason);
- return send_msg(env, msg, pid);
+ /*
+ esock_dbg_printf("SEND MSG",
+ "try send abort message to %T:\r\n",
+ "\r\n sockRef: %T"
+ "\r\n recvRef: %T"
+ "\r\n reason: %T"
+ "\r\n", *pid, sockRef, recvRef, reason);
+ */
+
+ return esock_send_socket_msg(env, sockRef, esock_atom_abort, info, pid);
+}
+
+
+/* *** esock_send_socket_msg ***
+ *
+ * This function sends a general purpose socket message to an erlang
+ * process. A general 'socket' message has the form:
+ *
+ * {'$socket', SockRef, Tag, Info}
+ *
+ */
+
+static
+char* esock_send_socket_msg(ErlNifEnv* env,
+ ERL_NIF_TERM sockRef,
+ ERL_NIF_TERM tag,
+ ERL_NIF_TERM info,
+ ErlNifPid* pid)
+{
+ ERL_NIF_TERM msg = MKT4(env, esock_atom_socket_tag, sockRef, tag, info);
+
+ return esock_send_msg(env, msg, pid);
}
/* Send a message to the specified process.
*/
static
-char* send_msg(ErlNifEnv* env,
- ERL_NIF_TERM msg,
- ErlNifPid* pid)
+char* esock_send_msg(ErlNifEnv* env,
+ ERL_NIF_TERM msg,
+ ErlNifPid* pid)
{
- if (!enif_send(env, pid, NULL, msg))
- return str_exsend;
- else
- return NULL;
+ if (!enif_send(env, pid, NULL, msg))
+ return str_exsend;
+ else
+ return NULL;
}
@@ -17022,10 +17130,11 @@ void socket_stop(ErlNifEnv* env, void* obj, int fd, int is_direct_call)
SSDBG( descP, ("SOCKET", "socket_stop -> "
"send abort message to current writer %T\r\n",
descP->currentWriter.pid) );
- if (send_msg_nif_abort(env,
- descP->currentWriter.ref,
- atom_closed,
- &descP->currentWriter.pid) != NULL) {
+ if (esock_send_abort_msg(env,
+ esock_atom_undefined,
+ descP->currentWriter.ref,
+ atom_closed,
+ &descP->currentWriter.pid) != NULL) {
/* Shall we really do this?
* This happens if the controlling process has been killed!
*/
@@ -17061,10 +17170,11 @@ void socket_stop(ErlNifEnv* env, void* obj, int fd, int is_direct_call)
SSDBG( descP, ("SOCKET", "socket_stop -> "
"send abort message to current reader %T\r\n",
descP->currentReader.pid) );
- if (send_msg_nif_abort(env,
- descP->currentReader.ref,
- atom_closed,
- &descP->currentReader.pid) != NULL) {
+ if (esock_send_abort_msg(env,
+ esock_atom_undefined,
+ descP->currentReader.ref,
+ atom_closed,
+ &descP->currentReader.pid) != NULL) {
/* Shall we really do this?
* This happens if the controlling process has been killed!
*/
@@ -17099,10 +17209,11 @@ void socket_stop(ErlNifEnv* env, void* obj, int fd, int is_direct_call)
SSDBG( descP, ("SOCKET", "socket_stop -> "
"send abort message to current acceptor %T\r\n",
descP->currentWriter.pid) );
- if (send_msg_nif_abort(env,
- descP->currentAcceptor.ref,
- atom_closed,
- &descP->currentAcceptor.pid) != NULL) {
+ if (esock_send_abort_msg(env,
+ esock_atom_undefined,
+ descP->currentAcceptor.ref,
+ atom_closed,
+ &descP->currentAcceptor.pid) != NULL) {
/* Shall we really do this?
* This happens if the controlling process has been killed!
*/
@@ -17145,13 +17256,12 @@ void socket_stop(ErlNifEnv* env, void* obj, int fd, int is_direct_call)
* Also, we should really *always* use a tag unique to this
* (nif-) module. Some like (in this case):
*
- * {'$socket', close, CloseRef}
+ * {'$socket', undefined, close, CloseRef}
*
* </KOLLA>
*/
-
- send_msg(env,
- MKT2(env, atom_close, descP->closeRef), &descP->closerPid);
+
+ esock_send_close_msg(env, descP->closeRef, &descP->closerPid);
DEMONP("socket_stop -> closer", env, descP, &descP->closerMon);
@@ -17221,13 +17331,14 @@ void inform_waiting_procs(ErlNifEnv* env,
*/
SSDBG( descP,
- ("SOCKET", "inform_waiting_procs -> abort %T (%T)\r\n",
+ ("SOCKET", "inform_waiting_procs -> abort request %T (from %T)\r\n",
currentP->data.ref, currentP->data.pid) );
- ESOCK_ASSERT( (NULL == send_msg_nif_abort(env,
- currentP->data.ref,
- reason,
- &currentP->data.pid)) );
+ ESOCK_ASSERT( (NULL == esock_send_abort_msg(env,
+ esock_atom_undefined,
+ currentP->data.ref,
+ reason,
+ &currentP->data.pid)) );
DEMONP("inform_waiting_procs -> current 'request'",
env, descP, &currentP->data.mon);
@@ -17783,7 +17894,7 @@ int on_load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info)
atom_min = MKA(env, str_min);
atom_mode = MKA(env, str_mode);
atom_multiaddr = MKA(env, str_multiaddr);
- atom_nif_abort = MKA(env, str_nif_abort);
+ // atom_nif_abort = MKA(env, str_nif_abort);
atom_null = MKA(env, str_null);
atom_num_dinet = MKA(env, str_num_dinet);
atom_num_dinet6 = MKA(env, str_num_dinet6);
@@ -17813,6 +17924,7 @@ int on_load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info)
atom_want = MKA(env, str_want);
/* Global atom(s) */
+ esock_atom_abort = MKA(env, "abort");
esock_atom_accept = MKA(env, "accept");
esock_atom_acceptconn = MKA(env, "acceptconn");
esock_atom_acceptfilter = MKA(env, "acceptfilter");
@@ -17836,6 +17948,7 @@ int on_load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info)
esock_atom_broadcast = MKA(env, "broadcast");
esock_atom_busy_poll = MKA(env, "busy_poll");
esock_atom_checksum = MKA(env, "checksum");
+ esock_atom_close = MKA(env, "close");
esock_atom_connect = MKA(env, "connect");
esock_atom_congestion = MKA(env, "congestion");
esock_atom_context = MKA(env, "context");
@@ -17979,6 +18092,7 @@ int on_load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info)
esock_atom_sndlowat = MKA(env, "sndlowat");
esock_atom_sndtimeo = MKA(env, "sndtimeo");
esock_atom_socket = MKA(env, "socket");
+ esock_atom_socket_tag = MKA(env, "$socket");
esock_atom_spec_dst = MKA(env, "spec_dst");
esock_atom_status = MKA(env, "status");
esock_atom_stream = MKA(env, "stream");
diff --git a/erts/preloaded/ebin/socket.beam b/erts/preloaded/ebin/socket.beam
index 25046e6aad..ddd50fdefa 100644
--- a/erts/preloaded/ebin/socket.beam
+++ b/erts/preloaded/ebin/socket.beam
Binary files differ
diff --git a/erts/preloaded/src/socket.erl b/erts/preloaded/src/socket.erl
index a40692881b..2e295a91ae 100644
--- a/erts/preloaded/src/socket.erl
+++ b/erts/preloaded/src/socket.erl
@@ -1331,7 +1331,7 @@ do_accept(LSockRef, Timeout) ->
{select, LSockRef, AccRef, ready_input} ->
do_accept(LSockRef, next_timeout(TS, Timeout));
- {nif_abort, AccRef, Reason} ->
+ {'$socket', _, abort, {AccRef, Reason}} ->
{error, Reason}
after NewTimeout ->
@@ -1408,7 +1408,7 @@ do_send(SockRef, Data, EFlags, Timeout) ->
do_send(SockRef, Data, EFlags,
next_timeout(TS, Timeout));
- {nif_abort, SendRef, Reason} ->
+ {'$socket', _, abort, {SendRef, Reason}} ->
{error, Reason}
after NewTimeout ->
@@ -1421,7 +1421,7 @@ do_send(SockRef, Data, EFlags, Timeout) ->
do_send(SockRef, Data, EFlags,
next_timeout(TS, Timeout));
- {nif_abort, SendRef, Reason} ->
+ {'$socket', _, abort, {SendRef, Reason}} ->
{error, Reason}
after Timeout ->
@@ -1513,7 +1513,7 @@ do_sendto(SockRef, Data, Dest, EFlags, Timeout) ->
do_sendto(SockRef, Data, Dest, EFlags,
next_timeout(TS, Timeout));
- {nif_abort, SendRef, Reason} ->
+ {'$socket', _, abort, {SendRef, Reason}} ->
{error, Reason}
after Timeout ->
@@ -1525,7 +1525,11 @@ do_sendto(SockRef, Data, Dest, EFlags, Timeout) ->
receive
{select, SockRef, SendRef, ready_output} ->
do_sendto(SockRef, Data, Dest, EFlags,
- next_timeout(TS, Timeout))
+ next_timeout(TS, Timeout));
+
+ {'$socket', _, abort, {SendRef, Reason}} ->
+ {error, Reason}
+
after Timeout ->
cancel(SockRef, sendto, SendRef),
{error, timeout}
@@ -1773,10 +1777,9 @@ do_recv(SockRef, _OldRef, Length, EFlags, Acc, Timeout)
Bin,
next_timeout(TS, Timeout));
- {nif_abort, RecvRef, Reason} ->
+ {'$socket', _, abort, {RecvRef, Reason}} ->
{error, Reason}
-
after NewTimeout ->
cancel(SockRef, recv, RecvRef),
{error, {timeout, Acc}}
@@ -1794,10 +1797,9 @@ do_recv(SockRef, _OldRef, Length, EFlags, Acc, Timeout)
<<Acc/binary, Bin/binary>>,
next_timeout(TS, Timeout));
- {nif_abort, RecvRef, Reason} ->
+ {'$socket', _, abort, {RecvRef, Reason}} ->
{error, Reason}
-
after NewTimeout ->
cancel(SockRef, recv, RecvRef),
{error, {timeout, Acc}}
@@ -1805,6 +1807,7 @@ do_recv(SockRef, _OldRef, Length, EFlags, Acc, Timeout)
%% We return with the accumulated binary (if its non-empty)
{error, eagain} when (Length =:= 0) andalso (size(Acc) > 0) ->
+ %% CAN WE REALLY DO THIS? THE NIF HAS SELECTED!! OR?
{ok, Acc};
{error, eagain} ->
@@ -1819,7 +1822,7 @@ do_recv(SockRef, _OldRef, Length, EFlags, Acc, Timeout)
Acc,
next_timeout(TS, Timeout));
- {nif_abort, RecvRef, Reason} ->
+ {'$socket', _, abort, {RecvRef, Reason}} ->
{error, Reason}
after NewTimeout ->
@@ -1850,7 +1853,8 @@ do_recv(SockRef, RecvRef, 0 = _Length, _Eflags, Acc, _Timeout) ->
%% any waiting reader.
cancel(SockRef, recv, RecvRef),
{ok, Acc};
-do_recv(_SockRef, _RecvRef, _Length, _EFlags, Acc, _Timeout) when (size(Acc) > 0) ->
+do_recv(_SockRef, _RecvRef, _Length, _EFlags, Acc, _Timeout)
+ when (size(Acc) > 0) ->
{error, {timeout, Acc}};
do_recv(_SockRef, _RecvRef, _Length, _EFlags, _Acc, _Timeout) ->
{error, timeout}.
@@ -1957,8 +1961,7 @@ do_recvfrom(SockRef, BufSz, EFlags, Timeout) ->
do_recvfrom(SockRef, BufSz, EFlags,
next_timeout(TS, Timeout));
- {nif_abort, RecvRef, Reason} ->
- %% p("received nif-abort: ~p", [Reason]),
+ {'$socket', _, abort, {RecvRef, Reason}} ->
{error, Reason}
after NewTimeout ->
@@ -2062,7 +2065,7 @@ do_recvmsg(SockRef, BufSz, CtrlSz, EFlags, Timeout) ->
do_recvmsg(SockRef, BufSz, CtrlSz, EFlags,
next_timeout(TS, Timeout));
- {nif_abort, RecvRef, Reason} ->
+ {'$socket', _, abort, {RecvRef, Reason}} ->
{error, Reason}
after NewTimeout ->
@@ -2107,7 +2110,8 @@ do_close(SockRef) ->
{ok, CloseRef} ->
%% We must wait
receive
- {close, CloseRef} ->
+ {'$socket', _, close, CloseRef} ->
+%% {close, CloseRef} ->
%% <KOLLA>
%%
%% WHAT HAPPENS IF THIS PROCESS IS KILLED