diff options
author | Micael Karlberg <[email protected]> | 2018-07-30 18:22:46 +0200 |
---|---|---|
committer | Micael Karlberg <[email protected]> | 2018-09-18 14:50:18 +0200 |
commit | 165666d8b8b1b21ffaf43ac436cfc1657ba83649 (patch) | |
tree | 82a21a93a7dadaa7eab78ccd1a82d6916d0edc11 /erts/emulator/nifs/common/socket_nif.c | |
parent | 6b01561dc13a0152f56da0a2c61ad88236f87de7 (diff) | |
download | otp-165666d8b8b1b21ffaf43ac436cfc1657ba83649.tar.gz otp-165666d8b8b1b21ffaf43ac436cfc1657ba83649.tar.bz2 otp-165666d8b8b1b21ffaf43ac436cfc1657ba83649.zip |
[socket-nif] Add support for recvmsg
Added preliminary support for function recvmsg. At the moment
this only works on *nix (Windows has another function, WSARecvMsg,
which has a slightly different API).
Also we have "no" cmsg decode at the moment (just level and type).
OTP-14831
Diffstat (limited to 'erts/emulator/nifs/common/socket_nif.c')
-rw-r--r-- | erts/emulator/nifs/common/socket_nif.c | 343 |
1 files changed, 337 insertions, 6 deletions
diff --git a/erts/emulator/nifs/common/socket_nif.c b/erts/emulator/nifs/common/socket_nif.c index 13250349db..6b6ddb29ca 100644 --- a/erts/emulator/nifs/common/socket_nif.c +++ b/erts/emulator/nifs/common/socket_nif.c @@ -417,7 +417,8 @@ static void (*esock_sctp_freepaddrs)(struct sockaddr *addrs) = NULL; #define SOCKET_RECV_FLAG_LOW SOCKET_RECV_FLAG_CMSG_CLOEXEC #define SOCKET_RECV_FLAG_HIGH SOCKET_RECV_FLAG_TRUNC -#define SOCKET_RECV_BUFFER_SIZE_DEFAULT 2048 +#define SOCKET_RECV_BUFFER_SIZE_DEFAULT 2048 +#define SOCKET_RECV_CTRL_BUFFER_SIZE_DEFAULT 1024 #define SOCKET_OPT_VALUE_TYPE_UNSPEC 0 #define SOCKET_OPT_VALUE_TYPE_INT 1 @@ -582,6 +583,10 @@ typedef union { #define SOCKET_OPT_SCTP_NODELAY 23 #define SOCKET_OPT_SCTP_RTOINFO 29 +/* We should *eventually* use this instead of hard-coding the size (to 1) */ +#define ESOCK_RECVMSG_IOVEC_SZ 1 + + /* =================================================================== * * * @@ -663,6 +668,7 @@ static unsigned long one_value = 1; #define sock_recv(s,buf,len,flag) recv((s),(buf),(len),(flag)) #define sock_recvfrom(s,buf,blen,flag,addr,alen) \ recvfrom((s),(buf),(blen),(flag),(addr),(alen)) +#define sock_recvmsg(s,msghdr,flag) recvmsg((s),(msghdr),(flag)) #define sock_send(s,buf,len,flag) send((s), (buf), (len), (flag)) #define sock_sendto(s,buf,blen,flag,addr,alen) \ sendto((s),(buf),(blen),(flag),(addr),(alen)) @@ -759,8 +765,9 @@ typedef struct { SocketRequestQueue acceptorsQ; /* +++ Config & Misc stuff +++ */ - size_t rBufSz; // Read buffer size (when data length = 0 is specified) - BOOLEAN_T iow; // Inform On Wrap + size_t rBufSz; // Read buffer size (when data length = 0 is specified) + size_t rCtrlSz; // Read control buffer size + BOOLEAN_T iow; // Inform On Wrap BOOLEAN_T dbg; /* +++ Close stuff +++ */ @@ -871,6 +878,9 @@ static ERL_NIF_TERM nif_recv(ErlNifEnv* env, static ERL_NIF_TERM nif_recvfrom(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]); +static ERL_NIF_TERM nif_recvmsg(ErlNifEnv* env, + int argc, + const ERL_NIF_TERM argv[]); static ERL_NIF_TERM nif_close(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]); @@ -947,6 +957,12 @@ static ERL_NIF_TERM nrecvfrom(ErlNifEnv* env, ERL_NIF_TERM recvRef, uint16_t bufSz, int flags); +static ERL_NIF_TERM nrecvmsg(ErlNifEnv* env, + SocketDescriptor* descP, + ERL_NIF_TERM recvRef, + uint16_t bufLen, + uint16_t ctrlLen, + int flags); static ERL_NIF_TERM nclose(ErlNifEnv* env, SocketDescriptor* descP); static ERL_NIF_TERM nshutdown(ErlNifEnv* env, @@ -1854,6 +1870,13 @@ static ERL_NIF_TERM recvfrom_check_result(ErlNifEnv* env, SocketAddress* fromAddrP, unsigned int fromAddrLen, ERL_NIF_TERM recvRef); +static ERL_NIF_TERM recvmsg_check_result(ErlNifEnv* env, + SocketDescriptor* descP, + int read, + int saveErrno, + struct msghdr* msgHdrP, + ErlNifBinary* ctrlBufP, + ERL_NIF_TERM recvRef); static ERL_NIF_TERM nfinalize_connection(ErlNifEnv* env, SocketDescriptor* descP); @@ -2211,19 +2234,28 @@ static char str_exsend[] = "exsend"; // failed send /* *** "Global" Atoms *** */ ERL_NIF_TERM esock_atom_addr; ERL_NIF_TERM esock_atom_any; +ERL_NIF_TERM esock_atom_ctrl; +ERL_NIF_TERM esock_atom_ctrunc; +ERL_NIF_TERM esock_atom_data; ERL_NIF_TERM esock_atom_dgram; ERL_NIF_TERM esock_atom_debug; +ERL_NIF_TERM esock_atom_eor; ERL_NIF_TERM esock_atom_error; +ERL_NIF_TERM esock_atom_errqueue; ERL_NIF_TERM esock_atom_false; ERL_NIF_TERM esock_atom_family; +ERL_NIF_TERM esock_atom_flags; ERL_NIF_TERM esock_atom_flowinfo; ERL_NIF_TERM esock_atom_inet; ERL_NIF_TERM esock_atom_inet6; +ERL_NIF_TERM esock_atom_iov; ERL_NIF_TERM esock_atom_ip; ERL_NIF_TERM esock_atom_ipv6; +ERL_NIF_TERM esock_atom_level; ERL_NIF_TERM esock_atom_local; ERL_NIF_TERM esock_atom_loopback; ERL_NIF_TERM esock_atom_ok; +ERL_NIF_TERM esock_atom_oob; ERL_NIF_TERM esock_atom_path; ERL_NIF_TERM esock_atom_protocol; ERL_NIF_TERM esock_atom_port; @@ -2235,6 +2267,7 @@ ERL_NIF_TERM esock_atom_seqpacket; ERL_NIF_TERM esock_atom_stream; ERL_NIF_TERM esock_atom_tcp; ERL_NIF_TERM esock_atom_true; +ERL_NIF_TERM esock_atom_trunc; ERL_NIF_TERM esock_atom_type; ERL_NIF_TERM esock_atom_udp; ERL_NIF_TERM esock_atom_undefined; @@ -2355,7 +2388,8 @@ static SocketData data; * nif_send(Sock, SendRef, Data, Flags) * nif_sendto(Sock, SendRef, Data, Dest, Flags) * nif_recv(Sock, RecvRef, Length, Flags) - * nif_recvfrom(Sock, Flags) + * nif_recvfrom(Sock, RecvRef, BufSz, Flags) + * nif_recvmsg(Sock, RecvRef, BufSz, CtrlSz, Flags) * nif_close(Sock) * nif_shutdown(Sock, How) * nif_sockname(Sock) @@ -4024,6 +4058,189 @@ ERL_NIF_TERM nrecvfrom(ErlNifEnv* env, /* ---------------------------------------------------------------------- + * nif_recvmsg + * + * Description: + * Receive a message on a socket. + * Normally used only on a (un-) connected socket! + * If a buffer size = 0 is specified, then we will use the default + * buffer size for this socket (whatever has been configured). + * If ctrl (buffer) size = 0 is specified, then the default ctrl + * (buffer) size is used (1024). + * + * Arguments: + * Socket (ref) - Points to the socket descriptor. + * RecvRef - A unique id for this (send) request. + * BufSz - Size of the buffer into which we put the received message. + * CtrlSz - Size of the ctrl (buffer) into which we put the received + * ancillary data. + * Flags - Receive flags. + * + * <KOLLA> + * + * How do we handle if the peek flag is set? We need to basically keep + * track of if we expect any data from the read. Regardless of the + * number of bytes we try to read. + * + * </KOLLA> + */ + +static +ERL_NIF_TERM nif_recvmsg(ErlNifEnv* env, + int argc, + const ERL_NIF_TERM argv[]) +{ + SocketDescriptor* descP; + ERL_NIF_TERM recvRef; + unsigned int bufSz; + unsigned int ctrlSz; + unsigned int eflags; + int flags; + ERL_NIF_TERM res; + + SGDBG( ("SOCKET", "nif_recvmsg -> entry with argc: %d\r\n", argc) ); + + /* Extract arguments and perform preliminary validation */ + + if ((argc != 4) || + !enif_get_resource(env, argv[0], sockets, (void**) &descP) || + !GET_UINT(env, argv[2], &bufSz) || + !GET_UINT(env, argv[3], &ctrlSz) || + !GET_UINT(env, argv[4], &eflags)) { + return enif_make_badarg(env); + } + recvRef = argv[1]; + + SSDBG( descP, + ("SOCKET", "nif_recvmsg -> args when sock = %d:" + "\r\n Socket: %T" + "\r\n recvRef: %T" + "\r\n bufSz: %d" + "\r\n ctrlSz: %d" + "\r\n eflags: %d" + "\r\n", descP->sock, argv[0], recvRef, bufSz, ctrlSz, eflags) ); + + /* if (IS_OPEN(descP)) */ + /* return esock_make_error(env, atom_enotconn); */ + + if (!erecvflags2recvflags(eflags, &flags)) + return enif_make_badarg(env); + + MLOCK(descP->readMtx); + + /* <KOLLA> + * + * We need to handle the case when another process tries + * to receive at the same time. + * If the current recv could not read its entire package + * this time (resulting in an select). The read of the + * other process must be made to wait until current + * is done! + * Basically, we need a read queue! + * + * A 'reading' field (boolean), which is set if we did + * not manage to read the entire message and reset every + * time we do. + * + * </KOLLA> + */ + + res = nrecvmsg(env, descP, recvRef, bufSz, ctrlSz, flags); + + MUNLOCK(descP->readMtx); + + return res; + +} + + +/* The (read) buffer handling *must* be optimized! + * But for now we make it easy for ourselves by + * allocating a binary (of the specified or default + * size) and then throwing it away... + */ +static +ERL_NIF_TERM nrecvmsg(ErlNifEnv* env, + SocketDescriptor* descP, + ERL_NIF_TERM recvRef, + uint16_t bufLen, + uint16_t ctrlLen, + int flags) +{ + unsigned int addrLen; + ssize_t read; + int save_errno; + ErlNifBinary buf, ctrl; + int bufSz = (bufLen ? bufLen : descP->rBufSz); + int ctrlSz = (ctrlLen ? ctrlLen : descP->rCtrlSz); + struct msghdr msgHdr; + struct iovec iov[1]; // Shall we always use 1? + SocketAddress addr; + + SSDBG( descP, ("SOCKET", "nrecvfrom -> entry with" + "\r\n bufSz: %d (%d)" + "\r\n ctrlSz: %d (%d)" + "\r\n flags: %d" + "\r\n", bufSz, bufLen, ctrlSz, ctrlLen, flags) ); + + if (!descP->isReadable) + return enif_make_badarg(env); + + /* + for (i = 0; i < sizeof(buf); i++) { + if (!ALLOC_BIN(bifSz, &buf[i])) + return esock_make_error(env, atom_exalloc); + iov[i].iov_base = buf[i].data; + iov[i].iov_len = buf[i].size; + } + */ + + /* Allocate the (msg) data buffer: + */ + if (!ALLOC_BIN(bufSz, &buf)) + return esock_make_error(env, atom_exalloc); + + /* Allocate the ctrl (buffer): + */ + if (!ALLOC_BIN(ctrlSz, &ctrl)) + return esock_make_error(env, atom_exalloc); + + /* We ignore the wrap for the moment. + * Maybe we should issue a wrap-message to controlling process... + */ + cnt_inc(&descP->readTries, 1); + + addrLen = sizeof(addr); + sys_memzero((char*) &addr, addrLen); + sys_memzero((char*) &msgHdr, sizeof(msgHdr)); + + iov[0].iov_base = buf.data; + iov[0].iov_len = buf.size; + + msgHdr.msg_name = &addr; + msgHdr.msg_namelen = addrLen; + msgHdr.msg_iov = iov; + msgHdr.msg_iovlen = 1; // Should use a constant or calculate... + msgHdr.msg_control = ctrl.data; + msgHdr.msg_controllen = ctrl.size; + + read = sock_recvmsg(descP->sock, &msgHdr, flags); + if (IS_SOCKET_ERROR(read)) + save_errno = sock_errno(); + else + save_errno = -1; // The value does not actually matter in this case + + return recvmsg_check_result(env, descP, + read, + save_errno, + &msgHdr, + &ctrl, // Needed for ctrl header decode + recvRef); +} + + + +/* ---------------------------------------------------------------------- * nif_close * * Description: @@ -9800,7 +10017,7 @@ ERL_NIF_TERM ngetopt_timeval_opt(ErlNifEnv* env, ESOCK_ASSERT( (numKeys == numVals) ); if (!MKMA(env, keys, vals, numKeys, &eTimeVal)) - return esock_make_error(env, esock_atom_einval);; + return esock_make_error(env, esock_atom_einval); result = esock_make_ok2(env, eTimeVal); } @@ -10370,6 +10587,110 @@ ERL_NIF_TERM recvfrom_check_result(ErlNifEnv* env, +/* The recvmsg function delivers one (1) message. If our buffer + * is to small, the message will be truncated. So, regardless + * if we filled the buffer or not, we have got what we are going + * to get regarding this message. + */ +static +ERL_NIF_TERM recvmsg_check_result(ErlNifEnv* env, + SocketDescriptor* descP, + int read, + int saveErrno, + struct msghdr* msgHdrP, + ErlNifBinary* ctrlBufP, + ERL_NIF_TERM recvRef) +{ + + SSDBG( descP, + ("SOCKET", "recvmsg_check_result -> entry with" + "\r\n read: %d" + "\r\n saveErrno: %d" + "\r\n recvRef: %T" + "\r\n", read, saveErrno, recvRef) ); + + + /* There is a special case: If the provided 'to read' value is + * zero (0). That means that we reads as much as we can, using + * the default read buffer size. + */ + + if (read < 0) { + + /* +++ Error handling +++ */ + + if (saveErrno == ECONNRESET) { + + /* +++ Oups - closed +++ */ + + SSDBG( descP, ("SOCKET", "recvfrom_check_result -> closed\r\n") ); + + /* <KOLLA> + * IF THE CURRENT PROCESS IS *NOT* THE CONTROLLING + * PROCESS, WE NEED TO INFORM IT!!! + * + * ALL WAITING PROCESSES MUST ALSO GET THE ERROR!! + * + * </KOLLA> + */ + + descP->closeLocal = FALSE; + descP->state = SOCKET_STATE_CLOSING; + + SELECT(env, + descP->sock, + (ERL_NIF_SELECT_STOP), + descP, NULL, recvRef); + + return esock_make_error(env, atom_closed); + + } else if ((saveErrno == ERRNO_BLOCK) || + (saveErrno == EAGAIN)) { + + SSDBG( descP, ("SOCKET", "recvfrom_check_result -> eagain\r\n") ); + + SELECT(env, descP->sock, (ERL_NIF_SELECT_READ), + descP, NULL, recvRef); + + return esock_make_error(env, esock_atom_eagain); + } else { + + SSDBG( descP, + ("SOCKET", + "recvfrom_check_result -> errno: %d\r\n", saveErrno) ); + + return esock_make_error_errno(env, saveErrno); + } + + } else { + + /* +++ We sucessfully got a message - time to encode it +++ */ + + ERL_NIF_TERM eMsgHdr; + char* xres; + + /* + * <KOLLA> + * + * The return value of recvmsg is the *total* number of bytes + * that where successfully read. This data has been put into + * the *IO vector*. + * + * </KOLLA> + */ + + if ((xres = esock_encode_msghdr(env, read, + msgHdrP, ctrlBufP, + &eMsgHdr)) != NULL) + return esock_make_error_str(env, xres); + else + return esock_make_ok2(env, eMsgHdr); + + } +} + + + /* +++ decode the linger value +++ * The (socket) linger option is provided as a two tuple: * @@ -10831,6 +11152,7 @@ SocketDescriptor* alloc_descriptor(SOCKET sock, HANDLE event) descP->closeMtx = MCREATE(buf); descP->rBufSz = SOCKET_RECV_BUFFER_SIZE_DEFAULT; + descP->rCtrlSz = SOCKET_RECV_CTRL_BUFFER_SIZE_DEFAULT; descP->iow = FALSE; descP->dbg = SOCKET_DEBUG_DEFAULT; @@ -11941,6 +12263,7 @@ ErlNifFunc socket_funcs[] = {"nif_sendto", 5, nif_sendto, 0}, {"nif_recv", 4, nif_recv, 0}, {"nif_recvfrom", 4, nif_recvfrom, 0}, + {"nif_recvmsg", 5, nif_recvmsg, 0}, {"nif_close", 1, nif_close, 0}, {"nif_shutdown", 2, nif_shutdown, 0}, {"nif_setopt", 5, nif_setopt, 0}, @@ -12077,16 +12400,23 @@ int on_load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info) /* Global atom(s) */ esock_atom_addr = MKA(env, "addr"); esock_atom_any = MKA(env, "any"); + esock_atom_ctrl = MKA(env, "ctrl"); + esock_atom_ctrunc = MKA(env, "ctrunc"); + esock_atom_data = MKA(env, "data"); esock_atom_debug = MKA(env, "debug"); esock_atom_dgram = MKA(env, "dgram"); + esock_atom_eor = MKA(env, "eor"); esock_atom_error = MKA(env, "error"); + esock_atom_errqueue = MKA(env, "errqueue"); esock_atom_false = MKA(env, "false"); esock_atom_family = MKA(env, "family"); + esock_atom_flags = MKA(env, "flags"); esock_atom_flowinfo = MKA(env, "flowinfo"); esock_atom_inet = MKA(env, "inet"); esock_atom_inet6 = MKA(env, "inet6"); - esock_atom_ip = MKA(env, "ip"); + esock_atom_ip = MKA(env, "iov"); esock_atom_ipv6 = MKA(env, "ipvp"); + esock_atom_level = MKA(env, "level"); esock_atom_local = MKA(env, "local"); esock_atom_loopback = MKA(env, "loopback"); esock_atom_ok = MKA(env, "ok"); @@ -12101,6 +12431,7 @@ int on_load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info) esock_atom_stream = MKA(env, "stream"); esock_atom_tcp = MKA(env, "tcp"); esock_atom_true = MKA(env, "true"); + esock_atom_trunc = MKA(env, "trunc"); esock_atom_type = MKA(env, "type"); esock_atom_udp = MKA(env, "udp"); esock_atom_undefined = MKA(env, "undefined"); |