aboutsummaryrefslogtreecommitdiffstats
path: root/erts/emulator/nifs/common/socket_nif.c
diff options
context:
space:
mode:
authorMicael Karlberg <[email protected]>2018-07-30 18:22:46 +0200
committerMicael Karlberg <[email protected]>2018-09-18 14:50:18 +0200
commit165666d8b8b1b21ffaf43ac436cfc1657ba83649 (patch)
tree82a21a93a7dadaa7eab78ccd1a82d6916d0edc11 /erts/emulator/nifs/common/socket_nif.c
parent6b01561dc13a0152f56da0a2c61ad88236f87de7 (diff)
downloadotp-165666d8b8b1b21ffaf43ac436cfc1657ba83649.tar.gz
otp-165666d8b8b1b21ffaf43ac436cfc1657ba83649.tar.bz2
otp-165666d8b8b1b21ffaf43ac436cfc1657ba83649.zip
[socket-nif] Add support for recvmsg
Added preliminary support for function recvmsg. At the moment this only works on *nix (Windows has another function, WSARecvMsg, which has a slightly different API). Also we have "no" cmsg decode at the moment (just level and type). OTP-14831
Diffstat (limited to 'erts/emulator/nifs/common/socket_nif.c')
-rw-r--r--erts/emulator/nifs/common/socket_nif.c343
1 files changed, 337 insertions, 6 deletions
diff --git a/erts/emulator/nifs/common/socket_nif.c b/erts/emulator/nifs/common/socket_nif.c
index 13250349db..6b6ddb29ca 100644
--- a/erts/emulator/nifs/common/socket_nif.c
+++ b/erts/emulator/nifs/common/socket_nif.c
@@ -417,7 +417,8 @@ static void (*esock_sctp_freepaddrs)(struct sockaddr *addrs) = NULL;
#define SOCKET_RECV_FLAG_LOW SOCKET_RECV_FLAG_CMSG_CLOEXEC
#define SOCKET_RECV_FLAG_HIGH SOCKET_RECV_FLAG_TRUNC
-#define SOCKET_RECV_BUFFER_SIZE_DEFAULT 2048
+#define SOCKET_RECV_BUFFER_SIZE_DEFAULT 2048
+#define SOCKET_RECV_CTRL_BUFFER_SIZE_DEFAULT 1024
#define SOCKET_OPT_VALUE_TYPE_UNSPEC 0
#define SOCKET_OPT_VALUE_TYPE_INT 1
@@ -582,6 +583,10 @@ typedef union {
#define SOCKET_OPT_SCTP_NODELAY 23
#define SOCKET_OPT_SCTP_RTOINFO 29
+/* We should *eventually* use this instead of hard-coding the size (to 1) */
+#define ESOCK_RECVMSG_IOVEC_SZ 1
+
+
/* =================================================================== *
* *
@@ -663,6 +668,7 @@ static unsigned long one_value = 1;
#define sock_recv(s,buf,len,flag) recv((s),(buf),(len),(flag))
#define sock_recvfrom(s,buf,blen,flag,addr,alen) \
recvfrom((s),(buf),(blen),(flag),(addr),(alen))
+#define sock_recvmsg(s,msghdr,flag) recvmsg((s),(msghdr),(flag))
#define sock_send(s,buf,len,flag) send((s), (buf), (len), (flag))
#define sock_sendto(s,buf,blen,flag,addr,alen) \
sendto((s),(buf),(blen),(flag),(addr),(alen))
@@ -759,8 +765,9 @@ typedef struct {
SocketRequestQueue acceptorsQ;
/* +++ Config & Misc stuff +++ */
- size_t rBufSz; // Read buffer size (when data length = 0 is specified)
- BOOLEAN_T iow; // Inform On Wrap
+ size_t rBufSz; // Read buffer size (when data length = 0 is specified)
+ size_t rCtrlSz; // Read control buffer size
+ BOOLEAN_T iow; // Inform On Wrap
BOOLEAN_T dbg;
/* +++ Close stuff +++ */
@@ -871,6 +878,9 @@ static ERL_NIF_TERM nif_recv(ErlNifEnv* env,
static ERL_NIF_TERM nif_recvfrom(ErlNifEnv* env,
int argc,
const ERL_NIF_TERM argv[]);
+static ERL_NIF_TERM nif_recvmsg(ErlNifEnv* env,
+ int argc,
+ const ERL_NIF_TERM argv[]);
static ERL_NIF_TERM nif_close(ErlNifEnv* env,
int argc,
const ERL_NIF_TERM argv[]);
@@ -947,6 +957,12 @@ static ERL_NIF_TERM nrecvfrom(ErlNifEnv* env,
ERL_NIF_TERM recvRef,
uint16_t bufSz,
int flags);
+static ERL_NIF_TERM nrecvmsg(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ ERL_NIF_TERM recvRef,
+ uint16_t bufLen,
+ uint16_t ctrlLen,
+ int flags);
static ERL_NIF_TERM nclose(ErlNifEnv* env,
SocketDescriptor* descP);
static ERL_NIF_TERM nshutdown(ErlNifEnv* env,
@@ -1854,6 +1870,13 @@ static ERL_NIF_TERM recvfrom_check_result(ErlNifEnv* env,
SocketAddress* fromAddrP,
unsigned int fromAddrLen,
ERL_NIF_TERM recvRef);
+static ERL_NIF_TERM recvmsg_check_result(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ int read,
+ int saveErrno,
+ struct msghdr* msgHdrP,
+ ErlNifBinary* ctrlBufP,
+ ERL_NIF_TERM recvRef);
static ERL_NIF_TERM nfinalize_connection(ErlNifEnv* env,
SocketDescriptor* descP);
@@ -2211,19 +2234,28 @@ static char str_exsend[] = "exsend"; // failed send
/* *** "Global" Atoms *** */
ERL_NIF_TERM esock_atom_addr;
ERL_NIF_TERM esock_atom_any;
+ERL_NIF_TERM esock_atom_ctrl;
+ERL_NIF_TERM esock_atom_ctrunc;
+ERL_NIF_TERM esock_atom_data;
ERL_NIF_TERM esock_atom_dgram;
ERL_NIF_TERM esock_atom_debug;
+ERL_NIF_TERM esock_atom_eor;
ERL_NIF_TERM esock_atom_error;
+ERL_NIF_TERM esock_atom_errqueue;
ERL_NIF_TERM esock_atom_false;
ERL_NIF_TERM esock_atom_family;
+ERL_NIF_TERM esock_atom_flags;
ERL_NIF_TERM esock_atom_flowinfo;
ERL_NIF_TERM esock_atom_inet;
ERL_NIF_TERM esock_atom_inet6;
+ERL_NIF_TERM esock_atom_iov;
ERL_NIF_TERM esock_atom_ip;
ERL_NIF_TERM esock_atom_ipv6;
+ERL_NIF_TERM esock_atom_level;
ERL_NIF_TERM esock_atom_local;
ERL_NIF_TERM esock_atom_loopback;
ERL_NIF_TERM esock_atom_ok;
+ERL_NIF_TERM esock_atom_oob;
ERL_NIF_TERM esock_atom_path;
ERL_NIF_TERM esock_atom_protocol;
ERL_NIF_TERM esock_atom_port;
@@ -2235,6 +2267,7 @@ ERL_NIF_TERM esock_atom_seqpacket;
ERL_NIF_TERM esock_atom_stream;
ERL_NIF_TERM esock_atom_tcp;
ERL_NIF_TERM esock_atom_true;
+ERL_NIF_TERM esock_atom_trunc;
ERL_NIF_TERM esock_atom_type;
ERL_NIF_TERM esock_atom_udp;
ERL_NIF_TERM esock_atom_undefined;
@@ -2355,7 +2388,8 @@ static SocketData data;
* nif_send(Sock, SendRef, Data, Flags)
* nif_sendto(Sock, SendRef, Data, Dest, Flags)
* nif_recv(Sock, RecvRef, Length, Flags)
- * nif_recvfrom(Sock, Flags)
+ * nif_recvfrom(Sock, RecvRef, BufSz, Flags)
+ * nif_recvmsg(Sock, RecvRef, BufSz, CtrlSz, Flags)
* nif_close(Sock)
* nif_shutdown(Sock, How)
* nif_sockname(Sock)
@@ -4024,6 +4058,189 @@ ERL_NIF_TERM nrecvfrom(ErlNifEnv* env,
/* ----------------------------------------------------------------------
+ * nif_recvmsg
+ *
+ * Description:
+ * Receive a message on a socket.
+ * Normally used only on a (un-) connected socket!
+ * If a buffer size = 0 is specified, then we will use the default
+ * buffer size for this socket (whatever has been configured).
+ * If ctrl (buffer) size = 0 is specified, then the default ctrl
+ * (buffer) size is used (1024).
+ *
+ * Arguments:
+ * Socket (ref) - Points to the socket descriptor.
+ * RecvRef - A unique id for this (send) request.
+ * BufSz - Size of the buffer into which we put the received message.
+ * CtrlSz - Size of the ctrl (buffer) into which we put the received
+ * ancillary data.
+ * Flags - Receive flags.
+ *
+ * <KOLLA>
+ *
+ * How do we handle if the peek flag is set? We need to basically keep
+ * track of if we expect any data from the read. Regardless of the
+ * number of bytes we try to read.
+ *
+ * </KOLLA>
+ */
+
+static
+ERL_NIF_TERM nif_recvmsg(ErlNifEnv* env,
+ int argc,
+ const ERL_NIF_TERM argv[])
+{
+ SocketDescriptor* descP;
+ ERL_NIF_TERM recvRef;
+ unsigned int bufSz;
+ unsigned int ctrlSz;
+ unsigned int eflags;
+ int flags;
+ ERL_NIF_TERM res;
+
+ SGDBG( ("SOCKET", "nif_recvmsg -> entry with argc: %d\r\n", argc) );
+
+ /* Extract arguments and perform preliminary validation */
+
+ if ((argc != 4) ||
+ !enif_get_resource(env, argv[0], sockets, (void**) &descP) ||
+ !GET_UINT(env, argv[2], &bufSz) ||
+ !GET_UINT(env, argv[3], &ctrlSz) ||
+ !GET_UINT(env, argv[4], &eflags)) {
+ return enif_make_badarg(env);
+ }
+ recvRef = argv[1];
+
+ SSDBG( descP,
+ ("SOCKET", "nif_recvmsg -> args when sock = %d:"
+ "\r\n Socket: %T"
+ "\r\n recvRef: %T"
+ "\r\n bufSz: %d"
+ "\r\n ctrlSz: %d"
+ "\r\n eflags: %d"
+ "\r\n", descP->sock, argv[0], recvRef, bufSz, ctrlSz, eflags) );
+
+ /* if (IS_OPEN(descP)) */
+ /* return esock_make_error(env, atom_enotconn); */
+
+ if (!erecvflags2recvflags(eflags, &flags))
+ return enif_make_badarg(env);
+
+ MLOCK(descP->readMtx);
+
+ /* <KOLLA>
+ *
+ * We need to handle the case when another process tries
+ * to receive at the same time.
+ * If the current recv could not read its entire package
+ * this time (resulting in an select). The read of the
+ * other process must be made to wait until current
+ * is done!
+ * Basically, we need a read queue!
+ *
+ * A 'reading' field (boolean), which is set if we did
+ * not manage to read the entire message and reset every
+ * time we do.
+ *
+ * </KOLLA>
+ */
+
+ res = nrecvmsg(env, descP, recvRef, bufSz, ctrlSz, flags);
+
+ MUNLOCK(descP->readMtx);
+
+ return res;
+
+}
+
+
+/* The (read) buffer handling *must* be optimized!
+ * But for now we make it easy for ourselves by
+ * allocating a binary (of the specified or default
+ * size) and then throwing it away...
+ */
+static
+ERL_NIF_TERM nrecvmsg(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ ERL_NIF_TERM recvRef,
+ uint16_t bufLen,
+ uint16_t ctrlLen,
+ int flags)
+{
+ unsigned int addrLen;
+ ssize_t read;
+ int save_errno;
+ ErlNifBinary buf, ctrl;
+ int bufSz = (bufLen ? bufLen : descP->rBufSz);
+ int ctrlSz = (ctrlLen ? ctrlLen : descP->rCtrlSz);
+ struct msghdr msgHdr;
+ struct iovec iov[1]; // Shall we always use 1?
+ SocketAddress addr;
+
+ SSDBG( descP, ("SOCKET", "nrecvfrom -> entry with"
+ "\r\n bufSz: %d (%d)"
+ "\r\n ctrlSz: %d (%d)"
+ "\r\n flags: %d"
+ "\r\n", bufSz, bufLen, ctrlSz, ctrlLen, flags) );
+
+ if (!descP->isReadable)
+ return enif_make_badarg(env);
+
+ /*
+ for (i = 0; i < sizeof(buf); i++) {
+ if (!ALLOC_BIN(bifSz, &buf[i]))
+ return esock_make_error(env, atom_exalloc);
+ iov[i].iov_base = buf[i].data;
+ iov[i].iov_len = buf[i].size;
+ }
+ */
+
+ /* Allocate the (msg) data buffer:
+ */
+ if (!ALLOC_BIN(bufSz, &buf))
+ return esock_make_error(env, atom_exalloc);
+
+ /* Allocate the ctrl (buffer):
+ */
+ if (!ALLOC_BIN(ctrlSz, &ctrl))
+ return esock_make_error(env, atom_exalloc);
+
+ /* We ignore the wrap for the moment.
+ * Maybe we should issue a wrap-message to controlling process...
+ */
+ cnt_inc(&descP->readTries, 1);
+
+ addrLen = sizeof(addr);
+ sys_memzero((char*) &addr, addrLen);
+ sys_memzero((char*) &msgHdr, sizeof(msgHdr));
+
+ iov[0].iov_base = buf.data;
+ iov[0].iov_len = buf.size;
+
+ msgHdr.msg_name = &addr;
+ msgHdr.msg_namelen = addrLen;
+ msgHdr.msg_iov = iov;
+ msgHdr.msg_iovlen = 1; // Should use a constant or calculate...
+ msgHdr.msg_control = ctrl.data;
+ msgHdr.msg_controllen = ctrl.size;
+
+ read = sock_recvmsg(descP->sock, &msgHdr, flags);
+ if (IS_SOCKET_ERROR(read))
+ save_errno = sock_errno();
+ else
+ save_errno = -1; // The value does not actually matter in this case
+
+ return recvmsg_check_result(env, descP,
+ read,
+ save_errno,
+ &msgHdr,
+ &ctrl, // Needed for ctrl header decode
+ recvRef);
+}
+
+
+
+/* ----------------------------------------------------------------------
* nif_close
*
* Description:
@@ -9800,7 +10017,7 @@ ERL_NIF_TERM ngetopt_timeval_opt(ErlNifEnv* env,
ESOCK_ASSERT( (numKeys == numVals) );
if (!MKMA(env, keys, vals, numKeys, &eTimeVal))
- return esock_make_error(env, esock_atom_einval);;
+ return esock_make_error(env, esock_atom_einval);
result = esock_make_ok2(env, eTimeVal);
}
@@ -10370,6 +10587,110 @@ ERL_NIF_TERM recvfrom_check_result(ErlNifEnv* env,
+/* The recvmsg function delivers one (1) message. If our buffer
+ * is to small, the message will be truncated. So, regardless
+ * if we filled the buffer or not, we have got what we are going
+ * to get regarding this message.
+ */
+static
+ERL_NIF_TERM recvmsg_check_result(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ int read,
+ int saveErrno,
+ struct msghdr* msgHdrP,
+ ErlNifBinary* ctrlBufP,
+ ERL_NIF_TERM recvRef)
+{
+
+ SSDBG( descP,
+ ("SOCKET", "recvmsg_check_result -> entry with"
+ "\r\n read: %d"
+ "\r\n saveErrno: %d"
+ "\r\n recvRef: %T"
+ "\r\n", read, saveErrno, recvRef) );
+
+
+ /* There is a special case: If the provided 'to read' value is
+ * zero (0). That means that we reads as much as we can, using
+ * the default read buffer size.
+ */
+
+ if (read < 0) {
+
+ /* +++ Error handling +++ */
+
+ if (saveErrno == ECONNRESET) {
+
+ /* +++ Oups - closed +++ */
+
+ SSDBG( descP, ("SOCKET", "recvfrom_check_result -> closed\r\n") );
+
+ /* <KOLLA>
+ * IF THE CURRENT PROCESS IS *NOT* THE CONTROLLING
+ * PROCESS, WE NEED TO INFORM IT!!!
+ *
+ * ALL WAITING PROCESSES MUST ALSO GET THE ERROR!!
+ *
+ * </KOLLA>
+ */
+
+ descP->closeLocal = FALSE;
+ descP->state = SOCKET_STATE_CLOSING;
+
+ SELECT(env,
+ descP->sock,
+ (ERL_NIF_SELECT_STOP),
+ descP, NULL, recvRef);
+
+ return esock_make_error(env, atom_closed);
+
+ } else if ((saveErrno == ERRNO_BLOCK) ||
+ (saveErrno == EAGAIN)) {
+
+ SSDBG( descP, ("SOCKET", "recvfrom_check_result -> eagain\r\n") );
+
+ SELECT(env, descP->sock, (ERL_NIF_SELECT_READ),
+ descP, NULL, recvRef);
+
+ return esock_make_error(env, esock_atom_eagain);
+ } else {
+
+ SSDBG( descP,
+ ("SOCKET",
+ "recvfrom_check_result -> errno: %d\r\n", saveErrno) );
+
+ return esock_make_error_errno(env, saveErrno);
+ }
+
+ } else {
+
+ /* +++ We sucessfully got a message - time to encode it +++ */
+
+ ERL_NIF_TERM eMsgHdr;
+ char* xres;
+
+ /*
+ * <KOLLA>
+ *
+ * The return value of recvmsg is the *total* number of bytes
+ * that where successfully read. This data has been put into
+ * the *IO vector*.
+ *
+ * </KOLLA>
+ */
+
+ if ((xres = esock_encode_msghdr(env, read,
+ msgHdrP, ctrlBufP,
+ &eMsgHdr)) != NULL)
+ return esock_make_error_str(env, xres);
+ else
+ return esock_make_ok2(env, eMsgHdr);
+
+ }
+}
+
+
+
/* +++ decode the linger value +++
* The (socket) linger option is provided as a two tuple:
*
@@ -10831,6 +11152,7 @@ SocketDescriptor* alloc_descriptor(SOCKET sock, HANDLE event)
descP->closeMtx = MCREATE(buf);
descP->rBufSz = SOCKET_RECV_BUFFER_SIZE_DEFAULT;
+ descP->rCtrlSz = SOCKET_RECV_CTRL_BUFFER_SIZE_DEFAULT;
descP->iow = FALSE;
descP->dbg = SOCKET_DEBUG_DEFAULT;
@@ -11941,6 +12263,7 @@ ErlNifFunc socket_funcs[] =
{"nif_sendto", 5, nif_sendto, 0},
{"nif_recv", 4, nif_recv, 0},
{"nif_recvfrom", 4, nif_recvfrom, 0},
+ {"nif_recvmsg", 5, nif_recvmsg, 0},
{"nif_close", 1, nif_close, 0},
{"nif_shutdown", 2, nif_shutdown, 0},
{"nif_setopt", 5, nif_setopt, 0},
@@ -12077,16 +12400,23 @@ int on_load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info)
/* Global atom(s) */
esock_atom_addr = MKA(env, "addr");
esock_atom_any = MKA(env, "any");
+ esock_atom_ctrl = MKA(env, "ctrl");
+ esock_atom_ctrunc = MKA(env, "ctrunc");
+ esock_atom_data = MKA(env, "data");
esock_atom_debug = MKA(env, "debug");
esock_atom_dgram = MKA(env, "dgram");
+ esock_atom_eor = MKA(env, "eor");
esock_atom_error = MKA(env, "error");
+ esock_atom_errqueue = MKA(env, "errqueue");
esock_atom_false = MKA(env, "false");
esock_atom_family = MKA(env, "family");
+ esock_atom_flags = MKA(env, "flags");
esock_atom_flowinfo = MKA(env, "flowinfo");
esock_atom_inet = MKA(env, "inet");
esock_atom_inet6 = MKA(env, "inet6");
- esock_atom_ip = MKA(env, "ip");
+ esock_atom_ip = MKA(env, "iov");
esock_atom_ipv6 = MKA(env, "ipvp");
+ esock_atom_level = MKA(env, "level");
esock_atom_local = MKA(env, "local");
esock_atom_loopback = MKA(env, "loopback");
esock_atom_ok = MKA(env, "ok");
@@ -12101,6 +12431,7 @@ int on_load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info)
esock_atom_stream = MKA(env, "stream");
esock_atom_tcp = MKA(env, "tcp");
esock_atom_true = MKA(env, "true");
+ esock_atom_trunc = MKA(env, "trunc");
esock_atom_type = MKA(env, "type");
esock_atom_udp = MKA(env, "udp");
esock_atom_undefined = MKA(env, "undefined");