diff options
author | Micael Karlberg <[email protected]> | 2018-06-29 18:23:55 +0200 |
---|---|---|
committer | Micael Karlberg <[email protected]> | 2018-09-18 14:50:18 +0200 |
commit | 24be0729fe3a1ccfd5f0713b565463d6557d8aa7 (patch) | |
tree | 2243a51ada05a3ddeacd443ed6e49262cf79766c | |
parent | b09136301525b0717e897ec0864c3d2ea7708758 (diff) | |
download | otp-24be0729fe3a1ccfd5f0713b565463d6557d8aa7.tar.gz otp-24be0729fe3a1ccfd5f0713b565463d6557d8aa7.tar.bz2 otp-24be0729fe3a1ccfd5f0713b565463d6557d8aa7.zip |
[socket-nif] Fixed (stream) recv
Fixed handling of closed in the recv function.
We still need to properly handle when we get
0 bytes of data for other types ock sockets
then stream (its valid for dgram for instance).
OTP-14831
-rw-r--r-- | erts/emulator/nifs/common/socket_int.h | 1 | ||||
-rw-r--r-- | erts/emulator/nifs/common/socket_nif.c | 695 | ||||
-rw-r--r-- | erts/emulator/nifs/common/socket_util.c | 89 | ||||
-rw-r--r-- | erts/emulator/nifs/common/socket_util.h | 3 | ||||
-rw-r--r-- | erts/preloaded/ebin/socket.beam | bin | 39680 -> 42488 bytes | |||
-rw-r--r-- | erts/preloaded/src/socket.erl | 288 | ||||
-rw-r--r-- | lib/kernel/test/socket_client.erl | 105 | ||||
-rw-r--r-- | lib/kernel/test/socket_server.erl | 105 |
8 files changed, 973 insertions, 313 deletions
diff --git a/erts/emulator/nifs/common/socket_int.h b/erts/emulator/nifs/common/socket_int.h index d6a612cab6..a3e54360fe 100644 --- a/erts/emulator/nifs/common/socket_int.h +++ b/erts/emulator/nifs/common/socket_int.h @@ -192,6 +192,7 @@ extern ERL_NIF_TERM esock_atom_einval; #define GET_INT(E, TE, IP) enif_get_int((E), (TE), (IP)) #define GET_LIST_ELEM(E, L, HP, TP) enif_get_list_cell((E), (L), (HP), (TP)) #define GET_LIST_LEN(E, L, LP) enif_get_list_length((E), (L), (LP)) +#define GET_LPID(E, T, P) enif_get_local_pid((E), (T), (P)) #define GET_STR(E, L, B, SZ) \ enif_get_string((E), (L), (B), (SZ), ERL_NIF_LATIN1) #define GET_UINT(E, TE, IP) enif_get_uint((E), (TE), (IP)) diff --git a/erts/emulator/nifs/common/socket_nif.c b/erts/emulator/nifs/common/socket_nif.c index 7f45fb7bcd..027155fc92 100644 --- a/erts/emulator/nifs/common/socket_nif.c +++ b/erts/emulator/nifs/common/socket_nif.c @@ -182,8 +182,8 @@ /* Debug stuff... */ -#define SOCKET_NIF_DEBUG_DEFAULT TRUE -#define SOCKET_DEBUG_DEFAULT TRUE +#define SOCKET_NIF_DEBUG_DEFAULT FALSE +#define SOCKET_DEBUG_DEFAULT FALSE /* Counters and stuff (Don't know where to sent this stuff anyway) */ #define SOCKET_NIF_IOW_DEFAULT FALSE @@ -344,6 +344,7 @@ typedef union { #define SOCKET_OPT_OTP_DEBUG 0 #define SOCKET_OPT_OTP_IOW 1 +#define SOCKET_OPT_OTP_CTRL_PROC 2 #define SOCKET_OPT_SOCK_BROADCAST 4 #define SOCKET_OPT_SOCK_DONTROUTE 7 @@ -594,16 +595,16 @@ typedef struct { ERL_NIF_TERM version; ERL_NIF_TERM buildDate; BOOLEAN_T dbg; - BOOLEAN_T iow; + BOOLEAN_T iow; ErlNifMutex* cntMtx; uint32_t numSockets; - uint32_t numTypeDGrams; uint32_t numTypeStreams; + uint32_t numTypeDGrams; uint32_t numTypeSeqPkgs; - uint32_t numDomainLocal; uint32_t numDomainInet; uint32_t numDomainInet6; + uint32_t numDomainLocal; uint32_t numProtoIP; uint32_t numProtoTCP; uint32_t numProtoUDP; @@ -751,6 +752,9 @@ static ERL_NIF_TERM nsetopt_otp_debug(ErlNifEnv* env, static ERL_NIF_TERM nsetopt_otp_iow(ErlNifEnv* env, SocketDescriptor* descP, ERL_NIF_TERM eVal); +static ERL_NIF_TERM nsetopt_otp_ctrl_proc(ErlNifEnv* env, + SocketDescriptor* descP, + ERL_NIF_TERM eVal); static ERL_NIF_TERM nsetopt_native(ErlNifEnv* env, SocketDescriptor* descP, int level, @@ -1045,11 +1049,13 @@ static ERL_NIF_TERM send_check_result(ErlNifEnv* env, SocketDescriptor* descP, ssize_t written, ssize_t dataSize, + int saveErrno, ERL_NIF_TERM sendRef); static ERL_NIF_TERM recv_check_result(ErlNifEnv* env, SocketDescriptor* descP, int read, int toRead, + int saveErrno, ErlNifBinary* bufP, ERL_NIF_TERM recvRef); static ERL_NIF_TERM recvfrom_check_result(ErlNifEnv* env, @@ -1214,6 +1220,11 @@ static BOOLEAN_T restore_network_namespace(int ns, SOCKET sock, int* err); #endif static BOOLEAN_T cnt_inc(uint32_t* cnt, uint32_t inc); +static void cnt_dec(uint32_t* cnt, uint32_t dec); + +static void inc_socket(int domain, int type, int protocol); +static void dec_socket(int domain, int type, int protocol); + /* #if defined(HAVE_SYS_UN_H) || defined(SO_BINDTODEVICE) @@ -1698,12 +1709,13 @@ ERL_NIF_TERM nopen(ErlNifEnv* env, * * </KOLLA> */ - SELECT(env, - event, - (ERL_NIF_SELECT_READ), - descP, NULL, esock_atom_undefined); + SELECT(env, + event, + (ERL_NIF_SELECT_READ), + descP, NULL, esock_atom_undefined); #endif + inc_socket(domain, type, protocol); return esock_make_ok2(env, res); } @@ -2515,21 +2527,31 @@ ERL_NIF_TERM nif_send(ErlNifEnv* env, { SocketDescriptor* descP; ERL_NIF_TERM sendRef; - ErlNifBinary data; + ErlNifBinary sndData; unsigned int eflags; int flags; ERL_NIF_TERM res; + SGDBG( ("SOCKET", "nif_send -> entry with argc: %d\r\n", argc) ); + /* Extract arguments and perform preliminary validation */ if ((argc != 4) || !enif_get_resource(env, argv[0], sockets, (void**) &descP) || - !GET_BIN(env, argv[2], &data) || + !GET_BIN(env, argv[2], &sndData) || !GET_UINT(env, argv[3], &eflags)) { return enif_make_badarg(env); } sendRef = argv[1]; + SSDBG( descP, + ("SOCKET", "nif_send -> args when sock = %d:" + "\r\n Socket: %T" + "\r\n SendRef: %T" + "\r\n Size of data: %d" + "\r\n eFlags: %d" + "\r\n", descP->sock, argv[0], sendRef, sndData.size, eflags) ); + if (!IS_CONNECTED(descP)) return esock_make_error(env, atom_enotconn); @@ -2551,7 +2573,7 @@ ERL_NIF_TERM nif_send(ErlNifEnv* env, * time we do. */ - res = nsend(env, descP, sendRef, &data, flags); + res = nsend(env, descP, sendRef, &sndData, flags); MUNLOCK(descP->writeMtx); @@ -2570,9 +2592,10 @@ static ERL_NIF_TERM nsend(ErlNifEnv* env, SocketDescriptor* descP, ERL_NIF_TERM sendRef, - ErlNifBinary* dataP, + ErlNifBinary* sndDataP, int flags) { + int save_errno; ssize_t written; if (!descP->isWritable) @@ -2583,9 +2606,15 @@ ERL_NIF_TERM nsend(ErlNifEnv* env, */ cnt_inc(&descP->writeTries, 1); - written = sock_send(descP->sock, dataP->data, dataP->size, flags); + written = sock_send(descP->sock, sndDataP->data, sndDataP->size, flags); + if (IS_SOCKET_ERROR(written)) + save_errno = sock_errno(); + else + save_errno = -1; // The value does not actually matter in this case + - return send_check_result(env, descP, written, dataP->size, sendRef); + return send_check_result(env, descP, + written, sndDataP->size, save_errno, sendRef); } @@ -2655,6 +2684,7 @@ ERL_NIF_TERM nsendto(ErlNifEnv* env, SocketAddress* toAddrP, unsigned int toAddrLen) { + int save_errno; ssize_t written; if (!descP->isWritable) @@ -2674,8 +2704,12 @@ ERL_NIF_TERM nsendto(ErlNifEnv* env, dataP->data, dataP->size, flags, NULL, 0); } + if (IS_SOCKET_ERROR(written)) + save_errno = sock_errno(); + else + save_errno = -1; // The value does not actually matter in this case - return send_check_result(env, descP, written, dataP->size, sendRef); + return send_check_result(env, descP, written, dataP->size, save_errno, sendRef); } @@ -2829,6 +2863,13 @@ ERL_NIF_TERM nrecv(ErlNifEnv* env, { ssize_t read; ErlNifBinary buf; + int save_errno; + int bufSz = (len ? len : descP->rBufSz); + + SSDBG( descP, ("SOCKET", "nrecv -> entry with" + "\r\n len: %d (%d)" + "\r\n flags: %d" + "\r\n", len, bufSz, flags) ); if (!descP->isReadable) return enif_make_badarg(env); @@ -2837,7 +2878,7 @@ ERL_NIF_TERM nrecv(ErlNifEnv* env, * Either as much as we want to read or (if zero (0)) use the "default" * size (what has been configured). */ - if (!ALLOC_BIN((len ? len : descP->rBufSz), &buf)) + if (!ALLOC_BIN(bufSz, &buf)) return esock_make_error(env, atom_exalloc); /* We ignore the wrap for the moment. @@ -2845,10 +2886,19 @@ ERL_NIF_TERM nrecv(ErlNifEnv* env, */ cnt_inc(&descP->readTries, 1); + // If it fails (read = -1), we need errno... + SSDBG( descP, ("SOCKET", "nrecv -> try read (%d)\r\n", buf.size) ); read = sock_recv(descP->sock, buf.data, buf.size, flags); + if (IS_SOCKET_ERROR(read)) + save_errno = sock_errno(); + else + save_errno = -1; // The value does not actually matter in this case + + SSDBG( descP, ("SOCKET", "nrecv -> read: %d (%d)\r\n", read, save_errno) ); return recv_check_result(env, descP, read, len, + save_errno, &buf, recvRef); } @@ -3003,6 +3053,11 @@ ERL_NIF_TERM nclose(ErlNifEnv* env, ERL_NIF_TERM reply, reason; BOOLEAN_T doClose; int selectRes; + int domain = descP->domain; + int type = descP->type; + int protocol = descP->protocol; + + SSDBG( descP, ("SOCKET", "nclose -> [%d] entry\r\n", descP->sock) ); MLOCK(descP->closeMtx); @@ -3049,10 +3104,16 @@ ERL_NIF_TERM nclose(ErlNifEnv* env, descP, NULL, descP->closeRef); if (selectRes & ERL_NIF_SELECT_STOP_CALLED) { /* Prep done - inform the caller it can finalize (close) directly */ + SSDBG( descP, + ("SOCKET", "nclose -> [%d] stop called\r\n", descP->sock) ); + dec_socket(domain, type, protocol); reply = esock_atom_ok; } else if (selectRes & ERL_NIF_SELECT_STOP_SCHEDULED) { /* The stop callback function has been *scheduled* which means that we * have to wait for it to complete. */ + SSDBG( descP, + ("SOCKET", "nclose -> [%d] stop scheduled\r\n", descP->sock) ); + dec_socket(domain, type, protocol); // SHALL WE DO THIS AT finalize? reply = esock_make_ok2(env, descP->closeRef); } else { /* <KOLLA> @@ -3071,6 +3132,11 @@ ERL_NIF_TERM nclose(ErlNifEnv* env, reply = esock_make_error(env, reason); } + SSDBG( descP, + ("SOCKET", "nclose -> [%d] done when: " + "\r\n reply: %T" + "\r\n", descP->sock, reply) ); + return reply; } @@ -3081,7 +3147,7 @@ ERL_NIF_TERM nclose(ErlNifEnv* env, * * Description: * Perform the actual socket close! - * Note that this function is executed in a dirfty scheduler. + * Note that this function is executed in a dirty scheduler. * * Arguments: * Socket (ref) - Points to the socket descriptor. @@ -3249,6 +3315,10 @@ ERL_NIF_TERM nif_setopt(ErlNifEnv* env, ERL_NIF_TERM eVal; BOOLEAN_T isEncoded, isOTP; + SGDBG( ("SOCKET", "nif_setopt -> entry with argc: %d\r\n", argc) ); + + /* Extract arguments and perform preliminary validation */ + if ((argc != 5) || !enif_get_resource(env, argv[0], sockets, (void**) &descP) || !GET_INT(env, argv[2], &eLevel) || @@ -3263,6 +3333,19 @@ ERL_NIF_TERM nif_setopt(ErlNifEnv* env, if (!elevel2level(isEncoded, eLevel, &isOTP, &level)) return esock_make_error(env, esock_atom_einval); + SSDBG( descP, + ("SOCKET", "nif_setopt -> args when sock = %d:" + "\r\n Socket: %T" + "\r\n Encoded: %T (%d)" + "\r\n Level: %d (%d)" + "\r\n Opt: %d" + "\r\n Value: %T" + "\r\n", + descP->sock, argv[0], + eIsEncoded, isEncoded, + eLevel, level, + eOpt, eVal) ); + return nsetopt(env, descP, isEncoded, isOTP, level, eOpt, eVal); } @@ -3304,6 +3387,12 @@ ERL_NIF_TERM nsetopt_otp(ErlNifEnv* env, { ERL_NIF_TERM result; + SSDBG( descP, + ("SOCKET", "nsetopt_otp -> entry with" + "\r\n eOpt: %d" + "\r\n eVal: %T" + "\r\n", eOpt, eVal) ); + switch (eOpt) { case SOCKET_OPT_OTP_DEBUG: result = nsetopt_otp_debug(env, descP, eVal); @@ -3313,6 +3402,10 @@ ERL_NIF_TERM nsetopt_otp(ErlNifEnv* env, result = nsetopt_otp_iow(env, descP, eVal); break; + case SOCKET_OPT_OTP_CTRL_PROC: + result = nsetopt_otp_ctrl_proc(env, descP, eVal); + break; + default: result = esock_make_error(env, esock_atom_einval); break; @@ -3349,6 +3442,48 @@ ERL_NIF_TERM nsetopt_otp_iow(ErlNifEnv* env, +/* nsetopt_otp_ctrl_proc - Handle the OTP (level) controlling_process options + */ +static +ERL_NIF_TERM nsetopt_otp_ctrl_proc(ErlNifEnv* env, + SocketDescriptor* descP, + ERL_NIF_TERM eVal) +{ + ErlNifPid newCtrlPid; + ErlNifMonitor newCtrlMon; + int xres; + + SSDBG( descP, + ("SOCKET", "nsetopt_otp_ctrl_proc -> entry with" + "\r\n eVal: %T" + "\r\n", eVal) ); + + if (!GET_LPID(env, eVal, &newCtrlPid)) { + esock_warning_msg("Failed get pid of new controlling process\r\n"); + return esock_make_error(env, esock_atom_einval); + } + + if ((xres = MONP(env, descP, &newCtrlPid, &newCtrlMon)) != 0) { + esock_warning_msg("Failed monitor %d) (new) controlling process\r\n", xres); + return esock_make_error(env, esock_atom_einval); + } + + if ((xres = DEMONP(env, descP, &descP->ctrlMon)) != 0) { + esock_warning_msg("Failed demonitor (%d) " + "old controlling process %T (%T)\r\n", + xres, descP->ctrlPid, descP->ctrlMon); + } + + descP->ctrlPid = newCtrlPid; + descP->ctrlMon = newCtrlMon; + + SSDBG( descP, ("SOCKET", "nsetopt_otp_ctrl_proc -> done\r\n") ); + + return esock_atom_ok; +} + + + /* The option has *not* been encoded. Instead it has been provided * in "native mode" (option is provided as is and value as a binary). */ @@ -3362,6 +3497,12 @@ ERL_NIF_TERM nsetopt_native(ErlNifEnv* env, ErlNifBinary val; ERL_NIF_TERM result; + SSDBG( descP, + ("SOCKET", "nsetopt_native -> entry with" + "\r\n opt: %d" + "\r\n eVal: %T" + "\r\n", opt, eVal) ); + if (GET_BIN(env, eVal, &val)) { int res = socket_setopt(descP->sock, level, opt, val.data, val.size); @@ -3389,6 +3530,11 @@ ERL_NIF_TERM nsetopt_level(ErlNifEnv* env, { ERL_NIF_TERM result; + SSDBG( descP, + ("SOCKET", "nsetopt_level -> entry with" + "\r\n level: %d" + "\r\n", level) ); + switch (level) { case SOL_SOCKET: result = nsetopt_lvl_socket(env, descP, eOpt, eVal); @@ -5081,30 +5227,44 @@ ERL_NIF_TERM send_check_result(ErlNifEnv* env, SocketDescriptor* descP, ssize_t written, ssize_t dataSize, + int saveErrno, ERL_NIF_TERM sendRef) { + SSDBG( descP, + ("SOCKET", "send_check_result -> entry with" + "\r\n written: %d" + "\r\n dataSize: %d" + "\r\n saveErrno: %d" + "\r\n", written, dataSize, saveErrno) ); + if (written == dataSize) { cnt_inc(&descP->writePkgCnt, 1); cnt_inc(&descP->writeByteCnt, written); + SSDBG( descP, + ("SOCKET", "send_check_result -> everything written - done\r\n") ); + return esock_atom_ok; } else if (written < 0) { /* Ouch, check what kind of failure */ - int save_errno = sock_errno(); - if ((save_errno != EAGAIN) && - (save_errno != EINTR)) { + if ((saveErrno != EAGAIN) && (saveErrno != EINTR)) { cnt_inc(&descP->writeFails, 1); - return esock_make_error_errno(env, save_errno); + SSDBG( descP, + ("SOCKET", "send_check_result -> error: %d\r\n", saveErrno) ); + + return esock_make_error_errno(env, saveErrno); } else { /* Ok, try again later */ + SSDBG( descP, ("SOCKET", "send_check_result -> try again\r\n") ); + /* <KOLLA> * SHOULD RESULT IN {error, eagain}!!!! * </KOLLA> @@ -5124,6 +5284,9 @@ ERL_NIF_TERM send_check_result(ErlNifEnv* env, SELECT(env, descP->sock, (ERL_NIF_SELECT_WRITE), descP, NULL, sendRef); + SSDBG( descP, + ("SOCKET", "send_check_result -> not entire package written\r\n") ); + return esock_make_ok2(env, enif_make_int(env, written)); } @@ -5134,11 +5297,42 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env, SocketDescriptor* descP, int read, int toRead, + int saveErrno, ErlNifBinary* bufP, ERL_NIF_TERM recvRef) { ERL_NIF_TERM data; + SSDBG( descP, + ("SOCKET", "recv_check_result -> entry with" + "\r\n read: %d" + "\r\n toRead: %d" + "\r\n saveErrno: %d" + "\r\n recvRef: %T" + "\r\n", read, toRead, saveErrno, recvRef) ); + + + /* <KOLLA> + * + * We need to handle read = 0 for other type(s) (DGRAM) when + * its actually valid to read 0 bytes. + * + * </KOLLA> + */ + + if ((read == 0) && (descP->type == SOCK_STREAM)) { + + /* + * When a stream socket peer has performed an orderly shutdown, the return + * value will be 0 (the traditional "end-of-file" return). + * + * *We* do never actually try to read 0 bytes from a stream socket! + */ + + return esock_make_error(env, atom_closed); + + } + /* There is a special case: If the provided 'to read' value is * zero (0). That means that we reads as much as we can, using * the default read buffer size. @@ -5148,6 +5342,10 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env, /* +++ We filled the buffer +++ */ + SSDBG( descP, + ("SOCKET", + "recv_check_result -> [%d] filled the buffer\r\n", toRead) ); + if (toRead == 0) { /* +++ Give us everything you have got => needs to continue +++ */ @@ -5168,6 +5366,11 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env, data = MKBIN(env, bufP); + SSDBG( descP, + ("SOCKET", + "recv_check_result -> [%d] " + "we are done for now - read more\r\n", toRead) ); + return esock_make_ok3(env, atom_false, data); } else { @@ -5181,6 +5384,11 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env, data = MKBIN(env, bufP); + SSDBG( descP, + ("SOCKET", + "recv_check_result -> [%d] " + "we got exactly what we could fit\r\n", toRead) ); + return esock_make_ok3(env, atom_true, data); } @@ -5189,12 +5397,13 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env, /* +++ Error handling +++ */ - int save_errno = sock_errno(); - - if (save_errno == ECONNRESET) { + if (saveErrno == ECONNRESET) { /* +++ Oups - closed +++ */ + SSDBG( descP, ("SOCKET", + "recv_check_result -> [%d] closed\r\n", toRead) ); + /* <KOLLA> * * IF THE CURRENT PROCESS IS *NOT* THE CONTROLLING @@ -5220,17 +5429,31 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env, return esock_make_error(env, atom_closed); - } else if ((save_errno == ERRNO_BLOCK) || - (save_errno == EAGAIN)) { + } else if ((saveErrno == ERRNO_BLOCK) || + (saveErrno == EAGAIN)) { + SSDBG( descP, ("SOCKET", + "recv_check_result -> [%d] eagain\r\n", toRead) ); + + SELECT(env, descP->sock, (ERL_NIF_SELECT_READ), + descP, NULL, recvRef); + return esock_make_error(env, esock_atom_eagain); } else { - return esock_make_error_errno(env, save_errno); + SSDBG( descP, ("SOCKET", "recv_check_result -> [%d] errno: %d\r\n", + toRead, saveErrno) ); + return esock_make_error_errno(env, saveErrno); } } else { /* +++ We did not fill the buffer +++ */ + SSDBG( descP, + ("SOCKET", + "recv_check_result -> [%d] " + "did not fill the buffer (%d of %d)\r\n", + toRead, read, bufP->size) ); + if (toRead == 0) { /* +++ We got a chunk of data but +++ @@ -5239,9 +5462,14 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env, * +++ into a sub-binary. +++ */ + SSDBG( descP, ("SOCKET", + "recv_check_result -> [%d] split buffer\r\n", toRead) ); + data = MKBIN(env, bufP); data = MKSBIN(env, data, 0, read); + SSDBG( descP, ("SOCKET", "recv_check_result -> [%d] done\r\n", toRead) ); + return esock_make_ok3(env, atom_true, data); } else { @@ -5249,6 +5477,9 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env, /* +++ We got only a part of what was expected +++ * +++ => receive more later. +++ */ + SSDBG( descP, ("SOCKET", "recv_check_result -> [%d] " + "only part of message - expect more\r\n", toRead) ); + return esock_make_ok3(env, atom_false, MKBIN(env, bufP)); } } @@ -6343,6 +6574,90 @@ SocketDescriptor* alloc_descriptor(SOCKET sock, HANDLE event) } + +/* decrement counters for when a socket is closed */ +static +void dec_socket(int domain, int type, int protocol) +{ + MLOCK(data.cntMtx); + + cnt_dec(&data.numSockets, 1); + + if (domain == AF_INET) + cnt_dec(&data.numDomainInet, 1); +#if defined(HAVE_IN6) && defined(AF_INET6) + else if (domain == AF_INET6) + cnt_dec(&data.numDomainInet6, 1); +#endif +#if defined(HAVE_SYS_UN_H) + else if (domain == AF_UNIX) + cnt_dec(&data.numDomainInet6, 1); +#endif + + if (type == SOCK_STREAM) + cnt_dec(&data.numTypeStreams, 1); + else if (type == SOCK_DGRAM) + cnt_dec(&data.numTypeDGrams, 1); + else if (type == SOCK_SEQPACKET) + cnt_dec(&data.numTypeSeqPkgs, 1); + + if (protocol == IPPROTO_IP) + cnt_dec(&data.numProtoIP, 1); + else if (protocol == IPPROTO_TCP) + cnt_dec(&data.numProtoTCP, 1); + else if (protocol == IPPROTO_UDP) + cnt_dec(&data.numProtoUDP, 1); +#if defined(HAVE_SCTP) + else if (protocol == IPPROTO_SCTP) + cnt_dec(&data.numProtoSCTP, 1); +#endif + + MUNLOCK(data.cntMtx); +} + + +/* increment counters for when a socket is opened */ +static +void inc_socket(int domain, int type, int protocol) +{ + MLOCK(data.cntMtx); + + cnt_inc(&data.numSockets, 1); + + if (domain == AF_INET) + cnt_inc(&data.numDomainInet, 1); +#if defined(HAVE_IN6) && defined(AF_INET6) + else if (domain == AF_INET6) + cnt_inc(&data.numDomainInet6, 1); +#endif +#if defined(HAVE_SYS_UN_H) + else if (domain == AF_UNIX) + cnt_inc(&data.numDomainInet6, 1); +#endif + + if (type == SOCK_STREAM) + cnt_inc(&data.numTypeStreams, 1); + else if (type == SOCK_DGRAM) + cnt_inc(&data.numTypeDGrams, 1); + else if (type == SOCK_SEQPACKET) + cnt_inc(&data.numTypeSeqPkgs, 1); + + if (protocol == IPPROTO_IP) + cnt_inc(&data.numProtoIP, 1); + else if (protocol == IPPROTO_TCP) + cnt_inc(&data.numProtoTCP, 1); + else if (protocol == IPPROTO_UDP) + cnt_inc(&data.numProtoUDP, 1); +#if defined(HAVE_SCTP) + else if (protocol == IPPROTO_SCTP) + cnt_inc(&data.numProtoSCTP, 1); +#endif + + MUNLOCK(data.cntMtx); +} + + + /* compare_pids - Test if two pids are equal * */ @@ -6446,9 +6761,11 @@ BOOLEAN_T eproto2proto(int eproto, int* proto) *proto = IPPROTO_UDP; break; +#if defined(HAVE_SCTP) case SOCKET_PROTOCOL_SCTP: *proto = IPPROTO_SCTP; break; +#endif default: return FALSE; @@ -6519,35 +6836,42 @@ BOOLEAN_T emap2netns(ErlNifEnv* env, ERL_NIF_TERM map, char** netns) * send flags. */ static -BOOLEAN_T esendflags2sendflags(unsigned int esendflags, int* sendflags) +BOOLEAN_T esendflags2sendflags(unsigned int eflags, int* flags) { unsigned int ef; int tmp = 0; for (ef = SOCKET_SEND_FLAG_LOW; ef <= SOCKET_SEND_FLAG_HIGH; ef++) { + switch (ef) { case SOCKET_SEND_FLAG_CONFIRM: - tmp |= MSG_CONFIRM; + if ((1 << SOCKET_SEND_FLAG_CONFIRM) & eflags) + tmp |= MSG_CONFIRM; break; case SOCKET_SEND_FLAG_DONTROUTE: - tmp |= MSG_DONTROUTE; + if ((1 << SOCKET_SEND_FLAG_DONTROUTE) & eflags) + tmp |= MSG_DONTROUTE; break; case SOCKET_SEND_FLAG_EOR: - tmp |= MSG_EOR; + if ((1 << SOCKET_SEND_FLAG_EOR) & eflags) + tmp |= MSG_EOR; break; case SOCKET_SEND_FLAG_MORE: - tmp |= MSG_MORE; + if ((1 << SOCKET_SEND_FLAG_MORE) & eflags) + tmp |= MSG_MORE; break; case SOCKET_SEND_FLAG_NOSIGNAL: - tmp |= MSG_NOSIGNAL; + if ((1 << SOCKET_SEND_FLAG_NOSIGNAL) & eflags) + tmp |= MSG_NOSIGNAL; break; case SOCKET_SEND_FLAG_OOB: - tmp |= MSG_OOB; + if ((1 << SOCKET_SEND_FLAG_OOB) & eflags) + tmp |= MSG_OOB; break; default: @@ -6556,7 +6880,7 @@ BOOLEAN_T esendflags2sendflags(unsigned int esendflags, int* sendflags) } - *sendflags = tmp; + *flags = tmp; return TRUE; } @@ -6567,31 +6891,53 @@ BOOLEAN_T esendflags2sendflags(unsigned int esendflags, int* sendflags) * send flags. */ static -BOOLEAN_T erecvflags2recvflags(unsigned int erecvflags, int* recvflags) +BOOLEAN_T erecvflags2recvflags(unsigned int eflags, int* flags) { unsigned int ef; int tmp = 0; + SGDBG( ("SOCKET", "erecvflags2recvflags -> entry with" + "\r\n eflags: %d" + "\r\n", eflags) ); + for (ef = SOCKET_RECV_FLAG_LOW; ef <= SOCKET_RECV_FLAG_HIGH; ef++) { + + SGDBG( ("SOCKET", "erecvflags2recvflags -> iteration" + "\r\n ef: %d" + "\r\n tmp: %d" + "\r\n", ef, tmp) ); + switch (ef) { case SOCKET_RECV_FLAG_CMSG_CLOEXEC: - tmp |= MSG_CMSG_CLOEXEC; + if ((1 << SOCKET_RECV_FLAG_CMSG_CLOEXEC) & eflags) + tmp |= MSG_CMSG_CLOEXEC; break; case SOCKET_RECV_FLAG_ERRQUEUE: - tmp |= MSG_ERRQUEUE; + if ((1 << SOCKET_RECV_FLAG_ERRQUEUE) & eflags) + tmp |= MSG_ERRQUEUE; break; case SOCKET_RECV_FLAG_OOB: - tmp |= MSG_OOB; + if ((1 << SOCKET_RECV_FLAG_OOB) & eflags) + tmp |= MSG_OOB; break; + /* + * <KOLLA> + * + * We need to handle this, because it may effect the read algorithm + * + * </KOLLA> + */ case SOCKET_RECV_FLAG_PEEK: - tmp |= MSG_PEEK; + if ((1 << SOCKET_RECV_FLAG_PEEK) & eflags) + tmp |= MSG_PEEK; break; case SOCKET_RECV_FLAG_TRUNC: - tmp |= MSG_TRUNC; + if ((1 << SOCKET_RECV_FLAG_TRUNC) & eflags) + tmp |= MSG_TRUNC; break; default: @@ -6600,7 +6946,7 @@ BOOLEAN_T erecvflags2recvflags(unsigned int erecvflags, int* recvflags) } - *recvflags = tmp; + *flags = tmp; return TRUE; } @@ -7117,19 +7463,33 @@ char* send_msg(ErlNifEnv* env, static BOOLEAN_T cnt_inc(uint32_t* cnt, uint32_t inc) { - BOOLEAN_T wrap; - uint32_t max = 0xFFFFFFFF; - uint32_t current = *cnt; + BOOLEAN_T wrap; + uint32_t max = 0xFFFFFFFF; + uint32_t current = *cnt; - if ((max - inc) >= current) { - *cnt += inc; - wrap = FALSE; - } else { - *cnt = inc - (max - current) - 1; - wrap = TRUE; - } + if ((max - inc) >= current) { + *cnt += inc; + wrap = FALSE; + } else { + *cnt = inc - (max - current) - 1; + wrap = TRUE; + } + + return (wrap); +} + + +static +void cnt_dec(uint32_t* cnt, uint32_t dec) +{ + uint32_t current = *cnt; + + if (dec > current) + *cnt = 0; // The counter cannot be < 0 so this is the best we can do... + else + *cnt -= dec; - return (wrap); + return; } @@ -7183,116 +7543,125 @@ void socket_dtor(ErlNifEnv* env, void* obj) static void socket_stop(ErlNifEnv* env, void* obj, int fd, int is_direct_call) { - SocketDescriptor* descP = (SocketDescriptor*) obj; - - MLOCK(descP->writeMtx); - MLOCK(descP->readMtx); - MLOCK(descP->accMtx); - MLOCK(descP->closeMtx); - - - descP->state = SOCKET_STATE_CLOSING; // Just in case...??? - descP->isReadable = FALSE; - descP->isWritable = FALSE; - - - /* We should check that we actually have a monitor. - * This *should* be done with a "NULL" monitor value, - * which there currently is none... - */ - DEMONP(env, descP, &descP->ctrlMon); - - if (descP->currentWriterP != NULL) { - /* We have a (current) writer and *may* therefor also have - * writers waiting. - */ - - ESOCK_ASSERT( (NULL == send_msg_nif_abort(env, - descP->currentWriter.ref, - atom_closed, - &descP->currentWriter.pid)) ); - - /* And also deal with the waiting writers (in the same way) */ - inform_waiting_procs(env, descP, &descP->writersQ, TRUE, atom_closed); - } - - if (descP->currentReaderP != NULL) { - - /* We have a (current) reader and *may* therefor also have - * readers waiting. - */ - - ESOCK_ASSERT( (NULL == send_msg_nif_abort(env, - descP->currentReader.ref, - atom_closed, - &descP->currentReader.pid)) ); - - /* And also deal with the waiting readers (in the same way) */ - inform_waiting_procs(env, descP, &descP->readersQ, TRUE, atom_closed); - } - - if (descP->currentAcceptorP != NULL) { - /* We have a (current) acceptor and *may* therefor also have - * acceptors waiting. - */ - - ESOCK_ASSERT( (NULL == send_msg_nif_abort(env, - descP->currentAcceptor.ref, - atom_closed, - &descP->currentAcceptor.pid)) ); - - /* And also deal with the waiting acceptors (in the same way) */ - inform_waiting_procs(env, descP, &descP->acceptorsQ, TRUE, atom_closed); - } - - - if (descP->sock != INVALID_SOCKET) { - - /* - * <KOLLA> - * - * WE NEED TO CHECK IF THIS OPERATION IS TRIGGERED - * LOCALLY (VIA A CALL TO CLOSE) OR REMOTELLY - * (VIA I.E. ECONSRESET). - * - * </KOLLA> - */ - - if (descP->closeLocal) { - - /* +++ send close message to the waiting process +++ - * - * {close, CloseRef} - * - * <KOLLA> - * - * WHAT HAPPENS IF THE RECEIVER HAS DIED IN THE MEANTIME???? - * - * </KOLLA> - */ - - send_msg(env, MKT2(env, atom_close, descP->closeRef), &descP->closerPid); - - DEMONP(env, descP, &descP->closerMon); - - } else { - - /* - * <KOLLA> - * - * ABORT? - * - * </KOLLA> - */ - } - } - - - MUNLOCK(descP->closeMtx); - MUNLOCK(descP->accMtx); - MUNLOCK(descP->readMtx); - MUNLOCK(descP->writeMtx); + SocketDescriptor* descP = (SocketDescriptor*) obj; + + SSDBG( descP, + ("SOCKET", "socket_stop -> entry when" + "\r\n sock: %d (%d)" + "\r\n is_direct_call: %d" + "\r\n", descP->sock, fd, is_direct_call) ); + + MLOCK(descP->writeMtx); + MLOCK(descP->readMtx); + MLOCK(descP->accMtx); + MLOCK(descP->closeMtx); + + + descP->state = SOCKET_STATE_CLOSING; // Just in case...??? + descP->isReadable = FALSE; + descP->isWritable = FALSE; + + + /* We should check that we actually have a monitor. + * This *should* be done with a "NULL" monitor value, + * which there currently is none... + */ + DEMONP(env, descP, &descP->ctrlMon); + + if (descP->currentWriterP != NULL) { + /* We have a (current) writer and *may* therefor also have + * writers waiting. + */ + + ESOCK_ASSERT( (NULL == send_msg_nif_abort(env, + descP->currentWriter.ref, + atom_closed, + &descP->currentWriter.pid)) ); + + /* And also deal with the waiting writers (in the same way) */ + inform_waiting_procs(env, descP, &descP->writersQ, TRUE, atom_closed); + } + + if (descP->currentReaderP != NULL) { + + /* We have a (current) reader and *may* therefor also have + * readers waiting. + */ + + ESOCK_ASSERT( (NULL == send_msg_nif_abort(env, + descP->currentReader.ref, + atom_closed, + &descP->currentReader.pid)) ); + + /* And also deal with the waiting readers (in the same way) */ + inform_waiting_procs(env, descP, &descP->readersQ, TRUE, atom_closed); + } + + if (descP->currentAcceptorP != NULL) { + /* We have a (current) acceptor and *may* therefor also have + * acceptors waiting. + */ + + ESOCK_ASSERT( (NULL == send_msg_nif_abort(env, + descP->currentAcceptor.ref, + atom_closed, + &descP->currentAcceptor.pid)) ); + + /* And also deal with the waiting acceptors (in the same way) */ + inform_waiting_procs(env, descP, &descP->acceptorsQ, TRUE, atom_closed); + } + + + if (descP->sock != INVALID_SOCKET) { + + /* + * <KOLLA> + * + * WE NEED TO CHECK IF THIS OPERATION IS TRIGGERED + * LOCALLY (VIA A CALL TO CLOSE) OR REMOTELLY + * (VIA I.E. ECONSRESET). + * + * </KOLLA> + */ + + if (descP->closeLocal) { + + /* +++ send close message to the waiting process +++ + * + * {close, CloseRef} + * + * <KOLLA> + * + * WHAT HAPPENS IF THE RECEIVER HAS DIED IN THE MEANTIME???? + * + * </KOLLA> + */ + + send_msg(env, MKT2(env, atom_close, descP->closeRef), &descP->closerPid); + + DEMONP(env, descP, &descP->closerMon); + + } else { + + /* + * <KOLLA> + * + * ABORT? + * + * </KOLLA> + */ + } + } + + + MUNLOCK(descP->closeMtx); + MUNLOCK(descP->accMtx); + MUNLOCK(descP->readMtx); + MUNLOCK(descP->writeMtx); + SSDBG( descP, + ("SOCKET", "socket_stop -> done (%d, %d)\r\n", descP->sock, fd) ); + } diff --git a/erts/emulator/nifs/common/socket_util.c b/erts/emulator/nifs/common/socket_util.c index 05fb40e286..397f69f58d 100644 --- a/erts/emulator/nifs/common/socket_util.c +++ b/erts/emulator/nifs/common/socket_util.c @@ -29,6 +29,11 @@ #include "socket_dbg.h" #include "sys.h" +#include <stdarg.h> +#include <string.h> +#include <stdio.h> +#include <ctype.h> +#include <time.h> /* We don't have a "debug flag" to check here, so we * should use the compile debug flag, whatever that is... @@ -46,6 +51,9 @@ extern char* erl_errno_id(int error); /* THIS IS JUST TEMPORARY??? */ +static int realtime(struct timespec* tsP); +static int timespec2str(char *buf, unsigned int len, struct timespec *ts); + static char* make_sockaddr_in4(ErlNifEnv* env, ERL_NIF_TERM port, ERL_NIF_TERM addr, @@ -1168,10 +1176,89 @@ void esock_abort(const char* expr, +/* *** esock_warning_msg *** + * + * Temporary function for issuing warning messages. + * + */ +extern +void esock_warning_msg( const char* format, ... ) +{ + va_list args; + char f[512 + sizeof(format)]; // This has to suffice... + char stamp[32]; + struct timespec ts; + int res; + + /* + * We should really include self in the printout, so we can se which process + * are executing the code. But then I must change the API.... + * ....something for later. + */ + + // 2018-06-29 12:13:21.232089 + // 29-Jun-2018::13:47:25.097097 + + if (!realtime(&ts)) { + if (timespec2str(stamp, sizeof(stamp), &ts) != 0) { + res = enif_snprintf(f, sizeof(f), "=WARNING MSG==== %s", format); + } else { + res = enif_snprintf(f, sizeof(f), + "=WARNING MSG==== %s ===\r\n%s" , stamp, format); + } + + if (res > 0) { + va_start (args, format); + enif_vfprintf (stdout, f, args); + va_end (args); + fflush(stdout); + } + } + + return; +} + + +static +int realtime(struct timespec* tsP) +{ + return clock_gettime(CLOCK_REALTIME, tsP); +} + + +/* + * Convert a timespec struct into a readable/printable string. + * + * "%F::%T" => 2018-06-29 12:13:21[.232089] + * "%d-%b-%Y::%T" => 29-Jun-2018::13:47:25.097097 + */ +static +int timespec2str(char *buf, unsigned int len, struct timespec *ts) +{ + int ret, buflen; + struct tm t; + + tzset(); + if (localtime_r(&(ts->tv_sec), &t) == NULL) + return 1; + + ret = strftime(buf, len, "%d-%B-%Y::%T", &t); + if (ret == 0) + return 2; + len -= ret - 1; + buflen = strlen(buf); + + ret = snprintf(&buf[buflen], len, ".%06ld", ts->tv_nsec/1000); + if (ret >= len) + return 3; + + return 0; +} + /* =================================================================== * * * - * Various utility functions * + * Various (internal) utility functions * * * * =================================================================== */ diff --git a/erts/emulator/nifs/common/socket_util.h b/erts/emulator/nifs/common/socket_util.h index dedeb8dd7d..add2c8f4be 100644 --- a/erts/emulator/nifs/common/socket_util.h +++ b/erts/emulator/nifs/common/socket_util.h @@ -158,5 +158,8 @@ ERL_NIF_TERM esock_make_error_str(ErlNifEnv* env, char* reason); extern ERL_NIF_TERM esock_make_error_errno(ErlNifEnv* env, int err); +extern +void esock_warning_msg(const char* format, ... ); + #endif // SOCKET_UTIL_H__ diff --git a/erts/preloaded/ebin/socket.beam b/erts/preloaded/ebin/socket.beam Binary files differindex f82db6e44e..4924c43a5c 100644 --- a/erts/preloaded/ebin/socket.beam +++ b/erts/preloaded/ebin/socket.beam diff --git a/erts/preloaded/src/socket.erl b/erts/preloaded/src/socket.erl index ba3ff6bab9..bf94271073 100644 --- a/erts/preloaded/src/socket.erl +++ b/erts/preloaded/src/socket.erl @@ -41,7 +41,7 @@ %% sendmsg/4, %% writev/4, OR SENDV? It will be strange for recv then: recvv (instead of readv) - recv/2, recv/3, recv/4, + recv/1, recv/2, recv/3, recv/4, recvfrom/1, recvfrom/2, recvfrom/3, recvfrom/4, %% recvmsg/4, %% readv/3, @@ -442,16 +442,17 @@ -define(SOCKET_RECV_FLAGS_DEFAULT, []). -define(SOCKET_RECV_TIMEOUT_DEFAULT, infinity). --define(SOCKET_OPT_LEVEL_OTP, 0). --define(SOCKET_OPT_LEVEL_SOCKET, 1). --define(SOCKET_OPT_LEVEL_IP, 2). --define(SOCKET_OPT_LEVEL_IPV6, 3). --define(SOCKET_OPT_LEVEL_TCP, 4). --define(SOCKET_OPT_LEVEL_UDP, 5). --define(SOCKET_OPT_LEVEL_SCTP, 6). +-define(SOCKET_OPT_LEVEL_OTP, 0). +-define(SOCKET_OPT_LEVEL_SOCKET, 1). +-define(SOCKET_OPT_LEVEL_IP, 2). +-define(SOCKET_OPT_LEVEL_IPV6, 3). +-define(SOCKET_OPT_LEVEL_TCP, 4). +-define(SOCKET_OPT_LEVEL_UDP, 5). +-define(SOCKET_OPT_LEVEL_SCTP, 6). --define(SOCKET_OPT_OTP_DEBUG, 0). --define(SOCKET_OPT_OTP_IOW, 1). +-define(SOCKET_OPT_OTP_DEBUG, 0). +-define(SOCKET_OPT_OTP_IOW, 1). +-define(SOCKET_OPT_OTP_CTRL_PROC, 2). -define(SOCKET_OPT_SOCK_BROADCAST, 4). -define(SOCKET_OPT_SOCK_DONTROUTE, 7). @@ -1097,6 +1098,9 @@ do_sendto(SockRef, Data, EFlags, Dest, Timeout) -> %% Flags - A list of "options" for the read. %% Timeout - Time-out in milliseconds. +recv(Socket) -> + recv(Socket, 0). + recv(Socket, Length) -> recv(Socket, Length, ?SOCKET_RECV_FLAGS_DEFAULT, @@ -1131,10 +1135,21 @@ do_recv(SockRef, _OldRef, Length, EFlags, Acc, Timeout) (is_integer(Timeout) andalso (Timeout > 0)) -> TS = timestamp(Timeout), RecvRef = make_ref(), + p("do_recv -> try read with" + "~n SockRef: ~p" + "~n RecvRef: ~p" + "~n Length: ~p" + "~n EFlags: ~p" + "~nwhen" + "~n Timeout: ~p (~p)", [SockRef, RecvRef, Length, EFlags, Timeout, TS]), case nif_recv(SockRef, RecvRef, Length, EFlags) of {ok, true = _Complete, Bin} when (size(Acc) =:= 0) -> + p("do_recv -> ok: complete (size(Acc) =:= 0)" + "~n size(Bin): ~p", [size(Bin)]), {ok, Bin}; {ok, true = _Complete, Bin} -> + p("do_recv -> ok: complete" + "~n size(Bin): ~p", [size(Bin)]), {ok, <<Acc/binary, Bin/binary>>}; %% It depends on the amount of bytes we tried to read: @@ -1143,12 +1158,16 @@ do_recv(SockRef, _OldRef, Length, EFlags, Acc, Timeout) %% > 0 - We got a part of the message and we will be notified %% when there is more to read (a select message) {ok, false = _Complete, Bin} when (Length =:= 0) -> + p("do_recv -> ok: not-complete (Length =:= 0)" + "~n size(Bin): ~p", [size(Bin)]), do_recv(SockRef, RecvRef, Length, EFlags, <<Acc/binary, Bin/binary>>, next_timeout(TS, Timeout)); {ok, false = _Completed, Bin} when (size(Acc) =:= 0) -> + p("do_recv -> ok: not-complete (size(Acc) =:= 0)" + "~n size(Bin): ~p", [size(Bin)]), %% We got the first chunk of it. %% We will be notified (select message) when there %% is more to read. @@ -1171,6 +1190,8 @@ do_recv(SockRef, _OldRef, Length, EFlags, Acc, Timeout) end; {ok, false = _Completed, Bin} -> + p("do_recv -> ok: not-complete" + "~n size(Bin): ~p", [size(Bin)]), %% We got a chunk of it! NewTimeout = next_timeout(TS, Timeout), receive @@ -1190,16 +1211,19 @@ do_recv(SockRef, _OldRef, Length, EFlags, Acc, Timeout) {error, {timeout, Acc}} end; - %% We return with the accumulated binary regardless if its empty... - {error, eagain} when (Length =:= 0) -> + %% We return with the accumulated binary (if its non-empty) + {error, eagain} when (Length =:= 0) andalso (size(Acc) > 0) -> + p("do_recv -> eagain (Length =:= 0)", []), {ok, Acc}; {error, eagain} -> + p("do_recv -> eagain", []), %% There is nothing just now, but we will be notified when there %% is something to read (a select message). NewTimeout = next_timeout(TS, Timeout), receive {select, SockRef, RecvRef, ready_input} -> + p("do_recv -> received select ready-input message", []), do_recv(SockRef, RecvRef, Length, EFlags, Acc, @@ -1214,6 +1238,15 @@ do_recv(SockRef, _OldRef, Length, EFlags, Acc, Timeout) {error, timeout} end; + {error, closed = Reason} -> + do_close(SockRef), + if + (size(Acc) =:= 0) -> + {error, Reason}; + true -> + {error, {Reason, Acc}} + end; + {error, _} = ERROR when (size(Acc) =:= 0) -> ERROR; @@ -1342,6 +1375,9 @@ do_recvfrom(SockRef, BufSz, EFlags, Timeout) -> Reason :: term(). close(#socket{ref = SockRef}) -> + do_close(SockRef). + +do_close(SockRef) -> case nif_close(SockRef) of ok -> nif_finalize_close(SockRef); @@ -1659,6 +1695,8 @@ enc_setopt_value(otp, debug, V, _, _, _) when is_boolean(V) -> V; enc_setopt_value(otp, iow, V, _, _, _) when is_boolean(V) -> V; +enc_setopt_value(otp, controlling_process, V, _, _, _) when is_pid(V) -> + V; enc_setopt_value(otp = L, Opt, V, _D, _T, _P) -> not_supported({L, Opt, V}); @@ -1859,27 +1897,31 @@ enc_sockopt_key(otp, debug, _, _, _, _) -> ?SOCKET_OPT_OTP_DEBUG; enc_sockopt_key(otp, iow, _, _, _, _) -> ?SOCKET_OPT_OTP_IOW; +enc_sockopt_key(otp, controlling_process, _, _, _, _) -> + ?SOCKET_OPT_OTP_CTRL_PROC; +enc_sockopt_key(otp = L, Opt, _, _, _, _) -> + not_supported({L, Opt}); %% +++ SOCKET socket options +++ -enc_sockopt_key(socket, acceptconn = Opt, get = _Dir, _D, _T, _P) -> - not_supported(Opt); +enc_sockopt_key(socket = L, acceptconn = Opt, get = _Dir, _D, _T, _P) -> + not_supported({L, Opt}); enc_sockopt_key(socket, acceptfilter = Opt, _Dir, _D, _T, _P) -> not_supported(Opt); %% Before linux 3.8, this socket option could be set. %% Size of buffer for name: IFNAMSZ %% So, we let the implementation decide. -enc_sockopt_key(socket, bindtodevide = Opt, _Dir, _D, _T, _P) -> - not_supported(Opt); +enc_sockopt_key(socket = L, bindtodevide = Opt, _Dir, _D, _T, _P) -> + not_supported({L, Opt}); enc_sockopt_key(socket, broadcast = _Opt, _Dir, _D, dgram = _T, _P) -> ?SOCKET_OPT_SOCK_BROADCAST; -enc_sockopt_key(socket, busy_poll = Opt, _Dir, _D, _T, _P) -> - not_supported(Opt); -enc_sockopt_key(socket, debug = Opt, _Dir, _D, _T, _P) -> - not_supported(Opt); +enc_sockopt_key(socket = L, busy_poll = Opt, _Dir, _D, _T, _P) -> + not_supported({L, Opt}); +enc_sockopt_key(socket = L, debug = Opt, _Dir, _D, _T, _P) -> + not_supported({L, Opt}); enc_sockopt_key(socket, dontroute = _Opt, _Dir, _D, _T, _P) -> ?SOCKET_OPT_SOCK_DONTROUTE; -enc_sockopt_key(socket, error = Opt, get = _Dir, _D, _T, _P) -> - not_supported(Opt); +enc_sockopt_key(socket = L, error = Opt, get = _Dir, _D, _T, _P) -> + not_supported({L, Opt}); %% This is only for connection-oriented sockets, but who are those? %% Type = stream or Protocol = tcp? %% For now, we just let is pass and it will fail later if not ok... @@ -1887,111 +1929,111 @@ enc_sockopt_key(socket, keepalive = _Opt, _Dir, _D, _T, _P) -> ?SOCKET_OPT_SOCK_KEEPALIVE; enc_sockopt_key(socket, linger = _Opt, _Dir, _D, _T, _P) -> ?SOCKET_OPT_SOCK_LINGER; -enc_sockopt_key(socket, mark = Opt, _Dir, _D, _T, _P) -> - not_supported(Opt); +enc_sockopt_key(socket = L, mark = Opt, _Dir, _D, _T, _P) -> + not_supported({L, Opt}); enc_sockopt_key(socket, oobinline = Opt, _Dir, _D, _T, _P) -> not_supported(Opt); -enc_sockopt_key(socket, passcred = Opt, _Dir, _D, _T, _P) -> - not_supported(Opt); -enc_sockopt_key(socket, peek_off = Opt, _Dir, local = _D, _T, _P) -> - not_supported(Opt); -enc_sockopt_key(socket, peek_cred = Opt, get = _Dir, _D, _T, _P) -> - not_supported(Opt); +enc_sockopt_key(socket = L, passcred = Opt, _Dir, _D, _T, _P) -> + not_supported({L, Opt}); +enc_sockopt_key(socket = L, peek_off = Opt, _Dir, local = _D, _T, _P) -> + not_supported({L, Opt}); +enc_sockopt_key(socket = L, peek_cred = Opt, get = _Dir, _D, _T, _P) -> + not_supported({L, Opt}); enc_sockopt_key(socket, priority = _Opt, _Dir, _D, _T, _P) -> ?SOCKET_OPT_SOCK_PRIORITY; enc_sockopt_key(socket, rcvbuf = _Opt, _Dir, _D, _T, _P) -> ?SOCKET_OPT_SOCK_RCVBUF; -enc_sockopt_key(socket, rcvbufforce = Opt, _Dir, _D, _T, _P) -> - not_supported(Opt); +enc_sockopt_key(socket = L, rcvbufforce = Opt, _Dir, _D, _T, _P) -> + not_supported({L, Opt}); %% May not work on linux. -enc_sockopt_key(socket, rcvlowat = Opt, _Dir, _D, _T, _P) -> - not_supported(Opt); +enc_sockopt_key(socket = L, rcvlowat = Opt, _Dir, _D, _T, _P) -> + not_supported({L, Opt}); enc_sockopt_key(socket, rcvtimeo = Opt, _Dir, _D, _T, _P) -> not_supported(Opt); enc_sockopt_key(socket, reuseaddr = _Opt, _Dir, _D, _T, _P) -> ?SOCKET_OPT_SOCK_REUSEADDR; -enc_sockopt_key(socket, reuseport = Opt, _Dir, D, _T, _P) +enc_sockopt_key(socket = L, reuseport = Opt, _Dir, D, _T, _P) when ((D =:= inet) orelse (D =:= inet6)) -> - not_supported(Opt); -enc_sockopt_key(socket, rxq_ovfl = Opt, _Dir, _D, _T, _P) -> - not_supported(Opt); -enc_sockopt_key(socket, setfib = Opt, set = _Dir, _D, _T, _P) -> - not_supported(Opt); + not_supported({L, Opt}); +enc_sockopt_key(socket = L, rxq_ovfl = Opt, _Dir, _D, _T, _P) -> + not_supported({L, Opt}); +enc_sockopt_key(socket = L, setfib = Opt, set = _Dir, _D, _T, _P) -> + not_supported({L, Opt}); enc_sockopt_key(socket, sndbuf = _Opt, _Dir, _D, _T, _P) -> ?SOCKET_OPT_SOCK_SNDBUF; -enc_sockopt_key(socket, sndbufforce = Opt, _Dir, _D, _T, _P) -> - not_supported(Opt); +enc_sockopt_key(socket = L, sndbufforce = Opt, _Dir, _D, _T, _P) -> + not_supported({L, Opt}); %% Not changeable on linux. -enc_sockopt_key(socket, sndlowat = Opt, _Dir, _D, _T, _P) -> - not_supported(Opt); -enc_sockopt_key(socket, sndtimeo = Opt, _Dir, _D, _T, _P) -> - not_supported(Opt); -enc_sockopt_key(socket, timestamp = Opt, _Dir, _D, _T, _P) -> - not_supported(Opt); -enc_sockopt_key(socket, UnknownOpt, _Dir, _D, _T, _P) -> - unknown(UnknownOpt); +enc_sockopt_key(socket = L, sndlowat = Opt, _Dir, _D, _T, _P) -> + not_supported({L, Opt}); +enc_sockopt_key(socket = L, sndtimeo = Opt, _Dir, _D, _T, _P) -> + not_supported({L, Opt}); +enc_sockopt_key(socket = L, timestamp = Opt, _Dir, _D, _T, _P) -> + not_supported({L, Opt}); +enc_sockopt_key(socket = L, UnknownOpt, _Dir, _D, _T, _P) -> + unknown({L, UnknownOpt}); %% +++ IP socket options +++ -enc_sockopt_key(ip, add_membership = Opt, set = _Dir, _D, _T, _P) -> - not_supported(Opt); -enc_sockopt_key(ip, add_source_membership = Opt, set = _Dir, _D, _T, _P) -> - not_supported(Opt); -enc_sockopt_key(ip, block_source = Opt, set = _Dir, _D, _T, _P) -> - not_supported(Opt); +enc_sockopt_key(ip = L, add_membership = Opt, set = _Dir, _D, _T, _P) -> + not_supported({L, Opt}); +enc_sockopt_key(ip = L, add_source_membership = Opt, set = _Dir, _D, _T, _P) -> + not_supported({L, Opt}); +enc_sockopt_key(ip = L, block_source = Opt, set = _Dir, _D, _T, _P) -> + not_supported({L, Opt}); %% FreeBSD only? %% Only respected on udp and raw ip (unless the hdrincl option has been set). -enc_sockopt_key(ip, dontfrag = Opt, _Dir, _D, _T, _P) -> - not_supported(Opt); -enc_sockopt_key(ip, drop_membership = Opt, set = _Dir, _D, _T, _P) -> - not_supported(Opt); -enc_sockopt_key(ip, drop_source_membership = Opt, set = _Dir, _D, _T, _P) -> - not_supported(Opt); +enc_sockopt_key(ip = L, dontfrag = Opt, _Dir, _D, _T, _P) -> + not_supported({L, Opt}); +enc_sockopt_key(ip = L, drop_membership = Opt, set = _Dir, _D, _T, _P) -> + not_supported({L, Opt}); +enc_sockopt_key(ip = L, drop_source_membership = Opt, set = _Dir, _D, _T, _P) -> + not_supported({L, Opt}); %% Linux only? -enc_sockopt_key(ip, free_bind = Opt, _Dir, _D, _T, _P) -> - not_supported(Opt); -enc_sockopt_key(ip, hdrincl = Opt, _Dir, _D, raw = _T, _P) -> - not_supported(Opt); +enc_sockopt_key(ip = L, free_bind = Opt, _Dir, _D, _T, _P) -> + not_supported({L, Opt}); +enc_sockopt_key(ip = L, hdrincl = Opt, _Dir, _D, raw = _T, _P) -> + not_supported({L, Opt}); %% FreeBSD only? -enc_sockopt_key(ip, minttl = Opt, _Dir, _D, raw = _T, _P) -> - not_supported(Opt); -enc_sockopt_key(ip, msfilter = Opt, _Dir, _D, _T, _P) -> - not_supported(Opt); -enc_sockopt_key(ip, mtu = Opt, get = _Dir, _D, _T, _P) -> - not_supported(Opt); -enc_sockopt_key(ip, mtu_discover = Opt, _Dir, _D, _T, _P) -> - not_supported(Opt); -enc_sockopt_key(ip, multicast_all = Opt, _Dir, _D, _T, _P) -> - not_supported(Opt); -enc_sockopt_key(ip, multicast_if = Opt, _Dir, _D, _T, _P) -> - not_supported(Opt); -enc_sockopt_key(ip, multicast_loop = Opt, _Dir, _D, _T, _P) -> - not_supported(Opt); -enc_sockopt_key(ip, multicast_ttl = Opt, _Dir, _D, _T, _P) -> - not_supported(Opt); -enc_sockopt_key(ip, nodefrag = Opt, _Dir, _D, raw = _T, _P) -> - not_supported(Opt); -enc_sockopt_key(ip, options = Opt, _Dir, _D, _T, _P) -> - not_supported(Opt); -enc_sockopt_key(ip, pktinfo = Opt, _Dir, _D, _T, _P) -> - not_supported(Opt); +enc_sockopt_key(ip = L, minttl = Opt, _Dir, _D, raw = _T, _P) -> + not_supported({L, Opt}); +enc_sockopt_key(ip = L, msfilter = Opt, _Dir, _D, _T, _P) -> + not_supported({L, Opt}); +enc_sockopt_key(ip = L, mtu = Opt, get = _Dir, _D, _T, _P) -> + not_supported({L, Opt}); +enc_sockopt_key(ip = L, mtu_discover = Opt, _Dir, _D, _T, _P) -> + not_supported({L, Opt}); +enc_sockopt_key(ip = L, multicast_all = Opt, _Dir, _D, _T, _P) -> + not_supported({L, Opt}); +enc_sockopt_key(ip = L, multicast_if = Opt, _Dir, _D, _T, _P) -> + not_supported({L, Opt}); +enc_sockopt_key(ip = L, multicast_loop = Opt, _Dir, _D, _T, _P) -> + not_supported({L, Opt}); +enc_sockopt_key(ip = L, multicast_ttl = Opt, _Dir, _D, _T, _P) -> + not_supported({L, Opt}); +enc_sockopt_key(ip = L, nodefrag = Opt, _Dir, _D, raw = _T, _P) -> + not_supported({L, Opt}); +enc_sockopt_key(ip = L, options = Opt, _Dir, _D, _T, _P) -> + not_supported({Opt, L}); +enc_sockopt_key(ip = L, pktinfo = Opt, _Dir, _D, _T, _P) -> + not_supported({L, Opt}); %% This require special code for accessing the errors. %% via calling the recvmsg with the MSG_ERRQUEUE flag set, -enc_sockopt_key(ip, recverr = Opt, _Dir, _D, _T, _P) -> - not_supported(Opt); -enc_sockopt_key(ip, recvif = Opt, _Dir, _D, dgram = _T, _P) -> - not_supported(Opt); -enc_sockopt_key(ip, recvdstaddr = Opt, _Dir, _D, _T, _P) -> - not_supported(Opt); -enc_sockopt_key(ip, recvopts = Opt, _Dir, _D, T, _P) when (T =/= stream) -> - not_supported(Opt); -enc_sockopt_key(ip, recvorigdstaddr = Opt, _Dir, _D, _T, _P) -> - not_supported(Opt); +enc_sockopt_key(ip = L, recverr = Opt, _Dir, _D, _T, _P) -> + not_supported({L, Opt}); +enc_sockopt_key(ip = L, recvif = Opt, _Dir, _D, dgram = _T, _P) -> + not_supported({L, Opt}); +enc_sockopt_key(ip = L, recvdstaddr = Opt, _Dir, _D, _T, _P) -> + not_supported({L, Opt}); +enc_sockopt_key(ip = L, recvopts = Opt, _Dir, _D, T, _P) when (T =/= stream) -> + not_supported({L, Opt}); +enc_sockopt_key(ip = L, recvorigdstaddr = Opt, _Dir, _D, _T, _P) -> + not_supported({L, Opt}); enc_sockopt_key(ip, recvtos = _Opt, _Dir, _D, _T, _P) -> ?SOCKET_OPT_IP_RECVTOS; -enc_sockopt_key(ip, recvttl = Opt, _Dir, _D, T, _P) when (T =/= stream) -> - not_supported(Opt); -enc_sockopt_key(ip, retopts = Opt, _Dir, _D, _T, _P) -> - not_supported(Opt); +enc_sockopt_key(ip = L, recvttl = Opt, _Dir, _D, T, _P) when (T =/= stream) -> + not_supported({L, Opt}); +enc_sockopt_key(ip = L, retopts = Opt, _Dir, _D, _T, _P) -> + not_supported({L, Opt}); enc_sockopt_key(ip, router_alert = _Opt, _Dir, _D, raw = _T, _P) -> ?SOCKET_OPT_IP_ROUTER_ALERT; %% On FreeBSD it specifies that this option is only valid @@ -1999,49 +2041,49 @@ enc_sockopt_key(ip, router_alert = _Opt, _Dir, _D, raw = _T, _P) -> %% No such condition on linux (in the man page)... enc_sockopt_key(ip, tos = _Opt, _Dir, _D, _T, _P) -> ?SOCKET_OPT_IP_TOS; -enc_sockopt_key(ip, transparent = Opt, _Dir, _D, _T, _P) -> - not_supported(Opt); +enc_sockopt_key(ip = L, transparent = Opt, _Dir, _D, _T, _P) -> + not_supported({L, Opt}); enc_sockopt_key(ip, ttl = _Opt, _Dir, _D, _T, _P) -> ?SOCKET_OPT_IP_TTL; -enc_sockopt_key(ip, unblock_source = Opt, set = _Dir, _D, _T, _P) -> - not_supported(Opt); -enc_sockopt_key(ip, UnknownOpt, _Dir, _D, _T, _P) -> - unknown(UnknownOpt); +enc_sockopt_key(ip = L, unblock_source = Opt, set = _Dir, _D, _T, _P) -> + not_supported({L, Opt}); +enc_sockopt_key(ip = L, UnknownOpt, _Dir, _D, _T, _P) -> + unknown({L, UnknownOpt}); %% IPv6 socket options enc_sockopt_key(ipv6, hoplimit = _Opt, _Dir, _D, T, _P) when (T =:= dgram) orelse (T =:= raw) -> ?SOCKET_OPT_IPV6_HOPLIMIT; -enc_sockopt_key(ipv6, UnknownOpt, _Dir, _D, _T, _P) -> - unknown(UnknownOpt); +enc_sockopt_key(ipv6 = L, UnknownOpt, _Dir, _D, _T, _P) -> + unknown({L, UnknownOpt}); %% TCP socket options %% There are other options that would be useful; info, %% but they are difficult to get portable... enc_sockopt_key(tcp, congestion = _Opt, _Dir, _D, _T, _P) -> ?SOCKET_OPT_TCP_CONGESTION; -enc_sockopt_key(tcp, cork = Opt, _Dir, _D, _T, _P) -> - not_supported(Opt); +enc_sockopt_key(tcp = L, cork = Opt, _Dir, _D, _T, _P) -> + not_supported({L, Opt}); enc_sockopt_key(tcp, maxseg = _Opt, _Dir, _D, _T, _P) -> ?SOCKET_OPT_TCP_MAXSEG; enc_sockopt_key(tcp, nodelay = _Opt, _Dir, _D, _T, _P) -> ?SOCKET_OPT_TCP_NODELAY; -enc_sockopt_key(tcp, UnknownOpt, _Dir, _D, _T, _P) -> - unknown(UnknownOpt); +enc_sockopt_key(tcp = L, UnknownOpt, _Dir, _D, _T, _P) -> + unknown({L, UnknownOpt}); %% UDP socket options enc_sockopt_key(udp, cork = _Opt, _Dir, _D, _T, _P) -> ?SOCKET_OPT_UDP_CORK; -enc_sockopt_key(udp, UnknownOpt, _Dir, _D, _T, _P) -> - unknown(UnknownOpt); +enc_sockopt_key(udp = L, UnknownOpt, _Dir, _D, _T, _P) -> + unknown({L, UnknownOpt}); %% SCTP socket options enc_sockopt_key(sctp, autoclose = _Opt, _Dir, _D, _T, _P) -> ?SOCKET_OPT_SCTP_AUTOCLOSE; enc_sockopt_key(sctp, nodelay = _Opt, _Dir, _D, _T, _P) -> ?SOCKET_OPT_SCTP_NODELAY; -enc_sockopt_key(sctp, UnknownOpt, _Dir, _D, _T, _P) -> - unknown(UnknownOpt); +enc_sockopt_key(sctp = L, UnknownOpt, _Dir, _D, _T, _P) -> + unknown({L, UnknownOpt}); %% +++ "Native" socket options +++ enc_sockopt_key(Level, Opt, set = _Dir, _D, _T, _P) @@ -2158,6 +2200,16 @@ tdiff(T1, T2) -> +p(F, A) -> + p(get(sname), F, A). + +p(undefined, F, A) -> + p("***", F, A); +p(SName, F, A) -> + io:format("[~s,~p] " ++ F ++ "~n", [SName, self()|A]). + + + %% =========================================================================== %% %% Error functions diff --git a/lib/kernel/test/socket_client.erl b/lib/kernel/test/socket_client.erl index 13e87f4109..a284777046 100644 --- a/lib/kernel/test/socket_client.erl +++ b/lib/kernel/test/socket_client.erl @@ -10,6 +10,9 @@ -export([start/1]). +-define(REQ, 0). +-define(REP, 1). + start(Port) -> start_tcp(Port). @@ -19,12 +22,16 @@ start_tcp(Port) -> start(Domain, Type, Proto, Port) -> try do_init(Domain, Type, Proto) of Sock -> - connect(Sock, Domain, Port) + connect(Sock, Domain, Port), + %% Give the server some time... + p("wait some", []), + %% sleep(5000), + %% ok = socket:close(Sock), + send_loop(Sock) catch - throw:E:P -> + throw:E -> e("Failed initiate: " - "~n Error: ~p" - "~n Path: ~p", [E, P]) + "~n Error: ~p", [E]) end. do_init(Domain, Type, Proto) -> @@ -58,11 +65,11 @@ connect(Sock, Domain, Port) -> SA = #{family => Domain, addr => Addr, port => Port}, - i("try (socket) connect to ~p", [SA]), + i("try (socket) connect to:" + "~n ~p", [SA]), case socket:connect(Sock, SA) of ok -> i("connected"), - send_loop(Sock), ok; {error, Reason} -> e("connect failure: " @@ -74,22 +81,37 @@ connect(Sock, Domain, Port) -> send_loop(Sock) -> send_loop(Sock, 1). -send_loop(Sock, N) -> - case socket:send(Sock, <<0:32, N:32, "hejsan">>) of +send_loop(Sock, N) when (N =< 10) -> + i("try send request ~w", [N]), + Req = enc_req_msg(N, "hejsan"), + case socket:send(Sock, Req) of ok -> - case send:recv(Sock, 0) of - {ok, <<1:32, N:32, "hejsan">>} -> - send_loop(Sock, N+1); + i("request ~w sent - now try read answer", [N]), + case socket:recv(Sock, 0) of + {ok, Msg} -> + i("received ~w bytes of data", [size(Msg)]), + case dec_msg(Msg) of + {reply, N, Reply} -> + i("received reply ~w: ~p", [N, Reply]), + send_loop(Sock, N+1) + end; {error, RReason} -> e("Failed recv response for request ~w: " - "~n ~p", [RReason]), + "~n ~p", [N, RReason]), exit({failed_recv, RReason}) end; {error, SReason} -> e("Failed send request ~w: " "~n ~p", [SReason]), exit({failed_send, SReason}) - end. + end; +send_loop(Sock, _N) -> + i("we are done - close the socket when: " + "~n ~p", [socket:info()]), + ok = socket:close(Sock), + i("we are done - socket closed when: " + "~n ~p", [socket:info()]). + which_addr(_Domain, []) -> throw(no_address); @@ -106,6 +128,61 @@ which_addr2(Domain, [_|IFO]) -> which_addr2(Domain, IFO). +%% --- + +enc_req_msg(N, Data) -> + enc_msg(?REQ, N, Data). + +enc_rep_msg(N, Data) -> + enc_msg(?REP, N, Data). + +enc_msg(Type, N, Data) when is_list(Data) -> + enc_msg(Type, N, list_to_binary(Data)); +enc_msg(Type, N, Data) + when is_integer(Type) andalso is_integer(N) andalso is_binary(Data) -> + <<Type:32/integer, N:32/integer, Data/binary>>. + +dec_msg(<<?REQ:32/integer, N:32/integer, Data/binary>>) -> + {request, N, Data}; +dec_msg(<<?REP:32/integer, N:32/integer, Data/binary>>) -> + {reply, N, Data}. + + +%% --- + +sleep(T) -> + receive after T -> ok end. + + +%% --- + +formated_timestamp() -> + format_timestamp(os:timestamp()). + +format_timestamp(Now) -> + N2T = fun(N) -> calendar:now_to_local_time(N) end, + format_timestamp(Now, N2T, true). + +format_timestamp({_N1, _N2, N3} = N, N2T, true) -> + FormatExtra = ".~.2.0w", + ArgsExtra = [N3 div 10000], + format_timestamp(N, N2T, FormatExtra, ArgsExtra); +format_timestamp({_N1, _N2, _N3} = N, N2T, false) -> + FormatExtra = "", + ArgsExtra = [], + format_timestamp(N, N2T, FormatExtra, ArgsExtra). + +format_timestamp(N, N2T, FormatExtra, ArgsExtra) -> + {Date, Time} = N2T(N), + {YYYY,MM,DD} = Date, + {Hour,Min,Sec} = Time, + FormatDate = + io_lib:format("~.4w-~.2.0w-~.2.0w ~.2.0w:~.2.0w:~.2.0w" ++ FormatExtra, + [YYYY, MM, DD, Hour, Min, Sec] ++ ArgsExtra), + lists:flatten(FormatDate). + + +%% --- e(F, A) -> p("<ERROR> " ++ F, A). @@ -116,5 +193,5 @@ i(F, A) -> p("*** " ++ F, A). p(F, A) -> - io:format("[client] " ++ F ++ "~n", A). + io:format("[client,~p][~s] " ++ F ++ "~n", [self(),formated_timestamp()|A]). diff --git a/lib/kernel/test/socket_server.erl b/lib/kernel/test/socket_server.erl index 0effc7c0ff..64bd6396e4 100644 --- a/lib/kernel/test/socket_server.erl +++ b/lib/kernel/test/socket_server.erl @@ -10,6 +10,9 @@ -export([start/0]). +-define(REQ, 0). +-define(REP, 1). + -record(handler, {socket, parent}). start() -> @@ -82,14 +85,17 @@ which_addr2(Domain, [_|IFO]) -> accept_loop(LSock) -> - put(sname, "accept-loop"), + put(sname, "acceptor"), accept_loop(LSock, []). accept_loop(LSock, Handlers) -> i("try accept"), case socket:accept(LSock, infinity) of {ok, Sock} -> - i("accepted: ~p", [Sock]), + i("accepted: " + "~n ~p" + "~nwhen" + "~n ~p", [Sock, socket:info()]), case handle_accept_success(Sock) of {ok, Handler} -> accept_loop(LSock, [Handler|Handlers]); @@ -127,6 +133,7 @@ handler_init(Parent, Socket) -> put(sname, "handler"), receive {handler, Parent, continue} -> + socket:setopt(Socket, otp, debug, true), handler_loop(#handler{parent = Parent, socket = Socket}) end. @@ -135,27 +142,90 @@ handler_continue(Handler) -> Handler ! {handler, self(), continue}. handler_loop(#handler{socket = Socket} = H) -> - case socket:read(Socket, 0) of - {ok, <<0:32, N:32, ReqData/binary>>} -> - i("received request ~w: " - "~n ~p", [N, ReqData]), - Reply = <<1:32, N:32, ReqData/binary>>, - case socket:send(Socket, Reply) of - ok -> - i("successfully sent reply ~w", [N]), - handler_loop(H); - {error, SReason} -> - e("failed sending reply ~w:" - "~n ~p", [N, SReason]), - exit({failed_sending_reply, SReason}) + case socket:recv(Socket, 0) of + {ok, Msg} when (size(Msg) =:= 0) -> + i("received empty msg - hickup? - try again", []), + handler_loop(H); + {ok, Msg} -> + i("received ~w bytes of data", [size(Msg)]), + case dec_msg(Msg) of + {request, N, Req} -> + i("received request ~w: " + "~n ~p", [N, Req]), + Reply = enc_rep_msg(N, "hoppsan"), + case socket:send(Socket, Reply) of + ok -> + i("successfully sent reply ~w", [N]), + handler_loop(H); + {error, SReason} -> + e("failed sending reply ~w:" + "~n ~p", [N, SReason]), + exit({failed_sending_reply, SReason}) + end end; + + {error, closed} -> + i("closed when" + "~n ~p", [socket:info()]), + exit(normal); + {error, RReason} -> e("failed reading request: " "~n ~p", [RReason]), - exit({failed_sending_reply, RReason}) + exit({failed_reading_request, RReason}) end. +%% --- + +enc_req_msg(N, Data) -> + enc_msg(?REQ, N, Data). + +enc_rep_msg(N, Data) -> + enc_msg(?REP, N, Data). + +enc_msg(Type, N, Data) when is_list(Data) -> + enc_msg(Type, N, list_to_binary(Data)); +enc_msg(Type, N, Data) + when is_integer(Type) andalso is_integer(N) andalso is_binary(Data) -> + <<Type:32/integer, N:32/integer, Data/binary>>. + +dec_msg(<<?REQ:32/integer, N:32/integer, Data/binary>>) -> + {request, N, Data}; +dec_msg(<<?REP:32/integer, N:32/integer, Data/binary>>) -> + {reply, N, Data}. + + +%% --- + +formated_timestamp() -> + format_timestamp(os:timestamp()). + +format_timestamp(Now) -> + N2T = fun(N) -> calendar:now_to_local_time(N) end, + format_timestamp(Now, N2T, true). + +format_timestamp({_N1, _N2, N3} = N, N2T, true) -> + FormatExtra = ".~.2.0w", + ArgsExtra = [N3 div 10000], + format_timestamp(N, N2T, FormatExtra, ArgsExtra); +format_timestamp({_N1, _N2, _N3} = N, N2T, false) -> + FormatExtra = "", + ArgsExtra = [], + format_timestamp(N, N2T, FormatExtra, ArgsExtra). + +format_timestamp(N, N2T, FormatExtra, ArgsExtra) -> + {Date, Time} = N2T(N), + {YYYY,MM,DD} = Date, + {Hour,Min,Sec} = Time, + FormatDate = + io_lib:format("~.4w-~.2.0w-~.2.0w ~.2.0w:~.2.0w:~.2.0w" ++ FormatExtra, + [YYYY, MM, DD, Hour, Min, Sec] ++ ArgsExtra), + lists:flatten(FormatDate). + + +%% --- + e(F, A) -> p("<ERROR> " ++ F, A). @@ -168,5 +238,6 @@ p(F, A) -> p(get(sname), F, A). p(SName, F, A) -> - io:format("[server,~s] " ++ F ++ "~n", [SName|A]). + io:format("[server:~s,~p][~s] " ++ F ++ "~n", + [SName,self(),formated_timestamp()|A]). |