aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--erts/doc/src/socket.xml49
-rw-r--r--erts/emulator/nifs/common/socket_nif.c526
-rw-r--r--erts/emulator/nifs/common/socket_util.c59
-rw-r--r--erts/emulator/nifs/common/socket_util.h7
-rw-r--r--erts/preloaded/ebin/socket.beambin61704 -> 64304 bytes
-rw-r--r--erts/preloaded/src/socket.erl172
-rw-r--r--lib/kernel/test/socket_client.erl15
-rw-r--r--lib/kernel/test/socket_server.erl10
8 files changed, 813 insertions, 25 deletions
diff --git a/erts/doc/src/socket.xml b/erts/doc/src/socket.xml
index 2fb922408b..93a3a8172e 100644
--- a/erts/doc/src/socket.xml
+++ b/erts/doc/src/socket.xml
@@ -403,6 +403,40 @@
</func>
<func>
+ <name name="recvmsg" arity="1"/>
+ <name name="recvmsg" arity="2" clause_i="1"/>
+ <name name="recvmsg" arity="2" clause_i="2"/>
+ <name name="recvmsg" arity="3"/>
+ <name name="recvmsg" arity="5"/>
+ <fsummary>Receive a message from a socket.</fsummary>
+ <desc>
+ <p>Receive a message from a socket.</p>
+ <p>This function reads "messages", which means that regardless of
+ how much we want to read, it returns when we get a message.</p>
+ <p>The message will be delivered in the form of a <c>msghdr()</c>,
+ which may contain the source address (if socket not connected),
+ a list of <c>cmsghdr()</c> (depends on what socket options have
+ been set and what the protocol and platform supports) and
+ also a set of flags, providing further info about the read . </p>
+
+ <p>The <c>BufSz</c> argument basically defines the size of the
+ receive buffer. By setting the value to zero (0), the configured
+ size (setopt with <c>Level</c> = <c>otp</c>) is used.</p>
+
+ <p>The <c>CtrlSz</c> argument basically defines the size of the
+ receive buffer for the control messages.
+ By setting the value to zero (0), the configured size (setopt
+ with <c>Level</c> = <c>otp</c>) is used.</p>
+
+ <p>It may be impossible to know what (buffer) size is appropriate
+ "in advance", and in those cases it may be convenient to use the
+ (recv) 'peek' flag. When this flag is provided, the message is *not*
+ "consumed" from the underlying buffers, so another recvmsg call
+ is needed, possibly with a then adjusted buffer size.</p>
+ </desc>
+ </func>
+
+ <func>
<name name="send" arity="2"/>
<name name="send" arity="3" clause_i="1"/>
<name name="send" arity="3" clause_i="2"/>
@@ -414,6 +448,21 @@
</func>
<func>
+ <name name="sendmsg" arity="2"/>
+ <name name="sendmsg" arity="3" clause_i="1"/>
+ <name name="sendmsg" arity="3" clause_i="2"/>
+ <name name="sendmsg" arity="4"/>
+ <fsummary>Send a message on a socket.</fsummary>
+ <desc>
+ <p>Send a message on a socket. The destination, if needed (socket not
+ connected) is provided in the <c>MsgHdr</c>, which also
+ contains the message to send, The <c>MsgHdr</c> may also contain
+ an list of optional <c>cmsghdr()</c> (depends on what the protocol and
+ platform supports).</p>
+ </desc>
+ </func>
+
+ <func>
<name name="sendto" arity="3"/>
<name name="sendto" arity="4"/>
<name name="sendto" arity="5"/>
diff --git a/erts/emulator/nifs/common/socket_nif.c b/erts/emulator/nifs/common/socket_nif.c
index 4144341d71..0d584306f1 100644
--- a/erts/emulator/nifs/common/socket_nif.c
+++ b/erts/emulator/nifs/common/socket_nif.c
@@ -397,9 +397,10 @@ static void (*esock_sctp_freepaddrs)(struct sockaddr *addrs) = NULL;
#define IS_CONNECTING(d) \
(((d)->state & SOCKET_FLAG_CON) == SOCKET_FLAG_CON)
+/*
#define IS_BUSY(d) \
(((d)->state & SOCKET_FLAG_BUSY) == SOCKET_FLAG_BUSY)
-
+*/
#define SOCKET_SEND_FLAG_CONFIRM 0
#define SOCKET_SEND_FLAG_DONTROUTE 1
@@ -420,6 +421,7 @@ static void (*esock_sctp_freepaddrs)(struct sockaddr *addrs) = NULL;
#define SOCKET_RECV_BUFFER_SIZE_DEFAULT 2048
#define SOCKET_RECV_CTRL_BUFFER_SIZE_DEFAULT 1024
+#define SOCKET_SEND_CTRL_BUFFER_SIZE_DEFAULT 1024
#define VT2S(__VT__) (((__VT__) == SOCKET_OPT_VALUE_TYPE_UNSPEC) ? "unspec" : \
(((__VT__) == SOCKET_OPT_VALUE_TYPE_INT) ? "int" : \
@@ -676,6 +678,7 @@ static unsigned long one_value = 1;
recvfrom((s),(buf),(blen),(flag),(addr),(alen))
#define sock_recvmsg(s,msghdr,flag) recvmsg((s),(msghdr),(flag))
#define sock_send(s,buf,len,flag) send((s), (buf), (len), (flag))
+#define sock_sendmsg(s,msghdr,flag) sendmsg((s),(msghdr),(flag))
#define sock_sendto(s,buf,blen,flag,addr,alen) \
sendto((s),(buf),(blen),(flag),(addr),(alen))
#define sock_setopt(s,l,o,v,ln) setsockopt((s),(l),(o),(v),(ln))
@@ -773,6 +776,7 @@ typedef struct {
/* +++ Config & Misc stuff +++ */
size_t rBufSz; // Read buffer size (when data length = 0 is specified)
size_t rCtrlSz; // Read control buffer size
+ size_t wCtrlSz; // Write control buffer size
BOOLEAN_T iow; // Inform On Wrap
BOOLEAN_T dbg;
@@ -878,6 +882,9 @@ static ERL_NIF_TERM nif_send(ErlNifEnv* env,
static ERL_NIF_TERM nif_sendto(ErlNifEnv* env,
int argc,
const ERL_NIF_TERM argv[]);
+static ERL_NIF_TERM nif_sendmsg(ErlNifEnv* env,
+ int argc,
+ const ERL_NIF_TERM argv[]);
static ERL_NIF_TERM nif_recv(ErlNifEnv* env,
int argc,
const ERL_NIF_TERM argv[]);
@@ -953,6 +960,11 @@ static ERL_NIF_TERM nsendto(ErlNifEnv* env,
int flags,
SocketAddress* toAddrP,
unsigned int toAddrLen);
+static ERL_NIF_TERM nsendmsg(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ ERL_NIF_TERM sendRef,
+ ERL_NIF_TERM eMsgHdr,
+ int flags);
static ERL_NIF_TERM nrecv(ErlNifEnv* env,
SocketDescriptor* descP,
ERL_NIF_TERM recvRef,
@@ -1902,13 +1914,30 @@ extern char* encode_cmsghdrs(ErlNifEnv* env,
ErlNifBinary* cmsgBinP,
struct msghdr* msgHdrP,
ERL_NIF_TERM* eCMsgHdr);
+extern char* decode_cmsghdrs(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ ERL_NIF_TERM eCMsgHdr,
+ void* cmsgHdrBufP,
+ size_t cmsgHdrBufLen);
+extern char* decode_cmsghdr(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ ERL_NIF_TERM eCMsgHdr,
+ void** bufP,
+ size_t* rem);
static char* encode_cmsghdr_level(ErlNifEnv* env,
int level,
ERL_NIF_TERM* eLevel);
+static char* decode_cmsghdr_level(ErlNifEnv* env,
+ ERL_NIF_TERM eLevel,
+ int* level);
static char* encode_cmsghdr_type(ErlNifEnv* env,
int level,
int type,
ERL_NIF_TERM* eType);
+static char* decode_cmsghdr_type(ErlNifEnv* env,
+ int level,
+ ERL_NIF_TERM eType,
+ int* type);
static char* encode_cmsghdr_data(ErlNifEnv* env,
ERL_NIF_TERM ctrlBuf,
int level,
@@ -2355,6 +2384,7 @@ static SocketData data;
* nif_accept(LSock, Ref)
* nif_send(Sock, SendRef, Data, Flags)
* nif_sendto(Sock, SendRef, Data, Dest, Flags)
+ * nif_sendmsg(Sock, SendRef, MsgHdr, Flags)
* nif_recv(Sock, RecvRef, Length, Flags)
* nif_recvfrom(Sock, RecvRef, BufSz, Flags)
* nif_recvmsg(Sock, RecvRef, BufSz, CtrlSz, Flags)
@@ -3641,8 +3671,8 @@ ERL_NIF_TERM nif_sendto(ErlNifEnv* env,
&remoteAddr, remoteAddrLen);
SGDBG( ("SOCKET", "nif_sendto -> done with result: "
- "\r\n %T"
- "\r\n", res) );
+ "\r\n %T"
+ "\r\n", res) );
return res;
}
@@ -3688,6 +3718,197 @@ ERL_NIF_TERM nsendto(ErlNifEnv* env,
/* ----------------------------------------------------------------------
+ * nif_sendmsg
+ *
+ * Description:
+ * Send a message on a socket
+ *
+ * Arguments:
+ * Socket (ref) - Points to the socket descriptor.
+ * SendRef - A unique id for this (send) request.
+ * MsgHdr - Message Header - data and (maybe) control and dest
+ * Flags - Send flags.
+ */
+
+static
+ERL_NIF_TERM nif_sendmsg(ErlNifEnv* env,
+ int argc,
+ const ERL_NIF_TERM argv[])
+{
+ ERL_NIF_TERM res, sendRef, eMsgHdr;
+ SocketDescriptor* descP;
+ unsigned int eflags;
+ int flags;
+
+ SGDBG( ("SOCKET", "nif_sendmsg -> entry with argc: %d\r\n", argc) );
+
+ /* Extract arguments and perform preliminary validation */
+
+ if ((argc != 4) ||
+ !enif_get_resource(env, argv[0], sockets, (void**) &descP) ||
+ !IS_MAP(env, argv[2]) ||
+ !GET_UINT(env, argv[3], &eflags)) {
+ return enif_make_badarg(env);
+ }
+ sendRef = argv[1];
+ eMsgHdr = argv[2];
+
+ SSDBG( descP,
+ ("SOCKET", "nif_sendmsg -> args when sock = %d:"
+ "\r\n Socket: %T"
+ "\r\n sendRef: %T"
+ "\r\n eflags: %d"
+ "\r\n",
+ descP->sock, argv[0], sendRef, eflags) );
+
+ /* THIS TEST IS NOT CORRECT!!! */
+ if (!IS_OPEN(descP))
+ return esock_make_error(env, esock_atom_einval);
+
+ if (!esendflags2sendflags(eflags, &flags))
+ return esock_make_error(env, esock_atom_einval);
+
+ res = nsendmsg(env, descP, sendRef, eMsgHdr, flags);
+
+ SGDBG( ("SOCKET", "nif_sendmsg -> done with result: "
+ "\r\n %T"
+ "\r\n", res) );
+
+ return res;
+}
+
+
+static
+ERL_NIF_TERM nsendmsg(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ ERL_NIF_TERM sendRef,
+ ERL_NIF_TERM eMsgHdr,
+ int flags)
+{
+ ERL_NIF_TERM res, eAddr, eIOV, eCtrl;
+ SocketAddress addr;
+ struct msghdr msgHdr;
+ ErlNifBinary* iovBins;
+ struct iovec* iov;
+ unsigned int iovLen;
+ void* ctrlBuf;
+ size_t ctrlBufLen;
+ int save_errno;
+ ssize_t written, dataSize;
+ char* xres;
+
+ if (!descP->isWritable)
+ return enif_make_badarg(env);
+
+
+ /* Depending on if we are *connected* or not, we require
+ * different things in the msghdr map.
+ */
+ if (IS_CONNECTED(descP)) {
+
+ /* We don't need the address */
+
+ msgHdr.msg_name = NULL;
+ msgHdr.msg_namelen = 0;
+
+ } else {
+
+ /* We need the address */
+
+ msgHdr.msg_name = (void*) &addr;
+ msgHdr.msg_namelen = sizeof(addr);
+ sys_memzero((char *) msgHdr.msg_name, msgHdr.msg_namelen);
+ if (!GET_MAP_VAL(env, eMsgHdr, esock_atom_addr, &eAddr))
+ return esock_make_error(env, esock_atom_einval);
+ if ((xres = esock_decode_sockaddr(env, eAddr,
+ msgHdr.msg_name,
+ &msgHdr.msg_namelen)) != NULL)
+ return esock_make_error_str(env, xres);
+ }
+
+
+ /* Extract the (other) attributes of the msghdr map: iov and maybe ctrl */
+
+ /* The *mandatory* iov, which must be a list */
+ if (!GET_MAP_VAL(env, eMsgHdr, esock_atom_iov, &eIOV))
+ return esock_make_error(env, esock_atom_einval);
+
+ if (!GET_LIST_LEN(env, eIOV, &iovLen) && (iovLen > 0))
+ return esock_make_error(env, esock_atom_einval);
+
+ iovBins = MALLOC(iovLen * sizeof(ErlNifBinary));
+ ESOCK_ASSERT( (iovBins != NULL) );
+
+ iov = MALLOC(iovLen * sizeof(struct iovec));
+ ESOCK_ASSERT( (iov != NULL) );
+
+ /* The *opional* ctrl */
+ if (GET_MAP_VAL(env, eMsgHdr, esock_atom_ctrl, &eCtrl)) {
+ ctrlBufLen = descP->wCtrlSz;
+ ctrlBuf = MALLOC(ctrlBufLen);
+ ESOCK_ASSERT( (ctrlBuf != NULL) );
+ } else {
+ eCtrl = esock_atom_undefined;
+ ctrlBufLen = 0;
+ ctrlBuf = NULL;
+ }
+
+
+ /* Decode the iov and initiate that part of the msghdr */
+ if ((xres = esock_decode_iov(env, eIOV,
+ iovBins, iov, iovLen, &dataSize)) != NULL) {
+ FREE(iovBins);
+ FREE(iov);
+ if (ctrlBuf != NULL) FREE(ctrlBuf);
+ return esock_make_error_str(env, xres);
+ }
+ msgHdr.msg_iov = iov;
+ msgHdr.msg_iovlen = iovLen;
+
+
+ /* Decode the ctrl and initiate that part of the msghdr */
+ if (ctrlBuf != NULL) {
+ if ((xres = decode_cmsghdrs(env, descP,
+ eCtrl, ctrlBuf, ctrlBufLen)) != NULL) {
+ FREE(iovBins);
+ FREE(iov);
+ if (ctrlBuf != NULL) FREE(ctrlBuf);
+ return esock_make_error_str(env, xres);
+ }
+ }
+ msgHdr.msg_control = ctrlBuf;
+ msgHdr.msg_controllen = ctrlBufLen;
+
+
+ /* The msg-flags field is not used when sending, but zero it just in case */
+ msgHdr.msg_flags = 0;
+
+
+ /* We ignore the wrap for the moment.
+ * Maybe we should issue a wrap-message to controlling process...
+ */
+ cnt_inc(&descP->writeTries, 1);
+
+ /* And now, finally, try to send the message */
+ written = sock_sendmsg(descP->sock, &msgHdr, flags);
+
+ if (IS_SOCKET_ERROR(written))
+ save_errno = sock_errno();
+ else
+ save_errno = -1; // The value does not actually matter in this case
+
+ res = send_check_result(env, descP, written, dataSize, save_errno, sendRef);
+
+ FREE(iovBins);
+ FREE(iov);
+ if (ctrlBuf != NULL) FREE(ctrlBuf);
+
+ return res;
+}
+
+
+
+/* ----------------------------------------------------------------------
* nif_writev / nif_sendv
*
* Description:
@@ -10198,13 +10419,14 @@ ERL_NIF_TERM send_check_result(ErlNifEnv* env,
"\r\n saveErrno: %d"
"\r\n", written, dataSize, saveErrno) );
- if (written == dataSize) {
+ if (written >= dataSize) {
cnt_inc(&descP->writePkgCnt, 1);
cnt_inc(&descP->writeByteCnt, written);
SSDBG( descP,
- ("SOCKET", "send_check_result -> everything written - done\r\n") );
+ ("SOCKET", "send_check_result -> "
+ "everything written (%d,%d) - done\r\n", dataSize, written) );
return esock_atom_ok;
@@ -10248,7 +10470,7 @@ ERL_NIF_TERM send_check_result(ErlNifEnv* env,
SSDBG( descP,
("SOCKET", "send_check_result -> not entire package written\r\n") );
- return esock_make_ok2(env, enif_make_int(env, written));
+ return esock_make_ok2(env, MKI(env, written));
}
@@ -10791,6 +11013,7 @@ char* encode_msghdr(ErlNifEnv* env,
+
/* +++ encode_cmsghdrs +++
*
* Encode a list of cmsghdr(). There can be 0 or more cmsghdr "blocks".
@@ -10921,9 +11144,157 @@ char* encode_cmsghdrs(ErlNifEnv* env,
+/* +++ decode_cmsghdrs +++
+ *
+ * Decode a list of cmsghdr(). There can be 0 or more cmsghdr "blocks".
+ *
+ * Each element can either be a (erlang) map that needds to be decoded,
+ * or a (erlang) binary that just needs to be appended to the control
+ * buffer.
+ *
+ * Our "problem" is that we have no idea much memory we actually need.
+ *
+ */
+
+extern
+char* decode_cmsghdrs(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ ERL_NIF_TERM eCMsgHdr,
+ void* cmsgHdrBufP,
+ size_t cmsgHdrBufLen)
+{
+ ERL_NIF_TERM elem, tail, list;
+ void* bufP;
+ size_t rem;
+ unsigned int len;
+ int i;
+ char* xres;
+
+ if (IS_LIST(env, eCMsgHdr) && GET_LIST_LEN(env, eCMsgHdr, &len)) {
+ for (i = 0, list = eCMsgHdr, rem = cmsgHdrBufLen, bufP = cmsgHdrBufP;
+ i < len; i++) {
+
+ /* Extract the (current) head of the (cmsg hdr) list */
+ if (!GET_LIST_ELEM(env, list, &elem, &tail))
+ return ESOCK_STR_EINVAL;
+
+ if ((xres = decode_cmsghdr(env, descP, elem, &bufP, &rem)) != NULL)
+ return xres;
+
+ list = tail;
+ }
+
+ xres = NULL;
+ } else {
+ xres = ESOCK_STR_EINVAL;
+ }
+
+ return xres;
+}
+
+
+/* +++ decode_cmsghdr +++
+ *
+ * Decode one cmsghdr(). Put the "result" into the buffer and advance the
+ * pointer (of the buffer) afterwards. Also update 'rem' accordingly.
+ * But before the actual decode, make sure that there is enough room in
+ * the buffer for the cmsg header (sizeof(*hdr) < rem).
+ *
+ * The eCMsgHdr should be a map with three fields:
+ *
+ * level :: cmsghdr_level() (socket | protocol() | integer())
+ * type :: cmsghdr_type() (atom() | integer())
+ * What values are valid depend on the level
+ * data :: cmsghdr_data() (term() | binary())
+ * The type of the data depends on
+ * level and type, but can be a binary,
+ * which means that the data already coded.
+ */
+extern
+char* decode_cmsghdr(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ ERL_NIF_TERM eCMsgHdr,
+ void** bufP,
+ size_t* rem)
+{
+ if (IS_MAP(env, eCMsgHdr)) {
+ ERL_NIF_TERM eLevel, eType, eData;
+ int level, type;
+ char* xres;
+
+ /* First extract all three attributes (as terms) */
+
+ if (!GET_MAP_VAL(env, eCMsgHdr, esock_atom_level, &eLevel))
+ return ESOCK_STR_EINVAL;
+
+ if (!GET_MAP_VAL(env, eCMsgHdr, esock_atom_type, &eType))
+ return ESOCK_STR_EINVAL;
+
+ if (!GET_MAP_VAL(env, eCMsgHdr, esock_atom_data, &eData))
+ return ESOCK_STR_EINVAL;
+
+ /* Second, decode level */
+ if ((xres = decode_cmsghdr_level(env, eLevel, &level)) != NULL)
+ return xres;
+
+ /* third, decode type */
+ if ((xres = decode_cmsghdr_type(env, level, eType, &type)) != NULL)
+ return xres;
+
+ /* And finally data
+ * If its a binary, we are done. Otherwise, we need to check
+ * level and type to know what kind of data to expect.
+ *
+ * <KOLLA>
+ *
+ * At the moment, the only data we support is a binary...
+ *
+ * </KOLLA>
+ */
+
+ if (IS_BIN(env, eData)) {
+ ErlNifBinary bin;
+ size_t currentRem = *rem;
+
+ if (!GET_BIN(env, eData, &bin)) {
+ return ESOCK_STR_EINVAL;
+ } else {
+ int len = CMSG_LEN(bin.size); // The cmsghdr
+ int space = CMSG_SPACE(bin.size); // With padding
+ /* Make sure it fits before we copy */
+ if (currentRem >= space) {
+ struct cmsghdr* cmsgP = (struct cmsghdr*) bufP;
+
+ /* The header */
+ cmsgP->cmsg_len = len;
+ cmsgP->cmsg_level = level;
+ cmsgP->cmsg_type = type;
+
+ /* And the data */
+ sys_memcpy(CMSG_DATA(cmsgP), bin.data, bin.size);
+ *bufP += space;
+ *rem -= space;
+ } else {
+ return ESOCK_STR_EINVAL;
+ }
+ }
+ } else {
+
+ /* Here is where we should have the proper data decode */
+
+ return ESOCK_STR_EINVAL;
+ }
+ } else {
+ return ESOCK_STR_EINVAL;
+ }
+
+ return NULL;
+}
+
+
/* +++ encode_cmsghdr_level +++
*
- * Encode the type part of the cmsghdr().
+ * Encode the level part of the cmsghdr().
*
*/
@@ -10950,6 +11321,38 @@ char* encode_cmsghdr_level(ErlNifEnv* env,
+/* +++ decode_cmsghdr_level +++
+ *
+ * Decode the level part of the cmsghdr().
+ *
+ */
+
+static
+char* decode_cmsghdr_level(ErlNifEnv* env,
+ ERL_NIF_TERM eLevel,
+ int* level)
+{
+ char* xres = NULL;
+
+ if (IS_ATOM(env, eLevel)) {
+ if (COMPARE(eLevel, esock_atom_socket) == 0) {
+ *level = SOL_SOCKET;
+ xres = NULL;
+ } else {
+ xres = esock_decode_protocol(env, eLevel, level);
+ }
+ } else if (IS_NUM(env, eLevel)) {
+ if (!GET_INT(env, eLevel, level))
+ xres = ESOCK_STR_EINVAL;
+ } else {
+ xres = ESOCK_STR_EINVAL;
+ }
+
+ return xres;
+}
+
+
+
/* +++ encode_cmsghdr_type +++
*
* Encode the type part of the cmsghdr().
@@ -11073,6 +11476,113 @@ char* encode_cmsghdr_type(ErlNifEnv* env,
+/* +++ decode_cmsghdr_type +++
+ *
+ * Decode the type part of the cmsghdr().
+ *
+ */
+
+static
+char* decode_cmsghdr_type(ErlNifEnv* env,
+ int level,
+ ERL_NIF_TERM eType,
+ int* type)
+{
+ char* xres = NULL;
+
+ switch (level) {
+ case SOL_SOCKET:
+ if (COMPARE(eType, esock_atom_timestamp) == 0) {
+#if defined(SO_TIMESTAMP)
+ *type = SO_TIMESTAMP;
+#else
+ xres = ESOCK_STR_EINVAL;
+#endif
+ } else if (COMPARE(eType, esock_atom_rights) == 0) {
+#if defined(SCM_RIGHTS)
+ *type = SCM_RIGHTS;
+#else
+ xres = ESOCK_STR_EINVAL;
+#endif
+ } else if (COMPARE(eType, esock_atom_credentials) == 0) {
+#if defined(SCM_CREDENTIALS)
+ *type = SCM_CREDENTIALS;
+#else
+ xres = ESOCK_STR_EINVAL;
+#endif
+ } else {
+ xres = ESOCK_STR_EINVAL;
+ }
+ break;
+
+
+#if defined(SOL_IP)
+ case SOL_IP:
+#else
+ case IPPROTO_IP:
+#endif
+ if (COMPARE(eType, esock_atom_tos) == 0) {
+#if defined(IP_TOS)
+ *type = IP_TOS;
+#else
+ xres = ESOCK_STR_EINVAL;
+#endif
+ } else if (COMPARE(eType, esock_atom_ttl) == 0) {
+#if defined(IP_TTL)
+ *type = IP_TTL;
+#else
+ xres = ESOCK_STR_EINVAL;
+#endif
+ } else if (COMPARE(eType, esock_atom_pktinfo) == 0) {
+#if defined(IP_PKTINFO)
+ *type = IP_PKTINFO;
+#else
+ xres = ESOCK_STR_EINVAL;
+#endif
+ } else if (COMPARE(eType, esock_atom_origdstaddr) == 0) {
+#if defined(IP_ORIGDSTADDR)
+ *type = IP_ORIGDSTADDR;
+#else
+ xres = ESOCK_STR_EINVAL;
+#endif
+ } else {
+ xres = ESOCK_STR_EINVAL;
+ }
+ break;
+
+#if defined(SOL_IPV6)
+ case SOL_IPV6:
+ xres = ESOCK_STR_EINVAL;
+ break;
+#endif
+
+ case IPPROTO_TCP:
+ xres = ESOCK_STR_EINVAL;
+ break;
+ break;
+
+ case IPPROTO_UDP:
+ xres = ESOCK_STR_EINVAL;
+ break;
+ break;
+
+#if defined(HAVE_SCTP)
+ case IPPROTO_SCTP:
+ xres = ESOCK_STR_EINVAL;
+ break;
+ break;
+#endif
+
+ default:
+ xres = ESOCK_STR_EINVAL;
+ break;
+ }
+
+ return xres;
+}
+
+
+
/* +++ encode_cmsghdr_data +++
*
* Encode the data part of the cmsghdr().
@@ -11787,6 +12297,7 @@ SocketDescriptor* alloc_descriptor(SOCKET sock, HANDLE event)
descP->rBufSz = SOCKET_RECV_BUFFER_SIZE_DEFAULT;
descP->rCtrlSz = SOCKET_RECV_CTRL_BUFFER_SIZE_DEFAULT;
+ descP->wCtrlSz = SOCKET_SEND_CTRL_BUFFER_SIZE_DEFAULT;
descP->iow = FALSE;
descP->dbg = SOCKET_DEBUG_DEFAULT;
@@ -12895,6 +13406,7 @@ ErlNifFunc socket_funcs[] =
{"nif_accept", 2, nif_accept, 0},
{"nif_send", 4, nif_send, 0},
{"nif_sendto", 5, nif_sendto, 0},
+ {"nif_sendmsg", 4, nif_sendmsg, 0},
{"nif_recv", 4, nif_recv, 0},
{"nif_recvfrom", 4, nif_recvfrom, 0},
{"nif_recvmsg", 5, nif_recvmsg, 0},
diff --git a/erts/emulator/nifs/common/socket_util.c b/erts/emulator/nifs/common/socket_util.c
index 59cd1a3408..a73b40cd29 100644
--- a/erts/emulator/nifs/common/socket_util.c
+++ b/erts/emulator/nifs/common/socket_util.c
@@ -71,7 +71,7 @@ static char* make_sockaddr_un(ErlNifEnv* env,
/* +++ esock_encode_iov +++
*
- * Encode a IO Vector. In erlang we represented this as a list of binaries.
+ * Encode an IO Vector. In erlang we represented this as a list of binaries.
*
* We iterate through the IO vector, and as long as the remaining (rem)
* number of bytes is greater than the size of the current buffer, we
@@ -141,6 +141,61 @@ char* esock_encode_iov(ErlNifEnv* env,
+/* +++ esock_decode_iov +++
+ *
+ * Decode an IO Vector. In erlang we represented this as a list of binaries.
+ *
+ * We assume that we have already figured out how long the iov (actually
+ * eIOV) is (len), and therefor allocated an array of bins and iov to be
+ * used.
+ */
+
+extern
+char* esock_decode_iov(ErlNifEnv* env,
+ ERL_NIF_TERM eIOV,
+ ErlNifBinary* bufs,
+ struct iovec* iov,
+ size_t len,
+ ssize_t* totSize)
+{
+ uint16_t i;
+ ssize_t sz;
+ ERL_NIF_TERM elem, tail, list;
+
+ UDBG( ("SUTIL", "esock_decode_iov -> entry with"
+ "\r\n (IOV) len: %d"
+ "\r\n", read, len) );
+
+ for (i = 0, list = eIOV, sz = 0; (i < len); i++) {
+
+ UDBG( ("SUTIL", "esock_decode_iov -> "
+ "\r\n iov[%d].iov_len: %d"
+ "\r\n rem: %d"
+ "\r\n", i) );
+
+ if (!GET_LIST_ELEM(env, list, &elem, &tail))
+ return ESOCK_STR_EINVAL;
+
+ if (IS_BIN(env, elem) && GET_BIN(env, elem, &bufs[i])) {
+ iov[i].iov_base = bufs[i].data;
+ iov[i].iov_len = bufs[i].size;
+ sz += bufs[i].size;
+ } else {
+ return ESOCK_STR_EINVAL;
+ }
+
+ list = tail;
+ }
+
+ *totSize = sz;
+
+ UDBG( ("SUTIL", "esock_decode_msghdr -> done (%d)\r\n", sz) );
+
+ return NULL;
+}
+
+
+
/* +++ esock_decode_sockaddr +++
*
* Decode a socket address - sockaddr. In erlang its represented as
@@ -1100,7 +1155,7 @@ char* esock_encode_type(ErlNifEnv* env,
-/* +++ esock_decode_protocol +++
+/* +++ esock_encode_protocol +++
*
* Encode the native protocol to the Erlang form, that is:
*
diff --git a/erts/emulator/nifs/common/socket_util.h b/erts/emulator/nifs/common/socket_util.h
index 22eed77d6e..d0b3076df1 100644
--- a/erts/emulator/nifs/common/socket_util.h
+++ b/erts/emulator/nifs/common/socket_util.h
@@ -43,6 +43,13 @@ char* esock_encode_iov(ErlNifEnv* env,
ErlNifBinary* data,
ERL_NIF_TERM* eIOV);
extern
+char* esock_decode_iov(ErlNifEnv* env,
+ ERL_NIF_TERM eIOV,
+ ErlNifBinary* bufs,
+ struct iovec* iov,
+ size_t len,
+ ssize_t* totSize);
+extern
char* esock_decode_sockaddr(ErlNifEnv* env,
ERL_NIF_TERM eSockAddr,
SocketAddress* sockAddrP,
diff --git a/erts/preloaded/ebin/socket.beam b/erts/preloaded/ebin/socket.beam
index 2219b1b271..f84b9b8369 100644
--- a/erts/preloaded/ebin/socket.beam
+++ b/erts/preloaded/ebin/socket.beam
Binary files differ
diff --git a/erts/preloaded/src/socket.erl b/erts/preloaded/src/socket.erl
index 5902c161db..1459ee4869 100644
--- a/erts/preloaded/src/socket.erl
+++ b/erts/preloaded/src/socket.erl
@@ -38,12 +38,12 @@
send/2, send/3, send/4,
sendto/3, sendto/4, sendto/5,
- %% sendmsg/4,
+ sendmsg/2, sendmsg/3, sendmsg/4,
%% writev/4, OR SENDV? It will be strange for recv then: recvv (instead of readv)
recv/1, recv/2, recv/3, recv/4,
recvfrom/1, recvfrom/2, recvfrom/3, recvfrom/4,
- recvmsg/1, recvmsg/2, recvmsg/5,
+ recvmsg/1, recvmsg/2, recvmsg/3, recvmsg/5,
%% readv/3,
close/1,
@@ -508,13 +508,15 @@
-type msghdr_flags() :: [msghdr_flag()].
-type msghdr() :: #{
%% *Optional* target address
- %% *If* this field is specified for an unconnected
- %% socket, then it will be used as destination for the
- %% datagram.
+ %% Used on an unconnected socket to specify the
+ %% target address for a datagram.
addr => sockaddr(),
iov => [binary()],
-
+
+ %% The maximum size of the control buffer is platform
+ %% specific. It is the users responsibility to ensure
+ %% that its not exceeded.
ctrl => [cmsghdr()],
%% Only valid with recvmsg
@@ -577,10 +579,12 @@
-define(SOCKET_SEND_FLAG_NOSIGNAL, 4).
-define(SOCKET_SEND_FLAG_OOB, 5).
--define(SOCKET_SEND_FLAGS_DEFAULT, []).
--define(SOCKET_SEND_TIMEOUT_DEFAULT, infinity).
--define(SOCKET_SENDTO_FLAGS_DEFAULT, []).
--define(SOCKET_SENDTO_TIMEOUT_DEFAULT, ?SOCKET_SEND_TIMEOUT_DEFAULT).
+-define(SOCKET_SEND_FLAGS_DEFAULT, []).
+-define(SOCKET_SEND_TIMEOUT_DEFAULT, infinity).
+-define(SOCKET_SENDTO_FLAGS_DEFAULT, []).
+-define(SOCKET_SENDTO_TIMEOUT_DEFAULT, ?SOCKET_SEND_TIMEOUT_DEFAULT).
+-define(SOCKET_SENDMSG_FLAGS_DEFAULT, []).
+-define(SOCKET_SENDMSG_TIMEOUT_DEFAULT, ?SOCKET_SEND_TIMEOUT_DEFAULT).
-define(SOCKET_RECV_FLAG_CMSG_CLOEXEC, 0).
-define(SOCKET_RECV_FLAG_ERRQUEUE, 1).
@@ -1390,14 +1394,123 @@ do_sendto(SockRef, Data, Dest, EFlags, Timeout) ->
%% ---------------------------------------------------------------------------
+%%
+%% The only part of the msghdr() that *must* exist (a connected
+%% socket need not specify the addr field) is the iov.
+%% The ctrl field is optional, and the addr and flags are not
+%% used when sending.
+%%
-%% -spec sendmsg(Socket, MsgHdr, Flags) -> ok | {error, Reason} when
-%% Socket :: socket(),
-%% MsgHdr :: msg_hdr(),
-%% Flags :: send_flags(),
-%% Reason :: term().
+-spec sendmsg(Socket, MsgHdr) -> ok | {error, Reason} when
+ Socket :: socket(),
+ MsgHdr :: msghdr(),
+ Reason :: term().
+
+sendmsg(Socket, MsgHdr) ->
+ sendmsg(Socket, MsgHdr,
+ ?SOCKET_SENDMSG_FLAGS_DEFAULT, ?SOCKET_SENDMSG_TIMEOUT_DEFAULT).
+
+
+-spec sendmsg(Socket, MsgHdr, Flags) -> ok | {error, Reason} when
+ Socket :: socket(),
+ MsgHdr :: msghdr(),
+ Flags :: send_flags(),
+ Reason :: term()
+ ; (Socket, MsgHdr, Timeout) -> ok | {error, Reason} when
+ Socket :: socket(),
+ MsgHdr :: msghdr(),
+ Timeout :: timeout(),
+ Reason :: term().
+
+sendmsg(Socket, MsgHdr, Flags) when is_list(Flags) ->
+ sendmsg(Socket, MsgHdr, Flags, ?SOCKET_SENDMSG_TIMEOUT_DEFAULT);
+sendmsg(Socket, MsgHdr, Timeout)
+ when is_integer(Timeout) orelse (Timeout =:= infinity) ->
+ sendmsg(Socket, MsgHdr, ?SOCKET_SENDMSG_FLAGS_DEFAULT, Timeout).
+
+
+-spec sendmsg(Socket, MsgHdr, Flags, Timeout) -> ok | {error, Reason} when
+ Socket :: socket(),
+ MsgHdr :: msghdr(),
+ Flags :: send_flags(),
+ Timeout :: timeout(),
+ Reason :: term().
+
+sendmsg(#socket{ref = SockRef}, #{iov := IOV} = MsgHdr, Flags, Timeout)
+ when is_list(IOV) andalso
+ is_list(Flags) andalso
+ (is_integer(Timeout) orelse (Timeout =:= infinity)) ->
+ try ensure_msghdr(MsgHdr) of
+ M ->
+ EFlags = enc_send_flags(Flags),
+ do_sendmsg(SockRef, M, EFlags, Timeout)
+ catch
+ throw:T ->
+ T;
+ error:Reason ->
+ {error, Reason}
+ end.
+
+do_sendmsg(SockRef, MsgHdr, EFlags, Timeout) ->
+ TS = timestamp(Timeout),
+ SendRef = make_ref(),
+ case nif_sendmsg(SockRef, SendRef, MsgHdr, EFlags) of
+ ok ->
+ %% We are done
+ ok;
+
+ {error, eagain} ->
+ receive
+ {select, SockRef, SendRef, ready_output} ->
+ do_sendmsg(SockRef, MsgHdr, EFlags,
+ next_timeout(TS, Timeout))
+ after Timeout ->
+ nif_cancel(SockRef, sendmsg, SendRef),
+ flush_select_msgs(SockRef, SendRef),
+ {error, timeout}
+ end;
+
+ {error, _} = ERROR ->
+ ERROR
+ end.
+
+ensure_msghdr(#{iov := IOV} = M) when is_list(IOV) andalso (IOV =/= []) ->
+ M#{iov := erlang:iolist_to_iovec(IOV)};
+ensure_msghdr(_) ->
+ einval().
+
+
+
+%% send(Sock, #{ctrl = Ctrl} = MsgHdr, Flags) when is_list(Ctrl) ->
+%% case encode_cmsghdrs(Ctrl) of
+%% undefined ->
+%% send(Sock, maps:remove(ctrl, MsgHdr), Flags);
+%% Ctrl2 ->
+%% send(Sock, MsgHdr#{ctrl = Ctrl2}, Flags)
+%% end.
+
+%% encode_cmsghdrs([]) ->
+%% undefined;
+%% encode_cmsghdrs(Hdrs) ->
+%% encode_cmsghdrs(Hdrs, []).
+
+%% encode_cmsghdrs([], Acc) ->
+%% list_to_binary(lists:reverse(Acc));
+%% encode_cmsghdrs([H|T], Acc) when is_binary(H) ->
+%% encode_cmsghdrs(T, [H|Acc]);
+%% encode_cmsghdrs([#{level := Level,
+%% type := Type,
+%% data := Data} | T], Acc) ->
+%% case nif_encode_cmsghdr(Level, Type, Data) of
+%% {ok, Bin} when is_binary(Bin) ->
+%% encode_cmsghdrs(T, [Bin | Acc]);
+%% {error, _} = ERROR ->
+%% ERROR
+%% end.
+
+
%% ===========================================================================
%%
@@ -1778,14 +1891,40 @@ do_recvfrom(SockRef, BufSz, EFlags, Timeout) ->
%% ---------------------------------------------------------------------------
%%
+-spec recvmsg(Socket) -> {ok, MsgHdr} | {error, Reason} when
+ Socket :: socket(),
+ MsgHdr :: msghdr(),
+ Reason :: term().
+
recvmsg(Socket) ->
recvmsg(Socket, 0, 0, ?SOCKET_RECV_FLAGS_DEFAULT, ?SOCKET_RECV_TIMEOUT_DEFAULT).
+-spec recvmsg(Socket, Flags) -> {ok, MsgHdr} | {error, Reason} when
+ Socket :: socket(),
+ Flags :: recv_flags(),
+ MsgHdr :: msghdr(),
+ Reason :: term()
+ ; (Socket, Timeout) -> {ok, MsgHdr} | {error, Reason} when
+ Socket :: socket(),
+ Timeout :: timeout(),
+ MsgHdr :: msghdr(),
+ Reason :: term().
+
recvmsg(Socket, Flags) when is_list(Flags) ->
recvmsg(Socket, 0, 0, Flags, ?SOCKET_RECV_TIMEOUT_DEFAULT);
recvmsg(Socket, Timeout) ->
recvmsg(Socket, 0, 0, ?SOCKET_RECV_FLAGS_DEFAULT, Timeout).
+-spec recvmsg(Socket, Flags, Timeout) -> {ok, MsgHdr} | {error, Reason} when
+ Socket :: socket(),
+ Flags :: recv_flags(),
+ Timeout :: timeout(),
+ MsgHdr :: msghdr(),
+ Reason :: term().
+
+recvmsg(Socket, Flags, Timeout) ->
+ recvmsg(Socket, 0, 0, Flags, Timeout).
+
-spec recvmsg(Socket,
BufSz, CtrlSz,
Flags, Timeout) -> {ok, MsgHdr} | {error, Reason} when
@@ -3246,6 +3385,9 @@ nif_send(_SockRef, _SendRef, _Data, _Flags) ->
nif_sendto(_SRef, _SendRef, _Data, _Dest, _Flags) ->
erlang:error(badarg).
+nif_sendmsg(_SRef, _SendRef, _MsgHdr, _Flags) ->
+ erlang:error(badarg).
+
nif_recv(_SRef, _RecvRef, _Length, _Flags) ->
erlang:error(badarg).
diff --git a/lib/kernel/test/socket_client.erl b/lib/kernel/test/socket_client.erl
index 094b7eebc5..56424457e0 100644
--- a/lib/kernel/test/socket_client.erl
+++ b/lib/kernel/test/socket_client.erl
@@ -71,6 +71,10 @@ start(Domain, Type, Proto, Addr, Port) ->
SA = #{family => Domain,
addr => Addr,
port => Port},
+ %% The way we use tos only works because we
+ %% send so few messages (a new value for every
+ %% message).
+ put(tos, 1),
do_start(Domain, Type, Proto, SA).
do_start(Domain, stream = Type, Proto, SA) ->
@@ -186,6 +190,7 @@ do_init(Domain, stream = Type, Proto) ->
i("try (socket) bind"),
case socket:bind(Sock, any) of
{ok, _P} ->
+ ok = socket:setopt(Sock, ip, tos, mincost),
Sock;
{error, BReason} ->
throw({bind, BReason})
@@ -271,7 +276,15 @@ send(#client{socket = Sock, type = dgram, dest = Dest}, Msg) ->
%% i("try send to: "
%% "~n ~p", [Dest]),
%% ok = socket:setopt(Sock, otp, debug, true),
- socket:sendto(Sock, Msg, Dest).
+ TOS = get(tos),
+ ok = socket:setopt(Sock, ip, tos, TOS),
+ case socket:sendto(Sock, Msg, Dest) of
+ ok = OK ->
+ put(tos, TOS+1),
+ OK;
+ {error, _} = ERROR ->
+ ERROR
+ end.
recv(#client{socket = Sock, type = stream}) ->
case socket:recv(Sock) of
diff --git a/lib/kernel/test/socket_server.erl b/lib/kernel/test/socket_server.erl
index f252be1683..34f354be32 100644
--- a/lib/kernel/test/socket_server.erl
+++ b/lib/kernel/test/socket_server.erl
@@ -900,8 +900,18 @@ peek_recvfrom(Sock, BufSz) ->
end.
+send(#handler{socket = Sock, msg = true, type = stream}, Msg, _) ->
+ MsgHdr = #{iov => [Msg]},
+ socket:sendmsg(Sock, MsgHdr);
send(#handler{socket = Sock, type = stream}, Msg, _) ->
socket:send(Sock, Msg);
+send(#handler{socket = Sock, msg = true, type = dgram}, Msg, Dest) ->
+ MsgHdr = #{addr => Dest,
+ iov => [Msg]},
+ %% ok = socket:setopt(Sock, otp, debug, true),
+ Res = socket:sendmsg(Sock, MsgHdr),
+ %% ok = socket:setopt(Sock, otp, debug, false),
+ Res;
send(#handler{socket = Sock, type = dgram}, Msg, Dest) ->
socket:sendto(Sock, Msg, Dest).