diff options
author | Micael Karlberg <[email protected]> | 2018-04-13 20:50:42 +0200 |
---|---|---|
committer | Micael Karlberg <[email protected]> | 2018-09-18 13:01:37 +0200 |
commit | 5920705deb70a44311e1b7552cfa73553f284164 (patch) | |
tree | 14bd698c6a81e0b65d47f9eec5699e5f64ad72d3 | |
parent | 2d57ebfc6fb723a476fdcffbb366558a6fa18844 (diff) | |
download | otp-5920705deb70a44311e1b7552cfa73553f284164.tar.gz otp-5920705deb70a44311e1b7552cfa73553f284164.tar.bz2 otp-5920705deb70a44311e1b7552cfa73553f284164.zip |
[socket-nif] Implemented sendto
Still not handling queue'ing of multiple send requests.
-rw-r--r-- | erts/emulator/nifs/common/socket_nif.c | 777 | ||||
-rw-r--r-- | erts/preloaded/src/socket.erl | 90 |
2 files changed, 611 insertions, 256 deletions
diff --git a/erts/emulator/nifs/common/socket_nif.c b/erts/emulator/nifs/common/socket_nif.c index bf9179d857..46c5c696e2 100644 --- a/erts/emulator/nifs/common/socket_nif.c +++ b/erts/emulator/nifs/common/socket_nif.c @@ -340,6 +340,7 @@ typedef union { enif_get_atom((E), (TE), (BP), (MAX), ERL_NIF_LATIN1) #define GET_BIN(E, TE, BP) enif_inspect_iolist_as_binary((E), (TE), (BP)) #define GET_INT(E, TE, IP) enif_get_int((E), (TE), (IP)) +#define GET_UINT(E, TE, IP) enif_get_uint((E), (TE), (IP)) #define GET_TUPLE(E, TE, TSZ, TA) enif_get_tuple((E), (TE), (TSZ), (TA)) @@ -366,6 +367,8 @@ typedef union { #define sock_open(domain, type, proto) \ make_noninheritable_handle(socket((domain), (type), (proto))) #define sock_send(s,buf,len,flag) send((s),(buf),(len),(flag)) +#define sock_sendto(s,buf,blen,flag,addr,alen) \ + sendto((s),(buf),(blen),(flag),(addr),(alen)) #define sock_errno() WSAGetLastError() #define sock_create_event(s) WSACreateEvent() @@ -393,6 +396,8 @@ static unsigned long one_value = 1; #define sock_name(s, addr, len) getsockname((s), (addr), (len)) #define sock_open(domain, type, proto) socket((domain), (type), (proto)) #define sock_send(s,buf,len,flag) send((s), (buf), (len), (flag)) +#define sock_sendto(s,buf,blen,flag,addr,alen) \ + sendto((s),(buf),(blen),(flag),(addr),(alen)) #define sock_errno() errno #define sock_create_event(s) (s) /* return file descriptor */ @@ -510,6 +515,39 @@ typedef struct { } SocketData; +/* typedef struct socket_queue_element { */ +/* struct socket_queue_element* next; */ +/* /\* */ +/* unsigned int tag; */ +/* union { */ +/* SocketAcceptor acceptor; */ +/* } u; */ +/* *\/ */ +/* SocketAcceptor acceptor; */ +/* } SocketQueueElement; */ + +/* typedef struct socket_queue { */ +/* SocketQueueElement* first; */ +/* SocketQueueElement* last; */ +/* } SocketQueue; */ + +/* Macros for defining the various queues (accept, send receive) */ +#define SOCKET_QUEUE_ELEMENT(QE,QEP) \ + typedef struct socket_queue_element_##QEP { \ + struct socket_queue_element_##QEP* next; \ + QE elem; \ + } QE##Element; + +#define SOCKET_QUEUE(QE,Q) \ + typedef struct { \ + QE* first; \ + QE* last; \ + } Q; + +/* The Acceptor Queue types */ +SOCKET_QUEUE_ELEMENT(SocketAcceptor, acceptor); +SOCKET_QUEUE(SocketAcceptorElement, SocketAcceptQueue); + /* ---------------------------------------------------------------------- * F o r w a r d s * ---------------------------------------------------------------------- @@ -576,30 +614,6 @@ static ERL_NIF_TERM nif_cancel(ErlNifEnv* env, const ERL_NIF_TERM argv[]); -static char* decode_laddress(ErlNifEnv* env, - int domain, - ERL_NIF_TERM localAddr, - SocketAddress* localP); -static char* decode_laddress_binary(ErlNifEnv* env, - int domain, - ERL_NIF_TERM localAddr, - SocketAddress* localP); -static char* decode_laddress_tuple(ErlNifEnv* env, - int domain, - ERL_NIF_TERM laddr, - SocketAddress* localP); -static char* decode_address_tuple(ErlNifEnv* env, - int domain, - const ERL_NIF_TERM* addrt, - int port, - SocketAddress* localP); -static char* decode_address_atom(ErlNifEnv* env, - int domain, - char* addr, - int addrLen, - int port, - SocketAddress* localP); - static ERL_NIF_TERM nopen(ErlNifEnv* env, int domain, int type, @@ -629,9 +643,56 @@ static ERL_NIF_TERM nsend(ErlNifEnv* env, ERL_NIF_TERM sendRef, ErlNifBinary* dataP, int flags); +static ERL_NIF_TERM nsendto(ErlNifEnv* env, + SocketDescriptor* descP, + ERL_NIF_TERM sendRef, + ErlNifBinary* dataP, + int flags, + SocketAddress* toAddrP); + +static ERL_NIF_TERM send_check_result(ErlNifEnv* env, + SocketDescriptor* descP, + ssize_t written, + ssize_t dataSize, + ERL_NIF_TERM sendRef); + static ERL_NIF_TERM nfinalize_connection(ErlNifEnv* env, SocketDescriptor* descP); + +static char* decode_laddress(ErlNifEnv* env, + int domain, + ERL_NIF_TERM localAddr, + SocketAddress* localP); +static char* decode_laddress_binary(ErlNifEnv* env, + int domain, + ERL_NIF_TERM localAddr, + SocketAddress* localP); +static char* decode_laddress_tuple(ErlNifEnv* env, + int domain, + ERL_NIF_TERM laddr, + SocketAddress* localP); +static char* decode_address_tuple(ErlNifEnv* env, + int domain, + const ERL_NIF_TERM* addrt, + int port, + SocketAddress* localP); +static char* decode_address_atom(ErlNifEnv* env, + int domain, + char* addr, + int addrLen, + int port, + SocketAddress* localP); +static char* decode_send_addr(ErlNifEnv* env, + int domain, + ERL_NIF_TERM addr, + int port, + SocketAddress** toAddrP); +static char* decode_send_addr_tuple(ErlNifEnv* env, + int domain, + ERL_NIF_TERM addr, + int port, + SocketAddress* toAddrP); static BOOLEAN_T verify_is_connected(SocketDescriptor* descP, int* err); static SocketDescriptor* alloc_descriptor(SOCKET sock, HANDLE event); @@ -640,6 +701,8 @@ static int compare_pids(ErlNifEnv* env, const ErlNifPid* pid1, const ErlNifPid* pid2); + + static BOOLEAN_T edomain2domain(int edomain, int* domain); static BOOLEAN_T etype2type(int etype, int* type); static BOOLEAN_T eproto2proto(int eproto, int* proto); @@ -965,7 +1028,7 @@ ERL_NIF_TERM nopen(ErlNifEnv* env, */ SELECT(env, event, - ERL_NIF_SELECT_READ, + (ERL_NIF_SELECT_READ), descP, NULL, atom_undefined); #endif @@ -1115,6 +1178,7 @@ ERL_NIF_TERM nif_bind(ErlNifEnv* env, /* Make sure we are ready * Not sure how this would even happen, but... */ + /* WHY NOT !IS_OPEN(...) */ if (descP->state != SOCKET_STATE_OPEN) return make_error(env, atom_exbadstate); @@ -1308,7 +1372,7 @@ char* decode_laddress_tuple(ErlNifEnv* env, unsigned int len; char a[16]; // Just in case... - if (!(GET_ATOM_LEN(env, laddrt[1], &len) && + if (!(GET_ATOM_LEN(env, laddrt[0], &len) && (len > 0) && (len <= (sizeof("loopback"))))) return str_einval; @@ -1327,154 +1391,6 @@ char* decode_laddress_tuple(ErlNifEnv* env, } -/* Decode the 4- or 8-element address tuple - * and initiate the socket address structure. - */ -static -char* decode_address_tuple(ErlNifEnv* env, - int domain, - const ERL_NIF_TERM* addrt, - int port, - SocketAddress* addrP) -{ - - /* We now *know* that the size of the tuple is correct, - * so we don't need to check anything here, just unpack. - */ - - switch (domain) { - case AF_INET: - { - int a, v; - char laddr[4]; - - sys_memzero((char*)&addrP->u, sizeof(struct sockaddr_in)); -#ifndef NO_SA_LEN - addrP->u.sai.sin_len = sizeof(struct sockaddr_in); -#endif - addrP->u.sai.sin_family = domain; - addrP->u.sai.sin_port = sock_htons(port); - for (a = 0; a < 4; a++) { - if (!GET_INT(env, addrt[a], &v)) - return str_einval; - laddr[a] = v; - } - sys_memcpy(&addrP->u.sai.sin_addr, &laddr, sizeof(laddr)); - addrP->len = sizeof(struct sockaddr_in); - return NULL; - } - break; - -#if defined(HAVE_IN6) && defined(AF_INET6) - case AF_INET6: - { - int a, v; - char laddr[16]; - - sys_memzero((char*)&addrP->u, sizeof(struct sockaddr_in6)); -#ifndef NO_SA_LEN - addrP->u.sai6.sin6_len = sizeof(struct sockaddr_in6); -#endif - addrP->u.sai6.sin6_family = domain; - addrP->u.sai6.sin6_port = sock_htons(port); - addrP->u.sai6.sin6_flowinfo = 0; - /* The address tuple is of size 8 - * and each element is a two byte integer - */ - for (a = 0; a < 8; a++) { - if (!GET_INT(env, addrt[a], &v)) - return str_einval; - laddr[a*2 ] = ((v >> 8) & 0xFF); - laddr[a*2+1] = (v & 0xFF); - } - sys_memcpy(&addrP->u.sai6.sin6_addr, &laddr, sizeof(laddr)); - addrP->len = sizeof(struct sockaddr_in6); - return NULL; - } - break; -#endif - - } /* switch (domain) */ - - return str_eafnosupport; - -} - - -/* Decode the address when its an atom. - * Currently we only accept two atoms: 'any' and 'loopback' - */ -static -char* decode_address_atom(ErlNifEnv* env, - int domain, - char* addr, - int addrLen, - int port, - SocketAddress* localP) -{ - BOOLEAN_T any; - - if (strncmp(addr, "any", addrLen) == 0) { - any = TRUE; - } if (strncmp(addr, "loopback", addrLen) == 0) { - any = FALSE; - } else { - return str_einval; - } - - /* If we get this far, we *know* its either 'any' or 'loopback' */ - - switch (domain) { - case AF_INET: - { - struct in_addr addr; - if (any) { - addr.s_addr = sock_htonl(INADDR_ANY); - } else { - addr.s_addr = sock_htonl(INADDR_LOOPBACK); - } - sys_memzero((char*) localP, sizeof(struct sockaddr_in)); -#ifndef NO_SA_LEN - localP->u.sai.sin_len = sizeof(struct sockaddr_in6); -#endif - localP->u.sai.sin_family = domain; - localP->u.sai.sin_port = sock_htons(port); - localP->u.sai.sin_addr.s_addr = addr.s_addr; - localP->len = sizeof(struct sockaddr_in); - } - break; - -#if defined(HAVE_IN6) && defined(AF_INET6) - case AF_INET6: - { - const struct in6_addr* paddr; - if (any) { - paddr = &in6addr_any; - } else { - paddr = &in6addr_loopback; - } - sys_memzero((char*)localP, sizeof(struct sockaddr_in6)); -#ifndef NO_SA_LEN - localP->u.sai6.sin6_len = sizeof(struct sockaddr_in6); -#endif - localP->u.sai6.sin6_family = domain; - localP->u.sai6.sin6_port = sock_htons(port); - localP->u.sai6.sin6_flowinfo = 0; - localP->u.sai6.sin6_addr = *paddr; - localP->len = sizeof(struct sockaddr_in6); - } - break; -#endif - - default: - return str_einval; - break; - } - - return NULL; -} - - /* ---------------------------------------------------------------------- * nif_connect @@ -2028,50 +1944,6 @@ ERL_NIF_TERM naccept_accepting(ErlNifEnv* env, -/* *** alloc_descriptor *** - * Allocate and perform basic initialization of a socket descriptor. - * - */ -static -SocketDescriptor* alloc_descriptor(SOCKET sock, HANDLE event) -{ - SocketDescriptor* descP; - - if ((descP = enif_alloc_resource(sockets, sizeof(SocketDescriptor))) != NULL) { - char buf[64]; /* Buffer used for building the mutex name */ - - sprintf(buf, "socket[w,%d]", sock); - descP->writeMtx = MCREATE(buf); - - sprintf(buf, "socket[r,%d]", sock); - descP->readMtx = MCREATE(buf); - - sprintf(buf, "socket[acc,%d]", sock); - descP->accMtx = MCREATE(buf); - - descP->dbg = SOCKET_DEBUG_DEFAULT; - descP->isWritable = TRUE; - descP->isReadable = TRUE; - descP->writePkgCnt = 0; - descP->writeByteCnt = 0; - descP->writeTries = 0; - descP->writeWaits = 0; - descP->writeFails = 0; - descP->readPkgCnt = 0; - descP->readByteCnt = 0; - descP->readTries = 0; - descP->readWaits = 0; - - descP->sock = sock; - descP->event = event; - - } - - return descP; -} - - - /* ---------------------------------------------------------------------- * nif_send * @@ -2080,7 +1952,7 @@ SocketDescriptor* alloc_descriptor(SOCKET sock, HANDLE event) * * Arguments: * Socket (ref) - Points to the socket descriptor. - * Sendref - A unique id for this (send) request. + * SendRef - A unique id for this (send) request. * Data - The data to send in the form of a IOVec. * Flags - Send flags. */ @@ -2101,8 +1973,8 @@ ERL_NIF_TERM nif_send(ErlNifEnv* env, if ((argc != 4) || !enif_get_resource(env, argv[0], sockets, (void**) &descP) || - !enif_inspect_iolist_as_binary(env, argv[2], &data) || - !enif_get_uint(env, argv[3], &eflags)) { + !GET_BIN(env, argv[2], &data) || + !GET_UINT(env, argv[3], &eflags)) { return enif_make_badarg(env); } sendRef = argv[1]; @@ -2115,6 +1987,19 @@ ERL_NIF_TERM nif_send(ErlNifEnv* env, MLOCK(descP->writeMtx); + /* We need to handle the case when another process tries + * to write at the same time. + * If the current write could not write its entire package + * this time (resulting in an select). The write of the + * other process must be made to wait until current + * is done! + * Basically, we need a write queue! + * + * A 'writing' field (boolean), which is set if we did + * not manage to write the entire message and reset every + * time we do. + */ + res = nsend(env, descP, sendRef, &data, flags); MUNLOCK(descP->writeMtx); @@ -2137,7 +2022,6 @@ ERL_NIF_TERM nsend(ErlNifEnv* env, ErlNifBinary* dataP, int flags) { - int save_errno; ssize_t written; if (!descP->isWritable) @@ -2150,48 +2034,117 @@ ERL_NIF_TERM nsend(ErlNifEnv* env, written = sock_send(descP->sock, dataP->data, dataP->size, flags); - if (written == dataP->size) { + return send_check_result(env, descP, written, dataP->size, sendRef); - cnt_inc(&descP->writePkgCnt, 1); - cnt_inc(&descP->writeByteCnt, written); +} - return atom_ok; - } else if (written < 0) { +/* ---------------------------------------------------------------------- + * nif_sendto + * + * Description: + * Send a message on a socket + * + * Arguments: + * Socket (ref) - Points to the socket descriptor. + * SendRef - A unique id for this (send) request. + * Data - The data to send in the form of a IOVec. + * Flags - Send flags. + * DestAddr - Destination address. + * DestPort - Destination Port. + */ - /* Ouch, check what kind of failure */ - save_errno = sock_errno(); - if ((save_errno != EAGAIN) && - (save_errno != EINTR)) { +static +ERL_NIF_TERM nif_sendto(ErlNifEnv* env, + int argc, + const ERL_NIF_TERM argv[]) +{ + SocketDescriptor* descP; + ERL_NIF_TERM sendRef; + ErlNifBinary data; + unsigned int eflags; + int flags; + ERL_NIF_TERM addr; + int port; + SocketAddress remoteAddr; + SocketAddress* remoteAddrP = &remoteAddr; + char* xerr; + // ERL_NIF_TERM res; - cnt_inc(&descP->writeFails, 1); + /* Extract arguments and perform preliminary validation */ - return make_error2(env, save_errno); + if ((argc != 6) || + !enif_get_resource(env, argv[0], sockets, (void**) &descP) || + !GET_BIN(env, argv[2], &data) || + !GET_UINT(env, argv[3], &eflags) || + !GET_INT(env, argv[5], &port)) { + return enif_make_badarg(env); + } + sendRef = argv[1]; + addr = argv[4]; - } else { + /* THIS TEST IS NOT CORRECT!!! */ + if (!IS_OPEN(descP)) + return make_error(env, atom_einval); - /* Ok, try again later */ + if (!esendflags2sendflags(eflags, &flags)) + return enif_make_badarg(env); - written = 0; + if ((xerr = decode_send_addr(env, descP->domain, + addr, port, + &remoteAddrP)) != NULL) + return make_error1(env, xerr); - } - } + return nsendto(env, descP, sendRef, &data, flags, remoteAddrP); +} - /* We failed to write the *entire* packet (anything less then size - * of the packet, which is 0 <= written < sizeof packet), - * so schedule the rest for later. - */ - cnt_inc(&descP->writeWaits, 1); +static +ERL_NIF_TERM nsendto(ErlNifEnv* env, + SocketDescriptor* descP, + ERL_NIF_TERM sendRef, + ErlNifBinary* dataP, + int flags, + SocketAddress* toAddrP) +{ + ssize_t written; - SELECT(env, descP->sock, (ERL_NIF_SELECT_WRITE), - descP, NULL, sendRef); + if (!descP->isWritable) + return enif_make_badarg(env); - return make_ok(env, enif_make_int(env, written)); + /* We ignore the wrap for the moment. + * Maybe we should issue a wrap-message to controlling process... + */ + cnt_inc(&descP->writeTries, 1); + + if (toAddrP != NULL) { + written = sock_sendto(descP->sock, + dataP->data, dataP->size, flags, + &toAddrP->u.sa, toAddrP->len); + } else { + written = sock_sendto(descP->sock, + dataP->data, dataP->size, flags, + NULL, 0); + } + return send_check_result(env, descP, written, dataP->size, sendRef); } + +/* ---------------------------------------------------------------------- + * nif_writev / nif_sendv + * + * Description: + * Send a message (vector) on a socket + * + * Arguments: + * Socket (ref) - Points to the socket descriptor. + * SendRef - A unique id for this (send) request. + * Data - A vector of binaries + * Flags - Send flags. + */ + #ifdef FOBAR static ERL_NIF_TERM nwritev(ErlNifEnv* env, @@ -2247,11 +2200,343 @@ ERL_NIF_TERM nwritev(ErlNifEnv* env, + + + /* ---------------------------------------------------------------------- * U t i l i t y F u n c t i o n s * ---------------------------------------------------------------------- */ +static +ERL_NIF_TERM send_check_result(ErlNifEnv* env, + SocketDescriptor* descP, + ssize_t written, + ssize_t dataSize, + ERL_NIF_TERM sendRef) +{ + if (written == dataSize) { + + cnt_inc(&descP->writePkgCnt, 1); + cnt_inc(&descP->writeByteCnt, written); + + return atom_ok; + + } else if (written < 0) { + + /* Ouch, check what kind of failure */ + int save_errno = sock_errno(); + if ((save_errno != EAGAIN) && + (save_errno != EINTR)) { + + cnt_inc(&descP->writeFails, 1); + + return make_error2(env, save_errno); + + } else { + + /* Ok, try again later */ + + written = 0; + + } + } + + /* We failed to write the *entire* packet (anything less then size + * of the packet, which is 0 <= written < sizeof packet), + * so schedule the rest for later. + */ + + cnt_inc(&descP->writeWaits, 1); + + SELECT(env, descP->sock, (ERL_NIF_SELECT_WRITE), + descP, NULL, sendRef); + + return make_ok(env, enif_make_int(env, written)); + +} + + +/* The rather odd thing about the 'toAddrP' (the **) is + * because we need to be able to return a NULL pointer, + * in the case of the dest address is the atom 'null'. + * Its possible to call the sendto function with the + * args NULL (address) and 0 (port number). + * + * This function whouls really have a char* return value + * type!! + */ +static +char* decode_send_addr(ErlNifEnv* env, + int domain, + ERL_NIF_TERM addr, + int port, + SocketAddress** toAddrP) +{ + if (IS_ATOM(env, addr)) { + unsigned int len; + char a[16]; // Just in case... + + /* The only acceptable value is the atom 'null' */ + + if (!(GET_ATOM_LEN(env, addr, &len) && + (len > 0) && + (len <= (sizeof("null"))))) + return str_einval; + + if (!GET_ATOM(env, addr, a, sizeof(a))) + return str_einval; + + *toAddrP = NULL; + if (strncmp(a, "null", len) == 0) + return NULL; + else + return str_einval; + + } else if (IS_TUPLE(env, addr)) { + /* We now know that the we have a proper address. */ + return decode_send_addr_tuple(env, domain, addr, port, *toAddrP); + } else { + return str_einval; + } +} + + +static +char* decode_send_addr_tuple(ErlNifEnv* env, + int domain, + ERL_NIF_TERM addr, + int port, + SocketAddress* toAddrP) +{ + /* We handle two different tuples: + * - size 4 (INET) + * - size 8 (INET6) + */ + + const ERL_NIF_TERM* addrt; + int addrtSz; + + if (!GET_TUPLE(env, addr, &addrtSz, &addrt)) + return str_einval; // PLACEHOLDER + + switch (domain) { + case AF_INET: + if (addrtSz != 4) + return str_einval; + break; + +#if defined(HAVE_IN6) && defined(AF_INET6) + case AF_INET6: + if (addrtSz != 8) + return str_einval; + break; +#endif + + default: + return str_eafnosupport; + break; + } + + return decode_address_tuple(env, domain, + addrt, port, + toAddrP); + +} + + +/* Decode the 4- or 8-element address tuple + * and initiate the socket address structure. + */ +static +char* decode_address_tuple(ErlNifEnv* env, + int domain, + const ERL_NIF_TERM* addrt, + int port, + SocketAddress* addrP) +{ + + /* We now *know* that the size of the tuple is correct, + * so we don't need to check anything here, just unpack. + */ + + switch (domain) { + case AF_INET: + { + int a, v; + char laddr[4]; + + sys_memzero((char*)&addrP->u, sizeof(struct sockaddr_in)); +#ifndef NO_SA_LEN + addrP->u.sai.sin_len = sizeof(struct sockaddr_in); +#endif + addrP->u.sai.sin_family = domain; + addrP->u.sai.sin_port = sock_htons(port); + for (a = 0; a < 4; a++) { + if (!GET_INT(env, addrt[a], &v)) + return str_einval; + laddr[a] = v; + } + sys_memcpy(&addrP->u.sai.sin_addr, &laddr, sizeof(laddr)); + addrP->len = sizeof(struct sockaddr_in); + return NULL; + } + break; + +#if defined(HAVE_IN6) && defined(AF_INET6) + case AF_INET6: + { + int a, v; + char laddr[16]; + + sys_memzero((char*)&addrP->u, sizeof(struct sockaddr_in6)); +#ifndef NO_SA_LEN + addrP->u.sai6.sin6_len = sizeof(struct sockaddr_in6); +#endif + addrP->u.sai6.sin6_family = domain; + addrP->u.sai6.sin6_port = sock_htons(port); + addrP->u.sai6.sin6_flowinfo = 0; + /* The address tuple is of size 8 + * and each element is a two byte integer + */ + for (a = 0; a < 8; a++) { + if (!GET_INT(env, addrt[a], &v)) + return str_einval; + laddr[a*2 ] = ((v >> 8) & 0xFF); + laddr[a*2+1] = (v & 0xFF); + } + sys_memcpy(&addrP->u.sai6.sin6_addr, &laddr, sizeof(laddr)); + addrP->len = sizeof(struct sockaddr_in6); + return NULL; + } + break; +#endif + + } /* switch (domain) */ + + return str_eafnosupport; + +} + + +/* Decode the address when its an atom. + * Currently we only accept two atoms: 'any' and 'loopback' + */ +static +char* decode_address_atom(ErlNifEnv* env, + int domain, + char* addr, + int addrLen, + int port, + SocketAddress* localP) +{ + BOOLEAN_T any; + + if (strncmp(addr, "any", addrLen) == 0) { + any = TRUE; + } if (strncmp(addr, "loopback", addrLen) == 0) { + any = FALSE; + } else { + return str_einval; + } + + /* If we get this far, we *know* its either 'any' or 'loopback' */ + + switch (domain) { + case AF_INET: + { + struct in_addr addr; + if (any) { + addr.s_addr = sock_htonl(INADDR_ANY); + } else { + addr.s_addr = sock_htonl(INADDR_LOOPBACK); + } + sys_memzero((char*) localP, sizeof(struct sockaddr_in)); +#ifndef NO_SA_LEN + localP->u.sai.sin_len = sizeof(struct sockaddr_in6); +#endif + localP->u.sai.sin_family = domain; + localP->u.sai.sin_port = sock_htons(port); + localP->u.sai.sin_addr.s_addr = addr.s_addr; + localP->len = sizeof(struct sockaddr_in); + } + break; + +#if defined(HAVE_IN6) && defined(AF_INET6) + case AF_INET6: + { + const struct in6_addr* paddr; + if (any) { + paddr = &in6addr_any; + } else { + paddr = &in6addr_loopback; + } + sys_memzero((char*)localP, sizeof(struct sockaddr_in6)); +#ifndef NO_SA_LEN + localP->u.sai6.sin6_len = sizeof(struct sockaddr_in6); +#endif + localP->u.sai6.sin6_family = domain; + localP->u.sai6.sin6_port = sock_htons(port); + localP->u.sai6.sin6_flowinfo = 0; + localP->u.sai6.sin6_addr = *paddr; + localP->len = sizeof(struct sockaddr_in6); + } + break; +#endif + + default: + return str_einval; + break; + } + + return NULL; +} + + + +/* *** alloc_descriptor *** + * Allocate and perform basic initialization of a socket descriptor. + * + */ +static +SocketDescriptor* alloc_descriptor(SOCKET sock, HANDLE event) +{ + SocketDescriptor* descP; + + if ((descP = enif_alloc_resource(sockets, sizeof(SocketDescriptor))) != NULL) { + char buf[64]; /* Buffer used for building the mutex name */ + + sprintf(buf, "socket[w,%d]", sock); + descP->writeMtx = MCREATE(buf); + + sprintf(buf, "socket[r,%d]", sock); + descP->readMtx = MCREATE(buf); + + sprintf(buf, "socket[acc,%d]", sock); + descP->accMtx = MCREATE(buf); + + descP->dbg = SOCKET_DEBUG_DEFAULT; + descP->isWritable = TRUE; + descP->isReadable = TRUE; + descP->writePkgCnt = 0; + descP->writeByteCnt = 0; + descP->writeTries = 0; + descP->writeWaits = 0; + descP->writeFails = 0; + descP->readPkgCnt = 0; + descP->readByteCnt = 0; + descP->readTries = 0; + descP->readWaits = 0; + + descP->sock = sock; + descP->event = event; + + } + + return descP; +} + + /* compare_pids - Test if two pids are equal * */ diff --git a/erts/preloaded/src/socket.erl b/erts/preloaded/src/socket.erl index 985b45a956..0a78feab4e 100644 --- a/erts/preloaded/src/socket.erl +++ b/erts/preloaded/src/socket.erl @@ -135,6 +135,26 @@ -type setopt_key() :: foo. -type getopt_key() :: foo. +-record(msg_hdr, + { + %% Optional address + %% On an unconnected socket this is used to specify the target + %% address for a datagram. + %% For a connected socket, this field should be specified []. + name :: list(), + + %% Scatter/gather array + iov :: [binary()], % iovec(), + + %% Ancillary (control) data + ctrl :: binary(), + + %% Unused + flags = [] :: list() + }). +-type msg_hdr() :: #msg_hdr{}. + + -define(SOCKET_DOMAIN_LOCAL, 1). -define(SOCKET_DOMAIN_UNIX, ?SOCKET_DOMAIN_LOCAL). -define(SOCKET_DOMAIN_INET, 2). @@ -542,25 +562,75 @@ do_send(SockRef, SendRef, Data, EFlags, Timeout) -> %% Do we need a timeout argument here also? %% --spec sendto(Socket, Data, Flags, DestAddr, Port) -> ok | {error, Reason} when +sendto(Socket, Data, Flags, DestAddr, DestPort) -> + sendto(Socket, Data, Flags, DestAddr, DestPort, infinity). + +-spec sendto(Socket, Data, Flags, DestAddr, DestPort, Timeout) -> + ok | {error, Reason} when Socket :: socket(), Data :: binary(), Flags :: send_flags(), DestAddr :: null | ip_address(), - Port :: port_number(), + DestPort :: port_number(), + Timeout :: timeout(), Reason :: term(). -sendto({socket, _, SockRef}, Data, Flags, DestAddr, DestPort) +sendto(Socket, Data, Flags, DestAddr, DestPort, Timeout) when is_list(Data) -> + Bin = erlang:list_to_binary(Data), + sendto(Socket, Bin, Flags, DestAddr, DestPort, Timeout); +sendto(Socket, Data, Flags, DestAddr, DestPort, Timeout) when is_binary(Data) andalso is_list(Flags) andalso - ((is_tuple(DestAddr) andalso - ((size(DestAddr) =:= 4) orelse - (size(DestAddr) =:= 8))) orelse - (DestAddr =:= null)) andalso - (is_integer(DestPort) andalso (DestPort >= 0)) -> - %% We may need something like send/4 above? + (is_tuple(DestAddr) orelse (DestAddr =:= null)) andalso + is_integer(DestPort) andalso + (is_integer(Timeout) orelse (Timeout =:= infinity)) -> EFlags = enc_send_flags(Flags), - nif_sendto(SockRef, make_ref(), Data, EFlags, DestAddr, DestPort). + do_sendto(Socket, make_ref(), Data, EFlags, DestAddr, DestPort, Timeout). + +do_sendto(SockRef, SendRef, Data, _EFlags, _DestAddr, _DestPort, Timeout) + when (Timeout =< 0) -> + nif_cancel(SockRef, SendRef), + flush_select_msgs(SockRef, SendRef), + {error, {timeout, size(Data)}}; +do_sendto(SockRef, SendRef, Data, EFlags, DestAddr, DestPort, Timeout) -> + TS = timestamp(Timeout), + case nif_sendto(SockRef, SendRef, Data, DestAddr, DestPort, EFlags) of + ok -> + {ok, next_timeout(TS, Timeout)}; + {ok, Written} -> + %% We are partially done, wait for continuation + receive + {select, SockRef, SendRef, ready_output} when (Written > 0) -> + <<_:Written/binary, Rest/binary>> = Data, + do_sendto(SockRef, make_ref(), Rest, EFlags, + DestAddr, DestPort, + next_timeout(TS, Timeout)); + {select, SockRef, SendRef, ready_output} -> + do_sendto(SockRef, make_ref(), Data, EFlags, + DestAddr, DestPort, + next_timeout(TS, Timeout)) + after Timeout -> + nif_cancel(SockRef, SendRef), + flush_select_msgs(SockRef, SendRef), + {error, timeout} + end; + {error, eagain} -> + %% Is this what we can expect? + %% If we have to wait because there is another ongoing write?? + receive + {select, SockRef, SendRef, ready_output} -> + do_sendto(SockRef, SendRef, Data, EFlags, + DestAddr, DestPort, + next_timeout(TS, Timeout)) + after Timeout -> + nif_cancel(SockRef, SendRef), + flush_select_msgs(SockRef, SendRef), + {error, timeout} + end; + + {error, _} = ERROR -> + ERROR + end. |