diff options
author | Micael Karlberg <[email protected]> | 2018-07-30 18:22:46 +0200 |
---|---|---|
committer | Micael Karlberg <[email protected]> | 2018-09-18 14:50:18 +0200 |
commit | 165666d8b8b1b21ffaf43ac436cfc1657ba83649 (patch) | |
tree | 82a21a93a7dadaa7eab78ccd1a82d6916d0edc11 | |
parent | 6b01561dc13a0152f56da0a2c61ad88236f87de7 (diff) | |
download | otp-165666d8b8b1b21ffaf43ac436cfc1657ba83649.tar.gz otp-165666d8b8b1b21ffaf43ac436cfc1657ba83649.tar.bz2 otp-165666d8b8b1b21ffaf43ac436cfc1657ba83649.zip |
[socket-nif] Add support for recvmsg
Added preliminary support for function recvmsg. At the moment
this only works on *nix (Windows has another function, WSARecvMsg,
which has a slightly different API).
Also we have "no" cmsg decode at the moment (just level and type).
OTP-14831
-rw-r--r-- | erts/emulator/Makefile.in | 3 | ||||
-rw-r--r-- | erts/emulator/nifs/common/socket_int.h | 11 | ||||
-rw-r--r-- | erts/emulator/nifs/common/socket_nif.c | 343 | ||||
-rw-r--r-- | erts/emulator/nifs/common/socket_tarray.c | 139 | ||||
-rw-r--r-- | erts/emulator/nifs/common/socket_tarray.h | 47 | ||||
-rw-r--r-- | erts/emulator/nifs/common/socket_util.c | 267 | ||||
-rw-r--r-- | erts/emulator/nifs/common/socket_util.h | 25 | ||||
-rw-r--r-- | erts/preloaded/ebin/socket.beam | bin | 59768 -> 61376 bytes | |||
-rw-r--r-- | erts/preloaded/src/socket.erl | 80 |
9 files changed, 891 insertions, 24 deletions
diff --git a/erts/emulator/Makefile.in b/erts/emulator/Makefile.in index 60f9b36491..1964d27134 100644 --- a/erts/emulator/Makefile.in +++ b/erts/emulator/Makefile.in @@ -878,7 +878,8 @@ RUN_OBJS += \ $(OBJDIR)/erl_ptab.o $(OBJDIR)/erl_map.o \ $(OBJDIR)/erl_msacc.o $(OBJDIR)/erl_lock_flags.o \ $(OBJDIR)/erl_io_queue.o \ - $(OBJDIR)/socket_dbg.o $(OBJDIR)/socket_util.o + $(OBJDIR)/socket_dbg.o $(OBJDIR)/socket_tarray.o \ + $(OBJDIR)/socket_util.o LTTNG_OBJS = $(OBJDIR)/erlang_lttng.o NIF_OBJS = \ diff --git a/erts/emulator/nifs/common/socket_int.h b/erts/emulator/nifs/common/socket_int.h index 2d5049a9eb..18e94e80ef 100644 --- a/erts/emulator/nifs/common/socket_int.h +++ b/erts/emulator/nifs/common/socket_int.h @@ -105,19 +105,28 @@ typedef unsigned int BOOLEAN_T; */ extern ERL_NIF_TERM esock_atom_addr; extern ERL_NIF_TERM esock_atom_any; +extern ERL_NIF_TERM esock_atom_ctrl; +extern ERL_NIF_TERM esock_atom_ctrunc; +extern ERL_NIF_TERM esock_atom_data; extern ERL_NIF_TERM esock_atom_debug; extern ERL_NIF_TERM esock_atom_dgram; +extern ERL_NIF_TERM esock_atom_eor; extern ERL_NIF_TERM esock_atom_error; +extern ERL_NIF_TERM esock_atom_errqueue; extern ERL_NIF_TERM esock_atom_false; extern ERL_NIF_TERM esock_atom_family; +extern ERL_NIF_TERM esock_atom_flags; extern ERL_NIF_TERM esock_atom_flowinfo; extern ERL_NIF_TERM esock_atom_inet; extern ERL_NIF_TERM esock_atom_inet6; +extern ERL_NIF_TERM esock_atom_iov; extern ERL_NIF_TERM esock_atom_ip; extern ERL_NIF_TERM esock_atom_ipv6; +extern ERL_NIF_TERM esock_atom_level; extern ERL_NIF_TERM esock_atom_local; extern ERL_NIF_TERM esock_atom_loopback; extern ERL_NIF_TERM esock_atom_ok; +extern ERL_NIF_TERM esock_atom_oob; extern ERL_NIF_TERM esock_atom_path; extern ERL_NIF_TERM esock_atom_port; extern ERL_NIF_TERM esock_atom_protocol; @@ -129,6 +138,7 @@ extern ERL_NIF_TERM esock_atom_seqpacket; extern ERL_NIF_TERM esock_atom_stream; extern ERL_NIF_TERM esock_atom_tcp; extern ERL_NIF_TERM esock_atom_true; +extern ERL_NIF_TERM esock_atom_trunc; extern ERL_NIF_TERM esock_atom_type; extern ERL_NIF_TERM esock_atom_udp; extern ERL_NIF_TERM esock_atom_undefined; @@ -147,6 +157,7 @@ extern ERL_NIF_TERM esock_atom_einval; * Various wrapper macros for enif functions */ #define MALLOC(SZ) enif_alloc((SZ)) +#define REALLOC(P, SZ) enif_realloc((P), (SZ)) #define FREE(P) enif_free((P)) #define MKA(E,S) enif_make_atom((E), (S)) diff --git a/erts/emulator/nifs/common/socket_nif.c b/erts/emulator/nifs/common/socket_nif.c index 13250349db..6b6ddb29ca 100644 --- a/erts/emulator/nifs/common/socket_nif.c +++ b/erts/emulator/nifs/common/socket_nif.c @@ -417,7 +417,8 @@ static void (*esock_sctp_freepaddrs)(struct sockaddr *addrs) = NULL; #define SOCKET_RECV_FLAG_LOW SOCKET_RECV_FLAG_CMSG_CLOEXEC #define SOCKET_RECV_FLAG_HIGH SOCKET_RECV_FLAG_TRUNC -#define SOCKET_RECV_BUFFER_SIZE_DEFAULT 2048 +#define SOCKET_RECV_BUFFER_SIZE_DEFAULT 2048 +#define SOCKET_RECV_CTRL_BUFFER_SIZE_DEFAULT 1024 #define SOCKET_OPT_VALUE_TYPE_UNSPEC 0 #define SOCKET_OPT_VALUE_TYPE_INT 1 @@ -582,6 +583,10 @@ typedef union { #define SOCKET_OPT_SCTP_NODELAY 23 #define SOCKET_OPT_SCTP_RTOINFO 29 +/* We should *eventually* use this instead of hard-coding the size (to 1) */ +#define ESOCK_RECVMSG_IOVEC_SZ 1 + + /* =================================================================== * * * @@ -663,6 +668,7 @@ static unsigned long one_value = 1; #define sock_recv(s,buf,len,flag) recv((s),(buf),(len),(flag)) #define sock_recvfrom(s,buf,blen,flag,addr,alen) \ recvfrom((s),(buf),(blen),(flag),(addr),(alen)) +#define sock_recvmsg(s,msghdr,flag) recvmsg((s),(msghdr),(flag)) #define sock_send(s,buf,len,flag) send((s), (buf), (len), (flag)) #define sock_sendto(s,buf,blen,flag,addr,alen) \ sendto((s),(buf),(blen),(flag),(addr),(alen)) @@ -759,8 +765,9 @@ typedef struct { SocketRequestQueue acceptorsQ; /* +++ Config & Misc stuff +++ */ - size_t rBufSz; // Read buffer size (when data length = 0 is specified) - BOOLEAN_T iow; // Inform On Wrap + size_t rBufSz; // Read buffer size (when data length = 0 is specified) + size_t rCtrlSz; // Read control buffer size + BOOLEAN_T iow; // Inform On Wrap BOOLEAN_T dbg; /* +++ Close stuff +++ */ @@ -871,6 +878,9 @@ static ERL_NIF_TERM nif_recv(ErlNifEnv* env, static ERL_NIF_TERM nif_recvfrom(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]); +static ERL_NIF_TERM nif_recvmsg(ErlNifEnv* env, + int argc, + const ERL_NIF_TERM argv[]); static ERL_NIF_TERM nif_close(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]); @@ -947,6 +957,12 @@ static ERL_NIF_TERM nrecvfrom(ErlNifEnv* env, ERL_NIF_TERM recvRef, uint16_t bufSz, int flags); +static ERL_NIF_TERM nrecvmsg(ErlNifEnv* env, + SocketDescriptor* descP, + ERL_NIF_TERM recvRef, + uint16_t bufLen, + uint16_t ctrlLen, + int flags); static ERL_NIF_TERM nclose(ErlNifEnv* env, SocketDescriptor* descP); static ERL_NIF_TERM nshutdown(ErlNifEnv* env, @@ -1854,6 +1870,13 @@ static ERL_NIF_TERM recvfrom_check_result(ErlNifEnv* env, SocketAddress* fromAddrP, unsigned int fromAddrLen, ERL_NIF_TERM recvRef); +static ERL_NIF_TERM recvmsg_check_result(ErlNifEnv* env, + SocketDescriptor* descP, + int read, + int saveErrno, + struct msghdr* msgHdrP, + ErlNifBinary* ctrlBufP, + ERL_NIF_TERM recvRef); static ERL_NIF_TERM nfinalize_connection(ErlNifEnv* env, SocketDescriptor* descP); @@ -2211,19 +2234,28 @@ static char str_exsend[] = "exsend"; // failed send /* *** "Global" Atoms *** */ ERL_NIF_TERM esock_atom_addr; ERL_NIF_TERM esock_atom_any; +ERL_NIF_TERM esock_atom_ctrl; +ERL_NIF_TERM esock_atom_ctrunc; +ERL_NIF_TERM esock_atom_data; ERL_NIF_TERM esock_atom_dgram; ERL_NIF_TERM esock_atom_debug; +ERL_NIF_TERM esock_atom_eor; ERL_NIF_TERM esock_atom_error; +ERL_NIF_TERM esock_atom_errqueue; ERL_NIF_TERM esock_atom_false; ERL_NIF_TERM esock_atom_family; +ERL_NIF_TERM esock_atom_flags; ERL_NIF_TERM esock_atom_flowinfo; ERL_NIF_TERM esock_atom_inet; ERL_NIF_TERM esock_atom_inet6; +ERL_NIF_TERM esock_atom_iov; ERL_NIF_TERM esock_atom_ip; ERL_NIF_TERM esock_atom_ipv6; +ERL_NIF_TERM esock_atom_level; ERL_NIF_TERM esock_atom_local; ERL_NIF_TERM esock_atom_loopback; ERL_NIF_TERM esock_atom_ok; +ERL_NIF_TERM esock_atom_oob; ERL_NIF_TERM esock_atom_path; ERL_NIF_TERM esock_atom_protocol; ERL_NIF_TERM esock_atom_port; @@ -2235,6 +2267,7 @@ ERL_NIF_TERM esock_atom_seqpacket; ERL_NIF_TERM esock_atom_stream; ERL_NIF_TERM esock_atom_tcp; ERL_NIF_TERM esock_atom_true; +ERL_NIF_TERM esock_atom_trunc; ERL_NIF_TERM esock_atom_type; ERL_NIF_TERM esock_atom_udp; ERL_NIF_TERM esock_atom_undefined; @@ -2355,7 +2388,8 @@ static SocketData data; * nif_send(Sock, SendRef, Data, Flags) * nif_sendto(Sock, SendRef, Data, Dest, Flags) * nif_recv(Sock, RecvRef, Length, Flags) - * nif_recvfrom(Sock, Flags) + * nif_recvfrom(Sock, RecvRef, BufSz, Flags) + * nif_recvmsg(Sock, RecvRef, BufSz, CtrlSz, Flags) * nif_close(Sock) * nif_shutdown(Sock, How) * nif_sockname(Sock) @@ -4024,6 +4058,189 @@ ERL_NIF_TERM nrecvfrom(ErlNifEnv* env, /* ---------------------------------------------------------------------- + * nif_recvmsg + * + * Description: + * Receive a message on a socket. + * Normally used only on a (un-) connected socket! + * If a buffer size = 0 is specified, then we will use the default + * buffer size for this socket (whatever has been configured). + * If ctrl (buffer) size = 0 is specified, then the default ctrl + * (buffer) size is used (1024). + * + * Arguments: + * Socket (ref) - Points to the socket descriptor. + * RecvRef - A unique id for this (send) request. + * BufSz - Size of the buffer into which we put the received message. + * CtrlSz - Size of the ctrl (buffer) into which we put the received + * ancillary data. + * Flags - Receive flags. + * + * <KOLLA> + * + * How do we handle if the peek flag is set? We need to basically keep + * track of if we expect any data from the read. Regardless of the + * number of bytes we try to read. + * + * </KOLLA> + */ + +static +ERL_NIF_TERM nif_recvmsg(ErlNifEnv* env, + int argc, + const ERL_NIF_TERM argv[]) +{ + SocketDescriptor* descP; + ERL_NIF_TERM recvRef; + unsigned int bufSz; + unsigned int ctrlSz; + unsigned int eflags; + int flags; + ERL_NIF_TERM res; + + SGDBG( ("SOCKET", "nif_recvmsg -> entry with argc: %d\r\n", argc) ); + + /* Extract arguments and perform preliminary validation */ + + if ((argc != 4) || + !enif_get_resource(env, argv[0], sockets, (void**) &descP) || + !GET_UINT(env, argv[2], &bufSz) || + !GET_UINT(env, argv[3], &ctrlSz) || + !GET_UINT(env, argv[4], &eflags)) { + return enif_make_badarg(env); + } + recvRef = argv[1]; + + SSDBG( descP, + ("SOCKET", "nif_recvmsg -> args when sock = %d:" + "\r\n Socket: %T" + "\r\n recvRef: %T" + "\r\n bufSz: %d" + "\r\n ctrlSz: %d" + "\r\n eflags: %d" + "\r\n", descP->sock, argv[0], recvRef, bufSz, ctrlSz, eflags) ); + + /* if (IS_OPEN(descP)) */ + /* return esock_make_error(env, atom_enotconn); */ + + if (!erecvflags2recvflags(eflags, &flags)) + return enif_make_badarg(env); + + MLOCK(descP->readMtx); + + /* <KOLLA> + * + * We need to handle the case when another process tries + * to receive at the same time. + * If the current recv could not read its entire package + * this time (resulting in an select). The read of the + * other process must be made to wait until current + * is done! + * Basically, we need a read queue! + * + * A 'reading' field (boolean), which is set if we did + * not manage to read the entire message and reset every + * time we do. + * + * </KOLLA> + */ + + res = nrecvmsg(env, descP, recvRef, bufSz, ctrlSz, flags); + + MUNLOCK(descP->readMtx); + + return res; + +} + + +/* The (read) buffer handling *must* be optimized! + * But for now we make it easy for ourselves by + * allocating a binary (of the specified or default + * size) and then throwing it away... + */ +static +ERL_NIF_TERM nrecvmsg(ErlNifEnv* env, + SocketDescriptor* descP, + ERL_NIF_TERM recvRef, + uint16_t bufLen, + uint16_t ctrlLen, + int flags) +{ + unsigned int addrLen; + ssize_t read; + int save_errno; + ErlNifBinary buf, ctrl; + int bufSz = (bufLen ? bufLen : descP->rBufSz); + int ctrlSz = (ctrlLen ? ctrlLen : descP->rCtrlSz); + struct msghdr msgHdr; + struct iovec iov[1]; // Shall we always use 1? + SocketAddress addr; + + SSDBG( descP, ("SOCKET", "nrecvfrom -> entry with" + "\r\n bufSz: %d (%d)" + "\r\n ctrlSz: %d (%d)" + "\r\n flags: %d" + "\r\n", bufSz, bufLen, ctrlSz, ctrlLen, flags) ); + + if (!descP->isReadable) + return enif_make_badarg(env); + + /* + for (i = 0; i < sizeof(buf); i++) { + if (!ALLOC_BIN(bifSz, &buf[i])) + return esock_make_error(env, atom_exalloc); + iov[i].iov_base = buf[i].data; + iov[i].iov_len = buf[i].size; + } + */ + + /* Allocate the (msg) data buffer: + */ + if (!ALLOC_BIN(bufSz, &buf)) + return esock_make_error(env, atom_exalloc); + + /* Allocate the ctrl (buffer): + */ + if (!ALLOC_BIN(ctrlSz, &ctrl)) + return esock_make_error(env, atom_exalloc); + + /* We ignore the wrap for the moment. + * Maybe we should issue a wrap-message to controlling process... + */ + cnt_inc(&descP->readTries, 1); + + addrLen = sizeof(addr); + sys_memzero((char*) &addr, addrLen); + sys_memzero((char*) &msgHdr, sizeof(msgHdr)); + + iov[0].iov_base = buf.data; + iov[0].iov_len = buf.size; + + msgHdr.msg_name = &addr; + msgHdr.msg_namelen = addrLen; + msgHdr.msg_iov = iov; + msgHdr.msg_iovlen = 1; // Should use a constant or calculate... + msgHdr.msg_control = ctrl.data; + msgHdr.msg_controllen = ctrl.size; + + read = sock_recvmsg(descP->sock, &msgHdr, flags); + if (IS_SOCKET_ERROR(read)) + save_errno = sock_errno(); + else + save_errno = -1; // The value does not actually matter in this case + + return recvmsg_check_result(env, descP, + read, + save_errno, + &msgHdr, + &ctrl, // Needed for ctrl header decode + recvRef); +} + + + +/* ---------------------------------------------------------------------- * nif_close * * Description: @@ -9800,7 +10017,7 @@ ERL_NIF_TERM ngetopt_timeval_opt(ErlNifEnv* env, ESOCK_ASSERT( (numKeys == numVals) ); if (!MKMA(env, keys, vals, numKeys, &eTimeVal)) - return esock_make_error(env, esock_atom_einval);; + return esock_make_error(env, esock_atom_einval); result = esock_make_ok2(env, eTimeVal); } @@ -10370,6 +10587,110 @@ ERL_NIF_TERM recvfrom_check_result(ErlNifEnv* env, +/* The recvmsg function delivers one (1) message. If our buffer + * is to small, the message will be truncated. So, regardless + * if we filled the buffer or not, we have got what we are going + * to get regarding this message. + */ +static +ERL_NIF_TERM recvmsg_check_result(ErlNifEnv* env, + SocketDescriptor* descP, + int read, + int saveErrno, + struct msghdr* msgHdrP, + ErlNifBinary* ctrlBufP, + ERL_NIF_TERM recvRef) +{ + + SSDBG( descP, + ("SOCKET", "recvmsg_check_result -> entry with" + "\r\n read: %d" + "\r\n saveErrno: %d" + "\r\n recvRef: %T" + "\r\n", read, saveErrno, recvRef) ); + + + /* There is a special case: If the provided 'to read' value is + * zero (0). That means that we reads as much as we can, using + * the default read buffer size. + */ + + if (read < 0) { + + /* +++ Error handling +++ */ + + if (saveErrno == ECONNRESET) { + + /* +++ Oups - closed +++ */ + + SSDBG( descP, ("SOCKET", "recvfrom_check_result -> closed\r\n") ); + + /* <KOLLA> + * IF THE CURRENT PROCESS IS *NOT* THE CONTROLLING + * PROCESS, WE NEED TO INFORM IT!!! + * + * ALL WAITING PROCESSES MUST ALSO GET THE ERROR!! + * + * </KOLLA> + */ + + descP->closeLocal = FALSE; + descP->state = SOCKET_STATE_CLOSING; + + SELECT(env, + descP->sock, + (ERL_NIF_SELECT_STOP), + descP, NULL, recvRef); + + return esock_make_error(env, atom_closed); + + } else if ((saveErrno == ERRNO_BLOCK) || + (saveErrno == EAGAIN)) { + + SSDBG( descP, ("SOCKET", "recvfrom_check_result -> eagain\r\n") ); + + SELECT(env, descP->sock, (ERL_NIF_SELECT_READ), + descP, NULL, recvRef); + + return esock_make_error(env, esock_atom_eagain); + } else { + + SSDBG( descP, + ("SOCKET", + "recvfrom_check_result -> errno: %d\r\n", saveErrno) ); + + return esock_make_error_errno(env, saveErrno); + } + + } else { + + /* +++ We sucessfully got a message - time to encode it +++ */ + + ERL_NIF_TERM eMsgHdr; + char* xres; + + /* + * <KOLLA> + * + * The return value of recvmsg is the *total* number of bytes + * that where successfully read. This data has been put into + * the *IO vector*. + * + * </KOLLA> + */ + + if ((xres = esock_encode_msghdr(env, read, + msgHdrP, ctrlBufP, + &eMsgHdr)) != NULL) + return esock_make_error_str(env, xres); + else + return esock_make_ok2(env, eMsgHdr); + + } +} + + + /* +++ decode the linger value +++ * The (socket) linger option is provided as a two tuple: * @@ -10831,6 +11152,7 @@ SocketDescriptor* alloc_descriptor(SOCKET sock, HANDLE event) descP->closeMtx = MCREATE(buf); descP->rBufSz = SOCKET_RECV_BUFFER_SIZE_DEFAULT; + descP->rCtrlSz = SOCKET_RECV_CTRL_BUFFER_SIZE_DEFAULT; descP->iow = FALSE; descP->dbg = SOCKET_DEBUG_DEFAULT; @@ -11941,6 +12263,7 @@ ErlNifFunc socket_funcs[] = {"nif_sendto", 5, nif_sendto, 0}, {"nif_recv", 4, nif_recv, 0}, {"nif_recvfrom", 4, nif_recvfrom, 0}, + {"nif_recvmsg", 5, nif_recvmsg, 0}, {"nif_close", 1, nif_close, 0}, {"nif_shutdown", 2, nif_shutdown, 0}, {"nif_setopt", 5, nif_setopt, 0}, @@ -12077,16 +12400,23 @@ int on_load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info) /* Global atom(s) */ esock_atom_addr = MKA(env, "addr"); esock_atom_any = MKA(env, "any"); + esock_atom_ctrl = MKA(env, "ctrl"); + esock_atom_ctrunc = MKA(env, "ctrunc"); + esock_atom_data = MKA(env, "data"); esock_atom_debug = MKA(env, "debug"); esock_atom_dgram = MKA(env, "dgram"); + esock_atom_eor = MKA(env, "eor"); esock_atom_error = MKA(env, "error"); + esock_atom_errqueue = MKA(env, "errqueue"); esock_atom_false = MKA(env, "false"); esock_atom_family = MKA(env, "family"); + esock_atom_flags = MKA(env, "flags"); esock_atom_flowinfo = MKA(env, "flowinfo"); esock_atom_inet = MKA(env, "inet"); esock_atom_inet6 = MKA(env, "inet6"); - esock_atom_ip = MKA(env, "ip"); + esock_atom_ip = MKA(env, "iov"); esock_atom_ipv6 = MKA(env, "ipvp"); + esock_atom_level = MKA(env, "level"); esock_atom_local = MKA(env, "local"); esock_atom_loopback = MKA(env, "loopback"); esock_atom_ok = MKA(env, "ok"); @@ -12101,6 +12431,7 @@ int on_load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info) esock_atom_stream = MKA(env, "stream"); esock_atom_tcp = MKA(env, "tcp"); esock_atom_true = MKA(env, "true"); + esock_atom_trunc = MKA(env, "trunc"); esock_atom_type = MKA(env, "type"); esock_atom_udp = MKA(env, "udp"); esock_atom_undefined = MKA(env, "undefined"); diff --git a/erts/emulator/nifs/common/socket_tarray.c b/erts/emulator/nifs/common/socket_tarray.c new file mode 100644 index 0000000000..bf37e5bc0e --- /dev/null +++ b/erts/emulator/nifs/common/socket_tarray.c @@ -0,0 +1,139 @@ +/* + * %CopyrightBegin% + * + * Copyright Ericsson AB 2018-2018. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * %CopyrightEnd% + * + * ---------------------------------------------------------------------- + * Purpose : Build and "maintain" a (erlang) term array of + * variable length. + * ---------------------------------------------------------------------- + * + */ + +#include <arpa/inet.h> +#include <stdio.h> + +#include <erl_nif.h> + +#include "socket_int.h" +#include "socket_util.h" +#include "socket_tarray.h" + + + +/* ---------------------------------------------------------------------- + * Types + */ + +typedef struct { + uint32_t sz; + uint32_t idx; + ERL_NIF_TERM* array; +} SocketTArrayInt; + + +/* ---------------------------------------------------------------------- + * Forward for internal functions + */ + +static void esock_tarray_add1(SocketTArrayInt* taP, ERL_NIF_TERM t); +static void esock_tarray_ensure_fits(SocketTArrayInt* taP, uint32_t needs); + + +/* ---------------------------------------------------------------------- + * API + */ + +extern +void* esock_tarray_create(uint32_t sz) +{ + SocketTArrayInt* tarrayP; + + ESOCK_ASSERT( (sz == 0) ); + + tarrayP = MALLOC(sizeof(SocketTArrayInt)); + ESOCK_ASSERT( (tarrayP == NULL) ); + + tarrayP->array = MALLOC(sz * sizeof(ERL_NIF_TERM)); + ESOCK_ASSERT( (tarrayP->array == NULL) ); + tarrayP->sz = sz; + tarrayP->idx = 0; + + return ((SocketTArray) tarrayP); +} + +extern +void esock_tarray_delete(SocketTArray ta) +{ + SocketTArrayInt* taP = (SocketTArrayInt*) ta; + + FREE(taP->array); + FREE(taP); +} + + +extern +uint32_t esock_tarray_sz(SocketTArray a) +{ + return ( ((SocketTArrayInt*) a)->idx ); +} + +extern +void esock_tarray_add(SocketTArray ta, ERL_NIF_TERM t) +{ + esock_tarray_add1((SocketTArrayInt*) ta, t); +} + +extern +void esock_tarray_tolist(SocketTArray ta, + ErlNifEnv* env, + ERL_NIF_TERM* list) +{ + SocketTArrayInt* taP = (SocketTArrayInt*) ta; + + *list = MKLA(env, taP->array, taP->idx); + + esock_tarray_delete(taP); +} + + + +/* ---------------------------------------------------------------------- + * "Internal" functions + */ + +static +void esock_tarray_add1(SocketTArrayInt* taP, ERL_NIF_TERM t) +{ + esock_tarray_ensure_fits(taP, 1); + + taP->array[taP->idx++] = t; +} + +static +void esock_tarray_ensure_fits(SocketTArrayInt* taP, uint32_t needs) +{ + if (taP->sz < (taP->idx + needs)) { + uint32_t newSz = (needs < taP->sz) ? 2*taP->sz : 2*needs; + void* mem = REALLOC(taP->array, newSz * sizeof(ERL_NIF_TERM)); + + ESOCK_ASSERT( (mem == NULL) ); + + taP->sz = newSz; + taP->array = (ERL_NIF_TERM*) mem; + } +} diff --git a/erts/emulator/nifs/common/socket_tarray.h b/erts/emulator/nifs/common/socket_tarray.h new file mode 100644 index 0000000000..2e9506d288 --- /dev/null +++ b/erts/emulator/nifs/common/socket_tarray.h @@ -0,0 +1,47 @@ +/* + * %CopyrightBegin% + * + * Copyright Ericsson AB 2018-2018. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * %CopyrightEnd% + * + * ---------------------------------------------------------------------- + * Purpose : Build and "maintain" a (erlang) term array of + * variable length. + * ---------------------------------------------------------------------- + * + */ + +#ifndef SOCKET_TARRAY_H__ +#define SOCKET_TARRAY_H__ + +typedef void* SocketTArray; + +extern SocketTArray esock_tarray_create(uint32_t sz); +extern void esock_tarray_delete(SocketTArray ta); +extern uint32_t esock_tarray_sz(SocketTArray ta); +extern void esock_tarray_add(SocketTArray ta, ERL_NIF_TERM t); +extern void esock_tarray_tolist(SocketTArray ta, + ErlNifEnv* env, + ERL_NIF_TERM* list); + +#define TARRAY_CREATE(SZ) esock_tarray_create((SZ)) +#define TARRAY_DELETE(TA) esock_tarray_delete((TA)) +#define TARRAY_SZ(TA) esock_tarray_sz((TA)) +#define TARRAY_ADD(TA, T) esock_tarray_add((TA), (T)) +#define TARRAY_TOLIST(TA, E, L) esock_tarray_tolist((TA), (E), (L)) + + +#endif // SOCKET_TARRAY_H__ diff --git a/erts/emulator/nifs/common/socket_util.c b/erts/emulator/nifs/common/socket_util.c index e6eb21adcf..5998ff35a4 100644 --- a/erts/emulator/nifs/common/socket_util.c +++ b/erts/emulator/nifs/common/socket_util.c @@ -23,17 +23,18 @@ * */ -#include <stddef.h> -#include "socket_int.h" -#include "socket_util.h" -#include "socket_dbg.h" -#include "sys.h" - #include <stdarg.h> #include <string.h> #include <stdio.h> #include <ctype.h> #include <time.h> +#include <stddef.h> + +#include "socket_int.h" +#include "socket_tarray.h" +#include "socket_util.h" +#include "socket_dbg.h" +#include "sys.h" /* We don't have a "debug flag" to check here, so we * should use the compile debug flag, whatever that is... @@ -69,6 +70,260 @@ static char* make_sockaddr_un(ErlNifEnv* env, ERL_NIF_TERM* sa); +/* +++ esock_encode_msghdr +++ + * + * Encode a msghdr (recvmsg). In erlang its represented as + * a map, which has a specific set of attributes: + * + * addr (source address) - sockaddr() + * iov - [binary()] + * ctrl - [cmsghdr()] + * flags - msghdr_flags() + */ + +extern +char* esock_encode_msghdr(ErlNifEnv* env, + int read, + struct msghdr* msgHdrP, + ErlNifBinary* ctrlBufP, + ERL_NIF_TERM* eSockAddr) +{ + char* xres; + ERL_NIF_TERM addr, iov, ctrl, flags; + + if ((xres = esock_encode_sockaddr(env, + (SocketAddress*) msgHdrP->msg_name, + msgHdrP->msg_namelen, + &addr)) != NULL) + return xres; + + if ((xres = esock_encode_iov(env, + read, + msgHdrP->msg_iov, + msgHdrP->msg_iovlen, + &iov)) != NULL) + return xres; + + if ((xres = esock_encode_cmsghdrs(env, + ctrlBufP, + msgHdrP, + &ctrl)) != NULL) + return xres; + + if ((xres = esock_encode_mshghdr_flags(env, + msgHdrP->msg_flags, + &flags)) != NULL) + return xres; + + { + ERL_NIF_TERM keys[] = {esock_atom_addr, + esock_atom_iov, + esock_atom_ctrl, + esock_atom_flags}; + ERL_NIF_TERM vals[] = {addr, iov, ctrl, flags}; + + unsigned int numKeys = sizeof(keys) / sizeof(ERL_NIF_TERM); + unsigned int numVals = sizeof(vals) / sizeof(ERL_NIF_TERM); + + ESOCK_ASSERT( (numKeys == numVals) ); + + if (!MKMA(env, keys, vals, numKeys, eSockAddr)) + return ESOCK_STR_EINVAL; + + } + + return NULL; +} + + + +/* +++ esock_encode_iov +++ + * + * Encode a IO Vector. In erlang we represented this as a list of binaries. + * + * We iterate through the IO vector, and as long as the remaining (rem) + * number of bytes is greater than the size of the current buffer, we + * contunue. When we have a buffer that is greater than rem, we have found + * the last buffer (it may be empty, and then the previous was last). + * We may need to split this (if 0 < rem < bufferSz). + */ + +extern +char* esock_encode_iov(ErlNifEnv* env, + int read, + struct iovec* iov, + size_t len, + ERL_NIF_TERM* eIOV) +{ + int rem = read; + uint16_t i; + BOOLEAN_T done = FALSE; + ERL_NIF_TERM a[len]; // At most this length + + if (len == 0) { + *eIOV = MKEL(env); + return NULL; + } + + for (i = 0; (!done) && (i < len); i++) { + if (iov[i].iov_len == rem) { + /* We have the exact amount - we are done */ + a[i] = MKBIN(env, iov[i].iov_base); + done = TRUE; + } else if (iov[i].iov_len < rem) { + /* Filled another buffer - continue */ + a[i] = MKBIN(env, iov[i].iov_base); + } else if (iov[i].iov_len > rem) { + /* Partly filled buffer (=> split) - we are done */ + a[i] = MKBIN(env, iov[i].iov_base); + a[i] = MKSBIN(env, a[i], 0, rem); + done = TRUE; + } + } + + *eIOV = MKLA(env, a, i+1); + + return NULL; +} + + + +/* +++ esock_encode_cmsghdrs +++ + * + * Encode a list of cmsghdr(). The X can 0 or more cmsghdr blocks. + * + * Our problem is that we have no idea how many control messages + * we have. + * + * The cmsgHdrP arguments points to the start of the control data buffer, + * an actual binary. Its the only way to create sub-binaries. So, what we + * need to continue processing this is to tern that into an binary erlang + * term (which can then in turn be turned into sub-binaries). + * + * We need the cmsgBufP (even though cmsgHdrP points to it) to be able + * to create sub-binaries (one for each HDR). + * + * The TArray is created with the size of 128, which should be enough. + * But if its not, then it will be automatically realloc'ed during add. + * Once we are done adding hdr's to it, we convert it to a list. + */ + +extern +char* esock_encode_cmsghdrs(ErlNifEnv* env, + ErlNifBinary* cmsgBinP, + struct msghdr* msgHdrP, + ERL_NIF_TERM* eCMsgHdr) +{ + ERL_NIF_TERM ctrlBuf = MKBIN(env, cmsgBinP); // The *entire* binary + SocketTArray cmsghdrs = TARRAY_CREATE(128); + struct cmsghdr* firstP = CMSG_FIRSTHDR(msgHdrP); + struct cmsghdr* currentP; + + for (currentP = firstP; + currentP != NULL; + currentP = CMSG_NXTHDR(msgHdrP, currentP)) { + + /* MUST check this since on Linux the returned "cmsg" may actually + * go too far! + */ + if (((CHARP(currentP) + currentP->cmsg_len) - CHARP(firstP)) > + msgHdrP->msg_controllen) { + /* Ouch, fatal error - give up + * We assume we cannot trust any data if this is wrong. + */ + TARRAY_DELETE(cmsghdrs); + return ESOCK_STR_EINVAL; + } else { + ERL_NIF_TERM level; + ERL_NIF_TERM type = MKI(env, currentP->cmsg_type); + unsigned char* dataP = (unsigned char*) CMSG_DATA(currentP); + size_t dataPos = dataP - cmsgBinP->data; + size_t dataLen = currentP->cmsg_len - (CHARP(currentP)-CHARP(dataP)); + ERL_NIF_TERM dataBin = MKSBIN(env, ctrlBuf, dataPos, dataLen); + + /* We can't give up just because its an unknown protocol, + * so if its a protocol we don't know, we return its integer + * value and leave it to the user. + */ + if (esock_encode_protocol(env, currentP->cmsg_level, &level) != NULL) + level = MKI(env, currentP->cmsg_level); + + /* And finally create the 'cmsghdr' map - + * and if successfull add it to the tarray. + */ + { + ERL_NIF_TERM keys[] = {esock_atom_level, + esock_atom_type, + esock_atom_data}; + ERL_NIF_TERM vals[] = {level, type, dataBin}; + unsigned int numKeys = sizeof(keys) / sizeof(ERL_NIF_TERM); + unsigned int numVals = sizeof(vals) / sizeof(ERL_NIF_TERM); + ERL_NIF_TERM cmsgHdr; + + /* Guard agains cut-and-paste errors */ + ESOCK_ASSERT( (numKeys == numVals) ); + + if (!MKMA(env, keys, vals, numKeys, &cmsgHdr)) { + TARRAY_DELETE(cmsghdrs); + return ESOCK_STR_EINVAL; + } + + /* And finally add it to the list... */ + TARRAY_ADD(cmsghdrs, cmsgHdr); + } + } + } + + /* The tarray is populated - convert it to a list */ + TARRAY_TOLIST(cmsghdrs, env, eCMsgHdr); + + return NULL; +} + + + +/* +++ esock_encode_mshghdr_flags +++ + * + * Encode a list of msghdr_flag(). + * + * The following flags are handled: eor | trunc | ctrunc | oob | errqueue. + */ + +extern +char* esock_encode_mshghdr_flags(ErlNifEnv* env, + int msgFlags, + ERL_NIF_TERM* flags) +{ + if (msgFlags == 0) { + *flags = MKEL(env); + return NULL; + } else { + SocketTArray ta = TARRAY_CREATE(10); // Just to be on the safe side + + if ((msgFlags & MSG_EOR) == MSG_EOR) + TARRAY_ADD(ta, esock_atom_eor); + + if ((msgFlags & MSG_TRUNC) == MSG_TRUNC) + TARRAY_ADD(ta, esock_atom_trunc); + + if ((msgFlags & MSG_CTRUNC) == MSG_CTRUNC) + TARRAY_ADD(ta, esock_atom_ctrunc); + + if ((msgFlags & MSG_OOB) == MSG_OOB) + TARRAY_ADD(ta, esock_atom_oob); + + if ((msgFlags & MSG_ERRQUEUE) == MSG_ERRQUEUE) + TARRAY_ADD(ta, esock_atom_errqueue); + + TARRAY_TOLIST(ta, env, flags); + + return NULL; + } +} + + + + /* +++ esock_decode_sockaddr +++ * * Decode a socket address - sockaddr. In erlang its represented as diff --git a/erts/emulator/nifs/common/socket_util.h b/erts/emulator/nifs/common/socket_util.h index 686ce0bac6..af0bf70d8f 100644 --- a/erts/emulator/nifs/common/socket_util.h +++ b/erts/emulator/nifs/common/socket_util.h @@ -29,10 +29,35 @@ #include <erl_nif.h> #include "socket_int.h" +#define VOIDP(P) ((void*)P) +#define CHARP(P) ((char*)P) + #define ESOCK_ABORT(E) esock_abort(E, __func__, __FILE__, __LINE__) #define ESOCK_ASSERT(e) ((void) ((e) ? 1 : (ESOCK_ABORT(#e), 0))) extern +char* esock_encode_msghdr(ErlNifEnv* env, + int read, + struct msghdr* msgHdrP, + ErlNifBinary* ctrlBufP, + ERL_NIF_TERM* eSockAddr); +extern +char* esock_encode_iov(ErlNifEnv* env, + int read, + struct iovec* iov, + size_t len, + ERL_NIF_TERM* eIOV); +extern +char* esock_encode_cmsghdrs(ErlNifEnv* env, + ErlNifBinary* cmsgBinP, + struct msghdr* msgHdrP, + ERL_NIF_TERM* eCMsgHdr); + +extern +char* esock_encode_mshghdr_flags(ErlNifEnv* env, + int msgFlags, + ERL_NIF_TERM* flags); +extern char* esock_decode_sockaddr(ErlNifEnv* env, ERL_NIF_TERM eSockAddr, SocketAddress* sockAddrP, diff --git a/erts/preloaded/ebin/socket.beam b/erts/preloaded/ebin/socket.beam Binary files differindex b2bd8f2728..f6ca653fed 100644 --- a/erts/preloaded/ebin/socket.beam +++ b/erts/preloaded/ebin/socket.beam diff --git a/erts/preloaded/src/socket.erl b/erts/preloaded/src/socket.erl index 1983c993a5..b9d1705d45 100644 --- a/erts/preloaded/src/socket.erl +++ b/erts/preloaded/src/socket.erl @@ -43,7 +43,7 @@ recv/1, recv/2, recv/3, recv/4, recvfrom/1, recvfrom/2, recvfrom/3, recvfrom/4, - %% recvmsg/4, + recvmsg/1, recvmsg/2, recvmsg/5, %% readv/3, close/1, @@ -500,24 +500,28 @@ -type shutdown_how() :: read | write | read_write. %% These are just place-holder(s) - used by the sendmsg/recvmsg functions... --type msghdr_flag() :: eor | trunc | ctrunc | oob | errqueue. +-type msghdr_flag() :: ctrunc | eor | errqueue | oob | trunc. -type msghdr_flags() :: [msghdr_flag()]. -type msghdr() :: #{ %% *Optional* target address %% *If* this field is specified for an unconnected %% socket, then it will be used as destination for the %% datagram. - target => sockaddr(), + addr => sockaddr(), - iov => [binary()], + iov => [binary()], - ctrl => cmsghdr(), + ctrl => [cmsghdr()], %% Only valid with recvmsg flags => msghdr_flags() }. +%% At some point we should be able to encode/decode the most common types +%% of control message headers. For now, we leave/take the data part raw +%% (as a binary) and leave it to the user to figure out (how to encode/decode +%% that bit). -type cmsghdr() :: #{ - level => protocol(), + level => protocol() | integer(), type => integer(), data => binary() }. @@ -1752,11 +1756,62 @@ do_recvfrom(SockRef, BufSz, EFlags, Timeout) -> %% --------------------------------------------------------------------------- %% -%% -spec recvmsg(Socket, Flags) -> {ok, MsgHdr} | {error, Reason} when -%% Socket :: socket(), -%% MsgHdr :: msghdr(), -%% Flags :: recv_flags(), -%% Reason :: term(). +recvmsg(Socket) -> + recvmsg(Socket, 0, 0, ?SOCKET_RECV_FLAGS_DEFAULT, ?SOCKET_RECV_TIMEOUT_DEFAULT). + +recvmsg(Socket, Flags) when is_list(Flags) -> + recvmsg(Socket, 0, 0, Flags, ?SOCKET_RECV_TIMEOUT_DEFAULT); +recvmsg(Socket, Timeout) -> + recvmsg(Socket, 0, 0, ?SOCKET_RECV_FLAGS_DEFAULT, Timeout). + +-spec recvmsg(Socket, + BufSz, CtrlSz, + Flags, Timeout) -> {ok, MsgHdr} | {error, Reason} when + Socket :: socket(), + BufSz :: non_neg_integer(), + CtrlSz :: non_neg_integer(), + Flags :: recv_flags(), + Timeout :: timeout(), + MsgHdr :: msghdr(), + Reason :: term(). + +recvmsg(#socket{ref = SockRef}, BufSz, CtrlSz, Flags, Timeout) + when (is_integer(BufSz) andalso (BufSz >= 0)) andalso + (is_integer(CtrlSz) andalso (CtrlSz >= 0)) andalso + is_list(Flags) andalso + (is_integer(Timeout) orelse (Timeout =:= infinity)) -> + EFlags = enc_recv_flags(Flags), + do_recvmsg(SockRef, BufSz, CtrlSz, EFlags, Timeout). + +do_recvmsg(SockRef, BufSz, CtrlSz, EFlags, Timeout) -> + TS = timestamp(Timeout), + RecvRef = make_ref(), + case nif_recvmsg(SockRef, RecvRef, BufSz, CtrlSz, EFlags) of + {ok, _MsgHdr} = OK -> + OK; + + {error, eagain} -> + %% There is nothing just now, but we will be notified when there + %% is something to read (a select message). + NewTimeout = next_timeout(TS, Timeout), + receive + {select, SockRef, RecvRef, ready_input} -> + do_recvmsg(SockRef, BufSz, CtrlSz, EFlags, + next_timeout(TS, Timeout)); + + {nif_abort, RecvRef, Reason} -> + {error, Reason} + + after NewTimeout -> + nif_cancel(SockRef, recvmsg, RecvRef), + flush_select_msgs(SockRef, RecvRef), + {error, timeout} + end; + + {error, _Reason} = ERROR -> + ERROR + + end. @@ -3171,6 +3226,9 @@ nif_recv(_SRef, _RecvRef, _Length, _Flags) -> nif_recvfrom(_SRef, _RecvRef, _Length, _Flags) -> erlang:error(badarg). +nif_recvmsg(_SRef, _RecvRef, _BufSz, _CtrlSz, _Flags) -> + erlang:error(badarg). + nif_cancel(_SRef, _Op, _Ref) -> erlang:error(badarg). |