aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMicael Karlberg <[email protected]>2018-07-30 18:22:46 +0200
committerMicael Karlberg <[email protected]>2018-09-18 14:50:18 +0200
commit165666d8b8b1b21ffaf43ac436cfc1657ba83649 (patch)
tree82a21a93a7dadaa7eab78ccd1a82d6916d0edc11
parent6b01561dc13a0152f56da0a2c61ad88236f87de7 (diff)
downloadotp-165666d8b8b1b21ffaf43ac436cfc1657ba83649.tar.gz
otp-165666d8b8b1b21ffaf43ac436cfc1657ba83649.tar.bz2
otp-165666d8b8b1b21ffaf43ac436cfc1657ba83649.zip
[socket-nif] Add support for recvmsg
Added preliminary support for function recvmsg. At the moment this only works on *nix (Windows has another function, WSARecvMsg, which has a slightly different API). Also we have "no" cmsg decode at the moment (just level and type). OTP-14831
-rw-r--r--erts/emulator/Makefile.in3
-rw-r--r--erts/emulator/nifs/common/socket_int.h11
-rw-r--r--erts/emulator/nifs/common/socket_nif.c343
-rw-r--r--erts/emulator/nifs/common/socket_tarray.c139
-rw-r--r--erts/emulator/nifs/common/socket_tarray.h47
-rw-r--r--erts/emulator/nifs/common/socket_util.c267
-rw-r--r--erts/emulator/nifs/common/socket_util.h25
-rw-r--r--erts/preloaded/ebin/socket.beambin59768 -> 61376 bytes
-rw-r--r--erts/preloaded/src/socket.erl80
9 files changed, 891 insertions, 24 deletions
diff --git a/erts/emulator/Makefile.in b/erts/emulator/Makefile.in
index 60f9b36491..1964d27134 100644
--- a/erts/emulator/Makefile.in
+++ b/erts/emulator/Makefile.in
@@ -878,7 +878,8 @@ RUN_OBJS += \
$(OBJDIR)/erl_ptab.o $(OBJDIR)/erl_map.o \
$(OBJDIR)/erl_msacc.o $(OBJDIR)/erl_lock_flags.o \
$(OBJDIR)/erl_io_queue.o \
- $(OBJDIR)/socket_dbg.o $(OBJDIR)/socket_util.o
+ $(OBJDIR)/socket_dbg.o $(OBJDIR)/socket_tarray.o \
+ $(OBJDIR)/socket_util.o
LTTNG_OBJS = $(OBJDIR)/erlang_lttng.o
NIF_OBJS = \
diff --git a/erts/emulator/nifs/common/socket_int.h b/erts/emulator/nifs/common/socket_int.h
index 2d5049a9eb..18e94e80ef 100644
--- a/erts/emulator/nifs/common/socket_int.h
+++ b/erts/emulator/nifs/common/socket_int.h
@@ -105,19 +105,28 @@ typedef unsigned int BOOLEAN_T;
*/
extern ERL_NIF_TERM esock_atom_addr;
extern ERL_NIF_TERM esock_atom_any;
+extern ERL_NIF_TERM esock_atom_ctrl;
+extern ERL_NIF_TERM esock_atom_ctrunc;
+extern ERL_NIF_TERM esock_atom_data;
extern ERL_NIF_TERM esock_atom_debug;
extern ERL_NIF_TERM esock_atom_dgram;
+extern ERL_NIF_TERM esock_atom_eor;
extern ERL_NIF_TERM esock_atom_error;
+extern ERL_NIF_TERM esock_atom_errqueue;
extern ERL_NIF_TERM esock_atom_false;
extern ERL_NIF_TERM esock_atom_family;
+extern ERL_NIF_TERM esock_atom_flags;
extern ERL_NIF_TERM esock_atom_flowinfo;
extern ERL_NIF_TERM esock_atom_inet;
extern ERL_NIF_TERM esock_atom_inet6;
+extern ERL_NIF_TERM esock_atom_iov;
extern ERL_NIF_TERM esock_atom_ip;
extern ERL_NIF_TERM esock_atom_ipv6;
+extern ERL_NIF_TERM esock_atom_level;
extern ERL_NIF_TERM esock_atom_local;
extern ERL_NIF_TERM esock_atom_loopback;
extern ERL_NIF_TERM esock_atom_ok;
+extern ERL_NIF_TERM esock_atom_oob;
extern ERL_NIF_TERM esock_atom_path;
extern ERL_NIF_TERM esock_atom_port;
extern ERL_NIF_TERM esock_atom_protocol;
@@ -129,6 +138,7 @@ extern ERL_NIF_TERM esock_atom_seqpacket;
extern ERL_NIF_TERM esock_atom_stream;
extern ERL_NIF_TERM esock_atom_tcp;
extern ERL_NIF_TERM esock_atom_true;
+extern ERL_NIF_TERM esock_atom_trunc;
extern ERL_NIF_TERM esock_atom_type;
extern ERL_NIF_TERM esock_atom_udp;
extern ERL_NIF_TERM esock_atom_undefined;
@@ -147,6 +157,7 @@ extern ERL_NIF_TERM esock_atom_einval;
* Various wrapper macros for enif functions
*/
#define MALLOC(SZ) enif_alloc((SZ))
+#define REALLOC(P, SZ) enif_realloc((P), (SZ))
#define FREE(P) enif_free((P))
#define MKA(E,S) enif_make_atom((E), (S))
diff --git a/erts/emulator/nifs/common/socket_nif.c b/erts/emulator/nifs/common/socket_nif.c
index 13250349db..6b6ddb29ca 100644
--- a/erts/emulator/nifs/common/socket_nif.c
+++ b/erts/emulator/nifs/common/socket_nif.c
@@ -417,7 +417,8 @@ static void (*esock_sctp_freepaddrs)(struct sockaddr *addrs) = NULL;
#define SOCKET_RECV_FLAG_LOW SOCKET_RECV_FLAG_CMSG_CLOEXEC
#define SOCKET_RECV_FLAG_HIGH SOCKET_RECV_FLAG_TRUNC
-#define SOCKET_RECV_BUFFER_SIZE_DEFAULT 2048
+#define SOCKET_RECV_BUFFER_SIZE_DEFAULT 2048
+#define SOCKET_RECV_CTRL_BUFFER_SIZE_DEFAULT 1024
#define SOCKET_OPT_VALUE_TYPE_UNSPEC 0
#define SOCKET_OPT_VALUE_TYPE_INT 1
@@ -582,6 +583,10 @@ typedef union {
#define SOCKET_OPT_SCTP_NODELAY 23
#define SOCKET_OPT_SCTP_RTOINFO 29
+/* We should *eventually* use this instead of hard-coding the size (to 1) */
+#define ESOCK_RECVMSG_IOVEC_SZ 1
+
+
/* =================================================================== *
* *
@@ -663,6 +668,7 @@ static unsigned long one_value = 1;
#define sock_recv(s,buf,len,flag) recv((s),(buf),(len),(flag))
#define sock_recvfrom(s,buf,blen,flag,addr,alen) \
recvfrom((s),(buf),(blen),(flag),(addr),(alen))
+#define sock_recvmsg(s,msghdr,flag) recvmsg((s),(msghdr),(flag))
#define sock_send(s,buf,len,flag) send((s), (buf), (len), (flag))
#define sock_sendto(s,buf,blen,flag,addr,alen) \
sendto((s),(buf),(blen),(flag),(addr),(alen))
@@ -759,8 +765,9 @@ typedef struct {
SocketRequestQueue acceptorsQ;
/* +++ Config & Misc stuff +++ */
- size_t rBufSz; // Read buffer size (when data length = 0 is specified)
- BOOLEAN_T iow; // Inform On Wrap
+ size_t rBufSz; // Read buffer size (when data length = 0 is specified)
+ size_t rCtrlSz; // Read control buffer size
+ BOOLEAN_T iow; // Inform On Wrap
BOOLEAN_T dbg;
/* +++ Close stuff +++ */
@@ -871,6 +878,9 @@ static ERL_NIF_TERM nif_recv(ErlNifEnv* env,
static ERL_NIF_TERM nif_recvfrom(ErlNifEnv* env,
int argc,
const ERL_NIF_TERM argv[]);
+static ERL_NIF_TERM nif_recvmsg(ErlNifEnv* env,
+ int argc,
+ const ERL_NIF_TERM argv[]);
static ERL_NIF_TERM nif_close(ErlNifEnv* env,
int argc,
const ERL_NIF_TERM argv[]);
@@ -947,6 +957,12 @@ static ERL_NIF_TERM nrecvfrom(ErlNifEnv* env,
ERL_NIF_TERM recvRef,
uint16_t bufSz,
int flags);
+static ERL_NIF_TERM nrecvmsg(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ ERL_NIF_TERM recvRef,
+ uint16_t bufLen,
+ uint16_t ctrlLen,
+ int flags);
static ERL_NIF_TERM nclose(ErlNifEnv* env,
SocketDescriptor* descP);
static ERL_NIF_TERM nshutdown(ErlNifEnv* env,
@@ -1854,6 +1870,13 @@ static ERL_NIF_TERM recvfrom_check_result(ErlNifEnv* env,
SocketAddress* fromAddrP,
unsigned int fromAddrLen,
ERL_NIF_TERM recvRef);
+static ERL_NIF_TERM recvmsg_check_result(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ int read,
+ int saveErrno,
+ struct msghdr* msgHdrP,
+ ErlNifBinary* ctrlBufP,
+ ERL_NIF_TERM recvRef);
static ERL_NIF_TERM nfinalize_connection(ErlNifEnv* env,
SocketDescriptor* descP);
@@ -2211,19 +2234,28 @@ static char str_exsend[] = "exsend"; // failed send
/* *** "Global" Atoms *** */
ERL_NIF_TERM esock_atom_addr;
ERL_NIF_TERM esock_atom_any;
+ERL_NIF_TERM esock_atom_ctrl;
+ERL_NIF_TERM esock_atom_ctrunc;
+ERL_NIF_TERM esock_atom_data;
ERL_NIF_TERM esock_atom_dgram;
ERL_NIF_TERM esock_atom_debug;
+ERL_NIF_TERM esock_atom_eor;
ERL_NIF_TERM esock_atom_error;
+ERL_NIF_TERM esock_atom_errqueue;
ERL_NIF_TERM esock_atom_false;
ERL_NIF_TERM esock_atom_family;
+ERL_NIF_TERM esock_atom_flags;
ERL_NIF_TERM esock_atom_flowinfo;
ERL_NIF_TERM esock_atom_inet;
ERL_NIF_TERM esock_atom_inet6;
+ERL_NIF_TERM esock_atom_iov;
ERL_NIF_TERM esock_atom_ip;
ERL_NIF_TERM esock_atom_ipv6;
+ERL_NIF_TERM esock_atom_level;
ERL_NIF_TERM esock_atom_local;
ERL_NIF_TERM esock_atom_loopback;
ERL_NIF_TERM esock_atom_ok;
+ERL_NIF_TERM esock_atom_oob;
ERL_NIF_TERM esock_atom_path;
ERL_NIF_TERM esock_atom_protocol;
ERL_NIF_TERM esock_atom_port;
@@ -2235,6 +2267,7 @@ ERL_NIF_TERM esock_atom_seqpacket;
ERL_NIF_TERM esock_atom_stream;
ERL_NIF_TERM esock_atom_tcp;
ERL_NIF_TERM esock_atom_true;
+ERL_NIF_TERM esock_atom_trunc;
ERL_NIF_TERM esock_atom_type;
ERL_NIF_TERM esock_atom_udp;
ERL_NIF_TERM esock_atom_undefined;
@@ -2355,7 +2388,8 @@ static SocketData data;
* nif_send(Sock, SendRef, Data, Flags)
* nif_sendto(Sock, SendRef, Data, Dest, Flags)
* nif_recv(Sock, RecvRef, Length, Flags)
- * nif_recvfrom(Sock, Flags)
+ * nif_recvfrom(Sock, RecvRef, BufSz, Flags)
+ * nif_recvmsg(Sock, RecvRef, BufSz, CtrlSz, Flags)
* nif_close(Sock)
* nif_shutdown(Sock, How)
* nif_sockname(Sock)
@@ -4024,6 +4058,189 @@ ERL_NIF_TERM nrecvfrom(ErlNifEnv* env,
/* ----------------------------------------------------------------------
+ * nif_recvmsg
+ *
+ * Description:
+ * Receive a message on a socket.
+ * Normally used only on a (un-) connected socket!
+ * If a buffer size = 0 is specified, then we will use the default
+ * buffer size for this socket (whatever has been configured).
+ * If ctrl (buffer) size = 0 is specified, then the default ctrl
+ * (buffer) size is used (1024).
+ *
+ * Arguments:
+ * Socket (ref) - Points to the socket descriptor.
+ * RecvRef - A unique id for this (send) request.
+ * BufSz - Size of the buffer into which we put the received message.
+ * CtrlSz - Size of the ctrl (buffer) into which we put the received
+ * ancillary data.
+ * Flags - Receive flags.
+ *
+ * <KOLLA>
+ *
+ * How do we handle if the peek flag is set? We need to basically keep
+ * track of if we expect any data from the read. Regardless of the
+ * number of bytes we try to read.
+ *
+ * </KOLLA>
+ */
+
+static
+ERL_NIF_TERM nif_recvmsg(ErlNifEnv* env,
+ int argc,
+ const ERL_NIF_TERM argv[])
+{
+ SocketDescriptor* descP;
+ ERL_NIF_TERM recvRef;
+ unsigned int bufSz;
+ unsigned int ctrlSz;
+ unsigned int eflags;
+ int flags;
+ ERL_NIF_TERM res;
+
+ SGDBG( ("SOCKET", "nif_recvmsg -> entry with argc: %d\r\n", argc) );
+
+ /* Extract arguments and perform preliminary validation */
+
+ if ((argc != 4) ||
+ !enif_get_resource(env, argv[0], sockets, (void**) &descP) ||
+ !GET_UINT(env, argv[2], &bufSz) ||
+ !GET_UINT(env, argv[3], &ctrlSz) ||
+ !GET_UINT(env, argv[4], &eflags)) {
+ return enif_make_badarg(env);
+ }
+ recvRef = argv[1];
+
+ SSDBG( descP,
+ ("SOCKET", "nif_recvmsg -> args when sock = %d:"
+ "\r\n Socket: %T"
+ "\r\n recvRef: %T"
+ "\r\n bufSz: %d"
+ "\r\n ctrlSz: %d"
+ "\r\n eflags: %d"
+ "\r\n", descP->sock, argv[0], recvRef, bufSz, ctrlSz, eflags) );
+
+ /* if (IS_OPEN(descP)) */
+ /* return esock_make_error(env, atom_enotconn); */
+
+ if (!erecvflags2recvflags(eflags, &flags))
+ return enif_make_badarg(env);
+
+ MLOCK(descP->readMtx);
+
+ /* <KOLLA>
+ *
+ * We need to handle the case when another process tries
+ * to receive at the same time.
+ * If the current recv could not read its entire package
+ * this time (resulting in an select). The read of the
+ * other process must be made to wait until current
+ * is done!
+ * Basically, we need a read queue!
+ *
+ * A 'reading' field (boolean), which is set if we did
+ * not manage to read the entire message and reset every
+ * time we do.
+ *
+ * </KOLLA>
+ */
+
+ res = nrecvmsg(env, descP, recvRef, bufSz, ctrlSz, flags);
+
+ MUNLOCK(descP->readMtx);
+
+ return res;
+
+}
+
+
+/* The (read) buffer handling *must* be optimized!
+ * But for now we make it easy for ourselves by
+ * allocating a binary (of the specified or default
+ * size) and then throwing it away...
+ */
+static
+ERL_NIF_TERM nrecvmsg(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ ERL_NIF_TERM recvRef,
+ uint16_t bufLen,
+ uint16_t ctrlLen,
+ int flags)
+{
+ unsigned int addrLen;
+ ssize_t read;
+ int save_errno;
+ ErlNifBinary buf, ctrl;
+ int bufSz = (bufLen ? bufLen : descP->rBufSz);
+ int ctrlSz = (ctrlLen ? ctrlLen : descP->rCtrlSz);
+ struct msghdr msgHdr;
+ struct iovec iov[1]; // Shall we always use 1?
+ SocketAddress addr;
+
+ SSDBG( descP, ("SOCKET", "nrecvfrom -> entry with"
+ "\r\n bufSz: %d (%d)"
+ "\r\n ctrlSz: %d (%d)"
+ "\r\n flags: %d"
+ "\r\n", bufSz, bufLen, ctrlSz, ctrlLen, flags) );
+
+ if (!descP->isReadable)
+ return enif_make_badarg(env);
+
+ /*
+ for (i = 0; i < sizeof(buf); i++) {
+ if (!ALLOC_BIN(bifSz, &buf[i]))
+ return esock_make_error(env, atom_exalloc);
+ iov[i].iov_base = buf[i].data;
+ iov[i].iov_len = buf[i].size;
+ }
+ */
+
+ /* Allocate the (msg) data buffer:
+ */
+ if (!ALLOC_BIN(bufSz, &buf))
+ return esock_make_error(env, atom_exalloc);
+
+ /* Allocate the ctrl (buffer):
+ */
+ if (!ALLOC_BIN(ctrlSz, &ctrl))
+ return esock_make_error(env, atom_exalloc);
+
+ /* We ignore the wrap for the moment.
+ * Maybe we should issue a wrap-message to controlling process...
+ */
+ cnt_inc(&descP->readTries, 1);
+
+ addrLen = sizeof(addr);
+ sys_memzero((char*) &addr, addrLen);
+ sys_memzero((char*) &msgHdr, sizeof(msgHdr));
+
+ iov[0].iov_base = buf.data;
+ iov[0].iov_len = buf.size;
+
+ msgHdr.msg_name = &addr;
+ msgHdr.msg_namelen = addrLen;
+ msgHdr.msg_iov = iov;
+ msgHdr.msg_iovlen = 1; // Should use a constant or calculate...
+ msgHdr.msg_control = ctrl.data;
+ msgHdr.msg_controllen = ctrl.size;
+
+ read = sock_recvmsg(descP->sock, &msgHdr, flags);
+ if (IS_SOCKET_ERROR(read))
+ save_errno = sock_errno();
+ else
+ save_errno = -1; // The value does not actually matter in this case
+
+ return recvmsg_check_result(env, descP,
+ read,
+ save_errno,
+ &msgHdr,
+ &ctrl, // Needed for ctrl header decode
+ recvRef);
+}
+
+
+
+/* ----------------------------------------------------------------------
* nif_close
*
* Description:
@@ -9800,7 +10017,7 @@ ERL_NIF_TERM ngetopt_timeval_opt(ErlNifEnv* env,
ESOCK_ASSERT( (numKeys == numVals) );
if (!MKMA(env, keys, vals, numKeys, &eTimeVal))
- return esock_make_error(env, esock_atom_einval);;
+ return esock_make_error(env, esock_atom_einval);
result = esock_make_ok2(env, eTimeVal);
}
@@ -10370,6 +10587,110 @@ ERL_NIF_TERM recvfrom_check_result(ErlNifEnv* env,
+/* The recvmsg function delivers one (1) message. If our buffer
+ * is to small, the message will be truncated. So, regardless
+ * if we filled the buffer or not, we have got what we are going
+ * to get regarding this message.
+ */
+static
+ERL_NIF_TERM recvmsg_check_result(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ int read,
+ int saveErrno,
+ struct msghdr* msgHdrP,
+ ErlNifBinary* ctrlBufP,
+ ERL_NIF_TERM recvRef)
+{
+
+ SSDBG( descP,
+ ("SOCKET", "recvmsg_check_result -> entry with"
+ "\r\n read: %d"
+ "\r\n saveErrno: %d"
+ "\r\n recvRef: %T"
+ "\r\n", read, saveErrno, recvRef) );
+
+
+ /* There is a special case: If the provided 'to read' value is
+ * zero (0). That means that we reads as much as we can, using
+ * the default read buffer size.
+ */
+
+ if (read < 0) {
+
+ /* +++ Error handling +++ */
+
+ if (saveErrno == ECONNRESET) {
+
+ /* +++ Oups - closed +++ */
+
+ SSDBG( descP, ("SOCKET", "recvfrom_check_result -> closed\r\n") );
+
+ /* <KOLLA>
+ * IF THE CURRENT PROCESS IS *NOT* THE CONTROLLING
+ * PROCESS, WE NEED TO INFORM IT!!!
+ *
+ * ALL WAITING PROCESSES MUST ALSO GET THE ERROR!!
+ *
+ * </KOLLA>
+ */
+
+ descP->closeLocal = FALSE;
+ descP->state = SOCKET_STATE_CLOSING;
+
+ SELECT(env,
+ descP->sock,
+ (ERL_NIF_SELECT_STOP),
+ descP, NULL, recvRef);
+
+ return esock_make_error(env, atom_closed);
+
+ } else if ((saveErrno == ERRNO_BLOCK) ||
+ (saveErrno == EAGAIN)) {
+
+ SSDBG( descP, ("SOCKET", "recvfrom_check_result -> eagain\r\n") );
+
+ SELECT(env, descP->sock, (ERL_NIF_SELECT_READ),
+ descP, NULL, recvRef);
+
+ return esock_make_error(env, esock_atom_eagain);
+ } else {
+
+ SSDBG( descP,
+ ("SOCKET",
+ "recvfrom_check_result -> errno: %d\r\n", saveErrno) );
+
+ return esock_make_error_errno(env, saveErrno);
+ }
+
+ } else {
+
+ /* +++ We sucessfully got a message - time to encode it +++ */
+
+ ERL_NIF_TERM eMsgHdr;
+ char* xres;
+
+ /*
+ * <KOLLA>
+ *
+ * The return value of recvmsg is the *total* number of bytes
+ * that where successfully read. This data has been put into
+ * the *IO vector*.
+ *
+ * </KOLLA>
+ */
+
+ if ((xres = esock_encode_msghdr(env, read,
+ msgHdrP, ctrlBufP,
+ &eMsgHdr)) != NULL)
+ return esock_make_error_str(env, xres);
+ else
+ return esock_make_ok2(env, eMsgHdr);
+
+ }
+}
+
+
+
/* +++ decode the linger value +++
* The (socket) linger option is provided as a two tuple:
*
@@ -10831,6 +11152,7 @@ SocketDescriptor* alloc_descriptor(SOCKET sock, HANDLE event)
descP->closeMtx = MCREATE(buf);
descP->rBufSz = SOCKET_RECV_BUFFER_SIZE_DEFAULT;
+ descP->rCtrlSz = SOCKET_RECV_CTRL_BUFFER_SIZE_DEFAULT;
descP->iow = FALSE;
descP->dbg = SOCKET_DEBUG_DEFAULT;
@@ -11941,6 +12263,7 @@ ErlNifFunc socket_funcs[] =
{"nif_sendto", 5, nif_sendto, 0},
{"nif_recv", 4, nif_recv, 0},
{"nif_recvfrom", 4, nif_recvfrom, 0},
+ {"nif_recvmsg", 5, nif_recvmsg, 0},
{"nif_close", 1, nif_close, 0},
{"nif_shutdown", 2, nif_shutdown, 0},
{"nif_setopt", 5, nif_setopt, 0},
@@ -12077,16 +12400,23 @@ int on_load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info)
/* Global atom(s) */
esock_atom_addr = MKA(env, "addr");
esock_atom_any = MKA(env, "any");
+ esock_atom_ctrl = MKA(env, "ctrl");
+ esock_atom_ctrunc = MKA(env, "ctrunc");
+ esock_atom_data = MKA(env, "data");
esock_atom_debug = MKA(env, "debug");
esock_atom_dgram = MKA(env, "dgram");
+ esock_atom_eor = MKA(env, "eor");
esock_atom_error = MKA(env, "error");
+ esock_atom_errqueue = MKA(env, "errqueue");
esock_atom_false = MKA(env, "false");
esock_atom_family = MKA(env, "family");
+ esock_atom_flags = MKA(env, "flags");
esock_atom_flowinfo = MKA(env, "flowinfo");
esock_atom_inet = MKA(env, "inet");
esock_atom_inet6 = MKA(env, "inet6");
- esock_atom_ip = MKA(env, "ip");
+ esock_atom_ip = MKA(env, "iov");
esock_atom_ipv6 = MKA(env, "ipvp");
+ esock_atom_level = MKA(env, "level");
esock_atom_local = MKA(env, "local");
esock_atom_loopback = MKA(env, "loopback");
esock_atom_ok = MKA(env, "ok");
@@ -12101,6 +12431,7 @@ int on_load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info)
esock_atom_stream = MKA(env, "stream");
esock_atom_tcp = MKA(env, "tcp");
esock_atom_true = MKA(env, "true");
+ esock_atom_trunc = MKA(env, "trunc");
esock_atom_type = MKA(env, "type");
esock_atom_udp = MKA(env, "udp");
esock_atom_undefined = MKA(env, "undefined");
diff --git a/erts/emulator/nifs/common/socket_tarray.c b/erts/emulator/nifs/common/socket_tarray.c
new file mode 100644
index 0000000000..bf37e5bc0e
--- /dev/null
+++ b/erts/emulator/nifs/common/socket_tarray.c
@@ -0,0 +1,139 @@
+/*
+ * %CopyrightBegin%
+ *
+ * Copyright Ericsson AB 2018-2018. All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * %CopyrightEnd%
+ *
+ * ----------------------------------------------------------------------
+ * Purpose : Build and "maintain" a (erlang) term array of
+ * variable length.
+ * ----------------------------------------------------------------------
+ *
+ */
+
+#include <arpa/inet.h>
+#include <stdio.h>
+
+#include <erl_nif.h>
+
+#include "socket_int.h"
+#include "socket_util.h"
+#include "socket_tarray.h"
+
+
+
+/* ----------------------------------------------------------------------
+ * Types
+ */
+
+typedef struct {
+ uint32_t sz;
+ uint32_t idx;
+ ERL_NIF_TERM* array;
+} SocketTArrayInt;
+
+
+/* ----------------------------------------------------------------------
+ * Forward for internal functions
+ */
+
+static void esock_tarray_add1(SocketTArrayInt* taP, ERL_NIF_TERM t);
+static void esock_tarray_ensure_fits(SocketTArrayInt* taP, uint32_t needs);
+
+
+/* ----------------------------------------------------------------------
+ * API
+ */
+
+extern
+void* esock_tarray_create(uint32_t sz)
+{
+ SocketTArrayInt* tarrayP;
+
+ ESOCK_ASSERT( (sz == 0) );
+
+ tarrayP = MALLOC(sizeof(SocketTArrayInt));
+ ESOCK_ASSERT( (tarrayP == NULL) );
+
+ tarrayP->array = MALLOC(sz * sizeof(ERL_NIF_TERM));
+ ESOCK_ASSERT( (tarrayP->array == NULL) );
+ tarrayP->sz = sz;
+ tarrayP->idx = 0;
+
+ return ((SocketTArray) tarrayP);
+}
+
+extern
+void esock_tarray_delete(SocketTArray ta)
+{
+ SocketTArrayInt* taP = (SocketTArrayInt*) ta;
+
+ FREE(taP->array);
+ FREE(taP);
+}
+
+
+extern
+uint32_t esock_tarray_sz(SocketTArray a)
+{
+ return ( ((SocketTArrayInt*) a)->idx );
+}
+
+extern
+void esock_tarray_add(SocketTArray ta, ERL_NIF_TERM t)
+{
+ esock_tarray_add1((SocketTArrayInt*) ta, t);
+}
+
+extern
+void esock_tarray_tolist(SocketTArray ta,
+ ErlNifEnv* env,
+ ERL_NIF_TERM* list)
+{
+ SocketTArrayInt* taP = (SocketTArrayInt*) ta;
+
+ *list = MKLA(env, taP->array, taP->idx);
+
+ esock_tarray_delete(taP);
+}
+
+
+
+/* ----------------------------------------------------------------------
+ * "Internal" functions
+ */
+
+static
+void esock_tarray_add1(SocketTArrayInt* taP, ERL_NIF_TERM t)
+{
+ esock_tarray_ensure_fits(taP, 1);
+
+ taP->array[taP->idx++] = t;
+}
+
+static
+void esock_tarray_ensure_fits(SocketTArrayInt* taP, uint32_t needs)
+{
+ if (taP->sz < (taP->idx + needs)) {
+ uint32_t newSz = (needs < taP->sz) ? 2*taP->sz : 2*needs;
+ void* mem = REALLOC(taP->array, newSz * sizeof(ERL_NIF_TERM));
+
+ ESOCK_ASSERT( (mem == NULL) );
+
+ taP->sz = newSz;
+ taP->array = (ERL_NIF_TERM*) mem;
+ }
+}
diff --git a/erts/emulator/nifs/common/socket_tarray.h b/erts/emulator/nifs/common/socket_tarray.h
new file mode 100644
index 0000000000..2e9506d288
--- /dev/null
+++ b/erts/emulator/nifs/common/socket_tarray.h
@@ -0,0 +1,47 @@
+/*
+ * %CopyrightBegin%
+ *
+ * Copyright Ericsson AB 2018-2018. All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * %CopyrightEnd%
+ *
+ * ----------------------------------------------------------------------
+ * Purpose : Build and "maintain" a (erlang) term array of
+ * variable length.
+ * ----------------------------------------------------------------------
+ *
+ */
+
+#ifndef SOCKET_TARRAY_H__
+#define SOCKET_TARRAY_H__
+
+typedef void* SocketTArray;
+
+extern SocketTArray esock_tarray_create(uint32_t sz);
+extern void esock_tarray_delete(SocketTArray ta);
+extern uint32_t esock_tarray_sz(SocketTArray ta);
+extern void esock_tarray_add(SocketTArray ta, ERL_NIF_TERM t);
+extern void esock_tarray_tolist(SocketTArray ta,
+ ErlNifEnv* env,
+ ERL_NIF_TERM* list);
+
+#define TARRAY_CREATE(SZ) esock_tarray_create((SZ))
+#define TARRAY_DELETE(TA) esock_tarray_delete((TA))
+#define TARRAY_SZ(TA) esock_tarray_sz((TA))
+#define TARRAY_ADD(TA, T) esock_tarray_add((TA), (T))
+#define TARRAY_TOLIST(TA, E, L) esock_tarray_tolist((TA), (E), (L))
+
+
+#endif // SOCKET_TARRAY_H__
diff --git a/erts/emulator/nifs/common/socket_util.c b/erts/emulator/nifs/common/socket_util.c
index e6eb21adcf..5998ff35a4 100644
--- a/erts/emulator/nifs/common/socket_util.c
+++ b/erts/emulator/nifs/common/socket_util.c
@@ -23,17 +23,18 @@
*
*/
-#include <stddef.h>
-#include "socket_int.h"
-#include "socket_util.h"
-#include "socket_dbg.h"
-#include "sys.h"
-
#include <stdarg.h>
#include <string.h>
#include <stdio.h>
#include <ctype.h>
#include <time.h>
+#include <stddef.h>
+
+#include "socket_int.h"
+#include "socket_tarray.h"
+#include "socket_util.h"
+#include "socket_dbg.h"
+#include "sys.h"
/* We don't have a "debug flag" to check here, so we
* should use the compile debug flag, whatever that is...
@@ -69,6 +70,260 @@ static char* make_sockaddr_un(ErlNifEnv* env,
ERL_NIF_TERM* sa);
+/* +++ esock_encode_msghdr +++
+ *
+ * Encode a msghdr (recvmsg). In erlang its represented as
+ * a map, which has a specific set of attributes:
+ *
+ * addr (source address) - sockaddr()
+ * iov - [binary()]
+ * ctrl - [cmsghdr()]
+ * flags - msghdr_flags()
+ */
+
+extern
+char* esock_encode_msghdr(ErlNifEnv* env,
+ int read,
+ struct msghdr* msgHdrP,
+ ErlNifBinary* ctrlBufP,
+ ERL_NIF_TERM* eSockAddr)
+{
+ char* xres;
+ ERL_NIF_TERM addr, iov, ctrl, flags;
+
+ if ((xres = esock_encode_sockaddr(env,
+ (SocketAddress*) msgHdrP->msg_name,
+ msgHdrP->msg_namelen,
+ &addr)) != NULL)
+ return xres;
+
+ if ((xres = esock_encode_iov(env,
+ read,
+ msgHdrP->msg_iov,
+ msgHdrP->msg_iovlen,
+ &iov)) != NULL)
+ return xres;
+
+ if ((xres = esock_encode_cmsghdrs(env,
+ ctrlBufP,
+ msgHdrP,
+ &ctrl)) != NULL)
+ return xres;
+
+ if ((xres = esock_encode_mshghdr_flags(env,
+ msgHdrP->msg_flags,
+ &flags)) != NULL)
+ return xres;
+
+ {
+ ERL_NIF_TERM keys[] = {esock_atom_addr,
+ esock_atom_iov,
+ esock_atom_ctrl,
+ esock_atom_flags};
+ ERL_NIF_TERM vals[] = {addr, iov, ctrl, flags};
+
+ unsigned int numKeys = sizeof(keys) / sizeof(ERL_NIF_TERM);
+ unsigned int numVals = sizeof(vals) / sizeof(ERL_NIF_TERM);
+
+ ESOCK_ASSERT( (numKeys == numVals) );
+
+ if (!MKMA(env, keys, vals, numKeys, eSockAddr))
+ return ESOCK_STR_EINVAL;
+
+ }
+
+ return NULL;
+}
+
+
+
+/* +++ esock_encode_iov +++
+ *
+ * Encode a IO Vector. In erlang we represented this as a list of binaries.
+ *
+ * We iterate through the IO vector, and as long as the remaining (rem)
+ * number of bytes is greater than the size of the current buffer, we
+ * contunue. When we have a buffer that is greater than rem, we have found
+ * the last buffer (it may be empty, and then the previous was last).
+ * We may need to split this (if 0 < rem < bufferSz).
+ */
+
+extern
+char* esock_encode_iov(ErlNifEnv* env,
+ int read,
+ struct iovec* iov,
+ size_t len,
+ ERL_NIF_TERM* eIOV)
+{
+ int rem = read;
+ uint16_t i;
+ BOOLEAN_T done = FALSE;
+ ERL_NIF_TERM a[len]; // At most this length
+
+ if (len == 0) {
+ *eIOV = MKEL(env);
+ return NULL;
+ }
+
+ for (i = 0; (!done) && (i < len); i++) {
+ if (iov[i].iov_len == rem) {
+ /* We have the exact amount - we are done */
+ a[i] = MKBIN(env, iov[i].iov_base);
+ done = TRUE;
+ } else if (iov[i].iov_len < rem) {
+ /* Filled another buffer - continue */
+ a[i] = MKBIN(env, iov[i].iov_base);
+ } else if (iov[i].iov_len > rem) {
+ /* Partly filled buffer (=> split) - we are done */
+ a[i] = MKBIN(env, iov[i].iov_base);
+ a[i] = MKSBIN(env, a[i], 0, rem);
+ done = TRUE;
+ }
+ }
+
+ *eIOV = MKLA(env, a, i+1);
+
+ return NULL;
+}
+
+
+
+/* +++ esock_encode_cmsghdrs +++
+ *
+ * Encode a list of cmsghdr(). The X can 0 or more cmsghdr blocks.
+ *
+ * Our problem is that we have no idea how many control messages
+ * we have.
+ *
+ * The cmsgHdrP arguments points to the start of the control data buffer,
+ * an actual binary. Its the only way to create sub-binaries. So, what we
+ * need to continue processing this is to tern that into an binary erlang
+ * term (which can then in turn be turned into sub-binaries).
+ *
+ * We need the cmsgBufP (even though cmsgHdrP points to it) to be able
+ * to create sub-binaries (one for each HDR).
+ *
+ * The TArray is created with the size of 128, which should be enough.
+ * But if its not, then it will be automatically realloc'ed during add.
+ * Once we are done adding hdr's to it, we convert it to a list.
+ */
+
+extern
+char* esock_encode_cmsghdrs(ErlNifEnv* env,
+ ErlNifBinary* cmsgBinP,
+ struct msghdr* msgHdrP,
+ ERL_NIF_TERM* eCMsgHdr)
+{
+ ERL_NIF_TERM ctrlBuf = MKBIN(env, cmsgBinP); // The *entire* binary
+ SocketTArray cmsghdrs = TARRAY_CREATE(128);
+ struct cmsghdr* firstP = CMSG_FIRSTHDR(msgHdrP);
+ struct cmsghdr* currentP;
+
+ for (currentP = firstP;
+ currentP != NULL;
+ currentP = CMSG_NXTHDR(msgHdrP, currentP)) {
+
+ /* MUST check this since on Linux the returned "cmsg" may actually
+ * go too far!
+ */
+ if (((CHARP(currentP) + currentP->cmsg_len) - CHARP(firstP)) >
+ msgHdrP->msg_controllen) {
+ /* Ouch, fatal error - give up
+ * We assume we cannot trust any data if this is wrong.
+ */
+ TARRAY_DELETE(cmsghdrs);
+ return ESOCK_STR_EINVAL;
+ } else {
+ ERL_NIF_TERM level;
+ ERL_NIF_TERM type = MKI(env, currentP->cmsg_type);
+ unsigned char* dataP = (unsigned char*) CMSG_DATA(currentP);
+ size_t dataPos = dataP - cmsgBinP->data;
+ size_t dataLen = currentP->cmsg_len - (CHARP(currentP)-CHARP(dataP));
+ ERL_NIF_TERM dataBin = MKSBIN(env, ctrlBuf, dataPos, dataLen);
+
+ /* We can't give up just because its an unknown protocol,
+ * so if its a protocol we don't know, we return its integer
+ * value and leave it to the user.
+ */
+ if (esock_encode_protocol(env, currentP->cmsg_level, &level) != NULL)
+ level = MKI(env, currentP->cmsg_level);
+
+ /* And finally create the 'cmsghdr' map -
+ * and if successfull add it to the tarray.
+ */
+ {
+ ERL_NIF_TERM keys[] = {esock_atom_level,
+ esock_atom_type,
+ esock_atom_data};
+ ERL_NIF_TERM vals[] = {level, type, dataBin};
+ unsigned int numKeys = sizeof(keys) / sizeof(ERL_NIF_TERM);
+ unsigned int numVals = sizeof(vals) / sizeof(ERL_NIF_TERM);
+ ERL_NIF_TERM cmsgHdr;
+
+ /* Guard agains cut-and-paste errors */
+ ESOCK_ASSERT( (numKeys == numVals) );
+
+ if (!MKMA(env, keys, vals, numKeys, &cmsgHdr)) {
+ TARRAY_DELETE(cmsghdrs);
+ return ESOCK_STR_EINVAL;
+ }
+
+ /* And finally add it to the list... */
+ TARRAY_ADD(cmsghdrs, cmsgHdr);
+ }
+ }
+ }
+
+ /* The tarray is populated - convert it to a list */
+ TARRAY_TOLIST(cmsghdrs, env, eCMsgHdr);
+
+ return NULL;
+}
+
+
+
+/* +++ esock_encode_mshghdr_flags +++
+ *
+ * Encode a list of msghdr_flag().
+ *
+ * The following flags are handled: eor | trunc | ctrunc | oob | errqueue.
+ */
+
+extern
+char* esock_encode_mshghdr_flags(ErlNifEnv* env,
+ int msgFlags,
+ ERL_NIF_TERM* flags)
+{
+ if (msgFlags == 0) {
+ *flags = MKEL(env);
+ return NULL;
+ } else {
+ SocketTArray ta = TARRAY_CREATE(10); // Just to be on the safe side
+
+ if ((msgFlags & MSG_EOR) == MSG_EOR)
+ TARRAY_ADD(ta, esock_atom_eor);
+
+ if ((msgFlags & MSG_TRUNC) == MSG_TRUNC)
+ TARRAY_ADD(ta, esock_atom_trunc);
+
+ if ((msgFlags & MSG_CTRUNC) == MSG_CTRUNC)
+ TARRAY_ADD(ta, esock_atom_ctrunc);
+
+ if ((msgFlags & MSG_OOB) == MSG_OOB)
+ TARRAY_ADD(ta, esock_atom_oob);
+
+ if ((msgFlags & MSG_ERRQUEUE) == MSG_ERRQUEUE)
+ TARRAY_ADD(ta, esock_atom_errqueue);
+
+ TARRAY_TOLIST(ta, env, flags);
+
+ return NULL;
+ }
+}
+
+
+
+
/* +++ esock_decode_sockaddr +++
*
* Decode a socket address - sockaddr. In erlang its represented as
diff --git a/erts/emulator/nifs/common/socket_util.h b/erts/emulator/nifs/common/socket_util.h
index 686ce0bac6..af0bf70d8f 100644
--- a/erts/emulator/nifs/common/socket_util.h
+++ b/erts/emulator/nifs/common/socket_util.h
@@ -29,10 +29,35 @@
#include <erl_nif.h>
#include "socket_int.h"
+#define VOIDP(P) ((void*)P)
+#define CHARP(P) ((char*)P)
+
#define ESOCK_ABORT(E) esock_abort(E, __func__, __FILE__, __LINE__)
#define ESOCK_ASSERT(e) ((void) ((e) ? 1 : (ESOCK_ABORT(#e), 0)))
extern
+char* esock_encode_msghdr(ErlNifEnv* env,
+ int read,
+ struct msghdr* msgHdrP,
+ ErlNifBinary* ctrlBufP,
+ ERL_NIF_TERM* eSockAddr);
+extern
+char* esock_encode_iov(ErlNifEnv* env,
+ int read,
+ struct iovec* iov,
+ size_t len,
+ ERL_NIF_TERM* eIOV);
+extern
+char* esock_encode_cmsghdrs(ErlNifEnv* env,
+ ErlNifBinary* cmsgBinP,
+ struct msghdr* msgHdrP,
+ ERL_NIF_TERM* eCMsgHdr);
+
+extern
+char* esock_encode_mshghdr_flags(ErlNifEnv* env,
+ int msgFlags,
+ ERL_NIF_TERM* flags);
+extern
char* esock_decode_sockaddr(ErlNifEnv* env,
ERL_NIF_TERM eSockAddr,
SocketAddress* sockAddrP,
diff --git a/erts/preloaded/ebin/socket.beam b/erts/preloaded/ebin/socket.beam
index b2bd8f2728..f6ca653fed 100644
--- a/erts/preloaded/ebin/socket.beam
+++ b/erts/preloaded/ebin/socket.beam
Binary files differ
diff --git a/erts/preloaded/src/socket.erl b/erts/preloaded/src/socket.erl
index 1983c993a5..b9d1705d45 100644
--- a/erts/preloaded/src/socket.erl
+++ b/erts/preloaded/src/socket.erl
@@ -43,7 +43,7 @@
recv/1, recv/2, recv/3, recv/4,
recvfrom/1, recvfrom/2, recvfrom/3, recvfrom/4,
- %% recvmsg/4,
+ recvmsg/1, recvmsg/2, recvmsg/5,
%% readv/3,
close/1,
@@ -500,24 +500,28 @@
-type shutdown_how() :: read | write | read_write.
%% These are just place-holder(s) - used by the sendmsg/recvmsg functions...
--type msghdr_flag() :: eor | trunc | ctrunc | oob | errqueue.
+-type msghdr_flag() :: ctrunc | eor | errqueue | oob | trunc.
-type msghdr_flags() :: [msghdr_flag()].
-type msghdr() :: #{
%% *Optional* target address
%% *If* this field is specified for an unconnected
%% socket, then it will be used as destination for the
%% datagram.
- target => sockaddr(),
+ addr => sockaddr(),
- iov => [binary()],
+ iov => [binary()],
- ctrl => cmsghdr(),
+ ctrl => [cmsghdr()],
%% Only valid with recvmsg
flags => msghdr_flags()
}.
+%% At some point we should be able to encode/decode the most common types
+%% of control message headers. For now, we leave/take the data part raw
+%% (as a binary) and leave it to the user to figure out (how to encode/decode
+%% that bit).
-type cmsghdr() :: #{
- level => protocol(),
+ level => protocol() | integer(),
type => integer(),
data => binary()
}.
@@ -1752,11 +1756,62 @@ do_recvfrom(SockRef, BufSz, EFlags, Timeout) ->
%% ---------------------------------------------------------------------------
%%
-%% -spec recvmsg(Socket, Flags) -> {ok, MsgHdr} | {error, Reason} when
-%% Socket :: socket(),
-%% MsgHdr :: msghdr(),
-%% Flags :: recv_flags(),
-%% Reason :: term().
+recvmsg(Socket) ->
+ recvmsg(Socket, 0, 0, ?SOCKET_RECV_FLAGS_DEFAULT, ?SOCKET_RECV_TIMEOUT_DEFAULT).
+
+recvmsg(Socket, Flags) when is_list(Flags) ->
+ recvmsg(Socket, 0, 0, Flags, ?SOCKET_RECV_TIMEOUT_DEFAULT);
+recvmsg(Socket, Timeout) ->
+ recvmsg(Socket, 0, 0, ?SOCKET_RECV_FLAGS_DEFAULT, Timeout).
+
+-spec recvmsg(Socket,
+ BufSz, CtrlSz,
+ Flags, Timeout) -> {ok, MsgHdr} | {error, Reason} when
+ Socket :: socket(),
+ BufSz :: non_neg_integer(),
+ CtrlSz :: non_neg_integer(),
+ Flags :: recv_flags(),
+ Timeout :: timeout(),
+ MsgHdr :: msghdr(),
+ Reason :: term().
+
+recvmsg(#socket{ref = SockRef}, BufSz, CtrlSz, Flags, Timeout)
+ when (is_integer(BufSz) andalso (BufSz >= 0)) andalso
+ (is_integer(CtrlSz) andalso (CtrlSz >= 0)) andalso
+ is_list(Flags) andalso
+ (is_integer(Timeout) orelse (Timeout =:= infinity)) ->
+ EFlags = enc_recv_flags(Flags),
+ do_recvmsg(SockRef, BufSz, CtrlSz, EFlags, Timeout).
+
+do_recvmsg(SockRef, BufSz, CtrlSz, EFlags, Timeout) ->
+ TS = timestamp(Timeout),
+ RecvRef = make_ref(),
+ case nif_recvmsg(SockRef, RecvRef, BufSz, CtrlSz, EFlags) of
+ {ok, _MsgHdr} = OK ->
+ OK;
+
+ {error, eagain} ->
+ %% There is nothing just now, but we will be notified when there
+ %% is something to read (a select message).
+ NewTimeout = next_timeout(TS, Timeout),
+ receive
+ {select, SockRef, RecvRef, ready_input} ->
+ do_recvmsg(SockRef, BufSz, CtrlSz, EFlags,
+ next_timeout(TS, Timeout));
+
+ {nif_abort, RecvRef, Reason} ->
+ {error, Reason}
+
+ after NewTimeout ->
+ nif_cancel(SockRef, recvmsg, RecvRef),
+ flush_select_msgs(SockRef, RecvRef),
+ {error, timeout}
+ end;
+
+ {error, _Reason} = ERROR ->
+ ERROR
+
+ end.
@@ -3171,6 +3226,9 @@ nif_recv(_SRef, _RecvRef, _Length, _Flags) ->
nif_recvfrom(_SRef, _RecvRef, _Length, _Flags) ->
erlang:error(badarg).
+nif_recvmsg(_SRef, _RecvRef, _BufSz, _CtrlSz, _Flags) ->
+ erlang:error(badarg).
+
nif_cancel(_SRef, _Op, _Ref) ->
erlang:error(badarg).