aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMicael Karlberg <[email protected]>2018-04-13 20:50:42 +0200
committerMicael Karlberg <[email protected]>2018-09-18 13:01:37 +0200
commit5920705deb70a44311e1b7552cfa73553f284164 (patch)
tree14bd698c6a81e0b65d47f9eec5699e5f64ad72d3
parent2d57ebfc6fb723a476fdcffbb366558a6fa18844 (diff)
downloadotp-5920705deb70a44311e1b7552cfa73553f284164.tar.gz
otp-5920705deb70a44311e1b7552cfa73553f284164.tar.bz2
otp-5920705deb70a44311e1b7552cfa73553f284164.zip
[socket-nif] Implemented sendto
Still not handling queue'ing of multiple send requests.
-rw-r--r--erts/emulator/nifs/common/socket_nif.c777
-rw-r--r--erts/preloaded/src/socket.erl90
2 files changed, 611 insertions, 256 deletions
diff --git a/erts/emulator/nifs/common/socket_nif.c b/erts/emulator/nifs/common/socket_nif.c
index bf9179d857..46c5c696e2 100644
--- a/erts/emulator/nifs/common/socket_nif.c
+++ b/erts/emulator/nifs/common/socket_nif.c
@@ -340,6 +340,7 @@ typedef union {
enif_get_atom((E), (TE), (BP), (MAX), ERL_NIF_LATIN1)
#define GET_BIN(E, TE, BP) enif_inspect_iolist_as_binary((E), (TE), (BP))
#define GET_INT(E, TE, IP) enif_get_int((E), (TE), (IP))
+#define GET_UINT(E, TE, IP) enif_get_uint((E), (TE), (IP))
#define GET_TUPLE(E, TE, TSZ, TA) enif_get_tuple((E), (TE), (TSZ), (TA))
@@ -366,6 +367,8 @@ typedef union {
#define sock_open(domain, type, proto) \
make_noninheritable_handle(socket((domain), (type), (proto)))
#define sock_send(s,buf,len,flag) send((s),(buf),(len),(flag))
+#define sock_sendto(s,buf,blen,flag,addr,alen) \
+ sendto((s),(buf),(blen),(flag),(addr),(alen))
#define sock_errno() WSAGetLastError()
#define sock_create_event(s) WSACreateEvent()
@@ -393,6 +396,8 @@ static unsigned long one_value = 1;
#define sock_name(s, addr, len) getsockname((s), (addr), (len))
#define sock_open(domain, type, proto) socket((domain), (type), (proto))
#define sock_send(s,buf,len,flag) send((s), (buf), (len), (flag))
+#define sock_sendto(s,buf,blen,flag,addr,alen) \
+ sendto((s),(buf),(blen),(flag),(addr),(alen))
#define sock_errno() errno
#define sock_create_event(s) (s) /* return file descriptor */
@@ -510,6 +515,39 @@ typedef struct {
} SocketData;
+/* typedef struct socket_queue_element { */
+/* struct socket_queue_element* next; */
+/* /\* */
+/* unsigned int tag; */
+/* union { */
+/* SocketAcceptor acceptor; */
+/* } u; */
+/* *\/ */
+/* SocketAcceptor acceptor; */
+/* } SocketQueueElement; */
+
+/* typedef struct socket_queue { */
+/* SocketQueueElement* first; */
+/* SocketQueueElement* last; */
+/* } SocketQueue; */
+
+/* Macros for defining the various queues (accept, send receive) */
+#define SOCKET_QUEUE_ELEMENT(QE,QEP) \
+ typedef struct socket_queue_element_##QEP { \
+ struct socket_queue_element_##QEP* next; \
+ QE elem; \
+ } QE##Element;
+
+#define SOCKET_QUEUE(QE,Q) \
+ typedef struct { \
+ QE* first; \
+ QE* last; \
+ } Q;
+
+/* The Acceptor Queue types */
+SOCKET_QUEUE_ELEMENT(SocketAcceptor, acceptor);
+SOCKET_QUEUE(SocketAcceptorElement, SocketAcceptQueue);
+
/* ----------------------------------------------------------------------
* F o r w a r d s
* ----------------------------------------------------------------------
@@ -576,30 +614,6 @@ static ERL_NIF_TERM nif_cancel(ErlNifEnv* env,
const ERL_NIF_TERM argv[]);
-static char* decode_laddress(ErlNifEnv* env,
- int domain,
- ERL_NIF_TERM localAddr,
- SocketAddress* localP);
-static char* decode_laddress_binary(ErlNifEnv* env,
- int domain,
- ERL_NIF_TERM localAddr,
- SocketAddress* localP);
-static char* decode_laddress_tuple(ErlNifEnv* env,
- int domain,
- ERL_NIF_TERM laddr,
- SocketAddress* localP);
-static char* decode_address_tuple(ErlNifEnv* env,
- int domain,
- const ERL_NIF_TERM* addrt,
- int port,
- SocketAddress* localP);
-static char* decode_address_atom(ErlNifEnv* env,
- int domain,
- char* addr,
- int addrLen,
- int port,
- SocketAddress* localP);
-
static ERL_NIF_TERM nopen(ErlNifEnv* env,
int domain,
int type,
@@ -629,9 +643,56 @@ static ERL_NIF_TERM nsend(ErlNifEnv* env,
ERL_NIF_TERM sendRef,
ErlNifBinary* dataP,
int flags);
+static ERL_NIF_TERM nsendto(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ ERL_NIF_TERM sendRef,
+ ErlNifBinary* dataP,
+ int flags,
+ SocketAddress* toAddrP);
+
+static ERL_NIF_TERM send_check_result(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ ssize_t written,
+ ssize_t dataSize,
+ ERL_NIF_TERM sendRef);
+
static ERL_NIF_TERM nfinalize_connection(ErlNifEnv* env,
SocketDescriptor* descP);
+
+static char* decode_laddress(ErlNifEnv* env,
+ int domain,
+ ERL_NIF_TERM localAddr,
+ SocketAddress* localP);
+static char* decode_laddress_binary(ErlNifEnv* env,
+ int domain,
+ ERL_NIF_TERM localAddr,
+ SocketAddress* localP);
+static char* decode_laddress_tuple(ErlNifEnv* env,
+ int domain,
+ ERL_NIF_TERM laddr,
+ SocketAddress* localP);
+static char* decode_address_tuple(ErlNifEnv* env,
+ int domain,
+ const ERL_NIF_TERM* addrt,
+ int port,
+ SocketAddress* localP);
+static char* decode_address_atom(ErlNifEnv* env,
+ int domain,
+ char* addr,
+ int addrLen,
+ int port,
+ SocketAddress* localP);
+static char* decode_send_addr(ErlNifEnv* env,
+ int domain,
+ ERL_NIF_TERM addr,
+ int port,
+ SocketAddress** toAddrP);
+static char* decode_send_addr_tuple(ErlNifEnv* env,
+ int domain,
+ ERL_NIF_TERM addr,
+ int port,
+ SocketAddress* toAddrP);
static BOOLEAN_T verify_is_connected(SocketDescriptor* descP, int* err);
static SocketDescriptor* alloc_descriptor(SOCKET sock, HANDLE event);
@@ -640,6 +701,8 @@ static int compare_pids(ErlNifEnv* env,
const ErlNifPid* pid1,
const ErlNifPid* pid2);
+
+
static BOOLEAN_T edomain2domain(int edomain, int* domain);
static BOOLEAN_T etype2type(int etype, int* type);
static BOOLEAN_T eproto2proto(int eproto, int* proto);
@@ -965,7 +1028,7 @@ ERL_NIF_TERM nopen(ErlNifEnv* env,
*/
SELECT(env,
event,
- ERL_NIF_SELECT_READ,
+ (ERL_NIF_SELECT_READ),
descP, NULL, atom_undefined);
#endif
@@ -1115,6 +1178,7 @@ ERL_NIF_TERM nif_bind(ErlNifEnv* env,
/* Make sure we are ready
* Not sure how this would even happen, but...
*/
+ /* WHY NOT !IS_OPEN(...) */
if (descP->state != SOCKET_STATE_OPEN)
return make_error(env, atom_exbadstate);
@@ -1308,7 +1372,7 @@ char* decode_laddress_tuple(ErlNifEnv* env,
unsigned int len;
char a[16]; // Just in case...
- if (!(GET_ATOM_LEN(env, laddrt[1], &len) &&
+ if (!(GET_ATOM_LEN(env, laddrt[0], &len) &&
(len > 0) &&
(len <= (sizeof("loopback")))))
return str_einval;
@@ -1327,154 +1391,6 @@ char* decode_laddress_tuple(ErlNifEnv* env,
}
-/* Decode the 4- or 8-element address tuple
- * and initiate the socket address structure.
- */
-static
-char* decode_address_tuple(ErlNifEnv* env,
- int domain,
- const ERL_NIF_TERM* addrt,
- int port,
- SocketAddress* addrP)
-{
-
- /* We now *know* that the size of the tuple is correct,
- * so we don't need to check anything here, just unpack.
- */
-
- switch (domain) {
- case AF_INET:
- {
- int a, v;
- char laddr[4];
-
- sys_memzero((char*)&addrP->u, sizeof(struct sockaddr_in));
-#ifndef NO_SA_LEN
- addrP->u.sai.sin_len = sizeof(struct sockaddr_in);
-#endif
- addrP->u.sai.sin_family = domain;
- addrP->u.sai.sin_port = sock_htons(port);
- for (a = 0; a < 4; a++) {
- if (!GET_INT(env, addrt[a], &v))
- return str_einval;
- laddr[a] = v;
- }
- sys_memcpy(&addrP->u.sai.sin_addr, &laddr, sizeof(laddr));
- addrP->len = sizeof(struct sockaddr_in);
- return NULL;
- }
- break;
-
-#if defined(HAVE_IN6) && defined(AF_INET6)
- case AF_INET6:
- {
- int a, v;
- char laddr[16];
-
- sys_memzero((char*)&addrP->u, sizeof(struct sockaddr_in6));
-#ifndef NO_SA_LEN
- addrP->u.sai6.sin6_len = sizeof(struct sockaddr_in6);
-#endif
- addrP->u.sai6.sin6_family = domain;
- addrP->u.sai6.sin6_port = sock_htons(port);
- addrP->u.sai6.sin6_flowinfo = 0;
- /* The address tuple is of size 8
- * and each element is a two byte integer
- */
- for (a = 0; a < 8; a++) {
- if (!GET_INT(env, addrt[a], &v))
- return str_einval;
- laddr[a*2 ] = ((v >> 8) & 0xFF);
- laddr[a*2+1] = (v & 0xFF);
- }
- sys_memcpy(&addrP->u.sai6.sin6_addr, &laddr, sizeof(laddr));
- addrP->len = sizeof(struct sockaddr_in6);
- return NULL;
- }
- break;
-#endif
-
- } /* switch (domain) */
-
- return str_eafnosupport;
-
-}
-
-
-/* Decode the address when its an atom.
- * Currently we only accept two atoms: 'any' and 'loopback'
- */
-static
-char* decode_address_atom(ErlNifEnv* env,
- int domain,
- char* addr,
- int addrLen,
- int port,
- SocketAddress* localP)
-{
- BOOLEAN_T any;
-
- if (strncmp(addr, "any", addrLen) == 0) {
- any = TRUE;
- } if (strncmp(addr, "loopback", addrLen) == 0) {
- any = FALSE;
- } else {
- return str_einval;
- }
-
- /* If we get this far, we *know* its either 'any' or 'loopback' */
-
- switch (domain) {
- case AF_INET:
- {
- struct in_addr addr;
- if (any) {
- addr.s_addr = sock_htonl(INADDR_ANY);
- } else {
- addr.s_addr = sock_htonl(INADDR_LOOPBACK);
- }
- sys_memzero((char*) localP, sizeof(struct sockaddr_in));
-#ifndef NO_SA_LEN
- localP->u.sai.sin_len = sizeof(struct sockaddr_in6);
-#endif
- localP->u.sai.sin_family = domain;
- localP->u.sai.sin_port = sock_htons(port);
- localP->u.sai.sin_addr.s_addr = addr.s_addr;
- localP->len = sizeof(struct sockaddr_in);
- }
- break;
-
-#if defined(HAVE_IN6) && defined(AF_INET6)
- case AF_INET6:
- {
- const struct in6_addr* paddr;
- if (any) {
- paddr = &in6addr_any;
- } else {
- paddr = &in6addr_loopback;
- }
- sys_memzero((char*)localP, sizeof(struct sockaddr_in6));
-#ifndef NO_SA_LEN
- localP->u.sai6.sin6_len = sizeof(struct sockaddr_in6);
-#endif
- localP->u.sai6.sin6_family = domain;
- localP->u.sai6.sin6_port = sock_htons(port);
- localP->u.sai6.sin6_flowinfo = 0;
- localP->u.sai6.sin6_addr = *paddr;
- localP->len = sizeof(struct sockaddr_in6);
- }
- break;
-#endif
-
- default:
- return str_einval;
- break;
- }
-
- return NULL;
-}
-
-
/* ----------------------------------------------------------------------
* nif_connect
@@ -2028,50 +1944,6 @@ ERL_NIF_TERM naccept_accepting(ErlNifEnv* env,
-/* *** alloc_descriptor ***
- * Allocate and perform basic initialization of a socket descriptor.
- *
- */
-static
-SocketDescriptor* alloc_descriptor(SOCKET sock, HANDLE event)
-{
- SocketDescriptor* descP;
-
- if ((descP = enif_alloc_resource(sockets, sizeof(SocketDescriptor))) != NULL) {
- char buf[64]; /* Buffer used for building the mutex name */
-
- sprintf(buf, "socket[w,%d]", sock);
- descP->writeMtx = MCREATE(buf);
-
- sprintf(buf, "socket[r,%d]", sock);
- descP->readMtx = MCREATE(buf);
-
- sprintf(buf, "socket[acc,%d]", sock);
- descP->accMtx = MCREATE(buf);
-
- descP->dbg = SOCKET_DEBUG_DEFAULT;
- descP->isWritable = TRUE;
- descP->isReadable = TRUE;
- descP->writePkgCnt = 0;
- descP->writeByteCnt = 0;
- descP->writeTries = 0;
- descP->writeWaits = 0;
- descP->writeFails = 0;
- descP->readPkgCnt = 0;
- descP->readByteCnt = 0;
- descP->readTries = 0;
- descP->readWaits = 0;
-
- descP->sock = sock;
- descP->event = event;
-
- }
-
- return descP;
-}
-
-
-
/* ----------------------------------------------------------------------
* nif_send
*
@@ -2080,7 +1952,7 @@ SocketDescriptor* alloc_descriptor(SOCKET sock, HANDLE event)
*
* Arguments:
* Socket (ref) - Points to the socket descriptor.
- * Sendref - A unique id for this (send) request.
+ * SendRef - A unique id for this (send) request.
* Data - The data to send in the form of a IOVec.
* Flags - Send flags.
*/
@@ -2101,8 +1973,8 @@ ERL_NIF_TERM nif_send(ErlNifEnv* env,
if ((argc != 4) ||
!enif_get_resource(env, argv[0], sockets, (void**) &descP) ||
- !enif_inspect_iolist_as_binary(env, argv[2], &data) ||
- !enif_get_uint(env, argv[3], &eflags)) {
+ !GET_BIN(env, argv[2], &data) ||
+ !GET_UINT(env, argv[3], &eflags)) {
return enif_make_badarg(env);
}
sendRef = argv[1];
@@ -2115,6 +1987,19 @@ ERL_NIF_TERM nif_send(ErlNifEnv* env,
MLOCK(descP->writeMtx);
+ /* We need to handle the case when another process tries
+ * to write at the same time.
+ * If the current write could not write its entire package
+ * this time (resulting in an select). The write of the
+ * other process must be made to wait until current
+ * is done!
+ * Basically, we need a write queue!
+ *
+ * A 'writing' field (boolean), which is set if we did
+ * not manage to write the entire message and reset every
+ * time we do.
+ */
+
res = nsend(env, descP, sendRef, &data, flags);
MUNLOCK(descP->writeMtx);
@@ -2137,7 +2022,6 @@ ERL_NIF_TERM nsend(ErlNifEnv* env,
ErlNifBinary* dataP,
int flags)
{
- int save_errno;
ssize_t written;
if (!descP->isWritable)
@@ -2150,48 +2034,117 @@ ERL_NIF_TERM nsend(ErlNifEnv* env,
written = sock_send(descP->sock, dataP->data, dataP->size, flags);
- if (written == dataP->size) {
+ return send_check_result(env, descP, written, dataP->size, sendRef);
- cnt_inc(&descP->writePkgCnt, 1);
- cnt_inc(&descP->writeByteCnt, written);
+}
- return atom_ok;
- } else if (written < 0) {
+/* ----------------------------------------------------------------------
+ * nif_sendto
+ *
+ * Description:
+ * Send a message on a socket
+ *
+ * Arguments:
+ * Socket (ref) - Points to the socket descriptor.
+ * SendRef - A unique id for this (send) request.
+ * Data - The data to send in the form of a IOVec.
+ * Flags - Send flags.
+ * DestAddr - Destination address.
+ * DestPort - Destination Port.
+ */
- /* Ouch, check what kind of failure */
- save_errno = sock_errno();
- if ((save_errno != EAGAIN) &&
- (save_errno != EINTR)) {
+static
+ERL_NIF_TERM nif_sendto(ErlNifEnv* env,
+ int argc,
+ const ERL_NIF_TERM argv[])
+{
+ SocketDescriptor* descP;
+ ERL_NIF_TERM sendRef;
+ ErlNifBinary data;
+ unsigned int eflags;
+ int flags;
+ ERL_NIF_TERM addr;
+ int port;
+ SocketAddress remoteAddr;
+ SocketAddress* remoteAddrP = &remoteAddr;
+ char* xerr;
+ // ERL_NIF_TERM res;
- cnt_inc(&descP->writeFails, 1);
+ /* Extract arguments and perform preliminary validation */
- return make_error2(env, save_errno);
+ if ((argc != 6) ||
+ !enif_get_resource(env, argv[0], sockets, (void**) &descP) ||
+ !GET_BIN(env, argv[2], &data) ||
+ !GET_UINT(env, argv[3], &eflags) ||
+ !GET_INT(env, argv[5], &port)) {
+ return enif_make_badarg(env);
+ }
+ sendRef = argv[1];
+ addr = argv[4];
- } else {
+ /* THIS TEST IS NOT CORRECT!!! */
+ if (!IS_OPEN(descP))
+ return make_error(env, atom_einval);
- /* Ok, try again later */
+ if (!esendflags2sendflags(eflags, &flags))
+ return enif_make_badarg(env);
- written = 0;
+ if ((xerr = decode_send_addr(env, descP->domain,
+ addr, port,
+ &remoteAddrP)) != NULL)
+ return make_error1(env, xerr);
- }
- }
+ return nsendto(env, descP, sendRef, &data, flags, remoteAddrP);
+}
- /* We failed to write the *entire* packet (anything less then size
- * of the packet, which is 0 <= written < sizeof packet),
- * so schedule the rest for later.
- */
- cnt_inc(&descP->writeWaits, 1);
+static
+ERL_NIF_TERM nsendto(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ ERL_NIF_TERM sendRef,
+ ErlNifBinary* dataP,
+ int flags,
+ SocketAddress* toAddrP)
+{
+ ssize_t written;
- SELECT(env, descP->sock, (ERL_NIF_SELECT_WRITE),
- descP, NULL, sendRef);
+ if (!descP->isWritable)
+ return enif_make_badarg(env);
- return make_ok(env, enif_make_int(env, written));
+ /* We ignore the wrap for the moment.
+ * Maybe we should issue a wrap-message to controlling process...
+ */
+ cnt_inc(&descP->writeTries, 1);
+
+ if (toAddrP != NULL) {
+ written = sock_sendto(descP->sock,
+ dataP->data, dataP->size, flags,
+ &toAddrP->u.sa, toAddrP->len);
+ } else {
+ written = sock_sendto(descP->sock,
+ dataP->data, dataP->size, flags,
+ NULL, 0);
+ }
+ return send_check_result(env, descP, written, dataP->size, sendRef);
}
+
+/* ----------------------------------------------------------------------
+ * nif_writev / nif_sendv
+ *
+ * Description:
+ * Send a message (vector) on a socket
+ *
+ * Arguments:
+ * Socket (ref) - Points to the socket descriptor.
+ * SendRef - A unique id for this (send) request.
+ * Data - A vector of binaries
+ * Flags - Send flags.
+ */
+
#ifdef FOBAR
static
ERL_NIF_TERM nwritev(ErlNifEnv* env,
@@ -2247,11 +2200,343 @@ ERL_NIF_TERM nwritev(ErlNifEnv* env,
+
+
+
/* ----------------------------------------------------------------------
* U t i l i t y F u n c t i o n s
* ----------------------------------------------------------------------
*/
+static
+ERL_NIF_TERM send_check_result(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ ssize_t written,
+ ssize_t dataSize,
+ ERL_NIF_TERM sendRef)
+{
+ if (written == dataSize) {
+
+ cnt_inc(&descP->writePkgCnt, 1);
+ cnt_inc(&descP->writeByteCnt, written);
+
+ return atom_ok;
+
+ } else if (written < 0) {
+
+ /* Ouch, check what kind of failure */
+ int save_errno = sock_errno();
+ if ((save_errno != EAGAIN) &&
+ (save_errno != EINTR)) {
+
+ cnt_inc(&descP->writeFails, 1);
+
+ return make_error2(env, save_errno);
+
+ } else {
+
+ /* Ok, try again later */
+
+ written = 0;
+
+ }
+ }
+
+ /* We failed to write the *entire* packet (anything less then size
+ * of the packet, which is 0 <= written < sizeof packet),
+ * so schedule the rest for later.
+ */
+
+ cnt_inc(&descP->writeWaits, 1);
+
+ SELECT(env, descP->sock, (ERL_NIF_SELECT_WRITE),
+ descP, NULL, sendRef);
+
+ return make_ok(env, enif_make_int(env, written));
+
+}
+
+
+/* The rather odd thing about the 'toAddrP' (the **) is
+ * because we need to be able to return a NULL pointer,
+ * in the case of the dest address is the atom 'null'.
+ * Its possible to call the sendto function with the
+ * args NULL (address) and 0 (port number).
+ *
+ * This function whouls really have a char* return value
+ * type!!
+ */
+static
+char* decode_send_addr(ErlNifEnv* env,
+ int domain,
+ ERL_NIF_TERM addr,
+ int port,
+ SocketAddress** toAddrP)
+{
+ if (IS_ATOM(env, addr)) {
+ unsigned int len;
+ char a[16]; // Just in case...
+
+ /* The only acceptable value is the atom 'null' */
+
+ if (!(GET_ATOM_LEN(env, addr, &len) &&
+ (len > 0) &&
+ (len <= (sizeof("null")))))
+ return str_einval;
+
+ if (!GET_ATOM(env, addr, a, sizeof(a)))
+ return str_einval;
+
+ *toAddrP = NULL;
+ if (strncmp(a, "null", len) == 0)
+ return NULL;
+ else
+ return str_einval;
+
+ } else if (IS_TUPLE(env, addr)) {
+ /* We now know that the we have a proper address. */
+ return decode_send_addr_tuple(env, domain, addr, port, *toAddrP);
+ } else {
+ return str_einval;
+ }
+}
+
+
+static
+char* decode_send_addr_tuple(ErlNifEnv* env,
+ int domain,
+ ERL_NIF_TERM addr,
+ int port,
+ SocketAddress* toAddrP)
+{
+ /* We handle two different tuples:
+ * - size 4 (INET)
+ * - size 8 (INET6)
+ */
+
+ const ERL_NIF_TERM* addrt;
+ int addrtSz;
+
+ if (!GET_TUPLE(env, addr, &addrtSz, &addrt))
+ return str_einval; // PLACEHOLDER
+
+ switch (domain) {
+ case AF_INET:
+ if (addrtSz != 4)
+ return str_einval;
+ break;
+
+#if defined(HAVE_IN6) && defined(AF_INET6)
+ case AF_INET6:
+ if (addrtSz != 8)
+ return str_einval;
+ break;
+#endif
+
+ default:
+ return str_eafnosupport;
+ break;
+ }
+
+ return decode_address_tuple(env, domain,
+ addrt, port,
+ toAddrP);
+
+}
+
+
+/* Decode the 4- or 8-element address tuple
+ * and initiate the socket address structure.
+ */
+static
+char* decode_address_tuple(ErlNifEnv* env,
+ int domain,
+ const ERL_NIF_TERM* addrt,
+ int port,
+ SocketAddress* addrP)
+{
+
+ /* We now *know* that the size of the tuple is correct,
+ * so we don't need to check anything here, just unpack.
+ */
+
+ switch (domain) {
+ case AF_INET:
+ {
+ int a, v;
+ char laddr[4];
+
+ sys_memzero((char*)&addrP->u, sizeof(struct sockaddr_in));
+#ifndef NO_SA_LEN
+ addrP->u.sai.sin_len = sizeof(struct sockaddr_in);
+#endif
+ addrP->u.sai.sin_family = domain;
+ addrP->u.sai.sin_port = sock_htons(port);
+ for (a = 0; a < 4; a++) {
+ if (!GET_INT(env, addrt[a], &v))
+ return str_einval;
+ laddr[a] = v;
+ }
+ sys_memcpy(&addrP->u.sai.sin_addr, &laddr, sizeof(laddr));
+ addrP->len = sizeof(struct sockaddr_in);
+ return NULL;
+ }
+ break;
+
+#if defined(HAVE_IN6) && defined(AF_INET6)
+ case AF_INET6:
+ {
+ int a, v;
+ char laddr[16];
+
+ sys_memzero((char*)&addrP->u, sizeof(struct sockaddr_in6));
+#ifndef NO_SA_LEN
+ addrP->u.sai6.sin6_len = sizeof(struct sockaddr_in6);
+#endif
+ addrP->u.sai6.sin6_family = domain;
+ addrP->u.sai6.sin6_port = sock_htons(port);
+ addrP->u.sai6.sin6_flowinfo = 0;
+ /* The address tuple is of size 8
+ * and each element is a two byte integer
+ */
+ for (a = 0; a < 8; a++) {
+ if (!GET_INT(env, addrt[a], &v))
+ return str_einval;
+ laddr[a*2 ] = ((v >> 8) & 0xFF);
+ laddr[a*2+1] = (v & 0xFF);
+ }
+ sys_memcpy(&addrP->u.sai6.sin6_addr, &laddr, sizeof(laddr));
+ addrP->len = sizeof(struct sockaddr_in6);
+ return NULL;
+ }
+ break;
+#endif
+
+ } /* switch (domain) */
+
+ return str_eafnosupport;
+
+}
+
+
+/* Decode the address when its an atom.
+ * Currently we only accept two atoms: 'any' and 'loopback'
+ */
+static
+char* decode_address_atom(ErlNifEnv* env,
+ int domain,
+ char* addr,
+ int addrLen,
+ int port,
+ SocketAddress* localP)
+{
+ BOOLEAN_T any;
+
+ if (strncmp(addr, "any", addrLen) == 0) {
+ any = TRUE;
+ } if (strncmp(addr, "loopback", addrLen) == 0) {
+ any = FALSE;
+ } else {
+ return str_einval;
+ }
+
+ /* If we get this far, we *know* its either 'any' or 'loopback' */
+
+ switch (domain) {
+ case AF_INET:
+ {
+ struct in_addr addr;
+ if (any) {
+ addr.s_addr = sock_htonl(INADDR_ANY);
+ } else {
+ addr.s_addr = sock_htonl(INADDR_LOOPBACK);
+ }
+ sys_memzero((char*) localP, sizeof(struct sockaddr_in));
+#ifndef NO_SA_LEN
+ localP->u.sai.sin_len = sizeof(struct sockaddr_in6);
+#endif
+ localP->u.sai.sin_family = domain;
+ localP->u.sai.sin_port = sock_htons(port);
+ localP->u.sai.sin_addr.s_addr = addr.s_addr;
+ localP->len = sizeof(struct sockaddr_in);
+ }
+ break;
+
+#if defined(HAVE_IN6) && defined(AF_INET6)
+ case AF_INET6:
+ {
+ const struct in6_addr* paddr;
+ if (any) {
+ paddr = &in6addr_any;
+ } else {
+ paddr = &in6addr_loopback;
+ }
+ sys_memzero((char*)localP, sizeof(struct sockaddr_in6));
+#ifndef NO_SA_LEN
+ localP->u.sai6.sin6_len = sizeof(struct sockaddr_in6);
+#endif
+ localP->u.sai6.sin6_family = domain;
+ localP->u.sai6.sin6_port = sock_htons(port);
+ localP->u.sai6.sin6_flowinfo = 0;
+ localP->u.sai6.sin6_addr = *paddr;
+ localP->len = sizeof(struct sockaddr_in6);
+ }
+ break;
+#endif
+
+ default:
+ return str_einval;
+ break;
+ }
+
+ return NULL;
+}
+
+
+
+/* *** alloc_descriptor ***
+ * Allocate and perform basic initialization of a socket descriptor.
+ *
+ */
+static
+SocketDescriptor* alloc_descriptor(SOCKET sock, HANDLE event)
+{
+ SocketDescriptor* descP;
+
+ if ((descP = enif_alloc_resource(sockets, sizeof(SocketDescriptor))) != NULL) {
+ char buf[64]; /* Buffer used for building the mutex name */
+
+ sprintf(buf, "socket[w,%d]", sock);
+ descP->writeMtx = MCREATE(buf);
+
+ sprintf(buf, "socket[r,%d]", sock);
+ descP->readMtx = MCREATE(buf);
+
+ sprintf(buf, "socket[acc,%d]", sock);
+ descP->accMtx = MCREATE(buf);
+
+ descP->dbg = SOCKET_DEBUG_DEFAULT;
+ descP->isWritable = TRUE;
+ descP->isReadable = TRUE;
+ descP->writePkgCnt = 0;
+ descP->writeByteCnt = 0;
+ descP->writeTries = 0;
+ descP->writeWaits = 0;
+ descP->writeFails = 0;
+ descP->readPkgCnt = 0;
+ descP->readByteCnt = 0;
+ descP->readTries = 0;
+ descP->readWaits = 0;
+
+ descP->sock = sock;
+ descP->event = event;
+
+ }
+
+ return descP;
+}
+
+
/* compare_pids - Test if two pids are equal
*
*/
diff --git a/erts/preloaded/src/socket.erl b/erts/preloaded/src/socket.erl
index 985b45a956..0a78feab4e 100644
--- a/erts/preloaded/src/socket.erl
+++ b/erts/preloaded/src/socket.erl
@@ -135,6 +135,26 @@
-type setopt_key() :: foo.
-type getopt_key() :: foo.
+-record(msg_hdr,
+ {
+ %% Optional address
+ %% On an unconnected socket this is used to specify the target
+ %% address for a datagram.
+ %% For a connected socket, this field should be specified [].
+ name :: list(),
+
+ %% Scatter/gather array
+ iov :: [binary()], % iovec(),
+
+ %% Ancillary (control) data
+ ctrl :: binary(),
+
+ %% Unused
+ flags = [] :: list()
+ }).
+-type msg_hdr() :: #msg_hdr{}.
+
+
-define(SOCKET_DOMAIN_LOCAL, 1).
-define(SOCKET_DOMAIN_UNIX, ?SOCKET_DOMAIN_LOCAL).
-define(SOCKET_DOMAIN_INET, 2).
@@ -542,25 +562,75 @@ do_send(SockRef, SendRef, Data, EFlags, Timeout) ->
%% Do we need a timeout argument here also?
%%
--spec sendto(Socket, Data, Flags, DestAddr, Port) -> ok | {error, Reason} when
+sendto(Socket, Data, Flags, DestAddr, DestPort) ->
+ sendto(Socket, Data, Flags, DestAddr, DestPort, infinity).
+
+-spec sendto(Socket, Data, Flags, DestAddr, DestPort, Timeout) ->
+ ok | {error, Reason} when
Socket :: socket(),
Data :: binary(),
Flags :: send_flags(),
DestAddr :: null | ip_address(),
- Port :: port_number(),
+ DestPort :: port_number(),
+ Timeout :: timeout(),
Reason :: term().
-sendto({socket, _, SockRef}, Data, Flags, DestAddr, DestPort)
+sendto(Socket, Data, Flags, DestAddr, DestPort, Timeout) when is_list(Data) ->
+ Bin = erlang:list_to_binary(Data),
+ sendto(Socket, Bin, Flags, DestAddr, DestPort, Timeout);
+sendto(Socket, Data, Flags, DestAddr, DestPort, Timeout)
when is_binary(Data) andalso
is_list(Flags) andalso
- ((is_tuple(DestAddr) andalso
- ((size(DestAddr) =:= 4) orelse
- (size(DestAddr) =:= 8))) orelse
- (DestAddr =:= null)) andalso
- (is_integer(DestPort) andalso (DestPort >= 0)) ->
- %% We may need something like send/4 above?
+ (is_tuple(DestAddr) orelse (DestAddr =:= null)) andalso
+ is_integer(DestPort) andalso
+ (is_integer(Timeout) orelse (Timeout =:= infinity)) ->
EFlags = enc_send_flags(Flags),
- nif_sendto(SockRef, make_ref(), Data, EFlags, DestAddr, DestPort).
+ do_sendto(Socket, make_ref(), Data, EFlags, DestAddr, DestPort, Timeout).
+
+do_sendto(SockRef, SendRef, Data, _EFlags, _DestAddr, _DestPort, Timeout)
+ when (Timeout =< 0) ->
+ nif_cancel(SockRef, SendRef),
+ flush_select_msgs(SockRef, SendRef),
+ {error, {timeout, size(Data)}};
+do_sendto(SockRef, SendRef, Data, EFlags, DestAddr, DestPort, Timeout) ->
+ TS = timestamp(Timeout),
+ case nif_sendto(SockRef, SendRef, Data, DestAddr, DestPort, EFlags) of
+ ok ->
+ {ok, next_timeout(TS, Timeout)};
+ {ok, Written} ->
+ %% We are partially done, wait for continuation
+ receive
+ {select, SockRef, SendRef, ready_output} when (Written > 0) ->
+ <<_:Written/binary, Rest/binary>> = Data,
+ do_sendto(SockRef, make_ref(), Rest, EFlags,
+ DestAddr, DestPort,
+ next_timeout(TS, Timeout));
+ {select, SockRef, SendRef, ready_output} ->
+ do_sendto(SockRef, make_ref(), Data, EFlags,
+ DestAddr, DestPort,
+ next_timeout(TS, Timeout))
+ after Timeout ->
+ nif_cancel(SockRef, SendRef),
+ flush_select_msgs(SockRef, SendRef),
+ {error, timeout}
+ end;
+ {error, eagain} ->
+ %% Is this what we can expect?
+ %% If we have to wait because there is another ongoing write??
+ receive
+ {select, SockRef, SendRef, ready_output} ->
+ do_sendto(SockRef, SendRef, Data, EFlags,
+ DestAddr, DestPort,
+ next_timeout(TS, Timeout))
+ after Timeout ->
+ nif_cancel(SockRef, SendRef),
+ flush_select_msgs(SockRef, SendRef),
+ {error, timeout}
+ end;
+
+ {error, _} = ERROR ->
+ ERROR
+ end.