aboutsummaryrefslogtreecommitdiffstats
path: root/erts/emulator/nifs/common/socket_nif.c
diff options
context:
space:
mode:
authorMicael Karlberg <[email protected]>2018-06-29 18:23:55 +0200
committerMicael Karlberg <[email protected]>2018-09-18 14:50:18 +0200
commit24be0729fe3a1ccfd5f0713b565463d6557d8aa7 (patch)
tree2243a51ada05a3ddeacd443ed6e49262cf79766c /erts/emulator/nifs/common/socket_nif.c
parentb09136301525b0717e897ec0864c3d2ea7708758 (diff)
downloadotp-24be0729fe3a1ccfd5f0713b565463d6557d8aa7.tar.gz
otp-24be0729fe3a1ccfd5f0713b565463d6557d8aa7.tar.bz2
otp-24be0729fe3a1ccfd5f0713b565463d6557d8aa7.zip
[socket-nif] Fixed (stream) recv
Fixed handling of closed in the recv function. We still need to properly handle when we get 0 bytes of data for other types ock sockets then stream (its valid for dgram for instance). OTP-14831
Diffstat (limited to 'erts/emulator/nifs/common/socket_nif.c')
-rw-r--r--erts/emulator/nifs/common/socket_nif.c695
1 files changed, 532 insertions, 163 deletions
diff --git a/erts/emulator/nifs/common/socket_nif.c b/erts/emulator/nifs/common/socket_nif.c
index 7f45fb7bcd..027155fc92 100644
--- a/erts/emulator/nifs/common/socket_nif.c
+++ b/erts/emulator/nifs/common/socket_nif.c
@@ -182,8 +182,8 @@
/* Debug stuff... */
-#define SOCKET_NIF_DEBUG_DEFAULT TRUE
-#define SOCKET_DEBUG_DEFAULT TRUE
+#define SOCKET_NIF_DEBUG_DEFAULT FALSE
+#define SOCKET_DEBUG_DEFAULT FALSE
/* Counters and stuff (Don't know where to sent this stuff anyway) */
#define SOCKET_NIF_IOW_DEFAULT FALSE
@@ -344,6 +344,7 @@ typedef union {
#define SOCKET_OPT_OTP_DEBUG 0
#define SOCKET_OPT_OTP_IOW 1
+#define SOCKET_OPT_OTP_CTRL_PROC 2
#define SOCKET_OPT_SOCK_BROADCAST 4
#define SOCKET_OPT_SOCK_DONTROUTE 7
@@ -594,16 +595,16 @@ typedef struct {
ERL_NIF_TERM version;
ERL_NIF_TERM buildDate;
BOOLEAN_T dbg;
- BOOLEAN_T iow;
+ BOOLEAN_T iow;
ErlNifMutex* cntMtx;
uint32_t numSockets;
- uint32_t numTypeDGrams;
uint32_t numTypeStreams;
+ uint32_t numTypeDGrams;
uint32_t numTypeSeqPkgs;
- uint32_t numDomainLocal;
uint32_t numDomainInet;
uint32_t numDomainInet6;
+ uint32_t numDomainLocal;
uint32_t numProtoIP;
uint32_t numProtoTCP;
uint32_t numProtoUDP;
@@ -751,6 +752,9 @@ static ERL_NIF_TERM nsetopt_otp_debug(ErlNifEnv* env,
static ERL_NIF_TERM nsetopt_otp_iow(ErlNifEnv* env,
SocketDescriptor* descP,
ERL_NIF_TERM eVal);
+static ERL_NIF_TERM nsetopt_otp_ctrl_proc(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ ERL_NIF_TERM eVal);
static ERL_NIF_TERM nsetopt_native(ErlNifEnv* env,
SocketDescriptor* descP,
int level,
@@ -1045,11 +1049,13 @@ static ERL_NIF_TERM send_check_result(ErlNifEnv* env,
SocketDescriptor* descP,
ssize_t written,
ssize_t dataSize,
+ int saveErrno,
ERL_NIF_TERM sendRef);
static ERL_NIF_TERM recv_check_result(ErlNifEnv* env,
SocketDescriptor* descP,
int read,
int toRead,
+ int saveErrno,
ErlNifBinary* bufP,
ERL_NIF_TERM recvRef);
static ERL_NIF_TERM recvfrom_check_result(ErlNifEnv* env,
@@ -1214,6 +1220,11 @@ static BOOLEAN_T restore_network_namespace(int ns, SOCKET sock, int* err);
#endif
static BOOLEAN_T cnt_inc(uint32_t* cnt, uint32_t inc);
+static void cnt_dec(uint32_t* cnt, uint32_t dec);
+
+static void inc_socket(int domain, int type, int protocol);
+static void dec_socket(int domain, int type, int protocol);
+
/*
#if defined(HAVE_SYS_UN_H) || defined(SO_BINDTODEVICE)
@@ -1698,12 +1709,13 @@ ERL_NIF_TERM nopen(ErlNifEnv* env,
*
* </KOLLA>
*/
- SELECT(env,
- event,
- (ERL_NIF_SELECT_READ),
- descP, NULL, esock_atom_undefined);
+ SELECT(env,
+ event,
+ (ERL_NIF_SELECT_READ),
+ descP, NULL, esock_atom_undefined);
#endif
+ inc_socket(domain, type, protocol);
return esock_make_ok2(env, res);
}
@@ -2515,21 +2527,31 @@ ERL_NIF_TERM nif_send(ErlNifEnv* env,
{
SocketDescriptor* descP;
ERL_NIF_TERM sendRef;
- ErlNifBinary data;
+ ErlNifBinary sndData;
unsigned int eflags;
int flags;
ERL_NIF_TERM res;
+ SGDBG( ("SOCKET", "nif_send -> entry with argc: %d\r\n", argc) );
+
/* Extract arguments and perform preliminary validation */
if ((argc != 4) ||
!enif_get_resource(env, argv[0], sockets, (void**) &descP) ||
- !GET_BIN(env, argv[2], &data) ||
+ !GET_BIN(env, argv[2], &sndData) ||
!GET_UINT(env, argv[3], &eflags)) {
return enif_make_badarg(env);
}
sendRef = argv[1];
+ SSDBG( descP,
+ ("SOCKET", "nif_send -> args when sock = %d:"
+ "\r\n Socket: %T"
+ "\r\n SendRef: %T"
+ "\r\n Size of data: %d"
+ "\r\n eFlags: %d"
+ "\r\n", descP->sock, argv[0], sendRef, sndData.size, eflags) );
+
if (!IS_CONNECTED(descP))
return esock_make_error(env, atom_enotconn);
@@ -2551,7 +2573,7 @@ ERL_NIF_TERM nif_send(ErlNifEnv* env,
* time we do.
*/
- res = nsend(env, descP, sendRef, &data, flags);
+ res = nsend(env, descP, sendRef, &sndData, flags);
MUNLOCK(descP->writeMtx);
@@ -2570,9 +2592,10 @@ static
ERL_NIF_TERM nsend(ErlNifEnv* env,
SocketDescriptor* descP,
ERL_NIF_TERM sendRef,
- ErlNifBinary* dataP,
+ ErlNifBinary* sndDataP,
int flags)
{
+ int save_errno;
ssize_t written;
if (!descP->isWritable)
@@ -2583,9 +2606,15 @@ ERL_NIF_TERM nsend(ErlNifEnv* env,
*/
cnt_inc(&descP->writeTries, 1);
- written = sock_send(descP->sock, dataP->data, dataP->size, flags);
+ written = sock_send(descP->sock, sndDataP->data, sndDataP->size, flags);
+ if (IS_SOCKET_ERROR(written))
+ save_errno = sock_errno();
+ else
+ save_errno = -1; // The value does not actually matter in this case
+
- return send_check_result(env, descP, written, dataP->size, sendRef);
+ return send_check_result(env, descP,
+ written, sndDataP->size, save_errno, sendRef);
}
@@ -2655,6 +2684,7 @@ ERL_NIF_TERM nsendto(ErlNifEnv* env,
SocketAddress* toAddrP,
unsigned int toAddrLen)
{
+ int save_errno;
ssize_t written;
if (!descP->isWritable)
@@ -2674,8 +2704,12 @@ ERL_NIF_TERM nsendto(ErlNifEnv* env,
dataP->data, dataP->size, flags,
NULL, 0);
}
+ if (IS_SOCKET_ERROR(written))
+ save_errno = sock_errno();
+ else
+ save_errno = -1; // The value does not actually matter in this case
- return send_check_result(env, descP, written, dataP->size, sendRef);
+ return send_check_result(env, descP, written, dataP->size, save_errno, sendRef);
}
@@ -2829,6 +2863,13 @@ ERL_NIF_TERM nrecv(ErlNifEnv* env,
{
ssize_t read;
ErlNifBinary buf;
+ int save_errno;
+ int bufSz = (len ? len : descP->rBufSz);
+
+ SSDBG( descP, ("SOCKET", "nrecv -> entry with"
+ "\r\n len: %d (%d)"
+ "\r\n flags: %d"
+ "\r\n", len, bufSz, flags) );
if (!descP->isReadable)
return enif_make_badarg(env);
@@ -2837,7 +2878,7 @@ ERL_NIF_TERM nrecv(ErlNifEnv* env,
* Either as much as we want to read or (if zero (0)) use the "default"
* size (what has been configured).
*/
- if (!ALLOC_BIN((len ? len : descP->rBufSz), &buf))
+ if (!ALLOC_BIN(bufSz, &buf))
return esock_make_error(env, atom_exalloc);
/* We ignore the wrap for the moment.
@@ -2845,10 +2886,19 @@ ERL_NIF_TERM nrecv(ErlNifEnv* env,
*/
cnt_inc(&descP->readTries, 1);
+ // If it fails (read = -1), we need errno...
+ SSDBG( descP, ("SOCKET", "nrecv -> try read (%d)\r\n", buf.size) );
read = sock_recv(descP->sock, buf.data, buf.size, flags);
+ if (IS_SOCKET_ERROR(read))
+ save_errno = sock_errno();
+ else
+ save_errno = -1; // The value does not actually matter in this case
+
+ SSDBG( descP, ("SOCKET", "nrecv -> read: %d (%d)\r\n", read, save_errno) );
return recv_check_result(env, descP,
read, len,
+ save_errno,
&buf,
recvRef);
}
@@ -3003,6 +3053,11 @@ ERL_NIF_TERM nclose(ErlNifEnv* env,
ERL_NIF_TERM reply, reason;
BOOLEAN_T doClose;
int selectRes;
+ int domain = descP->domain;
+ int type = descP->type;
+ int protocol = descP->protocol;
+
+ SSDBG( descP, ("SOCKET", "nclose -> [%d] entry\r\n", descP->sock) );
MLOCK(descP->closeMtx);
@@ -3049,10 +3104,16 @@ ERL_NIF_TERM nclose(ErlNifEnv* env,
descP, NULL, descP->closeRef);
if (selectRes & ERL_NIF_SELECT_STOP_CALLED) {
/* Prep done - inform the caller it can finalize (close) directly */
+ SSDBG( descP,
+ ("SOCKET", "nclose -> [%d] stop called\r\n", descP->sock) );
+ dec_socket(domain, type, protocol);
reply = esock_atom_ok;
} else if (selectRes & ERL_NIF_SELECT_STOP_SCHEDULED) {
/* The stop callback function has been *scheduled* which means that we
* have to wait for it to complete. */
+ SSDBG( descP,
+ ("SOCKET", "nclose -> [%d] stop scheduled\r\n", descP->sock) );
+ dec_socket(domain, type, protocol); // SHALL WE DO THIS AT finalize?
reply = esock_make_ok2(env, descP->closeRef);
} else {
/* <KOLLA>
@@ -3071,6 +3132,11 @@ ERL_NIF_TERM nclose(ErlNifEnv* env,
reply = esock_make_error(env, reason);
}
+ SSDBG( descP,
+ ("SOCKET", "nclose -> [%d] done when: "
+ "\r\n reply: %T"
+ "\r\n", descP->sock, reply) );
+
return reply;
}
@@ -3081,7 +3147,7 @@ ERL_NIF_TERM nclose(ErlNifEnv* env,
*
* Description:
* Perform the actual socket close!
- * Note that this function is executed in a dirfty scheduler.
+ * Note that this function is executed in a dirty scheduler.
*
* Arguments:
* Socket (ref) - Points to the socket descriptor.
@@ -3249,6 +3315,10 @@ ERL_NIF_TERM nif_setopt(ErlNifEnv* env,
ERL_NIF_TERM eVal;
BOOLEAN_T isEncoded, isOTP;
+ SGDBG( ("SOCKET", "nif_setopt -> entry with argc: %d\r\n", argc) );
+
+ /* Extract arguments and perform preliminary validation */
+
if ((argc != 5) ||
!enif_get_resource(env, argv[0], sockets, (void**) &descP) ||
!GET_INT(env, argv[2], &eLevel) ||
@@ -3263,6 +3333,19 @@ ERL_NIF_TERM nif_setopt(ErlNifEnv* env,
if (!elevel2level(isEncoded, eLevel, &isOTP, &level))
return esock_make_error(env, esock_atom_einval);
+ SSDBG( descP,
+ ("SOCKET", "nif_setopt -> args when sock = %d:"
+ "\r\n Socket: %T"
+ "\r\n Encoded: %T (%d)"
+ "\r\n Level: %d (%d)"
+ "\r\n Opt: %d"
+ "\r\n Value: %T"
+ "\r\n",
+ descP->sock, argv[0],
+ eIsEncoded, isEncoded,
+ eLevel, level,
+ eOpt, eVal) );
+
return nsetopt(env, descP, isEncoded, isOTP, level, eOpt, eVal);
}
@@ -3304,6 +3387,12 @@ ERL_NIF_TERM nsetopt_otp(ErlNifEnv* env,
{
ERL_NIF_TERM result;
+ SSDBG( descP,
+ ("SOCKET", "nsetopt_otp -> entry with"
+ "\r\n eOpt: %d"
+ "\r\n eVal: %T"
+ "\r\n", eOpt, eVal) );
+
switch (eOpt) {
case SOCKET_OPT_OTP_DEBUG:
result = nsetopt_otp_debug(env, descP, eVal);
@@ -3313,6 +3402,10 @@ ERL_NIF_TERM nsetopt_otp(ErlNifEnv* env,
result = nsetopt_otp_iow(env, descP, eVal);
break;
+ case SOCKET_OPT_OTP_CTRL_PROC:
+ result = nsetopt_otp_ctrl_proc(env, descP, eVal);
+ break;
+
default:
result = esock_make_error(env, esock_atom_einval);
break;
@@ -3349,6 +3442,48 @@ ERL_NIF_TERM nsetopt_otp_iow(ErlNifEnv* env,
+/* nsetopt_otp_ctrl_proc - Handle the OTP (level) controlling_process options
+ */
+static
+ERL_NIF_TERM nsetopt_otp_ctrl_proc(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ ERL_NIF_TERM eVal)
+{
+ ErlNifPid newCtrlPid;
+ ErlNifMonitor newCtrlMon;
+ int xres;
+
+ SSDBG( descP,
+ ("SOCKET", "nsetopt_otp_ctrl_proc -> entry with"
+ "\r\n eVal: %T"
+ "\r\n", eVal) );
+
+ if (!GET_LPID(env, eVal, &newCtrlPid)) {
+ esock_warning_msg("Failed get pid of new controlling process\r\n");
+ return esock_make_error(env, esock_atom_einval);
+ }
+
+ if ((xres = MONP(env, descP, &newCtrlPid, &newCtrlMon)) != 0) {
+ esock_warning_msg("Failed monitor %d) (new) controlling process\r\n", xres);
+ return esock_make_error(env, esock_atom_einval);
+ }
+
+ if ((xres = DEMONP(env, descP, &descP->ctrlMon)) != 0) {
+ esock_warning_msg("Failed demonitor (%d) "
+ "old controlling process %T (%T)\r\n",
+ xres, descP->ctrlPid, descP->ctrlMon);
+ }
+
+ descP->ctrlPid = newCtrlPid;
+ descP->ctrlMon = newCtrlMon;
+
+ SSDBG( descP, ("SOCKET", "nsetopt_otp_ctrl_proc -> done\r\n") );
+
+ return esock_atom_ok;
+}
+
+
+
/* The option has *not* been encoded. Instead it has been provided
* in "native mode" (option is provided as is and value as a binary).
*/
@@ -3362,6 +3497,12 @@ ERL_NIF_TERM nsetopt_native(ErlNifEnv* env,
ErlNifBinary val;
ERL_NIF_TERM result;
+ SSDBG( descP,
+ ("SOCKET", "nsetopt_native -> entry with"
+ "\r\n opt: %d"
+ "\r\n eVal: %T"
+ "\r\n", opt, eVal) );
+
if (GET_BIN(env, eVal, &val)) {
int res = socket_setopt(descP->sock, level, opt,
val.data, val.size);
@@ -3389,6 +3530,11 @@ ERL_NIF_TERM nsetopt_level(ErlNifEnv* env,
{
ERL_NIF_TERM result;
+ SSDBG( descP,
+ ("SOCKET", "nsetopt_level -> entry with"
+ "\r\n level: %d"
+ "\r\n", level) );
+
switch (level) {
case SOL_SOCKET:
result = nsetopt_lvl_socket(env, descP, eOpt, eVal);
@@ -5081,30 +5227,44 @@ ERL_NIF_TERM send_check_result(ErlNifEnv* env,
SocketDescriptor* descP,
ssize_t written,
ssize_t dataSize,
+ int saveErrno,
ERL_NIF_TERM sendRef)
{
+ SSDBG( descP,
+ ("SOCKET", "send_check_result -> entry with"
+ "\r\n written: %d"
+ "\r\n dataSize: %d"
+ "\r\n saveErrno: %d"
+ "\r\n", written, dataSize, saveErrno) );
+
if (written == dataSize) {
cnt_inc(&descP->writePkgCnt, 1);
cnt_inc(&descP->writeByteCnt, written);
+ SSDBG( descP,
+ ("SOCKET", "send_check_result -> everything written - done\r\n") );
+
return esock_atom_ok;
} else if (written < 0) {
/* Ouch, check what kind of failure */
- int save_errno = sock_errno();
- if ((save_errno != EAGAIN) &&
- (save_errno != EINTR)) {
+ if ((saveErrno != EAGAIN) && (saveErrno != EINTR)) {
cnt_inc(&descP->writeFails, 1);
- return esock_make_error_errno(env, save_errno);
+ SSDBG( descP,
+ ("SOCKET", "send_check_result -> error: %d\r\n", saveErrno) );
+
+ return esock_make_error_errno(env, saveErrno);
} else {
/* Ok, try again later */
+ SSDBG( descP, ("SOCKET", "send_check_result -> try again\r\n") );
+
/* <KOLLA>
* SHOULD RESULT IN {error, eagain}!!!!
* </KOLLA>
@@ -5124,6 +5284,9 @@ ERL_NIF_TERM send_check_result(ErlNifEnv* env,
SELECT(env, descP->sock, (ERL_NIF_SELECT_WRITE),
descP, NULL, sendRef);
+ SSDBG( descP,
+ ("SOCKET", "send_check_result -> not entire package written\r\n") );
+
return esock_make_ok2(env, enif_make_int(env, written));
}
@@ -5134,11 +5297,42 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env,
SocketDescriptor* descP,
int read,
int toRead,
+ int saveErrno,
ErlNifBinary* bufP,
ERL_NIF_TERM recvRef)
{
ERL_NIF_TERM data;
+ SSDBG( descP,
+ ("SOCKET", "recv_check_result -> entry with"
+ "\r\n read: %d"
+ "\r\n toRead: %d"
+ "\r\n saveErrno: %d"
+ "\r\n recvRef: %T"
+ "\r\n", read, toRead, saveErrno, recvRef) );
+
+
+ /* <KOLLA>
+ *
+ * We need to handle read = 0 for other type(s) (DGRAM) when
+ * its actually valid to read 0 bytes.
+ *
+ * </KOLLA>
+ */
+
+ if ((read == 0) && (descP->type == SOCK_STREAM)) {
+
+ /*
+ * When a stream socket peer has performed an orderly shutdown, the return
+ * value will be 0 (the traditional "end-of-file" return).
+ *
+ * *We* do never actually try to read 0 bytes from a stream socket!
+ */
+
+ return esock_make_error(env, atom_closed);
+
+ }
+
/* There is a special case: If the provided 'to read' value is
* zero (0). That means that we reads as much as we can, using
* the default read buffer size.
@@ -5148,6 +5342,10 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env,
/* +++ We filled the buffer +++ */
+ SSDBG( descP,
+ ("SOCKET",
+ "recv_check_result -> [%d] filled the buffer\r\n", toRead) );
+
if (toRead == 0) {
/* +++ Give us everything you have got => needs to continue +++ */
@@ -5168,6 +5366,11 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env,
data = MKBIN(env, bufP);
+ SSDBG( descP,
+ ("SOCKET",
+ "recv_check_result -> [%d] "
+ "we are done for now - read more\r\n", toRead) );
+
return esock_make_ok3(env, atom_false, data);
} else {
@@ -5181,6 +5384,11 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env,
data = MKBIN(env, bufP);
+ SSDBG( descP,
+ ("SOCKET",
+ "recv_check_result -> [%d] "
+ "we got exactly what we could fit\r\n", toRead) );
+
return esock_make_ok3(env, atom_true, data);
}
@@ -5189,12 +5397,13 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env,
/* +++ Error handling +++ */
- int save_errno = sock_errno();
-
- if (save_errno == ECONNRESET) {
+ if (saveErrno == ECONNRESET) {
/* +++ Oups - closed +++ */
+ SSDBG( descP, ("SOCKET",
+ "recv_check_result -> [%d] closed\r\n", toRead) );
+
/* <KOLLA>
*
* IF THE CURRENT PROCESS IS *NOT* THE CONTROLLING
@@ -5220,17 +5429,31 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env,
return esock_make_error(env, atom_closed);
- } else if ((save_errno == ERRNO_BLOCK) ||
- (save_errno == EAGAIN)) {
+ } else if ((saveErrno == ERRNO_BLOCK) ||
+ (saveErrno == EAGAIN)) {
+ SSDBG( descP, ("SOCKET",
+ "recv_check_result -> [%d] eagain\r\n", toRead) );
+
+ SELECT(env, descP->sock, (ERL_NIF_SELECT_READ),
+ descP, NULL, recvRef);
+
return esock_make_error(env, esock_atom_eagain);
} else {
- return esock_make_error_errno(env, save_errno);
+ SSDBG( descP, ("SOCKET", "recv_check_result -> [%d] errno: %d\r\n",
+ toRead, saveErrno) );
+ return esock_make_error_errno(env, saveErrno);
}
} else {
/* +++ We did not fill the buffer +++ */
+ SSDBG( descP,
+ ("SOCKET",
+ "recv_check_result -> [%d] "
+ "did not fill the buffer (%d of %d)\r\n",
+ toRead, read, bufP->size) );
+
if (toRead == 0) {
/* +++ We got a chunk of data but +++
@@ -5239,9 +5462,14 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env,
* +++ into a sub-binary. +++
*/
+ SSDBG( descP, ("SOCKET",
+ "recv_check_result -> [%d] split buffer\r\n", toRead) );
+
data = MKBIN(env, bufP);
data = MKSBIN(env, data, 0, read);
+ SSDBG( descP, ("SOCKET", "recv_check_result -> [%d] done\r\n", toRead) );
+
return esock_make_ok3(env, atom_true, data);
} else {
@@ -5249,6 +5477,9 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env,
/* +++ We got only a part of what was expected +++
* +++ => receive more later. +++ */
+ SSDBG( descP, ("SOCKET", "recv_check_result -> [%d] "
+ "only part of message - expect more\r\n", toRead) );
+
return esock_make_ok3(env, atom_false, MKBIN(env, bufP));
}
}
@@ -6343,6 +6574,90 @@ SocketDescriptor* alloc_descriptor(SOCKET sock, HANDLE event)
}
+
+/* decrement counters for when a socket is closed */
+static
+void dec_socket(int domain, int type, int protocol)
+{
+ MLOCK(data.cntMtx);
+
+ cnt_dec(&data.numSockets, 1);
+
+ if (domain == AF_INET)
+ cnt_dec(&data.numDomainInet, 1);
+#if defined(HAVE_IN6) && defined(AF_INET6)
+ else if (domain == AF_INET6)
+ cnt_dec(&data.numDomainInet6, 1);
+#endif
+#if defined(HAVE_SYS_UN_H)
+ else if (domain == AF_UNIX)
+ cnt_dec(&data.numDomainInet6, 1);
+#endif
+
+ if (type == SOCK_STREAM)
+ cnt_dec(&data.numTypeStreams, 1);
+ else if (type == SOCK_DGRAM)
+ cnt_dec(&data.numTypeDGrams, 1);
+ else if (type == SOCK_SEQPACKET)
+ cnt_dec(&data.numTypeSeqPkgs, 1);
+
+ if (protocol == IPPROTO_IP)
+ cnt_dec(&data.numProtoIP, 1);
+ else if (protocol == IPPROTO_TCP)
+ cnt_dec(&data.numProtoTCP, 1);
+ else if (protocol == IPPROTO_UDP)
+ cnt_dec(&data.numProtoUDP, 1);
+#if defined(HAVE_SCTP)
+ else if (protocol == IPPROTO_SCTP)
+ cnt_dec(&data.numProtoSCTP, 1);
+#endif
+
+ MUNLOCK(data.cntMtx);
+}
+
+
+/* increment counters for when a socket is opened */
+static
+void inc_socket(int domain, int type, int protocol)
+{
+ MLOCK(data.cntMtx);
+
+ cnt_inc(&data.numSockets, 1);
+
+ if (domain == AF_INET)
+ cnt_inc(&data.numDomainInet, 1);
+#if defined(HAVE_IN6) && defined(AF_INET6)
+ else if (domain == AF_INET6)
+ cnt_inc(&data.numDomainInet6, 1);
+#endif
+#if defined(HAVE_SYS_UN_H)
+ else if (domain == AF_UNIX)
+ cnt_inc(&data.numDomainInet6, 1);
+#endif
+
+ if (type == SOCK_STREAM)
+ cnt_inc(&data.numTypeStreams, 1);
+ else if (type == SOCK_DGRAM)
+ cnt_inc(&data.numTypeDGrams, 1);
+ else if (type == SOCK_SEQPACKET)
+ cnt_inc(&data.numTypeSeqPkgs, 1);
+
+ if (protocol == IPPROTO_IP)
+ cnt_inc(&data.numProtoIP, 1);
+ else if (protocol == IPPROTO_TCP)
+ cnt_inc(&data.numProtoTCP, 1);
+ else if (protocol == IPPROTO_UDP)
+ cnt_inc(&data.numProtoUDP, 1);
+#if defined(HAVE_SCTP)
+ else if (protocol == IPPROTO_SCTP)
+ cnt_inc(&data.numProtoSCTP, 1);
+#endif
+
+ MUNLOCK(data.cntMtx);
+}
+
+
+
/* compare_pids - Test if two pids are equal
*
*/
@@ -6446,9 +6761,11 @@ BOOLEAN_T eproto2proto(int eproto, int* proto)
*proto = IPPROTO_UDP;
break;
+#if defined(HAVE_SCTP)
case SOCKET_PROTOCOL_SCTP:
*proto = IPPROTO_SCTP;
break;
+#endif
default:
return FALSE;
@@ -6519,35 +6836,42 @@ BOOLEAN_T emap2netns(ErlNifEnv* env, ERL_NIF_TERM map, char** netns)
* send flags.
*/
static
-BOOLEAN_T esendflags2sendflags(unsigned int esendflags, int* sendflags)
+BOOLEAN_T esendflags2sendflags(unsigned int eflags, int* flags)
{
unsigned int ef;
int tmp = 0;
for (ef = SOCKET_SEND_FLAG_LOW; ef <= SOCKET_SEND_FLAG_HIGH; ef++) {
+
switch (ef) {
case SOCKET_SEND_FLAG_CONFIRM:
- tmp |= MSG_CONFIRM;
+ if ((1 << SOCKET_SEND_FLAG_CONFIRM) & eflags)
+ tmp |= MSG_CONFIRM;
break;
case SOCKET_SEND_FLAG_DONTROUTE:
- tmp |= MSG_DONTROUTE;
+ if ((1 << SOCKET_SEND_FLAG_DONTROUTE) & eflags)
+ tmp |= MSG_DONTROUTE;
break;
case SOCKET_SEND_FLAG_EOR:
- tmp |= MSG_EOR;
+ if ((1 << SOCKET_SEND_FLAG_EOR) & eflags)
+ tmp |= MSG_EOR;
break;
case SOCKET_SEND_FLAG_MORE:
- tmp |= MSG_MORE;
+ if ((1 << SOCKET_SEND_FLAG_MORE) & eflags)
+ tmp |= MSG_MORE;
break;
case SOCKET_SEND_FLAG_NOSIGNAL:
- tmp |= MSG_NOSIGNAL;
+ if ((1 << SOCKET_SEND_FLAG_NOSIGNAL) & eflags)
+ tmp |= MSG_NOSIGNAL;
break;
case SOCKET_SEND_FLAG_OOB:
- tmp |= MSG_OOB;
+ if ((1 << SOCKET_SEND_FLAG_OOB) & eflags)
+ tmp |= MSG_OOB;
break;
default:
@@ -6556,7 +6880,7 @@ BOOLEAN_T esendflags2sendflags(unsigned int esendflags, int* sendflags)
}
- *sendflags = tmp;
+ *flags = tmp;
return TRUE;
}
@@ -6567,31 +6891,53 @@ BOOLEAN_T esendflags2sendflags(unsigned int esendflags, int* sendflags)
* send flags.
*/
static
-BOOLEAN_T erecvflags2recvflags(unsigned int erecvflags, int* recvflags)
+BOOLEAN_T erecvflags2recvflags(unsigned int eflags, int* flags)
{
unsigned int ef;
int tmp = 0;
+ SGDBG( ("SOCKET", "erecvflags2recvflags -> entry with"
+ "\r\n eflags: %d"
+ "\r\n", eflags) );
+
for (ef = SOCKET_RECV_FLAG_LOW; ef <= SOCKET_RECV_FLAG_HIGH; ef++) {
+
+ SGDBG( ("SOCKET", "erecvflags2recvflags -> iteration"
+ "\r\n ef: %d"
+ "\r\n tmp: %d"
+ "\r\n", ef, tmp) );
+
switch (ef) {
case SOCKET_RECV_FLAG_CMSG_CLOEXEC:
- tmp |= MSG_CMSG_CLOEXEC;
+ if ((1 << SOCKET_RECV_FLAG_CMSG_CLOEXEC) & eflags)
+ tmp |= MSG_CMSG_CLOEXEC;
break;
case SOCKET_RECV_FLAG_ERRQUEUE:
- tmp |= MSG_ERRQUEUE;
+ if ((1 << SOCKET_RECV_FLAG_ERRQUEUE) & eflags)
+ tmp |= MSG_ERRQUEUE;
break;
case SOCKET_RECV_FLAG_OOB:
- tmp |= MSG_OOB;
+ if ((1 << SOCKET_RECV_FLAG_OOB) & eflags)
+ tmp |= MSG_OOB;
break;
+ /*
+ * <KOLLA>
+ *
+ * We need to handle this, because it may effect the read algorithm
+ *
+ * </KOLLA>
+ */
case SOCKET_RECV_FLAG_PEEK:
- tmp |= MSG_PEEK;
+ if ((1 << SOCKET_RECV_FLAG_PEEK) & eflags)
+ tmp |= MSG_PEEK;
break;
case SOCKET_RECV_FLAG_TRUNC:
- tmp |= MSG_TRUNC;
+ if ((1 << SOCKET_RECV_FLAG_TRUNC) & eflags)
+ tmp |= MSG_TRUNC;
break;
default:
@@ -6600,7 +6946,7 @@ BOOLEAN_T erecvflags2recvflags(unsigned int erecvflags, int* recvflags)
}
- *recvflags = tmp;
+ *flags = tmp;
return TRUE;
}
@@ -7117,19 +7463,33 @@ char* send_msg(ErlNifEnv* env,
static
BOOLEAN_T cnt_inc(uint32_t* cnt, uint32_t inc)
{
- BOOLEAN_T wrap;
- uint32_t max = 0xFFFFFFFF;
- uint32_t current = *cnt;
+ BOOLEAN_T wrap;
+ uint32_t max = 0xFFFFFFFF;
+ uint32_t current = *cnt;
- if ((max - inc) >= current) {
- *cnt += inc;
- wrap = FALSE;
- } else {
- *cnt = inc - (max - current) - 1;
- wrap = TRUE;
- }
+ if ((max - inc) >= current) {
+ *cnt += inc;
+ wrap = FALSE;
+ } else {
+ *cnt = inc - (max - current) - 1;
+ wrap = TRUE;
+ }
+
+ return (wrap);
+}
+
+
+static
+void cnt_dec(uint32_t* cnt, uint32_t dec)
+{
+ uint32_t current = *cnt;
+
+ if (dec > current)
+ *cnt = 0; // The counter cannot be < 0 so this is the best we can do...
+ else
+ *cnt -= dec;
- return (wrap);
+ return;
}
@@ -7183,116 +7543,125 @@ void socket_dtor(ErlNifEnv* env, void* obj)
static
void socket_stop(ErlNifEnv* env, void* obj, int fd, int is_direct_call)
{
- SocketDescriptor* descP = (SocketDescriptor*) obj;
-
- MLOCK(descP->writeMtx);
- MLOCK(descP->readMtx);
- MLOCK(descP->accMtx);
- MLOCK(descP->closeMtx);
-
-
- descP->state = SOCKET_STATE_CLOSING; // Just in case...???
- descP->isReadable = FALSE;
- descP->isWritable = FALSE;
-
-
- /* We should check that we actually have a monitor.
- * This *should* be done with a "NULL" monitor value,
- * which there currently is none...
- */
- DEMONP(env, descP, &descP->ctrlMon);
-
- if (descP->currentWriterP != NULL) {
- /* We have a (current) writer and *may* therefor also have
- * writers waiting.
- */
-
- ESOCK_ASSERT( (NULL == send_msg_nif_abort(env,
- descP->currentWriter.ref,
- atom_closed,
- &descP->currentWriter.pid)) );
-
- /* And also deal with the waiting writers (in the same way) */
- inform_waiting_procs(env, descP, &descP->writersQ, TRUE, atom_closed);
- }
-
- if (descP->currentReaderP != NULL) {
-
- /* We have a (current) reader and *may* therefor also have
- * readers waiting.
- */
-
- ESOCK_ASSERT( (NULL == send_msg_nif_abort(env,
- descP->currentReader.ref,
- atom_closed,
- &descP->currentReader.pid)) );
-
- /* And also deal with the waiting readers (in the same way) */
- inform_waiting_procs(env, descP, &descP->readersQ, TRUE, atom_closed);
- }
-
- if (descP->currentAcceptorP != NULL) {
- /* We have a (current) acceptor and *may* therefor also have
- * acceptors waiting.
- */
-
- ESOCK_ASSERT( (NULL == send_msg_nif_abort(env,
- descP->currentAcceptor.ref,
- atom_closed,
- &descP->currentAcceptor.pid)) );
-
- /* And also deal with the waiting acceptors (in the same way) */
- inform_waiting_procs(env, descP, &descP->acceptorsQ, TRUE, atom_closed);
- }
-
-
- if (descP->sock != INVALID_SOCKET) {
-
- /*
- * <KOLLA>
- *
- * WE NEED TO CHECK IF THIS OPERATION IS TRIGGERED
- * LOCALLY (VIA A CALL TO CLOSE) OR REMOTELLY
- * (VIA I.E. ECONSRESET).
- *
- * </KOLLA>
- */
-
- if (descP->closeLocal) {
-
- /* +++ send close message to the waiting process +++
- *
- * {close, CloseRef}
- *
- * <KOLLA>
- *
- * WHAT HAPPENS IF THE RECEIVER HAS DIED IN THE MEANTIME????
- *
- * </KOLLA>
- */
-
- send_msg(env, MKT2(env, atom_close, descP->closeRef), &descP->closerPid);
-
- DEMONP(env, descP, &descP->closerMon);
-
- } else {
-
- /*
- * <KOLLA>
- *
- * ABORT?
- *
- * </KOLLA>
- */
- }
- }
-
-
- MUNLOCK(descP->closeMtx);
- MUNLOCK(descP->accMtx);
- MUNLOCK(descP->readMtx);
- MUNLOCK(descP->writeMtx);
+ SocketDescriptor* descP = (SocketDescriptor*) obj;
+
+ SSDBG( descP,
+ ("SOCKET", "socket_stop -> entry when"
+ "\r\n sock: %d (%d)"
+ "\r\n is_direct_call: %d"
+ "\r\n", descP->sock, fd, is_direct_call) );
+
+ MLOCK(descP->writeMtx);
+ MLOCK(descP->readMtx);
+ MLOCK(descP->accMtx);
+ MLOCK(descP->closeMtx);
+
+
+ descP->state = SOCKET_STATE_CLOSING; // Just in case...???
+ descP->isReadable = FALSE;
+ descP->isWritable = FALSE;
+
+
+ /* We should check that we actually have a monitor.
+ * This *should* be done with a "NULL" monitor value,
+ * which there currently is none...
+ */
+ DEMONP(env, descP, &descP->ctrlMon);
+
+ if (descP->currentWriterP != NULL) {
+ /* We have a (current) writer and *may* therefor also have
+ * writers waiting.
+ */
+
+ ESOCK_ASSERT( (NULL == send_msg_nif_abort(env,
+ descP->currentWriter.ref,
+ atom_closed,
+ &descP->currentWriter.pid)) );
+
+ /* And also deal with the waiting writers (in the same way) */
+ inform_waiting_procs(env, descP, &descP->writersQ, TRUE, atom_closed);
+ }
+
+ if (descP->currentReaderP != NULL) {
+
+ /* We have a (current) reader and *may* therefor also have
+ * readers waiting.
+ */
+
+ ESOCK_ASSERT( (NULL == send_msg_nif_abort(env,
+ descP->currentReader.ref,
+ atom_closed,
+ &descP->currentReader.pid)) );
+
+ /* And also deal with the waiting readers (in the same way) */
+ inform_waiting_procs(env, descP, &descP->readersQ, TRUE, atom_closed);
+ }
+
+ if (descP->currentAcceptorP != NULL) {
+ /* We have a (current) acceptor and *may* therefor also have
+ * acceptors waiting.
+ */
+
+ ESOCK_ASSERT( (NULL == send_msg_nif_abort(env,
+ descP->currentAcceptor.ref,
+ atom_closed,
+ &descP->currentAcceptor.pid)) );
+
+ /* And also deal with the waiting acceptors (in the same way) */
+ inform_waiting_procs(env, descP, &descP->acceptorsQ, TRUE, atom_closed);
+ }
+
+
+ if (descP->sock != INVALID_SOCKET) {
+
+ /*
+ * <KOLLA>
+ *
+ * WE NEED TO CHECK IF THIS OPERATION IS TRIGGERED
+ * LOCALLY (VIA A CALL TO CLOSE) OR REMOTELLY
+ * (VIA I.E. ECONSRESET).
+ *
+ * </KOLLA>
+ */
+
+ if (descP->closeLocal) {
+
+ /* +++ send close message to the waiting process +++
+ *
+ * {close, CloseRef}
+ *
+ * <KOLLA>
+ *
+ * WHAT HAPPENS IF THE RECEIVER HAS DIED IN THE MEANTIME????
+ *
+ * </KOLLA>
+ */
+
+ send_msg(env, MKT2(env, atom_close, descP->closeRef), &descP->closerPid);
+
+ DEMONP(env, descP, &descP->closerMon);
+
+ } else {
+
+ /*
+ * <KOLLA>
+ *
+ * ABORT?
+ *
+ * </KOLLA>
+ */
+ }
+ }
+
+
+ MUNLOCK(descP->closeMtx);
+ MUNLOCK(descP->accMtx);
+ MUNLOCK(descP->readMtx);
+ MUNLOCK(descP->writeMtx);
+ SSDBG( descP,
+ ("SOCKET", "socket_stop -> done (%d, %d)\r\n", descP->sock, fd) );
+
}