aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMicael Karlberg <[email protected]>2018-06-29 18:23:55 +0200
committerMicael Karlberg <[email protected]>2018-09-18 14:50:18 +0200
commit24be0729fe3a1ccfd5f0713b565463d6557d8aa7 (patch)
tree2243a51ada05a3ddeacd443ed6e49262cf79766c
parentb09136301525b0717e897ec0864c3d2ea7708758 (diff)
downloadotp-24be0729fe3a1ccfd5f0713b565463d6557d8aa7.tar.gz
otp-24be0729fe3a1ccfd5f0713b565463d6557d8aa7.tar.bz2
otp-24be0729fe3a1ccfd5f0713b565463d6557d8aa7.zip
[socket-nif] Fixed (stream) recv
Fixed handling of closed in the recv function. We still need to properly handle when we get 0 bytes of data for other types ock sockets then stream (its valid for dgram for instance). OTP-14831
-rw-r--r--erts/emulator/nifs/common/socket_int.h1
-rw-r--r--erts/emulator/nifs/common/socket_nif.c695
-rw-r--r--erts/emulator/nifs/common/socket_util.c89
-rw-r--r--erts/emulator/nifs/common/socket_util.h3
-rw-r--r--erts/preloaded/ebin/socket.beambin39680 -> 42488 bytes
-rw-r--r--erts/preloaded/src/socket.erl288
-rw-r--r--lib/kernel/test/socket_client.erl105
-rw-r--r--lib/kernel/test/socket_server.erl105
8 files changed, 973 insertions, 313 deletions
diff --git a/erts/emulator/nifs/common/socket_int.h b/erts/emulator/nifs/common/socket_int.h
index d6a612cab6..a3e54360fe 100644
--- a/erts/emulator/nifs/common/socket_int.h
+++ b/erts/emulator/nifs/common/socket_int.h
@@ -192,6 +192,7 @@ extern ERL_NIF_TERM esock_atom_einval;
#define GET_INT(E, TE, IP) enif_get_int((E), (TE), (IP))
#define GET_LIST_ELEM(E, L, HP, TP) enif_get_list_cell((E), (L), (HP), (TP))
#define GET_LIST_LEN(E, L, LP) enif_get_list_length((E), (L), (LP))
+#define GET_LPID(E, T, P) enif_get_local_pid((E), (T), (P))
#define GET_STR(E, L, B, SZ) \
enif_get_string((E), (L), (B), (SZ), ERL_NIF_LATIN1)
#define GET_UINT(E, TE, IP) enif_get_uint((E), (TE), (IP))
diff --git a/erts/emulator/nifs/common/socket_nif.c b/erts/emulator/nifs/common/socket_nif.c
index 7f45fb7bcd..027155fc92 100644
--- a/erts/emulator/nifs/common/socket_nif.c
+++ b/erts/emulator/nifs/common/socket_nif.c
@@ -182,8 +182,8 @@
/* Debug stuff... */
-#define SOCKET_NIF_DEBUG_DEFAULT TRUE
-#define SOCKET_DEBUG_DEFAULT TRUE
+#define SOCKET_NIF_DEBUG_DEFAULT FALSE
+#define SOCKET_DEBUG_DEFAULT FALSE
/* Counters and stuff (Don't know where to sent this stuff anyway) */
#define SOCKET_NIF_IOW_DEFAULT FALSE
@@ -344,6 +344,7 @@ typedef union {
#define SOCKET_OPT_OTP_DEBUG 0
#define SOCKET_OPT_OTP_IOW 1
+#define SOCKET_OPT_OTP_CTRL_PROC 2
#define SOCKET_OPT_SOCK_BROADCAST 4
#define SOCKET_OPT_SOCK_DONTROUTE 7
@@ -594,16 +595,16 @@ typedef struct {
ERL_NIF_TERM version;
ERL_NIF_TERM buildDate;
BOOLEAN_T dbg;
- BOOLEAN_T iow;
+ BOOLEAN_T iow;
ErlNifMutex* cntMtx;
uint32_t numSockets;
- uint32_t numTypeDGrams;
uint32_t numTypeStreams;
+ uint32_t numTypeDGrams;
uint32_t numTypeSeqPkgs;
- uint32_t numDomainLocal;
uint32_t numDomainInet;
uint32_t numDomainInet6;
+ uint32_t numDomainLocal;
uint32_t numProtoIP;
uint32_t numProtoTCP;
uint32_t numProtoUDP;
@@ -751,6 +752,9 @@ static ERL_NIF_TERM nsetopt_otp_debug(ErlNifEnv* env,
static ERL_NIF_TERM nsetopt_otp_iow(ErlNifEnv* env,
SocketDescriptor* descP,
ERL_NIF_TERM eVal);
+static ERL_NIF_TERM nsetopt_otp_ctrl_proc(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ ERL_NIF_TERM eVal);
static ERL_NIF_TERM nsetopt_native(ErlNifEnv* env,
SocketDescriptor* descP,
int level,
@@ -1045,11 +1049,13 @@ static ERL_NIF_TERM send_check_result(ErlNifEnv* env,
SocketDescriptor* descP,
ssize_t written,
ssize_t dataSize,
+ int saveErrno,
ERL_NIF_TERM sendRef);
static ERL_NIF_TERM recv_check_result(ErlNifEnv* env,
SocketDescriptor* descP,
int read,
int toRead,
+ int saveErrno,
ErlNifBinary* bufP,
ERL_NIF_TERM recvRef);
static ERL_NIF_TERM recvfrom_check_result(ErlNifEnv* env,
@@ -1214,6 +1220,11 @@ static BOOLEAN_T restore_network_namespace(int ns, SOCKET sock, int* err);
#endif
static BOOLEAN_T cnt_inc(uint32_t* cnt, uint32_t inc);
+static void cnt_dec(uint32_t* cnt, uint32_t dec);
+
+static void inc_socket(int domain, int type, int protocol);
+static void dec_socket(int domain, int type, int protocol);
+
/*
#if defined(HAVE_SYS_UN_H) || defined(SO_BINDTODEVICE)
@@ -1698,12 +1709,13 @@ ERL_NIF_TERM nopen(ErlNifEnv* env,
*
* </KOLLA>
*/
- SELECT(env,
- event,
- (ERL_NIF_SELECT_READ),
- descP, NULL, esock_atom_undefined);
+ SELECT(env,
+ event,
+ (ERL_NIF_SELECT_READ),
+ descP, NULL, esock_atom_undefined);
#endif
+ inc_socket(domain, type, protocol);
return esock_make_ok2(env, res);
}
@@ -2515,21 +2527,31 @@ ERL_NIF_TERM nif_send(ErlNifEnv* env,
{
SocketDescriptor* descP;
ERL_NIF_TERM sendRef;
- ErlNifBinary data;
+ ErlNifBinary sndData;
unsigned int eflags;
int flags;
ERL_NIF_TERM res;
+ SGDBG( ("SOCKET", "nif_send -> entry with argc: %d\r\n", argc) );
+
/* Extract arguments and perform preliminary validation */
if ((argc != 4) ||
!enif_get_resource(env, argv[0], sockets, (void**) &descP) ||
- !GET_BIN(env, argv[2], &data) ||
+ !GET_BIN(env, argv[2], &sndData) ||
!GET_UINT(env, argv[3], &eflags)) {
return enif_make_badarg(env);
}
sendRef = argv[1];
+ SSDBG( descP,
+ ("SOCKET", "nif_send -> args when sock = %d:"
+ "\r\n Socket: %T"
+ "\r\n SendRef: %T"
+ "\r\n Size of data: %d"
+ "\r\n eFlags: %d"
+ "\r\n", descP->sock, argv[0], sendRef, sndData.size, eflags) );
+
if (!IS_CONNECTED(descP))
return esock_make_error(env, atom_enotconn);
@@ -2551,7 +2573,7 @@ ERL_NIF_TERM nif_send(ErlNifEnv* env,
* time we do.
*/
- res = nsend(env, descP, sendRef, &data, flags);
+ res = nsend(env, descP, sendRef, &sndData, flags);
MUNLOCK(descP->writeMtx);
@@ -2570,9 +2592,10 @@ static
ERL_NIF_TERM nsend(ErlNifEnv* env,
SocketDescriptor* descP,
ERL_NIF_TERM sendRef,
- ErlNifBinary* dataP,
+ ErlNifBinary* sndDataP,
int flags)
{
+ int save_errno;
ssize_t written;
if (!descP->isWritable)
@@ -2583,9 +2606,15 @@ ERL_NIF_TERM nsend(ErlNifEnv* env,
*/
cnt_inc(&descP->writeTries, 1);
- written = sock_send(descP->sock, dataP->data, dataP->size, flags);
+ written = sock_send(descP->sock, sndDataP->data, sndDataP->size, flags);
+ if (IS_SOCKET_ERROR(written))
+ save_errno = sock_errno();
+ else
+ save_errno = -1; // The value does not actually matter in this case
+
- return send_check_result(env, descP, written, dataP->size, sendRef);
+ return send_check_result(env, descP,
+ written, sndDataP->size, save_errno, sendRef);
}
@@ -2655,6 +2684,7 @@ ERL_NIF_TERM nsendto(ErlNifEnv* env,
SocketAddress* toAddrP,
unsigned int toAddrLen)
{
+ int save_errno;
ssize_t written;
if (!descP->isWritable)
@@ -2674,8 +2704,12 @@ ERL_NIF_TERM nsendto(ErlNifEnv* env,
dataP->data, dataP->size, flags,
NULL, 0);
}
+ if (IS_SOCKET_ERROR(written))
+ save_errno = sock_errno();
+ else
+ save_errno = -1; // The value does not actually matter in this case
- return send_check_result(env, descP, written, dataP->size, sendRef);
+ return send_check_result(env, descP, written, dataP->size, save_errno, sendRef);
}
@@ -2829,6 +2863,13 @@ ERL_NIF_TERM nrecv(ErlNifEnv* env,
{
ssize_t read;
ErlNifBinary buf;
+ int save_errno;
+ int bufSz = (len ? len : descP->rBufSz);
+
+ SSDBG( descP, ("SOCKET", "nrecv -> entry with"
+ "\r\n len: %d (%d)"
+ "\r\n flags: %d"
+ "\r\n", len, bufSz, flags) );
if (!descP->isReadable)
return enif_make_badarg(env);
@@ -2837,7 +2878,7 @@ ERL_NIF_TERM nrecv(ErlNifEnv* env,
* Either as much as we want to read or (if zero (0)) use the "default"
* size (what has been configured).
*/
- if (!ALLOC_BIN((len ? len : descP->rBufSz), &buf))
+ if (!ALLOC_BIN(bufSz, &buf))
return esock_make_error(env, atom_exalloc);
/* We ignore the wrap for the moment.
@@ -2845,10 +2886,19 @@ ERL_NIF_TERM nrecv(ErlNifEnv* env,
*/
cnt_inc(&descP->readTries, 1);
+ // If it fails (read = -1), we need errno...
+ SSDBG( descP, ("SOCKET", "nrecv -> try read (%d)\r\n", buf.size) );
read = sock_recv(descP->sock, buf.data, buf.size, flags);
+ if (IS_SOCKET_ERROR(read))
+ save_errno = sock_errno();
+ else
+ save_errno = -1; // The value does not actually matter in this case
+
+ SSDBG( descP, ("SOCKET", "nrecv -> read: %d (%d)\r\n", read, save_errno) );
return recv_check_result(env, descP,
read, len,
+ save_errno,
&buf,
recvRef);
}
@@ -3003,6 +3053,11 @@ ERL_NIF_TERM nclose(ErlNifEnv* env,
ERL_NIF_TERM reply, reason;
BOOLEAN_T doClose;
int selectRes;
+ int domain = descP->domain;
+ int type = descP->type;
+ int protocol = descP->protocol;
+
+ SSDBG( descP, ("SOCKET", "nclose -> [%d] entry\r\n", descP->sock) );
MLOCK(descP->closeMtx);
@@ -3049,10 +3104,16 @@ ERL_NIF_TERM nclose(ErlNifEnv* env,
descP, NULL, descP->closeRef);
if (selectRes & ERL_NIF_SELECT_STOP_CALLED) {
/* Prep done - inform the caller it can finalize (close) directly */
+ SSDBG( descP,
+ ("SOCKET", "nclose -> [%d] stop called\r\n", descP->sock) );
+ dec_socket(domain, type, protocol);
reply = esock_atom_ok;
} else if (selectRes & ERL_NIF_SELECT_STOP_SCHEDULED) {
/* The stop callback function has been *scheduled* which means that we
* have to wait for it to complete. */
+ SSDBG( descP,
+ ("SOCKET", "nclose -> [%d] stop scheduled\r\n", descP->sock) );
+ dec_socket(domain, type, protocol); // SHALL WE DO THIS AT finalize?
reply = esock_make_ok2(env, descP->closeRef);
} else {
/* <KOLLA>
@@ -3071,6 +3132,11 @@ ERL_NIF_TERM nclose(ErlNifEnv* env,
reply = esock_make_error(env, reason);
}
+ SSDBG( descP,
+ ("SOCKET", "nclose -> [%d] done when: "
+ "\r\n reply: %T"
+ "\r\n", descP->sock, reply) );
+
return reply;
}
@@ -3081,7 +3147,7 @@ ERL_NIF_TERM nclose(ErlNifEnv* env,
*
* Description:
* Perform the actual socket close!
- * Note that this function is executed in a dirfty scheduler.
+ * Note that this function is executed in a dirty scheduler.
*
* Arguments:
* Socket (ref) - Points to the socket descriptor.
@@ -3249,6 +3315,10 @@ ERL_NIF_TERM nif_setopt(ErlNifEnv* env,
ERL_NIF_TERM eVal;
BOOLEAN_T isEncoded, isOTP;
+ SGDBG( ("SOCKET", "nif_setopt -> entry with argc: %d\r\n", argc) );
+
+ /* Extract arguments and perform preliminary validation */
+
if ((argc != 5) ||
!enif_get_resource(env, argv[0], sockets, (void**) &descP) ||
!GET_INT(env, argv[2], &eLevel) ||
@@ -3263,6 +3333,19 @@ ERL_NIF_TERM nif_setopt(ErlNifEnv* env,
if (!elevel2level(isEncoded, eLevel, &isOTP, &level))
return esock_make_error(env, esock_atom_einval);
+ SSDBG( descP,
+ ("SOCKET", "nif_setopt -> args when sock = %d:"
+ "\r\n Socket: %T"
+ "\r\n Encoded: %T (%d)"
+ "\r\n Level: %d (%d)"
+ "\r\n Opt: %d"
+ "\r\n Value: %T"
+ "\r\n",
+ descP->sock, argv[0],
+ eIsEncoded, isEncoded,
+ eLevel, level,
+ eOpt, eVal) );
+
return nsetopt(env, descP, isEncoded, isOTP, level, eOpt, eVal);
}
@@ -3304,6 +3387,12 @@ ERL_NIF_TERM nsetopt_otp(ErlNifEnv* env,
{
ERL_NIF_TERM result;
+ SSDBG( descP,
+ ("SOCKET", "nsetopt_otp -> entry with"
+ "\r\n eOpt: %d"
+ "\r\n eVal: %T"
+ "\r\n", eOpt, eVal) );
+
switch (eOpt) {
case SOCKET_OPT_OTP_DEBUG:
result = nsetopt_otp_debug(env, descP, eVal);
@@ -3313,6 +3402,10 @@ ERL_NIF_TERM nsetopt_otp(ErlNifEnv* env,
result = nsetopt_otp_iow(env, descP, eVal);
break;
+ case SOCKET_OPT_OTP_CTRL_PROC:
+ result = nsetopt_otp_ctrl_proc(env, descP, eVal);
+ break;
+
default:
result = esock_make_error(env, esock_atom_einval);
break;
@@ -3349,6 +3442,48 @@ ERL_NIF_TERM nsetopt_otp_iow(ErlNifEnv* env,
+/* nsetopt_otp_ctrl_proc - Handle the OTP (level) controlling_process options
+ */
+static
+ERL_NIF_TERM nsetopt_otp_ctrl_proc(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ ERL_NIF_TERM eVal)
+{
+ ErlNifPid newCtrlPid;
+ ErlNifMonitor newCtrlMon;
+ int xres;
+
+ SSDBG( descP,
+ ("SOCKET", "nsetopt_otp_ctrl_proc -> entry with"
+ "\r\n eVal: %T"
+ "\r\n", eVal) );
+
+ if (!GET_LPID(env, eVal, &newCtrlPid)) {
+ esock_warning_msg("Failed get pid of new controlling process\r\n");
+ return esock_make_error(env, esock_atom_einval);
+ }
+
+ if ((xres = MONP(env, descP, &newCtrlPid, &newCtrlMon)) != 0) {
+ esock_warning_msg("Failed monitor %d) (new) controlling process\r\n", xres);
+ return esock_make_error(env, esock_atom_einval);
+ }
+
+ if ((xres = DEMONP(env, descP, &descP->ctrlMon)) != 0) {
+ esock_warning_msg("Failed demonitor (%d) "
+ "old controlling process %T (%T)\r\n",
+ xres, descP->ctrlPid, descP->ctrlMon);
+ }
+
+ descP->ctrlPid = newCtrlPid;
+ descP->ctrlMon = newCtrlMon;
+
+ SSDBG( descP, ("SOCKET", "nsetopt_otp_ctrl_proc -> done\r\n") );
+
+ return esock_atom_ok;
+}
+
+
+
/* The option has *not* been encoded. Instead it has been provided
* in "native mode" (option is provided as is and value as a binary).
*/
@@ -3362,6 +3497,12 @@ ERL_NIF_TERM nsetopt_native(ErlNifEnv* env,
ErlNifBinary val;
ERL_NIF_TERM result;
+ SSDBG( descP,
+ ("SOCKET", "nsetopt_native -> entry with"
+ "\r\n opt: %d"
+ "\r\n eVal: %T"
+ "\r\n", opt, eVal) );
+
if (GET_BIN(env, eVal, &val)) {
int res = socket_setopt(descP->sock, level, opt,
val.data, val.size);
@@ -3389,6 +3530,11 @@ ERL_NIF_TERM nsetopt_level(ErlNifEnv* env,
{
ERL_NIF_TERM result;
+ SSDBG( descP,
+ ("SOCKET", "nsetopt_level -> entry with"
+ "\r\n level: %d"
+ "\r\n", level) );
+
switch (level) {
case SOL_SOCKET:
result = nsetopt_lvl_socket(env, descP, eOpt, eVal);
@@ -5081,30 +5227,44 @@ ERL_NIF_TERM send_check_result(ErlNifEnv* env,
SocketDescriptor* descP,
ssize_t written,
ssize_t dataSize,
+ int saveErrno,
ERL_NIF_TERM sendRef)
{
+ SSDBG( descP,
+ ("SOCKET", "send_check_result -> entry with"
+ "\r\n written: %d"
+ "\r\n dataSize: %d"
+ "\r\n saveErrno: %d"
+ "\r\n", written, dataSize, saveErrno) );
+
if (written == dataSize) {
cnt_inc(&descP->writePkgCnt, 1);
cnt_inc(&descP->writeByteCnt, written);
+ SSDBG( descP,
+ ("SOCKET", "send_check_result -> everything written - done\r\n") );
+
return esock_atom_ok;
} else if (written < 0) {
/* Ouch, check what kind of failure */
- int save_errno = sock_errno();
- if ((save_errno != EAGAIN) &&
- (save_errno != EINTR)) {
+ if ((saveErrno != EAGAIN) && (saveErrno != EINTR)) {
cnt_inc(&descP->writeFails, 1);
- return esock_make_error_errno(env, save_errno);
+ SSDBG( descP,
+ ("SOCKET", "send_check_result -> error: %d\r\n", saveErrno) );
+
+ return esock_make_error_errno(env, saveErrno);
} else {
/* Ok, try again later */
+ SSDBG( descP, ("SOCKET", "send_check_result -> try again\r\n") );
+
/* <KOLLA>
* SHOULD RESULT IN {error, eagain}!!!!
* </KOLLA>
@@ -5124,6 +5284,9 @@ ERL_NIF_TERM send_check_result(ErlNifEnv* env,
SELECT(env, descP->sock, (ERL_NIF_SELECT_WRITE),
descP, NULL, sendRef);
+ SSDBG( descP,
+ ("SOCKET", "send_check_result -> not entire package written\r\n") );
+
return esock_make_ok2(env, enif_make_int(env, written));
}
@@ -5134,11 +5297,42 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env,
SocketDescriptor* descP,
int read,
int toRead,
+ int saveErrno,
ErlNifBinary* bufP,
ERL_NIF_TERM recvRef)
{
ERL_NIF_TERM data;
+ SSDBG( descP,
+ ("SOCKET", "recv_check_result -> entry with"
+ "\r\n read: %d"
+ "\r\n toRead: %d"
+ "\r\n saveErrno: %d"
+ "\r\n recvRef: %T"
+ "\r\n", read, toRead, saveErrno, recvRef) );
+
+
+ /* <KOLLA>
+ *
+ * We need to handle read = 0 for other type(s) (DGRAM) when
+ * its actually valid to read 0 bytes.
+ *
+ * </KOLLA>
+ */
+
+ if ((read == 0) && (descP->type == SOCK_STREAM)) {
+
+ /*
+ * When a stream socket peer has performed an orderly shutdown, the return
+ * value will be 0 (the traditional "end-of-file" return).
+ *
+ * *We* do never actually try to read 0 bytes from a stream socket!
+ */
+
+ return esock_make_error(env, atom_closed);
+
+ }
+
/* There is a special case: If the provided 'to read' value is
* zero (0). That means that we reads as much as we can, using
* the default read buffer size.
@@ -5148,6 +5342,10 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env,
/* +++ We filled the buffer +++ */
+ SSDBG( descP,
+ ("SOCKET",
+ "recv_check_result -> [%d] filled the buffer\r\n", toRead) );
+
if (toRead == 0) {
/* +++ Give us everything you have got => needs to continue +++ */
@@ -5168,6 +5366,11 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env,
data = MKBIN(env, bufP);
+ SSDBG( descP,
+ ("SOCKET",
+ "recv_check_result -> [%d] "
+ "we are done for now - read more\r\n", toRead) );
+
return esock_make_ok3(env, atom_false, data);
} else {
@@ -5181,6 +5384,11 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env,
data = MKBIN(env, bufP);
+ SSDBG( descP,
+ ("SOCKET",
+ "recv_check_result -> [%d] "
+ "we got exactly what we could fit\r\n", toRead) );
+
return esock_make_ok3(env, atom_true, data);
}
@@ -5189,12 +5397,13 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env,
/* +++ Error handling +++ */
- int save_errno = sock_errno();
-
- if (save_errno == ECONNRESET) {
+ if (saveErrno == ECONNRESET) {
/* +++ Oups - closed +++ */
+ SSDBG( descP, ("SOCKET",
+ "recv_check_result -> [%d] closed\r\n", toRead) );
+
/* <KOLLA>
*
* IF THE CURRENT PROCESS IS *NOT* THE CONTROLLING
@@ -5220,17 +5429,31 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env,
return esock_make_error(env, atom_closed);
- } else if ((save_errno == ERRNO_BLOCK) ||
- (save_errno == EAGAIN)) {
+ } else if ((saveErrno == ERRNO_BLOCK) ||
+ (saveErrno == EAGAIN)) {
+ SSDBG( descP, ("SOCKET",
+ "recv_check_result -> [%d] eagain\r\n", toRead) );
+
+ SELECT(env, descP->sock, (ERL_NIF_SELECT_READ),
+ descP, NULL, recvRef);
+
return esock_make_error(env, esock_atom_eagain);
} else {
- return esock_make_error_errno(env, save_errno);
+ SSDBG( descP, ("SOCKET", "recv_check_result -> [%d] errno: %d\r\n",
+ toRead, saveErrno) );
+ return esock_make_error_errno(env, saveErrno);
}
} else {
/* +++ We did not fill the buffer +++ */
+ SSDBG( descP,
+ ("SOCKET",
+ "recv_check_result -> [%d] "
+ "did not fill the buffer (%d of %d)\r\n",
+ toRead, read, bufP->size) );
+
if (toRead == 0) {
/* +++ We got a chunk of data but +++
@@ -5239,9 +5462,14 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env,
* +++ into a sub-binary. +++
*/
+ SSDBG( descP, ("SOCKET",
+ "recv_check_result -> [%d] split buffer\r\n", toRead) );
+
data = MKBIN(env, bufP);
data = MKSBIN(env, data, 0, read);
+ SSDBG( descP, ("SOCKET", "recv_check_result -> [%d] done\r\n", toRead) );
+
return esock_make_ok3(env, atom_true, data);
} else {
@@ -5249,6 +5477,9 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env,
/* +++ We got only a part of what was expected +++
* +++ => receive more later. +++ */
+ SSDBG( descP, ("SOCKET", "recv_check_result -> [%d] "
+ "only part of message - expect more\r\n", toRead) );
+
return esock_make_ok3(env, atom_false, MKBIN(env, bufP));
}
}
@@ -6343,6 +6574,90 @@ SocketDescriptor* alloc_descriptor(SOCKET sock, HANDLE event)
}
+
+/* decrement counters for when a socket is closed */
+static
+void dec_socket(int domain, int type, int protocol)
+{
+ MLOCK(data.cntMtx);
+
+ cnt_dec(&data.numSockets, 1);
+
+ if (domain == AF_INET)
+ cnt_dec(&data.numDomainInet, 1);
+#if defined(HAVE_IN6) && defined(AF_INET6)
+ else if (domain == AF_INET6)
+ cnt_dec(&data.numDomainInet6, 1);
+#endif
+#if defined(HAVE_SYS_UN_H)
+ else if (domain == AF_UNIX)
+ cnt_dec(&data.numDomainInet6, 1);
+#endif
+
+ if (type == SOCK_STREAM)
+ cnt_dec(&data.numTypeStreams, 1);
+ else if (type == SOCK_DGRAM)
+ cnt_dec(&data.numTypeDGrams, 1);
+ else if (type == SOCK_SEQPACKET)
+ cnt_dec(&data.numTypeSeqPkgs, 1);
+
+ if (protocol == IPPROTO_IP)
+ cnt_dec(&data.numProtoIP, 1);
+ else if (protocol == IPPROTO_TCP)
+ cnt_dec(&data.numProtoTCP, 1);
+ else if (protocol == IPPROTO_UDP)
+ cnt_dec(&data.numProtoUDP, 1);
+#if defined(HAVE_SCTP)
+ else if (protocol == IPPROTO_SCTP)
+ cnt_dec(&data.numProtoSCTP, 1);
+#endif
+
+ MUNLOCK(data.cntMtx);
+}
+
+
+/* increment counters for when a socket is opened */
+static
+void inc_socket(int domain, int type, int protocol)
+{
+ MLOCK(data.cntMtx);
+
+ cnt_inc(&data.numSockets, 1);
+
+ if (domain == AF_INET)
+ cnt_inc(&data.numDomainInet, 1);
+#if defined(HAVE_IN6) && defined(AF_INET6)
+ else if (domain == AF_INET6)
+ cnt_inc(&data.numDomainInet6, 1);
+#endif
+#if defined(HAVE_SYS_UN_H)
+ else if (domain == AF_UNIX)
+ cnt_inc(&data.numDomainInet6, 1);
+#endif
+
+ if (type == SOCK_STREAM)
+ cnt_inc(&data.numTypeStreams, 1);
+ else if (type == SOCK_DGRAM)
+ cnt_inc(&data.numTypeDGrams, 1);
+ else if (type == SOCK_SEQPACKET)
+ cnt_inc(&data.numTypeSeqPkgs, 1);
+
+ if (protocol == IPPROTO_IP)
+ cnt_inc(&data.numProtoIP, 1);
+ else if (protocol == IPPROTO_TCP)
+ cnt_inc(&data.numProtoTCP, 1);
+ else if (protocol == IPPROTO_UDP)
+ cnt_inc(&data.numProtoUDP, 1);
+#if defined(HAVE_SCTP)
+ else if (protocol == IPPROTO_SCTP)
+ cnt_inc(&data.numProtoSCTP, 1);
+#endif
+
+ MUNLOCK(data.cntMtx);
+}
+
+
+
/* compare_pids - Test if two pids are equal
*
*/
@@ -6446,9 +6761,11 @@ BOOLEAN_T eproto2proto(int eproto, int* proto)
*proto = IPPROTO_UDP;
break;
+#if defined(HAVE_SCTP)
case SOCKET_PROTOCOL_SCTP:
*proto = IPPROTO_SCTP;
break;
+#endif
default:
return FALSE;
@@ -6519,35 +6836,42 @@ BOOLEAN_T emap2netns(ErlNifEnv* env, ERL_NIF_TERM map, char** netns)
* send flags.
*/
static
-BOOLEAN_T esendflags2sendflags(unsigned int esendflags, int* sendflags)
+BOOLEAN_T esendflags2sendflags(unsigned int eflags, int* flags)
{
unsigned int ef;
int tmp = 0;
for (ef = SOCKET_SEND_FLAG_LOW; ef <= SOCKET_SEND_FLAG_HIGH; ef++) {
+
switch (ef) {
case SOCKET_SEND_FLAG_CONFIRM:
- tmp |= MSG_CONFIRM;
+ if ((1 << SOCKET_SEND_FLAG_CONFIRM) & eflags)
+ tmp |= MSG_CONFIRM;
break;
case SOCKET_SEND_FLAG_DONTROUTE:
- tmp |= MSG_DONTROUTE;
+ if ((1 << SOCKET_SEND_FLAG_DONTROUTE) & eflags)
+ tmp |= MSG_DONTROUTE;
break;
case SOCKET_SEND_FLAG_EOR:
- tmp |= MSG_EOR;
+ if ((1 << SOCKET_SEND_FLAG_EOR) & eflags)
+ tmp |= MSG_EOR;
break;
case SOCKET_SEND_FLAG_MORE:
- tmp |= MSG_MORE;
+ if ((1 << SOCKET_SEND_FLAG_MORE) & eflags)
+ tmp |= MSG_MORE;
break;
case SOCKET_SEND_FLAG_NOSIGNAL:
- tmp |= MSG_NOSIGNAL;
+ if ((1 << SOCKET_SEND_FLAG_NOSIGNAL) & eflags)
+ tmp |= MSG_NOSIGNAL;
break;
case SOCKET_SEND_FLAG_OOB:
- tmp |= MSG_OOB;
+ if ((1 << SOCKET_SEND_FLAG_OOB) & eflags)
+ tmp |= MSG_OOB;
break;
default:
@@ -6556,7 +6880,7 @@ BOOLEAN_T esendflags2sendflags(unsigned int esendflags, int* sendflags)
}
- *sendflags = tmp;
+ *flags = tmp;
return TRUE;
}
@@ -6567,31 +6891,53 @@ BOOLEAN_T esendflags2sendflags(unsigned int esendflags, int* sendflags)
* send flags.
*/
static
-BOOLEAN_T erecvflags2recvflags(unsigned int erecvflags, int* recvflags)
+BOOLEAN_T erecvflags2recvflags(unsigned int eflags, int* flags)
{
unsigned int ef;
int tmp = 0;
+ SGDBG( ("SOCKET", "erecvflags2recvflags -> entry with"
+ "\r\n eflags: %d"
+ "\r\n", eflags) );
+
for (ef = SOCKET_RECV_FLAG_LOW; ef <= SOCKET_RECV_FLAG_HIGH; ef++) {
+
+ SGDBG( ("SOCKET", "erecvflags2recvflags -> iteration"
+ "\r\n ef: %d"
+ "\r\n tmp: %d"
+ "\r\n", ef, tmp) );
+
switch (ef) {
case SOCKET_RECV_FLAG_CMSG_CLOEXEC:
- tmp |= MSG_CMSG_CLOEXEC;
+ if ((1 << SOCKET_RECV_FLAG_CMSG_CLOEXEC) & eflags)
+ tmp |= MSG_CMSG_CLOEXEC;
break;
case SOCKET_RECV_FLAG_ERRQUEUE:
- tmp |= MSG_ERRQUEUE;
+ if ((1 << SOCKET_RECV_FLAG_ERRQUEUE) & eflags)
+ tmp |= MSG_ERRQUEUE;
break;
case SOCKET_RECV_FLAG_OOB:
- tmp |= MSG_OOB;
+ if ((1 << SOCKET_RECV_FLAG_OOB) & eflags)
+ tmp |= MSG_OOB;
break;
+ /*
+ * <KOLLA>
+ *
+ * We need to handle this, because it may effect the read algorithm
+ *
+ * </KOLLA>
+ */
case SOCKET_RECV_FLAG_PEEK:
- tmp |= MSG_PEEK;
+ if ((1 << SOCKET_RECV_FLAG_PEEK) & eflags)
+ tmp |= MSG_PEEK;
break;
case SOCKET_RECV_FLAG_TRUNC:
- tmp |= MSG_TRUNC;
+ if ((1 << SOCKET_RECV_FLAG_TRUNC) & eflags)
+ tmp |= MSG_TRUNC;
break;
default:
@@ -6600,7 +6946,7 @@ BOOLEAN_T erecvflags2recvflags(unsigned int erecvflags, int* recvflags)
}
- *recvflags = tmp;
+ *flags = tmp;
return TRUE;
}
@@ -7117,19 +7463,33 @@ char* send_msg(ErlNifEnv* env,
static
BOOLEAN_T cnt_inc(uint32_t* cnt, uint32_t inc)
{
- BOOLEAN_T wrap;
- uint32_t max = 0xFFFFFFFF;
- uint32_t current = *cnt;
+ BOOLEAN_T wrap;
+ uint32_t max = 0xFFFFFFFF;
+ uint32_t current = *cnt;
- if ((max - inc) >= current) {
- *cnt += inc;
- wrap = FALSE;
- } else {
- *cnt = inc - (max - current) - 1;
- wrap = TRUE;
- }
+ if ((max - inc) >= current) {
+ *cnt += inc;
+ wrap = FALSE;
+ } else {
+ *cnt = inc - (max - current) - 1;
+ wrap = TRUE;
+ }
+
+ return (wrap);
+}
+
+
+static
+void cnt_dec(uint32_t* cnt, uint32_t dec)
+{
+ uint32_t current = *cnt;
+
+ if (dec > current)
+ *cnt = 0; // The counter cannot be < 0 so this is the best we can do...
+ else
+ *cnt -= dec;
- return (wrap);
+ return;
}
@@ -7183,116 +7543,125 @@ void socket_dtor(ErlNifEnv* env, void* obj)
static
void socket_stop(ErlNifEnv* env, void* obj, int fd, int is_direct_call)
{
- SocketDescriptor* descP = (SocketDescriptor*) obj;
-
- MLOCK(descP->writeMtx);
- MLOCK(descP->readMtx);
- MLOCK(descP->accMtx);
- MLOCK(descP->closeMtx);
-
-
- descP->state = SOCKET_STATE_CLOSING; // Just in case...???
- descP->isReadable = FALSE;
- descP->isWritable = FALSE;
-
-
- /* We should check that we actually have a monitor.
- * This *should* be done with a "NULL" monitor value,
- * which there currently is none...
- */
- DEMONP(env, descP, &descP->ctrlMon);
-
- if (descP->currentWriterP != NULL) {
- /* We have a (current) writer and *may* therefor also have
- * writers waiting.
- */
-
- ESOCK_ASSERT( (NULL == send_msg_nif_abort(env,
- descP->currentWriter.ref,
- atom_closed,
- &descP->currentWriter.pid)) );
-
- /* And also deal with the waiting writers (in the same way) */
- inform_waiting_procs(env, descP, &descP->writersQ, TRUE, atom_closed);
- }
-
- if (descP->currentReaderP != NULL) {
-
- /* We have a (current) reader and *may* therefor also have
- * readers waiting.
- */
-
- ESOCK_ASSERT( (NULL == send_msg_nif_abort(env,
- descP->currentReader.ref,
- atom_closed,
- &descP->currentReader.pid)) );
-
- /* And also deal with the waiting readers (in the same way) */
- inform_waiting_procs(env, descP, &descP->readersQ, TRUE, atom_closed);
- }
-
- if (descP->currentAcceptorP != NULL) {
- /* We have a (current) acceptor and *may* therefor also have
- * acceptors waiting.
- */
-
- ESOCK_ASSERT( (NULL == send_msg_nif_abort(env,
- descP->currentAcceptor.ref,
- atom_closed,
- &descP->currentAcceptor.pid)) );
-
- /* And also deal with the waiting acceptors (in the same way) */
- inform_waiting_procs(env, descP, &descP->acceptorsQ, TRUE, atom_closed);
- }
-
-
- if (descP->sock != INVALID_SOCKET) {
-
- /*
- * <KOLLA>
- *
- * WE NEED TO CHECK IF THIS OPERATION IS TRIGGERED
- * LOCALLY (VIA A CALL TO CLOSE) OR REMOTELLY
- * (VIA I.E. ECONSRESET).
- *
- * </KOLLA>
- */
-
- if (descP->closeLocal) {
-
- /* +++ send close message to the waiting process +++
- *
- * {close, CloseRef}
- *
- * <KOLLA>
- *
- * WHAT HAPPENS IF THE RECEIVER HAS DIED IN THE MEANTIME????
- *
- * </KOLLA>
- */
-
- send_msg(env, MKT2(env, atom_close, descP->closeRef), &descP->closerPid);
-
- DEMONP(env, descP, &descP->closerMon);
-
- } else {
-
- /*
- * <KOLLA>
- *
- * ABORT?
- *
- * </KOLLA>
- */
- }
- }
-
-
- MUNLOCK(descP->closeMtx);
- MUNLOCK(descP->accMtx);
- MUNLOCK(descP->readMtx);
- MUNLOCK(descP->writeMtx);
+ SocketDescriptor* descP = (SocketDescriptor*) obj;
+
+ SSDBG( descP,
+ ("SOCKET", "socket_stop -> entry when"
+ "\r\n sock: %d (%d)"
+ "\r\n is_direct_call: %d"
+ "\r\n", descP->sock, fd, is_direct_call) );
+
+ MLOCK(descP->writeMtx);
+ MLOCK(descP->readMtx);
+ MLOCK(descP->accMtx);
+ MLOCK(descP->closeMtx);
+
+
+ descP->state = SOCKET_STATE_CLOSING; // Just in case...???
+ descP->isReadable = FALSE;
+ descP->isWritable = FALSE;
+
+
+ /* We should check that we actually have a monitor.
+ * This *should* be done with a "NULL" monitor value,
+ * which there currently is none...
+ */
+ DEMONP(env, descP, &descP->ctrlMon);
+
+ if (descP->currentWriterP != NULL) {
+ /* We have a (current) writer and *may* therefor also have
+ * writers waiting.
+ */
+
+ ESOCK_ASSERT( (NULL == send_msg_nif_abort(env,
+ descP->currentWriter.ref,
+ atom_closed,
+ &descP->currentWriter.pid)) );
+
+ /* And also deal with the waiting writers (in the same way) */
+ inform_waiting_procs(env, descP, &descP->writersQ, TRUE, atom_closed);
+ }
+
+ if (descP->currentReaderP != NULL) {
+
+ /* We have a (current) reader and *may* therefor also have
+ * readers waiting.
+ */
+
+ ESOCK_ASSERT( (NULL == send_msg_nif_abort(env,
+ descP->currentReader.ref,
+ atom_closed,
+ &descP->currentReader.pid)) );
+
+ /* And also deal with the waiting readers (in the same way) */
+ inform_waiting_procs(env, descP, &descP->readersQ, TRUE, atom_closed);
+ }
+
+ if (descP->currentAcceptorP != NULL) {
+ /* We have a (current) acceptor and *may* therefor also have
+ * acceptors waiting.
+ */
+
+ ESOCK_ASSERT( (NULL == send_msg_nif_abort(env,
+ descP->currentAcceptor.ref,
+ atom_closed,
+ &descP->currentAcceptor.pid)) );
+
+ /* And also deal with the waiting acceptors (in the same way) */
+ inform_waiting_procs(env, descP, &descP->acceptorsQ, TRUE, atom_closed);
+ }
+
+
+ if (descP->sock != INVALID_SOCKET) {
+
+ /*
+ * <KOLLA>
+ *
+ * WE NEED TO CHECK IF THIS OPERATION IS TRIGGERED
+ * LOCALLY (VIA A CALL TO CLOSE) OR REMOTELLY
+ * (VIA I.E. ECONSRESET).
+ *
+ * </KOLLA>
+ */
+
+ if (descP->closeLocal) {
+
+ /* +++ send close message to the waiting process +++
+ *
+ * {close, CloseRef}
+ *
+ * <KOLLA>
+ *
+ * WHAT HAPPENS IF THE RECEIVER HAS DIED IN THE MEANTIME????
+ *
+ * </KOLLA>
+ */
+
+ send_msg(env, MKT2(env, atom_close, descP->closeRef), &descP->closerPid);
+
+ DEMONP(env, descP, &descP->closerMon);
+
+ } else {
+
+ /*
+ * <KOLLA>
+ *
+ * ABORT?
+ *
+ * </KOLLA>
+ */
+ }
+ }
+
+
+ MUNLOCK(descP->closeMtx);
+ MUNLOCK(descP->accMtx);
+ MUNLOCK(descP->readMtx);
+ MUNLOCK(descP->writeMtx);
+ SSDBG( descP,
+ ("SOCKET", "socket_stop -> done (%d, %d)\r\n", descP->sock, fd) );
+
}
diff --git a/erts/emulator/nifs/common/socket_util.c b/erts/emulator/nifs/common/socket_util.c
index 05fb40e286..397f69f58d 100644
--- a/erts/emulator/nifs/common/socket_util.c
+++ b/erts/emulator/nifs/common/socket_util.c
@@ -29,6 +29,11 @@
#include "socket_dbg.h"
#include "sys.h"
+#include <stdarg.h>
+#include <string.h>
+#include <stdio.h>
+#include <ctype.h>
+#include <time.h>
/* We don't have a "debug flag" to check here, so we
* should use the compile debug flag, whatever that is...
@@ -46,6 +51,9 @@
extern char* erl_errno_id(int error); /* THIS IS JUST TEMPORARY??? */
+static int realtime(struct timespec* tsP);
+static int timespec2str(char *buf, unsigned int len, struct timespec *ts);
+
static char* make_sockaddr_in4(ErlNifEnv* env,
ERL_NIF_TERM port,
ERL_NIF_TERM addr,
@@ -1168,10 +1176,89 @@ void esock_abort(const char* expr,
+/* *** esock_warning_msg ***
+ *
+ * Temporary function for issuing warning messages.
+ *
+ */
+extern
+void esock_warning_msg( const char* format, ... )
+{
+ va_list args;
+ char f[512 + sizeof(format)]; // This has to suffice...
+ char stamp[32];
+ struct timespec ts;
+ int res;
+
+ /*
+ * We should really include self in the printout, so we can se which process
+ * are executing the code. But then I must change the API....
+ * ....something for later.
+ */
+
+ // 2018-06-29 12:13:21.232089
+ // 29-Jun-2018::13:47:25.097097
+
+ if (!realtime(&ts)) {
+ if (timespec2str(stamp, sizeof(stamp), &ts) != 0) {
+ res = enif_snprintf(f, sizeof(f), "=WARNING MSG==== %s", format);
+ } else {
+ res = enif_snprintf(f, sizeof(f),
+ "=WARNING MSG==== %s ===\r\n%s" , stamp, format);
+ }
+
+ if (res > 0) {
+ va_start (args, format);
+ enif_vfprintf (stdout, f, args);
+ va_end (args);
+ fflush(stdout);
+ }
+ }
+
+ return;
+}
+
+
+static
+int realtime(struct timespec* tsP)
+{
+ return clock_gettime(CLOCK_REALTIME, tsP);
+}
+
+
+/*
+ * Convert a timespec struct into a readable/printable string.
+ *
+ * "%F::%T" => 2018-06-29 12:13:21[.232089]
+ * "%d-%b-%Y::%T" => 29-Jun-2018::13:47:25.097097
+ */
+static
+int timespec2str(char *buf, unsigned int len, struct timespec *ts)
+{
+ int ret, buflen;
+ struct tm t;
+
+ tzset();
+ if (localtime_r(&(ts->tv_sec), &t) == NULL)
+ return 1;
+
+ ret = strftime(buf, len, "%d-%B-%Y::%T", &t);
+ if (ret == 0)
+ return 2;
+ len -= ret - 1;
+ buflen = strlen(buf);
+
+ ret = snprintf(&buf[buflen], len, ".%06ld", ts->tv_nsec/1000);
+ if (ret >= len)
+ return 3;
+
+ return 0;
+}
+
/* =================================================================== *
* *
- * Various utility functions *
+ * Various (internal) utility functions *
* *
* =================================================================== */
diff --git a/erts/emulator/nifs/common/socket_util.h b/erts/emulator/nifs/common/socket_util.h
index dedeb8dd7d..add2c8f4be 100644
--- a/erts/emulator/nifs/common/socket_util.h
+++ b/erts/emulator/nifs/common/socket_util.h
@@ -158,5 +158,8 @@ ERL_NIF_TERM esock_make_error_str(ErlNifEnv* env, char* reason);
extern
ERL_NIF_TERM esock_make_error_errno(ErlNifEnv* env, int err);
+extern
+void esock_warning_msg(const char* format, ... );
+
#endif // SOCKET_UTIL_H__
diff --git a/erts/preloaded/ebin/socket.beam b/erts/preloaded/ebin/socket.beam
index f82db6e44e..4924c43a5c 100644
--- a/erts/preloaded/ebin/socket.beam
+++ b/erts/preloaded/ebin/socket.beam
Binary files differ
diff --git a/erts/preloaded/src/socket.erl b/erts/preloaded/src/socket.erl
index ba3ff6bab9..bf94271073 100644
--- a/erts/preloaded/src/socket.erl
+++ b/erts/preloaded/src/socket.erl
@@ -41,7 +41,7 @@
%% sendmsg/4,
%% writev/4, OR SENDV? It will be strange for recv then: recvv (instead of readv)
- recv/2, recv/3, recv/4,
+ recv/1, recv/2, recv/3, recv/4,
recvfrom/1, recvfrom/2, recvfrom/3, recvfrom/4,
%% recvmsg/4,
%% readv/3,
@@ -442,16 +442,17 @@
-define(SOCKET_RECV_FLAGS_DEFAULT, []).
-define(SOCKET_RECV_TIMEOUT_DEFAULT, infinity).
--define(SOCKET_OPT_LEVEL_OTP, 0).
--define(SOCKET_OPT_LEVEL_SOCKET, 1).
--define(SOCKET_OPT_LEVEL_IP, 2).
--define(SOCKET_OPT_LEVEL_IPV6, 3).
--define(SOCKET_OPT_LEVEL_TCP, 4).
--define(SOCKET_OPT_LEVEL_UDP, 5).
--define(SOCKET_OPT_LEVEL_SCTP, 6).
+-define(SOCKET_OPT_LEVEL_OTP, 0).
+-define(SOCKET_OPT_LEVEL_SOCKET, 1).
+-define(SOCKET_OPT_LEVEL_IP, 2).
+-define(SOCKET_OPT_LEVEL_IPV6, 3).
+-define(SOCKET_OPT_LEVEL_TCP, 4).
+-define(SOCKET_OPT_LEVEL_UDP, 5).
+-define(SOCKET_OPT_LEVEL_SCTP, 6).
--define(SOCKET_OPT_OTP_DEBUG, 0).
--define(SOCKET_OPT_OTP_IOW, 1).
+-define(SOCKET_OPT_OTP_DEBUG, 0).
+-define(SOCKET_OPT_OTP_IOW, 1).
+-define(SOCKET_OPT_OTP_CTRL_PROC, 2).
-define(SOCKET_OPT_SOCK_BROADCAST, 4).
-define(SOCKET_OPT_SOCK_DONTROUTE, 7).
@@ -1097,6 +1098,9 @@ do_sendto(SockRef, Data, EFlags, Dest, Timeout) ->
%% Flags - A list of "options" for the read.
%% Timeout - Time-out in milliseconds.
+recv(Socket) ->
+ recv(Socket, 0).
+
recv(Socket, Length) ->
recv(Socket, Length,
?SOCKET_RECV_FLAGS_DEFAULT,
@@ -1131,10 +1135,21 @@ do_recv(SockRef, _OldRef, Length, EFlags, Acc, Timeout)
(is_integer(Timeout) andalso (Timeout > 0)) ->
TS = timestamp(Timeout),
RecvRef = make_ref(),
+ p("do_recv -> try read with"
+ "~n SockRef: ~p"
+ "~n RecvRef: ~p"
+ "~n Length: ~p"
+ "~n EFlags: ~p"
+ "~nwhen"
+ "~n Timeout: ~p (~p)", [SockRef, RecvRef, Length, EFlags, Timeout, TS]),
case nif_recv(SockRef, RecvRef, Length, EFlags) of
{ok, true = _Complete, Bin} when (size(Acc) =:= 0) ->
+ p("do_recv -> ok: complete (size(Acc) =:= 0)"
+ "~n size(Bin): ~p", [size(Bin)]),
{ok, Bin};
{ok, true = _Complete, Bin} ->
+ p("do_recv -> ok: complete"
+ "~n size(Bin): ~p", [size(Bin)]),
{ok, <<Acc/binary, Bin/binary>>};
%% It depends on the amount of bytes we tried to read:
@@ -1143,12 +1158,16 @@ do_recv(SockRef, _OldRef, Length, EFlags, Acc, Timeout)
%% > 0 - We got a part of the message and we will be notified
%% when there is more to read (a select message)
{ok, false = _Complete, Bin} when (Length =:= 0) ->
+ p("do_recv -> ok: not-complete (Length =:= 0)"
+ "~n size(Bin): ~p", [size(Bin)]),
do_recv(SockRef, RecvRef,
Length, EFlags,
<<Acc/binary, Bin/binary>>,
next_timeout(TS, Timeout));
{ok, false = _Completed, Bin} when (size(Acc) =:= 0) ->
+ p("do_recv -> ok: not-complete (size(Acc) =:= 0)"
+ "~n size(Bin): ~p", [size(Bin)]),
%% We got the first chunk of it.
%% We will be notified (select message) when there
%% is more to read.
@@ -1171,6 +1190,8 @@ do_recv(SockRef, _OldRef, Length, EFlags, Acc, Timeout)
end;
{ok, false = _Completed, Bin} ->
+ p("do_recv -> ok: not-complete"
+ "~n size(Bin): ~p", [size(Bin)]),
%% We got a chunk of it!
NewTimeout = next_timeout(TS, Timeout),
receive
@@ -1190,16 +1211,19 @@ do_recv(SockRef, _OldRef, Length, EFlags, Acc, Timeout)
{error, {timeout, Acc}}
end;
- %% We return with the accumulated binary regardless if its empty...
- {error, eagain} when (Length =:= 0) ->
+ %% We return with the accumulated binary (if its non-empty)
+ {error, eagain} when (Length =:= 0) andalso (size(Acc) > 0) ->
+ p("do_recv -> eagain (Length =:= 0)", []),
{ok, Acc};
{error, eagain} ->
+ p("do_recv -> eagain", []),
%% There is nothing just now, but we will be notified when there
%% is something to read (a select message).
NewTimeout = next_timeout(TS, Timeout),
receive
{select, SockRef, RecvRef, ready_input} ->
+ p("do_recv -> received select ready-input message", []),
do_recv(SockRef, RecvRef,
Length, EFlags,
Acc,
@@ -1214,6 +1238,15 @@ do_recv(SockRef, _OldRef, Length, EFlags, Acc, Timeout)
{error, timeout}
end;
+ {error, closed = Reason} ->
+ do_close(SockRef),
+ if
+ (size(Acc) =:= 0) ->
+ {error, Reason};
+ true ->
+ {error, {Reason, Acc}}
+ end;
+
{error, _} = ERROR when (size(Acc) =:= 0) ->
ERROR;
@@ -1342,6 +1375,9 @@ do_recvfrom(SockRef, BufSz, EFlags, Timeout) ->
Reason :: term().
close(#socket{ref = SockRef}) ->
+ do_close(SockRef).
+
+do_close(SockRef) ->
case nif_close(SockRef) of
ok ->
nif_finalize_close(SockRef);
@@ -1659,6 +1695,8 @@ enc_setopt_value(otp, debug, V, _, _, _) when is_boolean(V) ->
V;
enc_setopt_value(otp, iow, V, _, _, _) when is_boolean(V) ->
V;
+enc_setopt_value(otp, controlling_process, V, _, _, _) when is_pid(V) ->
+ V;
enc_setopt_value(otp = L, Opt, V, _D, _T, _P) ->
not_supported({L, Opt, V});
@@ -1859,27 +1897,31 @@ enc_sockopt_key(otp, debug, _, _, _, _) ->
?SOCKET_OPT_OTP_DEBUG;
enc_sockopt_key(otp, iow, _, _, _, _) ->
?SOCKET_OPT_OTP_IOW;
+enc_sockopt_key(otp, controlling_process, _, _, _, _) ->
+ ?SOCKET_OPT_OTP_CTRL_PROC;
+enc_sockopt_key(otp = L, Opt, _, _, _, _) ->
+ not_supported({L, Opt});
%% +++ SOCKET socket options +++
-enc_sockopt_key(socket, acceptconn = Opt, get = _Dir, _D, _T, _P) ->
- not_supported(Opt);
+enc_sockopt_key(socket = L, acceptconn = Opt, get = _Dir, _D, _T, _P) ->
+ not_supported({L, Opt});
enc_sockopt_key(socket, acceptfilter = Opt, _Dir, _D, _T, _P) ->
not_supported(Opt);
%% Before linux 3.8, this socket option could be set.
%% Size of buffer for name: IFNAMSZ
%% So, we let the implementation decide.
-enc_sockopt_key(socket, bindtodevide = Opt, _Dir, _D, _T, _P) ->
- not_supported(Opt);
+enc_sockopt_key(socket = L, bindtodevide = Opt, _Dir, _D, _T, _P) ->
+ not_supported({L, Opt});
enc_sockopt_key(socket, broadcast = _Opt, _Dir, _D, dgram = _T, _P) ->
?SOCKET_OPT_SOCK_BROADCAST;
-enc_sockopt_key(socket, busy_poll = Opt, _Dir, _D, _T, _P) ->
- not_supported(Opt);
-enc_sockopt_key(socket, debug = Opt, _Dir, _D, _T, _P) ->
- not_supported(Opt);
+enc_sockopt_key(socket = L, busy_poll = Opt, _Dir, _D, _T, _P) ->
+ not_supported({L, Opt});
+enc_sockopt_key(socket = L, debug = Opt, _Dir, _D, _T, _P) ->
+ not_supported({L, Opt});
enc_sockopt_key(socket, dontroute = _Opt, _Dir, _D, _T, _P) ->
?SOCKET_OPT_SOCK_DONTROUTE;
-enc_sockopt_key(socket, error = Opt, get = _Dir, _D, _T, _P) ->
- not_supported(Opt);
+enc_sockopt_key(socket = L, error = Opt, get = _Dir, _D, _T, _P) ->
+ not_supported({L, Opt});
%% This is only for connection-oriented sockets, but who are those?
%% Type = stream or Protocol = tcp?
%% For now, we just let is pass and it will fail later if not ok...
@@ -1887,111 +1929,111 @@ enc_sockopt_key(socket, keepalive = _Opt, _Dir, _D, _T, _P) ->
?SOCKET_OPT_SOCK_KEEPALIVE;
enc_sockopt_key(socket, linger = _Opt, _Dir, _D, _T, _P) ->
?SOCKET_OPT_SOCK_LINGER;
-enc_sockopt_key(socket, mark = Opt, _Dir, _D, _T, _P) ->
- not_supported(Opt);
+enc_sockopt_key(socket = L, mark = Opt, _Dir, _D, _T, _P) ->
+ not_supported({L, Opt});
enc_sockopt_key(socket, oobinline = Opt, _Dir, _D, _T, _P) ->
not_supported(Opt);
-enc_sockopt_key(socket, passcred = Opt, _Dir, _D, _T, _P) ->
- not_supported(Opt);
-enc_sockopt_key(socket, peek_off = Opt, _Dir, local = _D, _T, _P) ->
- not_supported(Opt);
-enc_sockopt_key(socket, peek_cred = Opt, get = _Dir, _D, _T, _P) ->
- not_supported(Opt);
+enc_sockopt_key(socket = L, passcred = Opt, _Dir, _D, _T, _P) ->
+ not_supported({L, Opt});
+enc_sockopt_key(socket = L, peek_off = Opt, _Dir, local = _D, _T, _P) ->
+ not_supported({L, Opt});
+enc_sockopt_key(socket = L, peek_cred = Opt, get = _Dir, _D, _T, _P) ->
+ not_supported({L, Opt});
enc_sockopt_key(socket, priority = _Opt, _Dir, _D, _T, _P) ->
?SOCKET_OPT_SOCK_PRIORITY;
enc_sockopt_key(socket, rcvbuf = _Opt, _Dir, _D, _T, _P) ->
?SOCKET_OPT_SOCK_RCVBUF;
-enc_sockopt_key(socket, rcvbufforce = Opt, _Dir, _D, _T, _P) ->
- not_supported(Opt);
+enc_sockopt_key(socket = L, rcvbufforce = Opt, _Dir, _D, _T, _P) ->
+ not_supported({L, Opt});
%% May not work on linux.
-enc_sockopt_key(socket, rcvlowat = Opt, _Dir, _D, _T, _P) ->
- not_supported(Opt);
+enc_sockopt_key(socket = L, rcvlowat = Opt, _Dir, _D, _T, _P) ->
+ not_supported({L, Opt});
enc_sockopt_key(socket, rcvtimeo = Opt, _Dir, _D, _T, _P) ->
not_supported(Opt);
enc_sockopt_key(socket, reuseaddr = _Opt, _Dir, _D, _T, _P) ->
?SOCKET_OPT_SOCK_REUSEADDR;
-enc_sockopt_key(socket, reuseport = Opt, _Dir, D, _T, _P)
+enc_sockopt_key(socket = L, reuseport = Opt, _Dir, D, _T, _P)
when ((D =:= inet) orelse (D =:= inet6)) ->
- not_supported(Opt);
-enc_sockopt_key(socket, rxq_ovfl = Opt, _Dir, _D, _T, _P) ->
- not_supported(Opt);
-enc_sockopt_key(socket, setfib = Opt, set = _Dir, _D, _T, _P) ->
- not_supported(Opt);
+ not_supported({L, Opt});
+enc_sockopt_key(socket = L, rxq_ovfl = Opt, _Dir, _D, _T, _P) ->
+ not_supported({L, Opt});
+enc_sockopt_key(socket = L, setfib = Opt, set = _Dir, _D, _T, _P) ->
+ not_supported({L, Opt});
enc_sockopt_key(socket, sndbuf = _Opt, _Dir, _D, _T, _P) ->
?SOCKET_OPT_SOCK_SNDBUF;
-enc_sockopt_key(socket, sndbufforce = Opt, _Dir, _D, _T, _P) ->
- not_supported(Opt);
+enc_sockopt_key(socket = L, sndbufforce = Opt, _Dir, _D, _T, _P) ->
+ not_supported({L, Opt});
%% Not changeable on linux.
-enc_sockopt_key(socket, sndlowat = Opt, _Dir, _D, _T, _P) ->
- not_supported(Opt);
-enc_sockopt_key(socket, sndtimeo = Opt, _Dir, _D, _T, _P) ->
- not_supported(Opt);
-enc_sockopt_key(socket, timestamp = Opt, _Dir, _D, _T, _P) ->
- not_supported(Opt);
-enc_sockopt_key(socket, UnknownOpt, _Dir, _D, _T, _P) ->
- unknown(UnknownOpt);
+enc_sockopt_key(socket = L, sndlowat = Opt, _Dir, _D, _T, _P) ->
+ not_supported({L, Opt});
+enc_sockopt_key(socket = L, sndtimeo = Opt, _Dir, _D, _T, _P) ->
+ not_supported({L, Opt});
+enc_sockopt_key(socket = L, timestamp = Opt, _Dir, _D, _T, _P) ->
+ not_supported({L, Opt});
+enc_sockopt_key(socket = L, UnknownOpt, _Dir, _D, _T, _P) ->
+ unknown({L, UnknownOpt});
%% +++ IP socket options +++
-enc_sockopt_key(ip, add_membership = Opt, set = _Dir, _D, _T, _P) ->
- not_supported(Opt);
-enc_sockopt_key(ip, add_source_membership = Opt, set = _Dir, _D, _T, _P) ->
- not_supported(Opt);
-enc_sockopt_key(ip, block_source = Opt, set = _Dir, _D, _T, _P) ->
- not_supported(Opt);
+enc_sockopt_key(ip = L, add_membership = Opt, set = _Dir, _D, _T, _P) ->
+ not_supported({L, Opt});
+enc_sockopt_key(ip = L, add_source_membership = Opt, set = _Dir, _D, _T, _P) ->
+ not_supported({L, Opt});
+enc_sockopt_key(ip = L, block_source = Opt, set = _Dir, _D, _T, _P) ->
+ not_supported({L, Opt});
%% FreeBSD only?
%% Only respected on udp and raw ip (unless the hdrincl option has been set).
-enc_sockopt_key(ip, dontfrag = Opt, _Dir, _D, _T, _P) ->
- not_supported(Opt);
-enc_sockopt_key(ip, drop_membership = Opt, set = _Dir, _D, _T, _P) ->
- not_supported(Opt);
-enc_sockopt_key(ip, drop_source_membership = Opt, set = _Dir, _D, _T, _P) ->
- not_supported(Opt);
+enc_sockopt_key(ip = L, dontfrag = Opt, _Dir, _D, _T, _P) ->
+ not_supported({L, Opt});
+enc_sockopt_key(ip = L, drop_membership = Opt, set = _Dir, _D, _T, _P) ->
+ not_supported({L, Opt});
+enc_sockopt_key(ip = L, drop_source_membership = Opt, set = _Dir, _D, _T, _P) ->
+ not_supported({L, Opt});
%% Linux only?
-enc_sockopt_key(ip, free_bind = Opt, _Dir, _D, _T, _P) ->
- not_supported(Opt);
-enc_sockopt_key(ip, hdrincl = Opt, _Dir, _D, raw = _T, _P) ->
- not_supported(Opt);
+enc_sockopt_key(ip = L, free_bind = Opt, _Dir, _D, _T, _P) ->
+ not_supported({L, Opt});
+enc_sockopt_key(ip = L, hdrincl = Opt, _Dir, _D, raw = _T, _P) ->
+ not_supported({L, Opt});
%% FreeBSD only?
-enc_sockopt_key(ip, minttl = Opt, _Dir, _D, raw = _T, _P) ->
- not_supported(Opt);
-enc_sockopt_key(ip, msfilter = Opt, _Dir, _D, _T, _P) ->
- not_supported(Opt);
-enc_sockopt_key(ip, mtu = Opt, get = _Dir, _D, _T, _P) ->
- not_supported(Opt);
-enc_sockopt_key(ip, mtu_discover = Opt, _Dir, _D, _T, _P) ->
- not_supported(Opt);
-enc_sockopt_key(ip, multicast_all = Opt, _Dir, _D, _T, _P) ->
- not_supported(Opt);
-enc_sockopt_key(ip, multicast_if = Opt, _Dir, _D, _T, _P) ->
- not_supported(Opt);
-enc_sockopt_key(ip, multicast_loop = Opt, _Dir, _D, _T, _P) ->
- not_supported(Opt);
-enc_sockopt_key(ip, multicast_ttl = Opt, _Dir, _D, _T, _P) ->
- not_supported(Opt);
-enc_sockopt_key(ip, nodefrag = Opt, _Dir, _D, raw = _T, _P) ->
- not_supported(Opt);
-enc_sockopt_key(ip, options = Opt, _Dir, _D, _T, _P) ->
- not_supported(Opt);
-enc_sockopt_key(ip, pktinfo = Opt, _Dir, _D, _T, _P) ->
- not_supported(Opt);
+enc_sockopt_key(ip = L, minttl = Opt, _Dir, _D, raw = _T, _P) ->
+ not_supported({L, Opt});
+enc_sockopt_key(ip = L, msfilter = Opt, _Dir, _D, _T, _P) ->
+ not_supported({L, Opt});
+enc_sockopt_key(ip = L, mtu = Opt, get = _Dir, _D, _T, _P) ->
+ not_supported({L, Opt});
+enc_sockopt_key(ip = L, mtu_discover = Opt, _Dir, _D, _T, _P) ->
+ not_supported({L, Opt});
+enc_sockopt_key(ip = L, multicast_all = Opt, _Dir, _D, _T, _P) ->
+ not_supported({L, Opt});
+enc_sockopt_key(ip = L, multicast_if = Opt, _Dir, _D, _T, _P) ->
+ not_supported({L, Opt});
+enc_sockopt_key(ip = L, multicast_loop = Opt, _Dir, _D, _T, _P) ->
+ not_supported({L, Opt});
+enc_sockopt_key(ip = L, multicast_ttl = Opt, _Dir, _D, _T, _P) ->
+ not_supported({L, Opt});
+enc_sockopt_key(ip = L, nodefrag = Opt, _Dir, _D, raw = _T, _P) ->
+ not_supported({L, Opt});
+enc_sockopt_key(ip = L, options = Opt, _Dir, _D, _T, _P) ->
+ not_supported({Opt, L});
+enc_sockopt_key(ip = L, pktinfo = Opt, _Dir, _D, _T, _P) ->
+ not_supported({L, Opt});
%% This require special code for accessing the errors.
%% via calling the recvmsg with the MSG_ERRQUEUE flag set,
-enc_sockopt_key(ip, recverr = Opt, _Dir, _D, _T, _P) ->
- not_supported(Opt);
-enc_sockopt_key(ip, recvif = Opt, _Dir, _D, dgram = _T, _P) ->
- not_supported(Opt);
-enc_sockopt_key(ip, recvdstaddr = Opt, _Dir, _D, _T, _P) ->
- not_supported(Opt);
-enc_sockopt_key(ip, recvopts = Opt, _Dir, _D, T, _P) when (T =/= stream) ->
- not_supported(Opt);
-enc_sockopt_key(ip, recvorigdstaddr = Opt, _Dir, _D, _T, _P) ->
- not_supported(Opt);
+enc_sockopt_key(ip = L, recverr = Opt, _Dir, _D, _T, _P) ->
+ not_supported({L, Opt});
+enc_sockopt_key(ip = L, recvif = Opt, _Dir, _D, dgram = _T, _P) ->
+ not_supported({L, Opt});
+enc_sockopt_key(ip = L, recvdstaddr = Opt, _Dir, _D, _T, _P) ->
+ not_supported({L, Opt});
+enc_sockopt_key(ip = L, recvopts = Opt, _Dir, _D, T, _P) when (T =/= stream) ->
+ not_supported({L, Opt});
+enc_sockopt_key(ip = L, recvorigdstaddr = Opt, _Dir, _D, _T, _P) ->
+ not_supported({L, Opt});
enc_sockopt_key(ip, recvtos = _Opt, _Dir, _D, _T, _P) ->
?SOCKET_OPT_IP_RECVTOS;
-enc_sockopt_key(ip, recvttl = Opt, _Dir, _D, T, _P) when (T =/= stream) ->
- not_supported(Opt);
-enc_sockopt_key(ip, retopts = Opt, _Dir, _D, _T, _P) ->
- not_supported(Opt);
+enc_sockopt_key(ip = L, recvttl = Opt, _Dir, _D, T, _P) when (T =/= stream) ->
+ not_supported({L, Opt});
+enc_sockopt_key(ip = L, retopts = Opt, _Dir, _D, _T, _P) ->
+ not_supported({L, Opt});
enc_sockopt_key(ip, router_alert = _Opt, _Dir, _D, raw = _T, _P) ->
?SOCKET_OPT_IP_ROUTER_ALERT;
%% On FreeBSD it specifies that this option is only valid
@@ -1999,49 +2041,49 @@ enc_sockopt_key(ip, router_alert = _Opt, _Dir, _D, raw = _T, _P) ->
%% No such condition on linux (in the man page)...
enc_sockopt_key(ip, tos = _Opt, _Dir, _D, _T, _P) ->
?SOCKET_OPT_IP_TOS;
-enc_sockopt_key(ip, transparent = Opt, _Dir, _D, _T, _P) ->
- not_supported(Opt);
+enc_sockopt_key(ip = L, transparent = Opt, _Dir, _D, _T, _P) ->
+ not_supported({L, Opt});
enc_sockopt_key(ip, ttl = _Opt, _Dir, _D, _T, _P) ->
?SOCKET_OPT_IP_TTL;
-enc_sockopt_key(ip, unblock_source = Opt, set = _Dir, _D, _T, _P) ->
- not_supported(Opt);
-enc_sockopt_key(ip, UnknownOpt, _Dir, _D, _T, _P) ->
- unknown(UnknownOpt);
+enc_sockopt_key(ip = L, unblock_source = Opt, set = _Dir, _D, _T, _P) ->
+ not_supported({L, Opt});
+enc_sockopt_key(ip = L, UnknownOpt, _Dir, _D, _T, _P) ->
+ unknown({L, UnknownOpt});
%% IPv6 socket options
enc_sockopt_key(ipv6, hoplimit = _Opt, _Dir, _D, T, _P)
when (T =:= dgram) orelse (T =:= raw) ->
?SOCKET_OPT_IPV6_HOPLIMIT;
-enc_sockopt_key(ipv6, UnknownOpt, _Dir, _D, _T, _P) ->
- unknown(UnknownOpt);
+enc_sockopt_key(ipv6 = L, UnknownOpt, _Dir, _D, _T, _P) ->
+ unknown({L, UnknownOpt});
%% TCP socket options
%% There are other options that would be useful; info,
%% but they are difficult to get portable...
enc_sockopt_key(tcp, congestion = _Opt, _Dir, _D, _T, _P) ->
?SOCKET_OPT_TCP_CONGESTION;
-enc_sockopt_key(tcp, cork = Opt, _Dir, _D, _T, _P) ->
- not_supported(Opt);
+enc_sockopt_key(tcp = L, cork = Opt, _Dir, _D, _T, _P) ->
+ not_supported({L, Opt});
enc_sockopt_key(tcp, maxseg = _Opt, _Dir, _D, _T, _P) ->
?SOCKET_OPT_TCP_MAXSEG;
enc_sockopt_key(tcp, nodelay = _Opt, _Dir, _D, _T, _P) ->
?SOCKET_OPT_TCP_NODELAY;
-enc_sockopt_key(tcp, UnknownOpt, _Dir, _D, _T, _P) ->
- unknown(UnknownOpt);
+enc_sockopt_key(tcp = L, UnknownOpt, _Dir, _D, _T, _P) ->
+ unknown({L, UnknownOpt});
%% UDP socket options
enc_sockopt_key(udp, cork = _Opt, _Dir, _D, _T, _P) ->
?SOCKET_OPT_UDP_CORK;
-enc_sockopt_key(udp, UnknownOpt, _Dir, _D, _T, _P) ->
- unknown(UnknownOpt);
+enc_sockopt_key(udp = L, UnknownOpt, _Dir, _D, _T, _P) ->
+ unknown({L, UnknownOpt});
%% SCTP socket options
enc_sockopt_key(sctp, autoclose = _Opt, _Dir, _D, _T, _P) ->
?SOCKET_OPT_SCTP_AUTOCLOSE;
enc_sockopt_key(sctp, nodelay = _Opt, _Dir, _D, _T, _P) ->
?SOCKET_OPT_SCTP_NODELAY;
-enc_sockopt_key(sctp, UnknownOpt, _Dir, _D, _T, _P) ->
- unknown(UnknownOpt);
+enc_sockopt_key(sctp = L, UnknownOpt, _Dir, _D, _T, _P) ->
+ unknown({L, UnknownOpt});
%% +++ "Native" socket options +++
enc_sockopt_key(Level, Opt, set = _Dir, _D, _T, _P)
@@ -2158,6 +2200,16 @@ tdiff(T1, T2) ->
+p(F, A) ->
+ p(get(sname), F, A).
+
+p(undefined, F, A) ->
+ p("***", F, A);
+p(SName, F, A) ->
+ io:format("[~s,~p] " ++ F ++ "~n", [SName, self()|A]).
+
+
+
%% ===========================================================================
%%
%% Error functions
diff --git a/lib/kernel/test/socket_client.erl b/lib/kernel/test/socket_client.erl
index 13e87f4109..a284777046 100644
--- a/lib/kernel/test/socket_client.erl
+++ b/lib/kernel/test/socket_client.erl
@@ -10,6 +10,9 @@
-export([start/1]).
+-define(REQ, 0).
+-define(REP, 1).
+
start(Port) ->
start_tcp(Port).
@@ -19,12 +22,16 @@ start_tcp(Port) ->
start(Domain, Type, Proto, Port) ->
try do_init(Domain, Type, Proto) of
Sock ->
- connect(Sock, Domain, Port)
+ connect(Sock, Domain, Port),
+ %% Give the server some time...
+ p("wait some", []),
+ %% sleep(5000),
+ %% ok = socket:close(Sock),
+ send_loop(Sock)
catch
- throw:E:P ->
+ throw:E ->
e("Failed initiate: "
- "~n Error: ~p"
- "~n Path: ~p", [E, P])
+ "~n Error: ~p", [E])
end.
do_init(Domain, Type, Proto) ->
@@ -58,11 +65,11 @@ connect(Sock, Domain, Port) ->
SA = #{family => Domain,
addr => Addr,
port => Port},
- i("try (socket) connect to ~p", [SA]),
+ i("try (socket) connect to:"
+ "~n ~p", [SA]),
case socket:connect(Sock, SA) of
ok ->
i("connected"),
- send_loop(Sock),
ok;
{error, Reason} ->
e("connect failure: "
@@ -74,22 +81,37 @@ connect(Sock, Domain, Port) ->
send_loop(Sock) ->
send_loop(Sock, 1).
-send_loop(Sock, N) ->
- case socket:send(Sock, <<0:32, N:32, "hejsan">>) of
+send_loop(Sock, N) when (N =< 10) ->
+ i("try send request ~w", [N]),
+ Req = enc_req_msg(N, "hejsan"),
+ case socket:send(Sock, Req) of
ok ->
- case send:recv(Sock, 0) of
- {ok, <<1:32, N:32, "hejsan">>} ->
- send_loop(Sock, N+1);
+ i("request ~w sent - now try read answer", [N]),
+ case socket:recv(Sock, 0) of
+ {ok, Msg} ->
+ i("received ~w bytes of data", [size(Msg)]),
+ case dec_msg(Msg) of
+ {reply, N, Reply} ->
+ i("received reply ~w: ~p", [N, Reply]),
+ send_loop(Sock, N+1)
+ end;
{error, RReason} ->
e("Failed recv response for request ~w: "
- "~n ~p", [RReason]),
+ "~n ~p", [N, RReason]),
exit({failed_recv, RReason})
end;
{error, SReason} ->
e("Failed send request ~w: "
"~n ~p", [SReason]),
exit({failed_send, SReason})
- end.
+ end;
+send_loop(Sock, _N) ->
+ i("we are done - close the socket when: "
+ "~n ~p", [socket:info()]),
+ ok = socket:close(Sock),
+ i("we are done - socket closed when: "
+ "~n ~p", [socket:info()]).
+
which_addr(_Domain, []) ->
throw(no_address);
@@ -106,6 +128,61 @@ which_addr2(Domain, [_|IFO]) ->
which_addr2(Domain, IFO).
+%% ---
+
+enc_req_msg(N, Data) ->
+ enc_msg(?REQ, N, Data).
+
+enc_rep_msg(N, Data) ->
+ enc_msg(?REP, N, Data).
+
+enc_msg(Type, N, Data) when is_list(Data) ->
+ enc_msg(Type, N, list_to_binary(Data));
+enc_msg(Type, N, Data)
+ when is_integer(Type) andalso is_integer(N) andalso is_binary(Data) ->
+ <<Type:32/integer, N:32/integer, Data/binary>>.
+
+dec_msg(<<?REQ:32/integer, N:32/integer, Data/binary>>) ->
+ {request, N, Data};
+dec_msg(<<?REP:32/integer, N:32/integer, Data/binary>>) ->
+ {reply, N, Data}.
+
+
+%% ---
+
+sleep(T) ->
+ receive after T -> ok end.
+
+
+%% ---
+
+formated_timestamp() ->
+ format_timestamp(os:timestamp()).
+
+format_timestamp(Now) ->
+ N2T = fun(N) -> calendar:now_to_local_time(N) end,
+ format_timestamp(Now, N2T, true).
+
+format_timestamp({_N1, _N2, N3} = N, N2T, true) ->
+ FormatExtra = ".~.2.0w",
+ ArgsExtra = [N3 div 10000],
+ format_timestamp(N, N2T, FormatExtra, ArgsExtra);
+format_timestamp({_N1, _N2, _N3} = N, N2T, false) ->
+ FormatExtra = "",
+ ArgsExtra = [],
+ format_timestamp(N, N2T, FormatExtra, ArgsExtra).
+
+format_timestamp(N, N2T, FormatExtra, ArgsExtra) ->
+ {Date, Time} = N2T(N),
+ {YYYY,MM,DD} = Date,
+ {Hour,Min,Sec} = Time,
+ FormatDate =
+ io_lib:format("~.4w-~.2.0w-~.2.0w ~.2.0w:~.2.0w:~.2.0w" ++ FormatExtra,
+ [YYYY, MM, DD, Hour, Min, Sec] ++ ArgsExtra),
+ lists:flatten(FormatDate).
+
+
+%% ---
e(F, A) ->
p("<ERROR> " ++ F, A).
@@ -116,5 +193,5 @@ i(F, A) ->
p("*** " ++ F, A).
p(F, A) ->
- io:format("[client] " ++ F ++ "~n", A).
+ io:format("[client,~p][~s] " ++ F ++ "~n", [self(),formated_timestamp()|A]).
diff --git a/lib/kernel/test/socket_server.erl b/lib/kernel/test/socket_server.erl
index 0effc7c0ff..64bd6396e4 100644
--- a/lib/kernel/test/socket_server.erl
+++ b/lib/kernel/test/socket_server.erl
@@ -10,6 +10,9 @@
-export([start/0]).
+-define(REQ, 0).
+-define(REP, 1).
+
-record(handler, {socket, parent}).
start() ->
@@ -82,14 +85,17 @@ which_addr2(Domain, [_|IFO]) ->
accept_loop(LSock) ->
- put(sname, "accept-loop"),
+ put(sname, "acceptor"),
accept_loop(LSock, []).
accept_loop(LSock, Handlers) ->
i("try accept"),
case socket:accept(LSock, infinity) of
{ok, Sock} ->
- i("accepted: ~p", [Sock]),
+ i("accepted: "
+ "~n ~p"
+ "~nwhen"
+ "~n ~p", [Sock, socket:info()]),
case handle_accept_success(Sock) of
{ok, Handler} ->
accept_loop(LSock, [Handler|Handlers]);
@@ -127,6 +133,7 @@ handler_init(Parent, Socket) ->
put(sname, "handler"),
receive
{handler, Parent, continue} ->
+ socket:setopt(Socket, otp, debug, true),
handler_loop(#handler{parent = Parent,
socket = Socket})
end.
@@ -135,27 +142,90 @@ handler_continue(Handler) ->
Handler ! {handler, self(), continue}.
handler_loop(#handler{socket = Socket} = H) ->
- case socket:read(Socket, 0) of
- {ok, <<0:32, N:32, ReqData/binary>>} ->
- i("received request ~w: "
- "~n ~p", [N, ReqData]),
- Reply = <<1:32, N:32, ReqData/binary>>,
- case socket:send(Socket, Reply) of
- ok ->
- i("successfully sent reply ~w", [N]),
- handler_loop(H);
- {error, SReason} ->
- e("failed sending reply ~w:"
- "~n ~p", [N, SReason]),
- exit({failed_sending_reply, SReason})
+ case socket:recv(Socket, 0) of
+ {ok, Msg} when (size(Msg) =:= 0) ->
+ i("received empty msg - hickup? - try again", []),
+ handler_loop(H);
+ {ok, Msg} ->
+ i("received ~w bytes of data", [size(Msg)]),
+ case dec_msg(Msg) of
+ {request, N, Req} ->
+ i("received request ~w: "
+ "~n ~p", [N, Req]),
+ Reply = enc_rep_msg(N, "hoppsan"),
+ case socket:send(Socket, Reply) of
+ ok ->
+ i("successfully sent reply ~w", [N]),
+ handler_loop(H);
+ {error, SReason} ->
+ e("failed sending reply ~w:"
+ "~n ~p", [N, SReason]),
+ exit({failed_sending_reply, SReason})
+ end
end;
+
+ {error, closed} ->
+ i("closed when"
+ "~n ~p", [socket:info()]),
+ exit(normal);
+
{error, RReason} ->
e("failed reading request: "
"~n ~p", [RReason]),
- exit({failed_sending_reply, RReason})
+ exit({failed_reading_request, RReason})
end.
+%% ---
+
+enc_req_msg(N, Data) ->
+ enc_msg(?REQ, N, Data).
+
+enc_rep_msg(N, Data) ->
+ enc_msg(?REP, N, Data).
+
+enc_msg(Type, N, Data) when is_list(Data) ->
+ enc_msg(Type, N, list_to_binary(Data));
+enc_msg(Type, N, Data)
+ when is_integer(Type) andalso is_integer(N) andalso is_binary(Data) ->
+ <<Type:32/integer, N:32/integer, Data/binary>>.
+
+dec_msg(<<?REQ:32/integer, N:32/integer, Data/binary>>) ->
+ {request, N, Data};
+dec_msg(<<?REP:32/integer, N:32/integer, Data/binary>>) ->
+ {reply, N, Data}.
+
+
+%% ---
+
+formated_timestamp() ->
+ format_timestamp(os:timestamp()).
+
+format_timestamp(Now) ->
+ N2T = fun(N) -> calendar:now_to_local_time(N) end,
+ format_timestamp(Now, N2T, true).
+
+format_timestamp({_N1, _N2, N3} = N, N2T, true) ->
+ FormatExtra = ".~.2.0w",
+ ArgsExtra = [N3 div 10000],
+ format_timestamp(N, N2T, FormatExtra, ArgsExtra);
+format_timestamp({_N1, _N2, _N3} = N, N2T, false) ->
+ FormatExtra = "",
+ ArgsExtra = [],
+ format_timestamp(N, N2T, FormatExtra, ArgsExtra).
+
+format_timestamp(N, N2T, FormatExtra, ArgsExtra) ->
+ {Date, Time} = N2T(N),
+ {YYYY,MM,DD} = Date,
+ {Hour,Min,Sec} = Time,
+ FormatDate =
+ io_lib:format("~.4w-~.2.0w-~.2.0w ~.2.0w:~.2.0w:~.2.0w" ++ FormatExtra,
+ [YYYY, MM, DD, Hour, Min, Sec] ++ ArgsExtra),
+ lists:flatten(FormatDate).
+
+
+%% ---
+
e(F, A) ->
p("<ERROR> " ++ F, A).
@@ -168,5 +238,6 @@ p(F, A) ->
p(get(sname), F, A).
p(SName, F, A) ->
- io:format("[server,~s] " ++ F ++ "~n", [SName|A]).
+ io:format("[server:~s,~p][~s] " ++ F ++ "~n",
+ [SName,self(),formated_timestamp()|A]).