diff options
author | Micael Karlberg <[email protected]> | 2018-04-23 10:40:28 +0200 |
---|---|---|
committer | Micael Karlberg <[email protected]> | 2018-09-18 13:01:37 +0200 |
commit | 04335ca6aedfc5ad9f0d6a8d193dfd76a222291c (patch) | |
tree | cef0cfbfb688200f73f5690e545c23fbe1c319d1 | |
parent | d5aecb115070de76cb42b44edee6bbcb5f4a3724 (diff) | |
download | otp-04335ca6aedfc5ad9f0d6a8d193dfd76a222291c.tar.gz otp-04335ca6aedfc5ad9f0d6a8d193dfd76a222291c.tar.bz2 otp-04335ca6aedfc5ad9f0d6a8d193dfd76a222291c.zip |
[socket-nif] Completed the recv and recvfrom functions
Also updated the socket type (now a record for easy use).
-rw-r--r-- | erts/emulator/nifs/common/socket_nif.c | 716 | ||||
-rw-r--r-- | erts/preloaded/src/socket.erl | 172 |
2 files changed, 682 insertions, 206 deletions
diff --git a/erts/emulator/nifs/common/socket_nif.c b/erts/emulator/nifs/common/socket_nif.c index d3aa3db2aa..6e6851a608 100644 --- a/erts/emulator/nifs/common/socket_nif.c +++ b/erts/emulator/nifs/common/socket_nif.c @@ -181,6 +181,9 @@ typedef unsigned int BOOLEAN_T; #define BOOL2STR(__B__) ((__B__) ? "true" : "false") #define BOOL2ATOM(__B__) ((__B__) ? atom_true : atom_false) +/* Two byte integer decoding */ +#define get_int16(s) ((((unsigned char*) (s))[0] << 8) | \ + (((unsigned char*) (s))[1])) /* Debug stuff... */ #define SOCKET_NIF_DEBUG_DEFAULT TRUE @@ -223,19 +226,6 @@ typedef unsigned long long llu_t; /* *** Misc macros and defines *** */ -#define MALLOC(SZ) enif_alloc(SZ) -#define FREE(P) enif_free(P) -#define MKA(E,S) enif_make_atom(E, S) -#define MKBIN(E,B) enif_make_binary(E, B) -#define MKREF(E) enif_make_ref(E) -#define MKT2(E,E1,E2) enif_make_tuple2(E, E1, E2) -#define MKT3(E,E1,E2,E3) enif_make_tuple3(E, E1, E2, E3) -#define MCREATE(N) enif_mutex_create(N) -#define MLOCK(M) enif_mutex_lock(M) -#define MUNLOCK(M) enif_mutex_unlock(M) -#define SELECT(E,FD,M,O,P,R) \ - if (enif_select((E), (FD), (M), (O), (P), (R)) < 0) \ - return enif_make_badarg((E)); /* *** Socket state defs *** */ @@ -268,23 +258,20 @@ typedef unsigned long long llu_t; #define SOCKET_SEND_FLAG_CONFIRM 0 #define SOCKET_SEND_FLAG_DONTROUTE 1 -#define SOCKET_SEND_FLAG_DONTWAIT 2 -#define SOCKET_SEND_FLAG_EOR 3 -#define SOCKET_SEND_FLAG_MORE 4 -#define SOCKET_SEND_FLAG_NOSIGNAL 5 -#define SOCKET_SEND_FLAG_OOB 6 +#define SOCKET_SEND_FLAG_EOR 2 +#define SOCKET_SEND_FLAG_MORE 3 +#define SOCKET_SEND_FLAG_NOSIGNAL 4 +#define SOCKET_SEND_FLAG_OOB 5 #define SOCKET_SEND_FLAG_LOW SOCKET_SEND_FLAG_CONFIRM #define SOCKET_SEND_FLAG_HIGH SOCKET_SEND_FLAG_OOB #define SOCKET_RECV_FLAG_CMSG_CLOEXEC 0 -#define SOCKET_RECV_FLAG_DONTWAIT 1 -#define SOCKET_RECV_FLAG_ERRQUEUE 2 -#define SOCKET_RECV_FLAG_OOB 3 -#define SOCKET_RECV_FLAG_PEEK 4 -#define SOCKET_RECV_FLAG_TRUNC 5 -#define SOCKET_RECV_FLAG_WAITALL 6 +#define SOCKET_RECV_FLAG_ERRQUEUE 1 +#define SOCKET_RECV_FLAG_OOB 2 +#define SOCKET_RECV_FLAG_PEEK 3 +#define SOCKET_RECV_FLAG_TRUNC 4 #define SOCKET_RECV_FLAG_LOW SOCKET_RECV_FLAG_CMSG_CLOEXEC -#define SOCKET_RECV_FLAG_HIGH SOCKET_RECV_FLAG_WAITALL +#define SOCKET_RECV_FLAG_HIGH SOCKET_RECV_FLAG_TRUNC #define SOCKET_RECV_BUFFER_SIZE_DEFAULT 2048 @@ -342,6 +329,27 @@ typedef union { * * * =================================================================== */ +#define MALLOC(SZ) enif_alloc((SZ)) +#define FREE(P) enif_free((P)) +#define MKA(E,S) enif_make_atom((E), (S)) +#define MKBIN(E,B) enif_make_binary((E), (B)) +#define MKI(E,I) enif_make_int((E), (I)) +#define MKREF(E) enif_make_ref((E)) +#define MKS(E,S) enif_make_string((E), (S), ERL_NIF_LATIN1) +#define MKSL(E,S,L) enif_make_string_len((E), (S), (L), ERL_NIF_LATIN1) +#define MKSBIN(E,B,ST,SZ) enif_make_sub_binary((E), (B), (ST), (SZ)) +#define MKT2(E,E1,E2) enif_make_tuple2((E), (E1), (E2)) +#define MKT3(E,E1,E2,E3) enif_make_tuple3((E), (E1), (E2), (E3)) +#define MKT4(E,E1,E2,E3,E4) enif_make_tuple4((E), (E1), (E2), (E3), (E4)) +#define MKT8(E,E1,E2,E3,E4,E5,E6,E7,E8) \ + enif_make_tuple8((E), (E1), (E2), (E3), (E4), (E5), (E6), (E7), (E8)) +#define MCREATE(N) enif_mutex_create((N)) +#define MLOCK(M) enif_mutex_lock((M)) +#define MUNLOCK(M) enif_mutex_unlock((M)) +#define SELECT(E,FD,M,O,P,R) \ + if (enif_select((E), (FD), (M), (O), (P), (R)) < 0) \ + return enif_make_badarg((E)); + #define IS_ATOM(E, TE) enif_is_atom((E), (TE)) #define IS_BIN(E, TE) enif_is_binary((E), (TE)) #define IS_NUM(E, TE) enif_is_number((E), (TE)) @@ -379,12 +387,15 @@ typedef union { #define sock_htonl(x) htonl((x)) #define sock_listen(s, b) listen((s), (b)) #define sock_name(s, addr, len) getsockname((s), (addr), (len)) +#define sock_ntohs(x) ntohs((x)) #define sock_open(domain, type, proto) \ make_noninheritable_handle(socket((domain), (type), (proto))) #define sock_recv(s,buf,len,flag) recv((s),(buf),(len),(flag)) +#define sock_recvfrom(s,buf,blen,flag,addr,alen) \ + recvfrom((s),(buf),(blen),(flag),(addr),(alen)) #define sock_send(s,buf,len,flag) send((s),(buf),(len),(flag)) #define sock_sendto(s,buf,blen,flag,addr,alen) \ - sendto((s),(buf),(blen),(flag),(addr),(alen)) + sendto((s),(buf),(blen),(flag),(addr),(alen)) #define sock_errno() WSAGetLastError() #define sock_create_event(s) WSACreateEvent() @@ -410,8 +421,11 @@ static unsigned long one_value = 1; #define sock_htonl(x) htonl((x)) #define sock_listen(s, b) listen((s), (b)) #define sock_name(s, addr, len) getsockname((s), (addr), (len)) +#define sock_ntohs(x) ntohs((x)) #define sock_open(domain, type, proto) socket((domain), (type), (proto)) #define sock_recv(s,buf,len,flag) recv((s),(buf),(len),(flag)) +#define sock_recvfrom(s,buf,blen,flag,addr,alen) \ + recvfrom((s),(buf),(blen),(flag),(addr),(alen)) #define sock_send(s,buf,len,flag) send((s), (buf), (len), (flag)) #define sock_sendto(s,buf,blen,flag,addr,alen) \ sendto((s),(buf),(blen),(flag),(addr),(alen)) @@ -430,27 +444,23 @@ static unsigned long one_value = 1; /* The general purpose sockaddr */ -typedef struct { - union { - struct sockaddr sa; - struct sockaddr_in sai; +typedef union { + struct sockaddr sa; + struct sockaddr_in sai; #ifdef HAVE_IN6 - struct sockaddr_in6 sai6; + struct sockaddr_in6 sai6; #endif #ifdef HAVE_SYS_UN_H - struct sockaddr_un sal; + struct sockaddr_un sal; #endif - - } u; - unsigned int len; } SocketAddress; #define which_address_port(sap) \ - ((((sap)->u.sai.sin_family == AF_INET) || \ - ((sap)->u.sai.sin_family == AF_INET6)) ? \ - ((sap)->u.sai.sin_port) : -1) + ((((sap)->sai.sin_family == AF_INET) || \ + ((sap)->sai.sin_family == AF_INET6)) ? \ + ((sap)->sai.sin_port) : -1) typedef struct { @@ -519,9 +529,10 @@ typedef struct { SocketRequestor* currentAcceptorP; // NULL or points to currentReader SocketRequestQueue acceptorsQ; - /* +++ Misc stuff +++ */ - BOOLEAN_T iow; // Inform On Wrap - BOOLEAN_T dbg; + /* +++ Config & Misc stuff +++ */ + size_t rBufSz; // Read buffer size (when data length = 0 is specified) + BOOLEAN_T iow; // Inform On Wrap + BOOLEAN_T dbg; } SocketDescriptor; @@ -637,12 +648,18 @@ static ERL_NIF_TERM nsendto(ErlNifEnv* env, ERL_NIF_TERM sendRef, ErlNifBinary* dataP, int flags, - SocketAddress* toAddrP); + SocketAddress* toAddrP, + unsigned int toAddrLen); static ERL_NIF_TERM nrecv(ErlNifEnv* env, SocketDescriptor* descP, ERL_NIF_TERM recvRef, int len, int flags); +static ERL_NIF_TERM nrecvfrom(ErlNifEnv* env, + SocketDescriptor* descP, + ERL_NIF_TERM recvRef, + uint16_t bufSz, + int flags); static ERL_NIF_TERM send_check_result(ErlNifEnv* env, SocketDescriptor* descP, @@ -655,6 +672,13 @@ static ERL_NIF_TERM recv_check_result(ErlNifEnv* env, int toRead, ErlNifBinary* bufP, ERL_NIF_TERM recvRef); +static ERL_NIF_TERM recvfrom_check_result(ErlNifEnv* env, + SocketDescriptor* descP, + int read, + ErlNifBinary* bufP, + SocketAddress* fromAddrP, + unsigned int fromAddrLen, + ERL_NIF_TERM recvRef); static ERL_NIF_TERM nfinalize_connection(ErlNifEnv* env, SocketDescriptor* descP); @@ -663,36 +687,49 @@ static ERL_NIF_TERM nfinalize_connection(ErlNifEnv* env, static char* decode_laddress(ErlNifEnv* env, int domain, ERL_NIF_TERM localAddr, - SocketAddress* localP); + SocketAddress* localP, + unsigned int* addrLenP); static char* decode_laddress_binary(ErlNifEnv* env, int domain, ERL_NIF_TERM localAddr, - SocketAddress* localP); + SocketAddress* localP, + unsigned int* addrLenP); static char* decode_laddress_tuple(ErlNifEnv* env, int domain, ERL_NIF_TERM laddr, - SocketAddress* localP); + SocketAddress* localP, + unsigned int* addrLenP); static char* decode_address_tuple(ErlNifEnv* env, int domain, const ERL_NIF_TERM* addrt, int port, - SocketAddress* localP); + SocketAddress* localP, + unsigned int* addrLenP); static char* decode_address_atom(ErlNifEnv* env, int domain, char* addr, int addrLen, int port, - SocketAddress* localP); + SocketAddress* localP, + unsigned int* addrLenP); static char* decode_send_addr(ErlNifEnv* env, int domain, ERL_NIF_TERM addr, int port, - SocketAddress** toAddrP); + SocketAddress** toAddrP, + unsigned int* addrLenP); static char* decode_send_addr_tuple(ErlNifEnv* env, int domain, ERL_NIF_TERM addr, int port, - SocketAddress* toAddrP); + SocketAddress* toAddrP, + unsigned int* addrLenP); +static void encode_address(ErlNifEnv* env, + SocketAddress* fromAddrP, + unsigned int fromAddrLen, + ERL_NIF_TERM* fromDomainT, + ERL_NIF_TERM* fromSourceT); + static BOOLEAN_T verify_is_connected(SocketDescriptor* descP, int* err); static SocketDescriptor* alloc_descriptor(SOCKET sock, HANDLE event); @@ -716,6 +753,10 @@ static BOOLEAN_T restore_network_namespace(int ns, SOCKET sock, int* err); static BOOLEAN_T cnt_inc(uint32_t* cnt, uint32_t inc); +#if defined(HAVE_SYS_UN_H) || defined(SO_BINDTODEVICE) +static size_t my_strnlen(const char *s, size_t maxlen); +#endif + static void socket_dtor(ErlNifEnv* env, void* obj); static void socket_stop(ErlNifEnv* env, void* obj, @@ -1198,22 +1239,23 @@ ERL_NIF_TERM nbind(ErlNifEnv* env, ERL_NIF_TERM addr) { SocketAddress local; + unsigned int addrLen; char* err; int port; - if ((err = decode_laddress(env, descP->domain, addr, &local)) != NULL) + if ((err = decode_laddress(env, descP->domain, addr, &local, &addrLen)) != NULL) return make_error1(env, err); if (IS_SOCKET_ERROR(sock_bind(descP->sock, - (struct sockaddr*) &local.u, local.len))) { + (struct sockaddr*) &local, addrLen))) { return make_error2(env, sock_errno()); } port = which_address_port(&local); if (port == 0) { - SOCKLEN_T addrLen = sizeof(local.u); - sys_memzero((char *) &local.u, addrLen); - sock_name(descP->sock, &local.u.sa, &addrLen); + SOCKLEN_T len = sizeof(local); + sys_memzero((char *) &local, len); + sock_name(descP->sock, &local.sa, &len); port = which_address_port(&local); } else if (port == -1) { port = 0; @@ -1233,12 +1275,13 @@ static char* decode_laddress(ErlNifEnv* env, int domain, ERL_NIF_TERM localAddr, - SocketAddress* localP) + SocketAddress* localP, + unsigned int* addrLenP) { if (IS_BIN(env, localAddr)) { - return decode_laddress_binary(env, domain, localAddr, localP); + return decode_laddress_binary(env, domain, localAddr, localP, addrLenP); } else if (IS_TUPLE(env, localAddr)) { - return decode_laddress_tuple(env, domain, localAddr, localP); + return decode_laddress_tuple(env, domain, localAddr, localP, addrLenP); } else { return str_einval; } @@ -1254,8 +1297,11 @@ static char* decode_laddress_binary(ErlNifEnv* env, int domain, ERL_NIF_TERM localAddr, - SocketAddress* localP) + SocketAddress* localP, + unsigned int* addrLenP) { + unsigned int addrLen; + #ifdef HAVE_SYS_UN_H ErlNifBinary bin; @@ -1278,16 +1324,17 @@ char* decode_laddress_binary(ErlNifEnv* env, #else 1 #endif - ) > sizeof(localP->u.sal.sun_path)) + ) > sizeof(localP->sal.sun_path)) return str_einval; - sys_memzero((char*)&localP->u, sizeof(struct sockaddr_un)); - localP->u.sal.sun_family = domain; - sys_memcpy(localP->u.sal.sun_path, bin.data, bin.size); - localP->len = offsetof(struct sockaddr_un, sun_path) + bin.size; + sys_memzero((char*)localP, sizeof(struct sockaddr_un)); + localP->sal.sun_family = domain; + sys_memcpy(localP->sal.sun_path, bin.data, bin.size); + addrLen = offsetof(struct sockaddr_un, sun_path) + bin.size; #ifndef NO_SA_LEN - localP->u.sal.sun_len = localP->len; + localP->u.sal.sun_len = addrLen; #endif + *addrLenP = addrLen; return NULL; #else // HAVE_SYS_UN_H @@ -1312,7 +1359,8 @@ static char* decode_laddress_tuple(ErlNifEnv* env, int domain, ERL_NIF_TERM laddr, - SocketAddress* localP) + SocketAddress* localP, + unsigned int* addrLenP) { const ERL_NIF_TERM* laddrt; int laddrtSz; @@ -1365,7 +1413,7 @@ char* decode_laddress_tuple(ErlNifEnv* env, return decode_address_tuple(env, domain, addrt, port, - localP); + localP, addrLenP); } else if (IS_ATOM(env, laddrt[0]) && IS_NUM(env, laddrt[1])) { @@ -1388,7 +1436,7 @@ char* decode_laddress_tuple(ErlNifEnv* env, return decode_address_atom(env, domain, a, len, port, - localP); + localP, addrLenP); } else { return str_einval; @@ -1457,8 +1505,9 @@ ERL_NIF_TERM nconnect(ErlNifEnv* env, const ERL_NIF_TERM* addr, int port) { - int code; - char* xerr; + unsigned int addrLen; + int code; + char* xerr; /* Verify that we are where in the proper state */ @@ -1473,11 +1522,11 @@ ERL_NIF_TERM nconnect(ErlNifEnv* env, if ((xerr = decode_address_tuple(env, descP->domain, addr, port, - &descP->remote)) != NULL) + &descP->remote, &addrLen)) != NULL) return make_error1(env, xerr); code = sock_connect(descP->sock, - (struct sockaddr*) &descP->remote.u, descP->remote.len); + (struct sockaddr*) &descP->remote, addrLen); if (IS_SOCKET_ERROR(code) && ((sock_errno() == ERRNO_BLOCK) || /* Winsock2 */ @@ -1572,8 +1621,8 @@ BOOLEAN_T verify_is_connected(SocketDescriptor* descP, int* err) #ifndef SO_ERROR int sz, code; - sz = sizeof(descP->inet.remote); - sys_memzero((char *) &descP->inet.remote, sz); + sz = sizeof(descP->remote); + sys_memzero((char *) &descP->remote, sz); code = sock_peer(desc->sock, (struct sockaddr*) &descP->remote, &sz); @@ -1735,7 +1784,7 @@ ERL_NIF_TERM naccept_listening(ErlNifEnv* env, if (enif_self(env, &caller) == NULL) return make_error(env, atom_exself); - n = sizeof(descP->remote.u); + n = sizeof(remote); sys_memzero((char *) &remote, n); accSock = sock_accept(descP->sock, (struct sockaddr*) &remote, &n); if (accSock == INVALID_SOCKET) { @@ -1869,7 +1918,7 @@ ERL_NIF_TERM naccept_accepting(ErlNifEnv* env, return make_error(env, atom_exbusy); } - n = sizeof(descP->remote.u); + n = sizeof(descP->remote); sys_memzero((char *) &remote, n); accSock = sock_accept(descP->sock, (struct sockaddr*) &remote, &n); if (accSock == INVALID_SOCKET) { @@ -2074,6 +2123,7 @@ ERL_NIF_TERM nif_sendto(ErlNifEnv* env, int port; SocketAddress remoteAddr; SocketAddress* remoteAddrP = &remoteAddr; + unsigned int remoteAddrLen; char* xerr; // ERL_NIF_TERM res; @@ -2098,10 +2148,11 @@ ERL_NIF_TERM nif_sendto(ErlNifEnv* env, if ((xerr = decode_send_addr(env, descP->domain, addr, port, - &remoteAddrP)) != NULL) + &remoteAddrP, + &remoteAddrLen)) != NULL) return make_error1(env, xerr); - return nsendto(env, descP, sendRef, &data, flags, remoteAddrP); + return nsendto(env, descP, sendRef, &data, flags, remoteAddrP, remoteAddrLen); } @@ -2111,7 +2162,8 @@ ERL_NIF_TERM nsendto(ErlNifEnv* env, ERL_NIF_TERM sendRef, ErlNifBinary* dataP, int flags, - SocketAddress* toAddrP) + SocketAddress* toAddrP, + unsigned int toAddrLen) { ssize_t written; @@ -2126,7 +2178,7 @@ ERL_NIF_TERM nsendto(ErlNifEnv* env, if (toAddrP != NULL) { written = sock_sendto(descP->sock, dataP->data, dataP->size, flags, - &toAddrP->u.sa, toAddrP->len); + &toAddrP->sa, toAddrLen); } else { written = sock_sendto(descP->sock, dataP->data, dataP->size, flags, @@ -2273,6 +2325,11 @@ ERL_NIF_TERM nif_recv(ErlNifEnv* env, } +/* The (read) buffer handling *must* be optimized! + * But for now we make it easy for ourselves by + * allocating a binary (of the specified or default + * size) and then throwing it away... + */ static ERL_NIF_TERM nrecv(ErlNifEnv* env, SocketDescriptor* descP, @@ -2286,7 +2343,11 @@ ERL_NIF_TERM nrecv(ErlNifEnv* env, if (!descP->isReadable) return enif_make_badarg(env); - if (!ALLOC_BIN((len ? len : SOCKET_RECV_BUFFER_SIZE_DEFAULT), &buf)) + /* Allocate a buffer: + * Either as much as we want to read or (if zero (0)) use the "default" + * size (what has been configured). + */ + if (!ALLOC_BIN((len ? len : descP->rBufSz), &buf)) return make_error(env, atom_exalloc); /* We ignore the wrap for the moment. @@ -2305,6 +2366,121 @@ ERL_NIF_TERM nrecv(ErlNifEnv* env, /* ---------------------------------------------------------------------- + * nif_recvfrom + * + * Description: + * Receive a message on a socket. + * Normally used only on a (un-) connected socket! + * If a buffer size = 0 is specified, then the we will use the default + * buffer size for this socket (whatever has been configured). + * + * Arguments: + * Socket (ref) - Points to the socket descriptor. + * RecvRef - A unique id for this (send) request. + * BufSz - Size of the buffer into which we put the received message. + * Flags - Receive flags. + */ + +static +ERL_NIF_TERM nif_recvfrom(ErlNifEnv* env, + int argc, + const ERL_NIF_TERM argv[]) +{ + SocketDescriptor* descP; + ERL_NIF_TERM recvRef; + unsigned int bufSz; + unsigned int eflags; + int flags; + ERL_NIF_TERM res; + + if ((argc != 4) || + !enif_get_resource(env, argv[0], sockets, (void**) &descP) || + !GET_UINT(env, argv[2], &bufSz) || + !GET_UINT(env, argv[3], &eflags)) { + return enif_make_badarg(env); + } + recvRef = argv[1]; + + /* if (IS_OPEN(descP)) */ + /* return make_error(env, atom_enotconn); */ + + if (!erecvflags2recvflags(eflags, &flags)) + return enif_make_badarg(env); + + MLOCK(descP->readMtx); + + /* <KOLLA> + * We need to handle the case when another process tries + * to receive at the same time. + * If the current recv could not read its entire package + * this time (resulting in an select). The read of the + * other process must be made to wait until current + * is done! + * Basically, we need a read queue! + * + * A 'reading' field (boolean), which is set if we did + * not manage to read the entire message and reset every + * time we do. + * </KOLLA> + */ + + res = nrecvfrom(env, descP, recvRef, bufSz, flags); + + MUNLOCK(descP->readMtx); + + return res; + +} + + +/* The (read) buffer handling *must* be optimized! + * But for now we make it easy for ourselves by + * allocating a binary (of the specified or default + * size) and then throwing it away... + */ +static +ERL_NIF_TERM nrecvfrom(ErlNifEnv* env, + SocketDescriptor* descP, + ERL_NIF_TERM recvRef, + uint16_t bufSz, + int flags) +{ + SocketAddress fromAddr; + unsigned int addrLen; + ssize_t read; + ErlNifBinary buf; + + if (!descP->isReadable) + return enif_make_badarg(env); + + /* Allocate a buffer: + * Either as much as we want to read or (if zero (0)) use the "default" + * size (what has been configured). + */ + if (!ALLOC_BIN((bufSz ? bufSz : descP->rBufSz), &buf)) + return make_error(env, atom_exalloc); + + /* We ignore the wrap for the moment. + * Maybe we should issue a wrap-message to controlling process... + */ + cnt_inc(&descP->readTries, 1); + + addrLen = sizeof(fromAddr); + sys_memzero((char*) &fromAddr, addrLen); + + read = sock_recvfrom(descP->sock, buf.data, buf.size, flags, + &fromAddr.sa, &addrLen); + + return recvfrom_check_result(env, descP, + read, + &buf, + &fromAddr, addrLen, + recvRef); +} + + + +/* ---------------------------------------------------------------------- * U t i l i t y F u n c t i o n s * ---------------------------------------------------------------------- */ @@ -2338,7 +2514,11 @@ ERL_NIF_TERM send_check_result(ErlNifEnv* env, /* Ok, try again later */ - written = 0; // SHOULD RESULT IN {error, eagain}!!!! + /* <KOLLA> + * SHOULD RESULT IN {error, eagain}!!!! + * </KOLLA> + */ + written = 0; } } @@ -2366,11 +2546,11 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env, ErlNifBinary* bufP, ERL_NIF_TERM recvRef) { + ERL_NIF_TERM data; + /* There is a special case: If the provided 'to read' value is - * zero (0). That means that if we filled the (default size) - * buffer, we need to continue to read (since there *may* be - * more data), but we cannot loop here. Instead we inform the - * caller that it must call again. + * zero (0). That means that we reads as much as we can, using + * the default read buffer size. */ if (bufP->size == read) { @@ -2395,7 +2575,9 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env, * => We choose alt 1 for now. */ - return make_ok3(env, atom_false, MKBIN(env, bufP)); + data = MKBIN(env, bufP); + + return make_ok3(env, atom_false, data); } else { @@ -2406,7 +2588,9 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env, * </KOLLA> */ - return make_ok3(env, atom_true, MKBIN(env, bufP)); + data = MKBIN(env, bufP); + + return make_ok3(env, atom_true, data); } @@ -2421,6 +2605,89 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env, /* +++ Oups - closed +++ */ /* <KOLLA> + * + * IF THE CURRENT PROCESS IS *NOT* THE CONTROLLING + * PROCESS, WE NEED TO INFORM IT!!! + * + * ALL WAITING PROCESSES MUST ALSO GET THE ERROR!! + * + * </KOLLA> + */ + + SELECT(env, + descP->sock, + (ERL_NIF_SELECT_STOP), + descP, NULL, recvRef); + + return make_error(env, atom_closed); + + } else if ((save_errno == ERRNO_BLOCK) || + (save_errno == EAGAIN)) { + return make_error(env, atom_eagain); + } else { + return make_error2(env, save_errno); + } + + } else { + + /* +++ We did not fill the buffer +++ */ + + if (toRead == 0) { + + /* +++ We got a chunk of data but +++ + * +++ since we did not fill the +++ + * +++ buffer, we must split it +++ + * +++ into a sub-binary. +++ + */ + + data = MKBIN(env, bufP); + data = MKSBIN(env, data, 0, read); + + return make_ok3(env, atom_true, data); + + } else { + + /* +++ We got only a part of what was expected +++ + * +++ => receive more later. +++ */ + + return make_ok3(env, atom_false, MKBIN(env, bufP)); + } + } +} + + +/* The recvfrom function delivers one (1) message. If our buffer + * is to small, the message will be truncated. So, regardless + * if we filled the buffer or not, we have got what we are going + * to get regarding this message. + */ +static +ERL_NIF_TERM recvfrom_check_result(ErlNifEnv* env, + SocketDescriptor* descP, + int read, + ErlNifBinary* bufP, + SocketAddress* fromAddrP, + unsigned int fromAddrLen, + ERL_NIF_TERM recvRef) +{ + ERL_NIF_TERM data; + + /* There is a special case: If the provided 'to read' value is + * zero (0). That means that we reads as much as we can, using + * the default read buffer size. + */ + + if (read < 0) { + + /* +++ Error handling +++ */ + + int save_errno = sock_errno(); + + if (save_errno == ECONNRESET) { + + /* +++ Oups - closed +++ */ + + /* <KOLLA> * IF THE CURRENT PROCESS IS *NOT* THE CONTROLLING * PROCESS, WE NEED TO INFORM IT!!! * @@ -2445,9 +2712,29 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env, } else { - /* +++ We got only a part of what was expected - receive more later +++ */ + /* +++ We sucessfully got a message - time to encode the address +++ */ + + ERL_NIF_TERM fromDomainT, fromSourceT; - return make_ok3(env, atom_false, MKBIN(env, bufP)); + encode_address(env, + fromAddrP, fromAddrLen, + &fromDomainT, &fromSourceT); + + if (read == bufP->size) { + data = MKBIN(env, bufP); + } else { + + /* +++ We got a chunk of data but +++ + * +++ since we did not fill the +++ + * +++ buffer, we must split it +++ + * +++ into a sub-binary. +++ + */ + + data = MKBIN(env, bufP); + data = MKSBIN(env, data, 0, read); + } + + return make_ok2(env, MKT3(env, fromDomainT, fromSourceT, data)); } } @@ -2469,7 +2756,8 @@ char* decode_send_addr(ErlNifEnv* env, int domain, ERL_NIF_TERM addr, int port, - SocketAddress** toAddrP) + SocketAddress** toAddrP, + unsigned int* toAddrLenP) { if (IS_ATOM(env, addr)) { unsigned int len; @@ -2493,7 +2781,8 @@ char* decode_send_addr(ErlNifEnv* env, } else if (IS_TUPLE(env, addr)) { /* We now know that the we have a proper address. */ - return decode_send_addr_tuple(env, domain, addr, port, *toAddrP); + return decode_send_addr_tuple(env, domain, addr, port, + *toAddrP, toAddrLenP); } else { return str_einval; } @@ -2505,7 +2794,8 @@ char* decode_send_addr_tuple(ErlNifEnv* env, int domain, ERL_NIF_TERM addr, int port, - SocketAddress* toAddrP) + SocketAddress* toAddrP, + unsigned int* toAddrLenP) { /* We handle two different tuples: * - size 4 (INET) @@ -2538,7 +2828,7 @@ char* decode_send_addr_tuple(ErlNifEnv* env, return decode_address_tuple(env, domain, addrt, port, - toAddrP); + toAddrP, toAddrLenP); } @@ -2551,7 +2841,8 @@ char* decode_address_tuple(ErlNifEnv* env, int domain, const ERL_NIF_TERM* addrt, int port, - SocketAddress* addrP) + SocketAddress* addrP, + unsigned int* addrLenP) { /* We now *know* that the size of the tuple is correct, @@ -2564,19 +2855,19 @@ char* decode_address_tuple(ErlNifEnv* env, int a, v; char laddr[4]; - sys_memzero((char*)&addrP->u, sizeof(struct sockaddr_in)); + sys_memzero((char*)addrP, sizeof(struct sockaddr_in)); #ifndef NO_SA_LEN - addrP->u.sai.sin_len = sizeof(struct sockaddr_in); + addrP->sai.sin_len = sizeof(struct sockaddr_in); #endif - addrP->u.sai.sin_family = domain; - addrP->u.sai.sin_port = sock_htons(port); + addrP->sai.sin_family = domain; + addrP->sai.sin_port = sock_htons(port); for (a = 0; a < 4; a++) { if (!GET_INT(env, addrt[a], &v)) return str_einval; laddr[a] = v; } - sys_memcpy(&addrP->u.sai.sin_addr, &laddr, sizeof(laddr)); - addrP->len = sizeof(struct sockaddr_in); + sys_memcpy(&addrP->sai.sin_addr, &laddr, sizeof(laddr)); + *addrLenP = sizeof(struct sockaddr_in); return NULL; } break; @@ -2587,13 +2878,13 @@ char* decode_address_tuple(ErlNifEnv* env, int a, v; char laddr[16]; - sys_memzero((char*)&addrP->u, sizeof(struct sockaddr_in6)); + sys_memzero((char*)addrP, sizeof(struct sockaddr_in6)); #ifndef NO_SA_LEN - addrP->u.sai6.sin6_len = sizeof(struct sockaddr_in6); + addrP->sai6.sin6_len = sizeof(struct sockaddr_in6); #endif - addrP->u.sai6.sin6_family = domain; - addrP->u.sai6.sin6_port = sock_htons(port); - addrP->u.sai6.sin6_flowinfo = 0; + addrP->sai6.sin6_family = domain; + addrP->sai6.sin6_port = sock_htons(port); + addrP->sai6.sin6_flowinfo = 0; /* The address tuple is of size 8 * and each element is a two byte integer */ @@ -2603,8 +2894,8 @@ char* decode_address_tuple(ErlNifEnv* env, laddr[a*2 ] = ((v >> 8) & 0xFF); laddr[a*2+1] = (v & 0xFF); } - sys_memcpy(&addrP->u.sai6.sin6_addr, &laddr, sizeof(laddr)); - addrP->len = sizeof(struct sockaddr_in6); + sys_memcpy(&addrP->sai6.sin6_addr, &laddr, sizeof(laddr)); + *addrLenP = sizeof(struct sockaddr_in6); return NULL; } break; @@ -2616,6 +2907,131 @@ char* decode_address_tuple(ErlNifEnv* env, } +/* Encode the 4- or 8-element address tuple from the socket address structure. + * + * This function is called when we have received a message. So, if we for some + * reason fail to decode the address or parts of it, it makes more sense to + * return with "undefined" for the values rather then fail completely (and not + * deliver the received message). + * + * Returns two things (assuming the encode works): + * + * Domain: inet | inet6 | local + * Source: {Address, Port} | string() + * + */ +static +void encode_address(ErlNifEnv* env, + SocketAddress* addrP, + unsigned int addrLen, + ERL_NIF_TERM* domainT, + ERL_NIF_TERM* sourceT) +{ + short port; + + switch (addrP->sa.sa_family) { + + /* +++ inet (IPv4) +++ */ + + case AF_INET: + if (addrLen >= sizeof(struct sockaddr_in)) { + ERL_NIF_TERM addrT, portT; + unsigned int i; + ERL_NIF_TERM at4[4]; + char* a4 = (char*) &addrP->sai.sin_addr; + + port = sock_ntohs(addrP->sai.sin_port); + for (i = 0; i < 4; i++) { + at4[i] = MKI(env, a4[i]); + } + + *domainT = MKA(env, "inet"); // Shall we encode these? See decode + addrT = MKT4(env, at4[0], at4[1], at4[2], at4[3]); + portT = MKI(env, port); + *sourceT = MKT2(env, addrT, portT); + } else { + *domainT = atom_undefined; + *sourceT = atom_undefined; + } + break; + + + /* +++ inet6 (IPv6) +++ */ + +#if defined(HAVE_IN6) && defined(AF_INET6) + case AF_INET6: + if (addrLen >= sizeof(struct sockaddr_in6)) { + ERL_NIF_TERM addrT, portT; + unsigned int i; + ERL_NIF_TERM at6[8]; + char* a16 = (char*) &addrP->sai6.sin6_addr; + + port = sock_ntohs(addrP->sai6.sin6_port); + /* The address tuple is of size 8 + * and each element is a two byte integer + */ + for (i = 0; i < 8; i++) { + // at6[i] = MKI(env, get_int16(a16[i*2])); + at6[i] = MKI(env, get_int16(a16 + i*2)); + } + + *domainT = MKA(env, "inet6"); // Shall we encode these? See decode + addrT = MKT8(env, + at6[0], at6[1], at6[2], at6[3], + at6[4], at6[5], at6[6], at6[7]); + portT = MKI(env, port); + *sourceT = MKT2(env, addrT, portT); + } else { + *domainT = atom_undefined; + *sourceT = atom_undefined; + } + break; +#endif + + /* +++ local (Unix Domain Sockets) +++ */ + +#ifdef HAVE_SYS_UN_H + case AF_UNIX: + { + size_t n, m; + + *domainT = MKA(env, "local"); + if (addrLen < offsetof(struct sockaddr_un, sun_path)) { + *sourceT = atom_undefined; + } else { + n = addrLen - offsetof(struct sockaddr_un, sun_path); + if (255 < n) { + *sourceT = atom_undefined; + } else { + m = my_strnlen(addrP->sal.sun_path, n); +#ifdef __linux__ + /* Assume that the address is a zero terminated string, + * except when the first byte is \0 i.e the string length is 0, + * then use the reported length instead. + * This fix handles Linux's nonportable + * abstract socket address extension. + */ + if (m == 0) { + m = n; + } +#endif + + *sourceT = MKSL(env, addrP->sal.sun_path, m); + } + } + } + break; +#endif + + default: + *domainT = atom_undefined; + *sourceT = atom_undefined; + break; + + } /* switch (addrP->sa.sa_family) */ + +} + /* Decode the address when its an atom. * Currently we only accept two atoms: 'any' and 'loopback' @@ -2626,7 +3042,8 @@ char* decode_address_atom(ErlNifEnv* env, char* addr, int addrLen, int port, - SocketAddress* localP) + SocketAddress* addrP, + unsigned int* addrLenP) { BOOLEAN_T any; @@ -2649,14 +3066,14 @@ char* decode_address_atom(ErlNifEnv* env, } else { addr.s_addr = sock_htonl(INADDR_LOOPBACK); } - sys_memzero((char*) localP, sizeof(struct sockaddr_in)); + sys_memzero((char*) addrP, sizeof(struct sockaddr_in)); #ifndef NO_SA_LEN - localP->u.sai.sin_len = sizeof(struct sockaddr_in6); + addrP->sai.sin_len = sizeof(struct sockaddr_in6); #endif - localP->u.sai.sin_family = domain; - localP->u.sai.sin_port = sock_htons(port); - localP->u.sai.sin_addr.s_addr = addr.s_addr; - localP->len = sizeof(struct sockaddr_in); + addrP->sai.sin_family = domain; + addrP->sai.sin_port = sock_htons(port); + addrP->sai.sin_addr.s_addr = addr.s_addr; + *addrLenP = sizeof(struct sockaddr_in); } break; @@ -2669,15 +3086,15 @@ char* decode_address_atom(ErlNifEnv* env, } else { paddr = &in6addr_loopback; } - sys_memzero((char*)localP, sizeof(struct sockaddr_in6)); + sys_memzero((char*)addrP, sizeof(struct sockaddr_in6)); #ifndef NO_SA_LEN - localP->u.sai6.sin6_len = sizeof(struct sockaddr_in6); + addrP->sai6.sin6_len = sizeof(struct sockaddr_in6); #endif - localP->u.sai6.sin6_family = domain; - localP->u.sai6.sin6_port = sock_htons(port); - localP->u.sai6.sin6_flowinfo = 0; - localP->u.sai6.sin6_addr = *paddr; - localP->len = sizeof(struct sockaddr_in6); + addrP->sai6.sin6_family = domain; + addrP->sai6.sin6_port = sock_htons(port); + addrP->sai6.sin6_flowinfo = 0; + addrP->sai6.sin6_addr = *paddr; + *addrLenP = sizeof(struct sockaddr_in6); } break; #endif @@ -2709,12 +3126,23 @@ SocketDescriptor* alloc_descriptor(SOCKET sock, HANDLE event) descP->currentWriterP = NULL; // currentWriter not used descP->writersQ.first = NULL; descP->writersQ.last = NULL; + descP->isWritable = TRUE; + descP->writePkgCnt = 0; + descP->writeByteCnt = 0; + descP->writeTries = 0; + descP->writeWaits = 0; + descP->writeFails = 0; sprintf(buf, "socket[r,%d]", sock); descP->readMtx = MCREATE(buf); descP->currentReaderP = NULL; // currentReader not used descP->readersQ.first = NULL; descP->readersQ.last = NULL; + descP->isReadable = TRUE; + descP->readPkgCnt = 0; + descP->readByteCnt = 0; + descP->readTries = 0; + descP->readWaits = 0; sprintf(buf, "socket[acc,%d]", sock); descP->accMtx = MCREATE(buf); @@ -2722,22 +3150,12 @@ SocketDescriptor* alloc_descriptor(SOCKET sock, HANDLE event) descP->acceptorsQ.first = NULL; descP->acceptorsQ.last = NULL; - descP->iow = FALSE; - descP->dbg = SOCKET_DEBUG_DEFAULT; - descP->isWritable = TRUE; - descP->isReadable = TRUE; - descP->writePkgCnt = 0; - descP->writeByteCnt = 0; - descP->writeTries = 0; - descP->writeWaits = 0; - descP->writeFails = 0; - descP->readPkgCnt = 0; - descP->readByteCnt = 0; - descP->readTries = 0; - descP->readWaits = 0; - - descP->sock = sock; - descP->event = event; + descP->rBufSz = SOCKET_RECV_BUFFER_SIZE_DEFAULT; + descP->iow = FALSE; + descP->dbg = SOCKET_DEBUG_DEFAULT; + + descP->sock = sock; + descP->event = event; } @@ -2931,10 +3349,6 @@ BOOLEAN_T esendflags2sendflags(unsigned int esendflags, int* sendflags) tmp |= MSG_DONTROUTE; break; - case SOCKET_SEND_FLAG_DONTWAIT: - tmp |= MSG_DONTWAIT; - break; - case SOCKET_SEND_FLAG_EOR: tmp |= MSG_EOR; break; @@ -2979,10 +3393,6 @@ BOOLEAN_T erecvflags2recvflags(unsigned int erecvflags, int* recvflags) tmp |= MSG_CMSG_CLOEXEC; break; - case SOCKET_RECV_FLAG_DONTWAIT: - tmp |= MSG_DONTWAIT; - break; - case SOCKET_RECV_FLAG_ERRQUEUE: tmp |= MSG_ERRQUEUE; break; @@ -2999,10 +3409,6 @@ BOOLEAN_T erecvflags2recvflags(unsigned int erecvflags, int* recvflags) tmp |= MSG_TRUNC; break; - case SOCKET_RECV_FLAG_WAITALL: - tmp |= MSG_WAITALL; - break; - default: return FALSE; } @@ -3015,6 +3421,18 @@ BOOLEAN_T erecvflags2recvflags(unsigned int erecvflags, int* recvflags) } +#if defined(HAVE_SYS_UN_H) || defined(SO_BINDTODEVICE) +/* strnlen doesn't exist everywhere */ +static +size_t my_strnlen(const char *s, size_t maxlen) +{ + size_t i = 0; + while (i < maxlen && s[i] != '\0') + i++; + return i; +} +#endif + /* Create an ok two (2) tuple in the form: {ok, Any}. * The second element (Any) is already in the form of an diff --git a/erts/preloaded/src/socket.erl b/erts/preloaded/src/socket.erl index 0dadcecaa0..bae561cd51 100644 --- a/erts/preloaded/src/socket.erl +++ b/erts/preloaded/src/socket.erl @@ -39,7 +39,7 @@ %% writev/4, OR SENDV? It will be strange for recv then: recvv (instead of readv) recv/2, recv/3, recv/4, - recvfrom/1, recvfrom/2, + recvfrom/1, recvfrom/2, recvfrom/3, recvfrom/4, %% recvmsg/4, %% readv/3, @@ -106,10 +106,10 @@ -type port_number() :: 0..65535. -type socket_info() :: map(). -%% -record(socket, {info :: socket_info, -%% ref :: reference()}). --opaque socket() :: {socket, socket_info(), reference()}. -%% -opaque socket() :: #socket{}. +-record(socket, {info :: socket_info(), + ref :: reference()}). +%% -opaque socket() :: {socket, socket_info(), reference()}. +-opaque socket() :: #socket{}. -type accept_flags() :: [accept_flag()]. -type accept_flag() :: nonblock | cloexec. @@ -117,20 +117,26 @@ -type send_flags() :: [send_flag()]. -type send_flag() :: confirm | dontroute | - dontwait | eor | more | nosignal | oob. +%% Extend with OWN flags for other usage: +%% - adapt-buffer-sz: +%% This will have the effect that the nif recvfrom will use +%% MSG_PEEK to ensure no part of the message is lost, but if +%% necessary adapt (increase) the buffer size until all of +%% it fits. +%% +%% Note that not all of these flags is useful for every recv function! +%% -type recv_flags() :: [recv_flag()]. -type recv_flag() :: cmsg_cloexec | - dontwait | errqueue | oob | peek | - trunc | - waitall. + trunc. -type setopt_key() :: foo. -type getopt_key() :: foo. @@ -177,23 +183,20 @@ -define(SOCKET_SEND_FLAG_CONFIRM, 0). -define(SOCKET_SEND_FLAG_DONTROUTE, 1). --define(SOCKET_SEND_FLAG_DONTWAIT, 2). --define(SOCKET_SEND_FLAG_EOR, 3). --define(SOCKET_SEND_FLAG_MORE, 4). --define(SOCKET_SEND_FLAG_NOSIGNAL, 5). --define(SOCKET_SEND_FLAG_OOB, 6). +-define(SOCKET_SEND_FLAG_EOR, 2). +-define(SOCKET_SEND_FLAG_MORE, 3). +-define(SOCKET_SEND_FLAG_NOSIGNAL, 4). +-define(SOCKET_SEND_FLAG_OOB, 5). -define(SOCKET_SEND_FLAGS_DEFAULT, []). -define(SOCKET_SEND_TIMEOUT_DEFAULT, infinity). -define(SOCKET_SENDTO_TIMEOUT_DEFAULT, ?SOCKET_SEND_TIMEOUT_DEFAULT). -define(SOCKET_RECV_FLAG_CMSG_CLOEXEC, 0). --define(SOCKET_RECV_FLAG_DONTWAIT, 1). --define(SOCKET_RECV_FLAG_ERRQUEUE, 2). --define(SOCKET_RECV_FLAG_OOB, 3). --define(SOCKET_RECV_FLAG_PEEK, 4). --define(SOCKET_RECV_FLAG_TRUNC, 5). --define(SOCKET_RECV_FLAG_WAITALL, 6). +-define(SOCKET_RECV_FLAG_ERRQUEUE, 1). +-define(SOCKET_RECV_FLAG_OOB, 2). +-define(SOCKET_RECV_FLAG_PEEK, 3). +-define(SOCKET_RECV_FLAG_TRUNC, 4). -define(SOCKET_RECV_FLAGS_DEFAULT, []). -define(SOCKET_RECV_TIMEOUT_DEFAULT, infinity). @@ -287,7 +290,8 @@ open(Domain, Type, Protocol0, Extra) when is_map(Extra) -> SocketInfo = #{domain => Domain, type => Type, protocol => Protocol}, - Socket = {socket, SocketInfo, SockRef}, + Socket = #socket{info = SocketInfo, + ref = SockRef}, {ok, Socket}; {error, _} = ERROR -> ERROR @@ -352,7 +356,7 @@ bind(Socket, Addr) when is_tuple(Addr) orelse Reason :: term(). %% Shall we keep info about domain so that we can verify address? -bind({socket, _, SockRef}, Addr, Port) +bind(#socket{ref = SockRef}, Addr, Port) when (is_tuple(Addr) andalso ((size(Addr) =:= 4) orelse (size(Addr) =:= 8))) orelse ((Addr =:= any) orelse (Addr =:= loopback)) andalso @@ -385,7 +389,7 @@ connect(Socket, Addr, Port) -> connect(_Socket, _Addr, _Port, Timeout) when (is_integer(Timeout) andalso (Timeout =< 0)) -> {error, timeout}; -connect({socket, _, SockRef}, Addr, Port, Timeout) +connect(#socket{ref = SockRef}, Addr, Port, Timeout) when (is_tuple(Addr) andalso ((size(Addr) =:= 4) orelse (size(Addr) =:= 8))) andalso (is_integer(Port) andalso (Port >= 0)) andalso @@ -424,7 +428,7 @@ connect({socket, _, SockRef}, Addr, Port, Timeout) listen(Socket) -> listen(Socket, ?SOCKET_LISTEN_BACKLOG_DEFAULT). -listen({socket, _, SockRef}, Backlog) +listen(#socket{ref = SockRef}, Backlog) when (is_integer(Backlog) andalso (Backlog >= 0)) -> nif_listen(SockRef, Backlog). @@ -448,7 +452,7 @@ accept(Socket) -> %% Do we really need this optimization? accept(_, Timeout) when is_integer(Timeout) andalso (Timeout =< 0) -> {error, timeout}; -accept({socket, SI, LSockRef}, Timeout) +accept(#socket{info = SI, ref = LSockRef}, Timeout) when is_integer(Timeout) orelse (Timeout =:= infinity) -> do_accept(LSockRef, SI, Timeout). @@ -460,7 +464,8 @@ do_accept(LSockRef, SI, Timeout) -> SocketInfo = #{domain => maps:get(domain, SI), type => maps:get(type, SI), protocol => maps:get(protocol, SI)}, - Socket = {socket, SocketInfo, SockRef}, + Socket = #socket{info = SocketInfo, + ref = SockRef}, {ok, Socket}; {error, eagain} -> NewTimeout = next_timeout(TS, Timeout), @@ -499,9 +504,10 @@ send(Socket, Data, Timeout) -> send(Socket, Data, Flags, Timeout) when is_list(Data) -> Bin = erlang:list_to_binary(Data), send(Socket, Bin, Flags, Timeout); -send(Socket, Data, Flags, Timeout) when is_binary(Data) andalso is_list(Flags) -> +send(#socket{ref = SockRef}, Data, Flags, Timeout) + when is_binary(Data) andalso is_list(Flags) -> EFlags = enc_send_flags(Flags), - do_send(Socket, Data, EFlags, Timeout). + do_send(SockRef, Data, EFlags, Timeout). do_send(SockRef, Data, EFlags, Timeout) -> TS = timestamp(Timeout), @@ -562,14 +568,14 @@ sendto(Socket, Data, Flags, DestAddr, DestPort) -> sendto(Socket, Data, Flags, DestAddr, DestPort, Timeout) when is_list(Data) -> Bin = erlang:list_to_binary(Data), sendto(Socket, Bin, Flags, DestAddr, DestPort, Timeout); -sendto(Socket, Data, Flags, DestAddr, DestPort, Timeout) +sendto(#socket{ref = SockRef}, Data, Flags, DestAddr, DestPort, Timeout) when is_binary(Data) andalso is_list(Flags) andalso (is_tuple(DestAddr) orelse (DestAddr =:= null)) andalso is_integer(DestPort) andalso (is_integer(Timeout) orelse (Timeout =:= infinity)) -> EFlags = enc_send_flags(Flags), - do_sendto(Socket, Data, EFlags, DestAddr, DestPort, Timeout). + do_sendto(SockRef, Data, EFlags, DestAddr, DestPort, Timeout). do_sendto(SockRef, Data, EFlags, DestAddr, DestPort, Timeout) -> TS = timestamp(Timeout), @@ -726,18 +732,18 @@ recv(Socket, Length, Timeout) -> Data :: binary(), Reason :: term(). -recv(Socket, Length, Flags, Timeout) +recv(#socket{ref = SockRef}, Length, Flags, Timeout) when (is_integer(Length) andalso (Length >= 0)) andalso is_list(Flags) andalso (is_integer(Timeout) orelse (Timeout =:= infinity)) -> EFlags = enc_recv_flags(Flags), - do_recv(Socket, undefined, Length, EFlags, <<>>, EFlags). + do_recv(SockRef, undefined, Length, EFlags, <<>>, Timeout). %% We need to pass the "old recv ref" around because of the special case %% with Length = 0. This case makes it neccessary to have a timeout function %% clause since we may never wait for anything (no receive select), and so the %% the only timeout check will be the function clause. -do_recv({socket, _, SockRef} = Socket, _OldRef, Length, EFlags, Acc, Timeout) +do_recv(SockRef, _OldRef, Length, EFlags, Acc, Timeout) when (Timeout =:= infinity) orelse (is_integer(Timeout) andalso (Timeout > 0)) -> TS = timestamp(Timeout), @@ -754,7 +760,7 @@ do_recv({socket, _, SockRef} = Socket, _OldRef, Length, EFlags, Acc, Timeout) %% > 0 - We got a part of the message and we will be notified %% when there is more to read (a select message) {ok, false = _Complete, Bin} when (Length =:= 0) -> - do_recv(Socket, RecvRef, + do_recv(SockRef, RecvRef, Length, EFlags, <<Acc/binary, Bin/binary>>, next_timeout(TS, Timeout)); @@ -766,7 +772,7 @@ do_recv({socket, _, SockRef} = Socket, _OldRef, Length, EFlags, Acc, Timeout) NewTimeout = next_timeout(TS, Timeout), receive {select, SockRef, RecvRef, ready_input} -> - do_recv(Socket, RecvRef, + do_recv(SockRef, RecvRef, Length-size(Bin), EFlags, Bin, next_timeout(TS, Timeout)) @@ -781,7 +787,7 @@ do_recv({socket, _, SockRef} = Socket, _OldRef, Length, EFlags, Acc, Timeout) NewTimeout = next_timeout(TS, Timeout), receive {select, SockRef, RecvRef, ready_input} -> - do_recv(Socket, RecvRef, + do_recv(SockRef, RecvRef, Length-size(Bin), EFlags, <<Acc/binary, Bin/binary>>, next_timeout(TS, Timeout)) @@ -801,7 +807,7 @@ do_recv({socket, _, SockRef} = Socket, _OldRef, Length, EFlags, Acc, Timeout) NewTimeout = next_timeout(TS, Timeout), receive {select, SockRef, RecvRef, ready_input} -> - do_recv(Socket, RecvRef, + do_recv(SockRef, RecvRef, Length, EFlags, Acc, next_timeout(TS, Timeout)) @@ -819,37 +825,92 @@ do_recv({socket, _, SockRef} = Socket, _OldRef, Length, EFlags, Acc, Timeout) end; -do_recv({socket, _, SockRef} = _Socket, RecvRef, - 0 = _Length, _Eflags, Acc, _Timeout) -> +do_recv(SockRef, RecvRef, 0 = _Length, _Eflags, Acc, _Timeout) -> %% The current recv operation is to be cancelled, so no need for a ref... %% The cancel will end our 'read everything you have' and "activate" %% any waiting reader. nif_cancel(SockRef, recv, RecvRef), {ok, Acc}; -do_recv(_Socket, _RecvRef, _Length, _EFlags, Acc, _Timeout) when (size(Acc) > 0) -> +do_recv(_SockRef, _RecvRef, _Length, _EFlags, Acc, _Timeout) when (size(Acc) > 0) -> {error, {timeout, Acc}}; -do_recv(_Socket, _RecvRef, _Length, _EFlags, _Acc, _Timeout) -> +do_recv(_SockRef, _RecvRef, _Length, _EFlags, _Acc, _Timeout) -> {error, timeout}. %% --------------------------------------------------------------------------- %% +%% With recvfrom we get messages, which means that regardless of how +%% much we want to read, we return when we get a message. +%% The MaxSize argument basically defines the size of our receive +%% buffer. By setting the size to zero (0), we use the configured +%% size (see setopt). +%% It may be impossible to know what (buffer) size is appropriate +%% "in advance", and in those cases it may be convenient to use the +%% (recv) 'peek' flag. When this flag is provided the message is *not* +%% "consumed" from the underlying buffers, so another recvfrom call +%% is needed, possibly with a then adjusted buffer size. +%% recvfrom(Socket) -> - recvfrom(Socket, ?SOCKET_RECV_FLAGS_DEFAULT). + recvfrom(Socket, 0). + +recvfrom(Socket, BufSz) -> + recvfrom(Socket, BufSz, + ?SOCKET_RECV_FLAGS_DEFAULT, + ?SOCKET_RECV_TIMEOUT_DEFAULT). + + +recvfrom(Socket, Flags, Timeout) when is_list(Flags) -> + recvfrom(Socket, 0, Flags, Timeout); +recvfrom(Socket, BufSz, Flags) when is_list(Flags) -> + recvfrom(Socket, BufSz, Flags, ?SOCKET_RECV_TIMEOUT_DEFAULT); +recvfrom(Socket, BufSz, Timeout) -> + recvfrom(Socket, BufSz, ?SOCKET_RECV_FLAGS_DEFAULT, Timeout). + +-spec recvfrom(Socket, BufSz, Flags, Timeout) -> {ok, {SrcDomain, Source, Data}} | {error, Reason} when + Socket :: socket(), + BufSz :: non_neg_integer(), + Flags :: recv_flags(), + Timeout :: timeout(), + SrcDomain :: domain() | undefined, + Source :: {ip_address(), port_number()} | string() | undefined, + Data :: binary(), + Reason :: term(). + +recvfrom(#socket{ref = SockRef}, BufSz, Flags, Timeout) + when (is_integer(BufSz) andalso (BufSz >= 0)) andalso + is_list(Flags) andalso + (is_integer(Timeout) orelse (Timeout =:= infinity)) -> + EFlags = enc_recv_flags(Flags), + do_recvfrom(SockRef, BufSz, EFlags, Timeout). --spec recvfrom(Socket, Flags) -> {ok, Data, SrcAddr, SrcPort} | {error, Reason} when - Socket :: socket(), - Flags :: recv_flags(), - Data :: binary(), - SrcAddr :: ip_address(), - SrcPort :: port_number(), - Reason :: term(). +do_recvfrom(SockRef, BufSz, EFlags, Timeout) -> + TS = timestamp(Timeout), + RecvRef = make_ref(), + case nif_recvfrom(SockRef, RecvRef, BufSz, EFlags) of + {ok, {_Domain, _Source, _NewData}} = OK -> + OK; + + {error, eagain} -> + %% There is nothing just now, but we will be notified when there + %% is something to read (a select message). + NewTimeout = next_timeout(TS, Timeout), + receive + {select, SockRef, RecvRef, ready_input} -> + do_recvfrom(SockRef, BufSz, EFlags, + next_timeout(TS, Timeout)) + after NewTimeout -> + nif_cancel(SockRef, recvfrom, RecvRef), + flush_select_msgs(SockRef, RecvRef), + {error, timeout} + end; + + {error, _} = ERROR -> + ERROR + + end. -recvfrom({socket, _, SockRef}, Flags) when is_list(Flags) -> - EFlags = enc_recv_flags(Flags), - nif_recvfrom(SockRef, EFlags). %% --------------------------------------------------------------------------- @@ -998,7 +1059,6 @@ enc_protocol(Type, Proto) -> throw({error, {invalid_protocol, {Type, Proto}} enc_send_flags(Flags) -> EFlags = [{confirm, ?SOCKET_SEND_FLAG_CONFIRM}, {dontroute, ?SOCKET_SEND_FLAG_DONTROUTE}, - {dontwait, ?SOCKET_SEND_FLAG_DONTWAIT}, {eor, ?SOCKET_SEND_FLAG_EOR}, {more, ?SOCKET_SEND_FLAG_MORE}, {nosignal, ?SOCKET_SEND_FLAG_NOSIGNAL}, @@ -1010,12 +1070,10 @@ enc_send_flags(Flags) -> enc_recv_flags(Flags) -> EFlags = [{cmsg_cloexec, ?SOCKET_RECV_FLAG_CMSG_CLOEXEC}, - {dontwait, ?SOCKET_RECV_FLAG_DONTWAIT}, {errqueue, ?SOCKET_RECV_FLAG_ERRQUEUE}, {oob, ?SOCKET_RECV_FLAG_OOB}, {peek, ?SOCKET_RECV_FLAG_PEEK}, - {trunc, ?SOCKET_RECV_FLAG_TRUNC}, - {waitall, ?SOCKET_RECV_FLAG_WAITALL}], + {trunc, ?SOCKET_RECV_FLAG_TRUNC}], enc_flags(Flags, EFlags). @@ -1159,7 +1217,7 @@ nif_sendto(_SRef, _SendRef, _Data, _Flags, _Dest, _Port) -> nif_recv(_SRef, _RecvRef, _Length, _Flags) -> erlang:error(badarg). -nif_recvfrom(_SRef, _Flags) -> +nif_recvfrom(_SRef, _RecvRef, _Length, _Flags) -> erlang:error(badarg). nif_cancel(_SRef, _Op, _Ref) -> |