aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMicael Karlberg <[email protected]>2018-04-23 10:40:28 +0200
committerMicael Karlberg <[email protected]>2018-09-18 13:01:37 +0200
commit04335ca6aedfc5ad9f0d6a8d193dfd76a222291c (patch)
treecef0cfbfb688200f73f5690e545c23fbe1c319d1
parentd5aecb115070de76cb42b44edee6bbcb5f4a3724 (diff)
downloadotp-04335ca6aedfc5ad9f0d6a8d193dfd76a222291c.tar.gz
otp-04335ca6aedfc5ad9f0d6a8d193dfd76a222291c.tar.bz2
otp-04335ca6aedfc5ad9f0d6a8d193dfd76a222291c.zip
[socket-nif] Completed the recv and recvfrom functions
Also updated the socket type (now a record for easy use).
-rw-r--r--erts/emulator/nifs/common/socket_nif.c716
-rw-r--r--erts/preloaded/src/socket.erl172
2 files changed, 682 insertions, 206 deletions
diff --git a/erts/emulator/nifs/common/socket_nif.c b/erts/emulator/nifs/common/socket_nif.c
index d3aa3db2aa..6e6851a608 100644
--- a/erts/emulator/nifs/common/socket_nif.c
+++ b/erts/emulator/nifs/common/socket_nif.c
@@ -181,6 +181,9 @@ typedef unsigned int BOOLEAN_T;
#define BOOL2STR(__B__) ((__B__) ? "true" : "false")
#define BOOL2ATOM(__B__) ((__B__) ? atom_true : atom_false)
+/* Two byte integer decoding */
+#define get_int16(s) ((((unsigned char*) (s))[0] << 8) | \
+ (((unsigned char*) (s))[1]))
/* Debug stuff... */
#define SOCKET_NIF_DEBUG_DEFAULT TRUE
@@ -223,19 +226,6 @@ typedef unsigned long long llu_t;
/* *** Misc macros and defines *** */
-#define MALLOC(SZ) enif_alloc(SZ)
-#define FREE(P) enif_free(P)
-#define MKA(E,S) enif_make_atom(E, S)
-#define MKBIN(E,B) enif_make_binary(E, B)
-#define MKREF(E) enif_make_ref(E)
-#define MKT2(E,E1,E2) enif_make_tuple2(E, E1, E2)
-#define MKT3(E,E1,E2,E3) enif_make_tuple3(E, E1, E2, E3)
-#define MCREATE(N) enif_mutex_create(N)
-#define MLOCK(M) enif_mutex_lock(M)
-#define MUNLOCK(M) enif_mutex_unlock(M)
-#define SELECT(E,FD,M,O,P,R) \
- if (enif_select((E), (FD), (M), (O), (P), (R)) < 0) \
- return enif_make_badarg((E));
/* *** Socket state defs *** */
@@ -268,23 +258,20 @@ typedef unsigned long long llu_t;
#define SOCKET_SEND_FLAG_CONFIRM 0
#define SOCKET_SEND_FLAG_DONTROUTE 1
-#define SOCKET_SEND_FLAG_DONTWAIT 2
-#define SOCKET_SEND_FLAG_EOR 3
-#define SOCKET_SEND_FLAG_MORE 4
-#define SOCKET_SEND_FLAG_NOSIGNAL 5
-#define SOCKET_SEND_FLAG_OOB 6
+#define SOCKET_SEND_FLAG_EOR 2
+#define SOCKET_SEND_FLAG_MORE 3
+#define SOCKET_SEND_FLAG_NOSIGNAL 4
+#define SOCKET_SEND_FLAG_OOB 5
#define SOCKET_SEND_FLAG_LOW SOCKET_SEND_FLAG_CONFIRM
#define SOCKET_SEND_FLAG_HIGH SOCKET_SEND_FLAG_OOB
#define SOCKET_RECV_FLAG_CMSG_CLOEXEC 0
-#define SOCKET_RECV_FLAG_DONTWAIT 1
-#define SOCKET_RECV_FLAG_ERRQUEUE 2
-#define SOCKET_RECV_FLAG_OOB 3
-#define SOCKET_RECV_FLAG_PEEK 4
-#define SOCKET_RECV_FLAG_TRUNC 5
-#define SOCKET_RECV_FLAG_WAITALL 6
+#define SOCKET_RECV_FLAG_ERRQUEUE 1
+#define SOCKET_RECV_FLAG_OOB 2
+#define SOCKET_RECV_FLAG_PEEK 3
+#define SOCKET_RECV_FLAG_TRUNC 4
#define SOCKET_RECV_FLAG_LOW SOCKET_RECV_FLAG_CMSG_CLOEXEC
-#define SOCKET_RECV_FLAG_HIGH SOCKET_RECV_FLAG_WAITALL
+#define SOCKET_RECV_FLAG_HIGH SOCKET_RECV_FLAG_TRUNC
#define SOCKET_RECV_BUFFER_SIZE_DEFAULT 2048
@@ -342,6 +329,27 @@ typedef union {
* *
* =================================================================== */
+#define MALLOC(SZ) enif_alloc((SZ))
+#define FREE(P) enif_free((P))
+#define MKA(E,S) enif_make_atom((E), (S))
+#define MKBIN(E,B) enif_make_binary((E), (B))
+#define MKI(E,I) enif_make_int((E), (I))
+#define MKREF(E) enif_make_ref((E))
+#define MKS(E,S) enif_make_string((E), (S), ERL_NIF_LATIN1)
+#define MKSL(E,S,L) enif_make_string_len((E), (S), (L), ERL_NIF_LATIN1)
+#define MKSBIN(E,B,ST,SZ) enif_make_sub_binary((E), (B), (ST), (SZ))
+#define MKT2(E,E1,E2) enif_make_tuple2((E), (E1), (E2))
+#define MKT3(E,E1,E2,E3) enif_make_tuple3((E), (E1), (E2), (E3))
+#define MKT4(E,E1,E2,E3,E4) enif_make_tuple4((E), (E1), (E2), (E3), (E4))
+#define MKT8(E,E1,E2,E3,E4,E5,E6,E7,E8) \
+ enif_make_tuple8((E), (E1), (E2), (E3), (E4), (E5), (E6), (E7), (E8))
+#define MCREATE(N) enif_mutex_create((N))
+#define MLOCK(M) enif_mutex_lock((M))
+#define MUNLOCK(M) enif_mutex_unlock((M))
+#define SELECT(E,FD,M,O,P,R) \
+ if (enif_select((E), (FD), (M), (O), (P), (R)) < 0) \
+ return enif_make_badarg((E));
+
#define IS_ATOM(E, TE) enif_is_atom((E), (TE))
#define IS_BIN(E, TE) enif_is_binary((E), (TE))
#define IS_NUM(E, TE) enif_is_number((E), (TE))
@@ -379,12 +387,15 @@ typedef union {
#define sock_htonl(x) htonl((x))
#define sock_listen(s, b) listen((s), (b))
#define sock_name(s, addr, len) getsockname((s), (addr), (len))
+#define sock_ntohs(x) ntohs((x))
#define sock_open(domain, type, proto) \
make_noninheritable_handle(socket((domain), (type), (proto)))
#define sock_recv(s,buf,len,flag) recv((s),(buf),(len),(flag))
+#define sock_recvfrom(s,buf,blen,flag,addr,alen) \
+ recvfrom((s),(buf),(blen),(flag),(addr),(alen))
#define sock_send(s,buf,len,flag) send((s),(buf),(len),(flag))
#define sock_sendto(s,buf,blen,flag,addr,alen) \
- sendto((s),(buf),(blen),(flag),(addr),(alen))
+ sendto((s),(buf),(blen),(flag),(addr),(alen))
#define sock_errno() WSAGetLastError()
#define sock_create_event(s) WSACreateEvent()
@@ -410,8 +421,11 @@ static unsigned long one_value = 1;
#define sock_htonl(x) htonl((x))
#define sock_listen(s, b) listen((s), (b))
#define sock_name(s, addr, len) getsockname((s), (addr), (len))
+#define sock_ntohs(x) ntohs((x))
#define sock_open(domain, type, proto) socket((domain), (type), (proto))
#define sock_recv(s,buf,len,flag) recv((s),(buf),(len),(flag))
+#define sock_recvfrom(s,buf,blen,flag,addr,alen) \
+ recvfrom((s),(buf),(blen),(flag),(addr),(alen))
#define sock_send(s,buf,len,flag) send((s), (buf), (len), (flag))
#define sock_sendto(s,buf,blen,flag,addr,alen) \
sendto((s),(buf),(blen),(flag),(addr),(alen))
@@ -430,27 +444,23 @@ static unsigned long one_value = 1;
/* The general purpose sockaddr */
-typedef struct {
- union {
- struct sockaddr sa;
- struct sockaddr_in sai;
+typedef union {
+ struct sockaddr sa;
+ struct sockaddr_in sai;
#ifdef HAVE_IN6
- struct sockaddr_in6 sai6;
+ struct sockaddr_in6 sai6;
#endif
#ifdef HAVE_SYS_UN_H
- struct sockaddr_un sal;
+ struct sockaddr_un sal;
#endif
-
- } u;
- unsigned int len;
} SocketAddress;
#define which_address_port(sap) \
- ((((sap)->u.sai.sin_family == AF_INET) || \
- ((sap)->u.sai.sin_family == AF_INET6)) ? \
- ((sap)->u.sai.sin_port) : -1)
+ ((((sap)->sai.sin_family == AF_INET) || \
+ ((sap)->sai.sin_family == AF_INET6)) ? \
+ ((sap)->sai.sin_port) : -1)
typedef struct {
@@ -519,9 +529,10 @@ typedef struct {
SocketRequestor* currentAcceptorP; // NULL or points to currentReader
SocketRequestQueue acceptorsQ;
- /* +++ Misc stuff +++ */
- BOOLEAN_T iow; // Inform On Wrap
- BOOLEAN_T dbg;
+ /* +++ Config & Misc stuff +++ */
+ size_t rBufSz; // Read buffer size (when data length = 0 is specified)
+ BOOLEAN_T iow; // Inform On Wrap
+ BOOLEAN_T dbg;
} SocketDescriptor;
@@ -637,12 +648,18 @@ static ERL_NIF_TERM nsendto(ErlNifEnv* env,
ERL_NIF_TERM sendRef,
ErlNifBinary* dataP,
int flags,
- SocketAddress* toAddrP);
+ SocketAddress* toAddrP,
+ unsigned int toAddrLen);
static ERL_NIF_TERM nrecv(ErlNifEnv* env,
SocketDescriptor* descP,
ERL_NIF_TERM recvRef,
int len,
int flags);
+static ERL_NIF_TERM nrecvfrom(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ ERL_NIF_TERM recvRef,
+ uint16_t bufSz,
+ int flags);
static ERL_NIF_TERM send_check_result(ErlNifEnv* env,
SocketDescriptor* descP,
@@ -655,6 +672,13 @@ static ERL_NIF_TERM recv_check_result(ErlNifEnv* env,
int toRead,
ErlNifBinary* bufP,
ERL_NIF_TERM recvRef);
+static ERL_NIF_TERM recvfrom_check_result(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ int read,
+ ErlNifBinary* bufP,
+ SocketAddress* fromAddrP,
+ unsigned int fromAddrLen,
+ ERL_NIF_TERM recvRef);
static ERL_NIF_TERM nfinalize_connection(ErlNifEnv* env,
SocketDescriptor* descP);
@@ -663,36 +687,49 @@ static ERL_NIF_TERM nfinalize_connection(ErlNifEnv* env,
static char* decode_laddress(ErlNifEnv* env,
int domain,
ERL_NIF_TERM localAddr,
- SocketAddress* localP);
+ SocketAddress* localP,
+ unsigned int* addrLenP);
static char* decode_laddress_binary(ErlNifEnv* env,
int domain,
ERL_NIF_TERM localAddr,
- SocketAddress* localP);
+ SocketAddress* localP,
+ unsigned int* addrLenP);
static char* decode_laddress_tuple(ErlNifEnv* env,
int domain,
ERL_NIF_TERM laddr,
- SocketAddress* localP);
+ SocketAddress* localP,
+ unsigned int* addrLenP);
static char* decode_address_tuple(ErlNifEnv* env,
int domain,
const ERL_NIF_TERM* addrt,
int port,
- SocketAddress* localP);
+ SocketAddress* localP,
+ unsigned int* addrLenP);
static char* decode_address_atom(ErlNifEnv* env,
int domain,
char* addr,
int addrLen,
int port,
- SocketAddress* localP);
+ SocketAddress* localP,
+ unsigned int* addrLenP);
static char* decode_send_addr(ErlNifEnv* env,
int domain,
ERL_NIF_TERM addr,
int port,
- SocketAddress** toAddrP);
+ SocketAddress** toAddrP,
+ unsigned int* addrLenP);
static char* decode_send_addr_tuple(ErlNifEnv* env,
int domain,
ERL_NIF_TERM addr,
int port,
- SocketAddress* toAddrP);
+ SocketAddress* toAddrP,
+ unsigned int* addrLenP);
+static void encode_address(ErlNifEnv* env,
+ SocketAddress* fromAddrP,
+ unsigned int fromAddrLen,
+ ERL_NIF_TERM* fromDomainT,
+ ERL_NIF_TERM* fromSourceT);
+
static BOOLEAN_T verify_is_connected(SocketDescriptor* descP, int* err);
static SocketDescriptor* alloc_descriptor(SOCKET sock, HANDLE event);
@@ -716,6 +753,10 @@ static BOOLEAN_T restore_network_namespace(int ns, SOCKET sock, int* err);
static BOOLEAN_T cnt_inc(uint32_t* cnt, uint32_t inc);
+#if defined(HAVE_SYS_UN_H) || defined(SO_BINDTODEVICE)
+static size_t my_strnlen(const char *s, size_t maxlen);
+#endif
+
static void socket_dtor(ErlNifEnv* env, void* obj);
static void socket_stop(ErlNifEnv* env,
void* obj,
@@ -1198,22 +1239,23 @@ ERL_NIF_TERM nbind(ErlNifEnv* env,
ERL_NIF_TERM addr)
{
SocketAddress local;
+ unsigned int addrLen;
char* err;
int port;
- if ((err = decode_laddress(env, descP->domain, addr, &local)) != NULL)
+ if ((err = decode_laddress(env, descP->domain, addr, &local, &addrLen)) != NULL)
return make_error1(env, err);
if (IS_SOCKET_ERROR(sock_bind(descP->sock,
- (struct sockaddr*) &local.u, local.len))) {
+ (struct sockaddr*) &local, addrLen))) {
return make_error2(env, sock_errno());
}
port = which_address_port(&local);
if (port == 0) {
- SOCKLEN_T addrLen = sizeof(local.u);
- sys_memzero((char *) &local.u, addrLen);
- sock_name(descP->sock, &local.u.sa, &addrLen);
+ SOCKLEN_T len = sizeof(local);
+ sys_memzero((char *) &local, len);
+ sock_name(descP->sock, &local.sa, &len);
port = which_address_port(&local);
} else if (port == -1) {
port = 0;
@@ -1233,12 +1275,13 @@ static
char* decode_laddress(ErlNifEnv* env,
int domain,
ERL_NIF_TERM localAddr,
- SocketAddress* localP)
+ SocketAddress* localP,
+ unsigned int* addrLenP)
{
if (IS_BIN(env, localAddr)) {
- return decode_laddress_binary(env, domain, localAddr, localP);
+ return decode_laddress_binary(env, domain, localAddr, localP, addrLenP);
} else if (IS_TUPLE(env, localAddr)) {
- return decode_laddress_tuple(env, domain, localAddr, localP);
+ return decode_laddress_tuple(env, domain, localAddr, localP, addrLenP);
} else {
return str_einval;
}
@@ -1254,8 +1297,11 @@ static
char* decode_laddress_binary(ErlNifEnv* env,
int domain,
ERL_NIF_TERM localAddr,
- SocketAddress* localP)
+ SocketAddress* localP,
+ unsigned int* addrLenP)
{
+ unsigned int addrLen;
+
#ifdef HAVE_SYS_UN_H
ErlNifBinary bin;
@@ -1278,16 +1324,17 @@ char* decode_laddress_binary(ErlNifEnv* env,
#else
1
#endif
- ) > sizeof(localP->u.sal.sun_path))
+ ) > sizeof(localP->sal.sun_path))
return str_einval;
- sys_memzero((char*)&localP->u, sizeof(struct sockaddr_un));
- localP->u.sal.sun_family = domain;
- sys_memcpy(localP->u.sal.sun_path, bin.data, bin.size);
- localP->len = offsetof(struct sockaddr_un, sun_path) + bin.size;
+ sys_memzero((char*)localP, sizeof(struct sockaddr_un));
+ localP->sal.sun_family = domain;
+ sys_memcpy(localP->sal.sun_path, bin.data, bin.size);
+ addrLen = offsetof(struct sockaddr_un, sun_path) + bin.size;
#ifndef NO_SA_LEN
- localP->u.sal.sun_len = localP->len;
+ localP->u.sal.sun_len = addrLen;
#endif
+ *addrLenP = addrLen;
return NULL;
#else // HAVE_SYS_UN_H
@@ -1312,7 +1359,8 @@ static
char* decode_laddress_tuple(ErlNifEnv* env,
int domain,
ERL_NIF_TERM laddr,
- SocketAddress* localP)
+ SocketAddress* localP,
+ unsigned int* addrLenP)
{
const ERL_NIF_TERM* laddrt;
int laddrtSz;
@@ -1365,7 +1413,7 @@ char* decode_laddress_tuple(ErlNifEnv* env,
return decode_address_tuple(env,
domain, addrt, port,
- localP);
+ localP, addrLenP);
} else if (IS_ATOM(env, laddrt[0]) &&
IS_NUM(env, laddrt[1])) {
@@ -1388,7 +1436,7 @@ char* decode_laddress_tuple(ErlNifEnv* env,
return decode_address_atom(env,
domain, a, len, port,
- localP);
+ localP, addrLenP);
} else {
return str_einval;
@@ -1457,8 +1505,9 @@ ERL_NIF_TERM nconnect(ErlNifEnv* env,
const ERL_NIF_TERM* addr,
int port)
{
- int code;
- char* xerr;
+ unsigned int addrLen;
+ int code;
+ char* xerr;
/* Verify that we are where in the proper state */
@@ -1473,11 +1522,11 @@ ERL_NIF_TERM nconnect(ErlNifEnv* env,
if ((xerr = decode_address_tuple(env,
descP->domain, addr, port,
- &descP->remote)) != NULL)
+ &descP->remote, &addrLen)) != NULL)
return make_error1(env, xerr);
code = sock_connect(descP->sock,
- (struct sockaddr*) &descP->remote.u, descP->remote.len);
+ (struct sockaddr*) &descP->remote, addrLen);
if (IS_SOCKET_ERROR(code) &&
((sock_errno() == ERRNO_BLOCK) || /* Winsock2 */
@@ -1572,8 +1621,8 @@ BOOLEAN_T verify_is_connected(SocketDescriptor* descP, int* err)
#ifndef SO_ERROR
int sz, code;
- sz = sizeof(descP->inet.remote);
- sys_memzero((char *) &descP->inet.remote, sz);
+ sz = sizeof(descP->remote);
+ sys_memzero((char *) &descP->remote, sz);
code = sock_peer(desc->sock,
(struct sockaddr*) &descP->remote, &sz);
@@ -1735,7 +1784,7 @@ ERL_NIF_TERM naccept_listening(ErlNifEnv* env,
if (enif_self(env, &caller) == NULL)
return make_error(env, atom_exself);
- n = sizeof(descP->remote.u);
+ n = sizeof(remote);
sys_memzero((char *) &remote, n);
accSock = sock_accept(descP->sock, (struct sockaddr*) &remote, &n);
if (accSock == INVALID_SOCKET) {
@@ -1869,7 +1918,7 @@ ERL_NIF_TERM naccept_accepting(ErlNifEnv* env,
return make_error(env, atom_exbusy);
}
- n = sizeof(descP->remote.u);
+ n = sizeof(descP->remote);
sys_memzero((char *) &remote, n);
accSock = sock_accept(descP->sock, (struct sockaddr*) &remote, &n);
if (accSock == INVALID_SOCKET) {
@@ -2074,6 +2123,7 @@ ERL_NIF_TERM nif_sendto(ErlNifEnv* env,
int port;
SocketAddress remoteAddr;
SocketAddress* remoteAddrP = &remoteAddr;
+ unsigned int remoteAddrLen;
char* xerr;
// ERL_NIF_TERM res;
@@ -2098,10 +2148,11 @@ ERL_NIF_TERM nif_sendto(ErlNifEnv* env,
if ((xerr = decode_send_addr(env, descP->domain,
addr, port,
- &remoteAddrP)) != NULL)
+ &remoteAddrP,
+ &remoteAddrLen)) != NULL)
return make_error1(env, xerr);
- return nsendto(env, descP, sendRef, &data, flags, remoteAddrP);
+ return nsendto(env, descP, sendRef, &data, flags, remoteAddrP, remoteAddrLen);
}
@@ -2111,7 +2162,8 @@ ERL_NIF_TERM nsendto(ErlNifEnv* env,
ERL_NIF_TERM sendRef,
ErlNifBinary* dataP,
int flags,
- SocketAddress* toAddrP)
+ SocketAddress* toAddrP,
+ unsigned int toAddrLen)
{
ssize_t written;
@@ -2126,7 +2178,7 @@ ERL_NIF_TERM nsendto(ErlNifEnv* env,
if (toAddrP != NULL) {
written = sock_sendto(descP->sock,
dataP->data, dataP->size, flags,
- &toAddrP->u.sa, toAddrP->len);
+ &toAddrP->sa, toAddrLen);
} else {
written = sock_sendto(descP->sock,
dataP->data, dataP->size, flags,
@@ -2273,6 +2325,11 @@ ERL_NIF_TERM nif_recv(ErlNifEnv* env,
}
+/* The (read) buffer handling *must* be optimized!
+ * But for now we make it easy for ourselves by
+ * allocating a binary (of the specified or default
+ * size) and then throwing it away...
+ */
static
ERL_NIF_TERM nrecv(ErlNifEnv* env,
SocketDescriptor* descP,
@@ -2286,7 +2343,11 @@ ERL_NIF_TERM nrecv(ErlNifEnv* env,
if (!descP->isReadable)
return enif_make_badarg(env);
- if (!ALLOC_BIN((len ? len : SOCKET_RECV_BUFFER_SIZE_DEFAULT), &buf))
+ /* Allocate a buffer:
+ * Either as much as we want to read or (if zero (0)) use the "default"
+ * size (what has been configured).
+ */
+ if (!ALLOC_BIN((len ? len : descP->rBufSz), &buf))
return make_error(env, atom_exalloc);
/* We ignore the wrap for the moment.
@@ -2305,6 +2366,121 @@ ERL_NIF_TERM nrecv(ErlNifEnv* env,
/* ----------------------------------------------------------------------
+ * nif_recvfrom
+ *
+ * Description:
+ * Receive a message on a socket.
+ * Normally used only on a (un-) connected socket!
+ * If a buffer size = 0 is specified, then the we will use the default
+ * buffer size for this socket (whatever has been configured).
+ *
+ * Arguments:
+ * Socket (ref) - Points to the socket descriptor.
+ * RecvRef - A unique id for this (send) request.
+ * BufSz - Size of the buffer into which we put the received message.
+ * Flags - Receive flags.
+ */
+
+static
+ERL_NIF_TERM nif_recvfrom(ErlNifEnv* env,
+ int argc,
+ const ERL_NIF_TERM argv[])
+{
+ SocketDescriptor* descP;
+ ERL_NIF_TERM recvRef;
+ unsigned int bufSz;
+ unsigned int eflags;
+ int flags;
+ ERL_NIF_TERM res;
+
+ if ((argc != 4) ||
+ !enif_get_resource(env, argv[0], sockets, (void**) &descP) ||
+ !GET_UINT(env, argv[2], &bufSz) ||
+ !GET_UINT(env, argv[3], &eflags)) {
+ return enif_make_badarg(env);
+ }
+ recvRef = argv[1];
+
+ /* if (IS_OPEN(descP)) */
+ /* return make_error(env, atom_enotconn); */
+
+ if (!erecvflags2recvflags(eflags, &flags))
+ return enif_make_badarg(env);
+
+ MLOCK(descP->readMtx);
+
+ /* <KOLLA>
+ * We need to handle the case when another process tries
+ * to receive at the same time.
+ * If the current recv could not read its entire package
+ * this time (resulting in an select). The read of the
+ * other process must be made to wait until current
+ * is done!
+ * Basically, we need a read queue!
+ *
+ * A 'reading' field (boolean), which is set if we did
+ * not manage to read the entire message and reset every
+ * time we do.
+ * </KOLLA>
+ */
+
+ res = nrecvfrom(env, descP, recvRef, bufSz, flags);
+
+ MUNLOCK(descP->readMtx);
+
+ return res;
+
+}
+
+
+/* The (read) buffer handling *must* be optimized!
+ * But for now we make it easy for ourselves by
+ * allocating a binary (of the specified or default
+ * size) and then throwing it away...
+ */
+static
+ERL_NIF_TERM nrecvfrom(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ ERL_NIF_TERM recvRef,
+ uint16_t bufSz,
+ int flags)
+{
+ SocketAddress fromAddr;
+ unsigned int addrLen;
+ ssize_t read;
+ ErlNifBinary buf;
+
+ if (!descP->isReadable)
+ return enif_make_badarg(env);
+
+ /* Allocate a buffer:
+ * Either as much as we want to read or (if zero (0)) use the "default"
+ * size (what has been configured).
+ */
+ if (!ALLOC_BIN((bufSz ? bufSz : descP->rBufSz), &buf))
+ return make_error(env, atom_exalloc);
+
+ /* We ignore the wrap for the moment.
+ * Maybe we should issue a wrap-message to controlling process...
+ */
+ cnt_inc(&descP->readTries, 1);
+
+ addrLen = sizeof(fromAddr);
+ sys_memzero((char*) &fromAddr, addrLen);
+
+ read = sock_recvfrom(descP->sock, buf.data, buf.size, flags,
+ &fromAddr.sa, &addrLen);
+
+ return recvfrom_check_result(env, descP,
+ read,
+ &buf,
+ &fromAddr, addrLen,
+ recvRef);
+}
+
+
+
+/* ----------------------------------------------------------------------
* U t i l i t y F u n c t i o n s
* ----------------------------------------------------------------------
*/
@@ -2338,7 +2514,11 @@ ERL_NIF_TERM send_check_result(ErlNifEnv* env,
/* Ok, try again later */
- written = 0; // SHOULD RESULT IN {error, eagain}!!!!
+ /* <KOLLA>
+ * SHOULD RESULT IN {error, eagain}!!!!
+ * </KOLLA>
+ */
+ written = 0;
}
}
@@ -2366,11 +2546,11 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env,
ErlNifBinary* bufP,
ERL_NIF_TERM recvRef)
{
+ ERL_NIF_TERM data;
+
/* There is a special case: If the provided 'to read' value is
- * zero (0). That means that if we filled the (default size)
- * buffer, we need to continue to read (since there *may* be
- * more data), but we cannot loop here. Instead we inform the
- * caller that it must call again.
+ * zero (0). That means that we reads as much as we can, using
+ * the default read buffer size.
*/
if (bufP->size == read) {
@@ -2395,7 +2575,9 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env,
* => We choose alt 1 for now.
*/
- return make_ok3(env, atom_false, MKBIN(env, bufP));
+ data = MKBIN(env, bufP);
+
+ return make_ok3(env, atom_false, data);
} else {
@@ -2406,7 +2588,9 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env,
* </KOLLA>
*/
- return make_ok3(env, atom_true, MKBIN(env, bufP));
+ data = MKBIN(env, bufP);
+
+ return make_ok3(env, atom_true, data);
}
@@ -2421,6 +2605,89 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env,
/* +++ Oups - closed +++ */
/* <KOLLA>
+ *
+ * IF THE CURRENT PROCESS IS *NOT* THE CONTROLLING
+ * PROCESS, WE NEED TO INFORM IT!!!
+ *
+ * ALL WAITING PROCESSES MUST ALSO GET THE ERROR!!
+ *
+ * </KOLLA>
+ */
+
+ SELECT(env,
+ descP->sock,
+ (ERL_NIF_SELECT_STOP),
+ descP, NULL, recvRef);
+
+ return make_error(env, atom_closed);
+
+ } else if ((save_errno == ERRNO_BLOCK) ||
+ (save_errno == EAGAIN)) {
+ return make_error(env, atom_eagain);
+ } else {
+ return make_error2(env, save_errno);
+ }
+
+ } else {
+
+ /* +++ We did not fill the buffer +++ */
+
+ if (toRead == 0) {
+
+ /* +++ We got a chunk of data but +++
+ * +++ since we did not fill the +++
+ * +++ buffer, we must split it +++
+ * +++ into a sub-binary. +++
+ */
+
+ data = MKBIN(env, bufP);
+ data = MKSBIN(env, data, 0, read);
+
+ return make_ok3(env, atom_true, data);
+
+ } else {
+
+ /* +++ We got only a part of what was expected +++
+ * +++ => receive more later. +++ */
+
+ return make_ok3(env, atom_false, MKBIN(env, bufP));
+ }
+ }
+}
+
+
+/* The recvfrom function delivers one (1) message. If our buffer
+ * is to small, the message will be truncated. So, regardless
+ * if we filled the buffer or not, we have got what we are going
+ * to get regarding this message.
+ */
+static
+ERL_NIF_TERM recvfrom_check_result(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ int read,
+ ErlNifBinary* bufP,
+ SocketAddress* fromAddrP,
+ unsigned int fromAddrLen,
+ ERL_NIF_TERM recvRef)
+{
+ ERL_NIF_TERM data;
+
+ /* There is a special case: If the provided 'to read' value is
+ * zero (0). That means that we reads as much as we can, using
+ * the default read buffer size.
+ */
+
+ if (read < 0) {
+
+ /* +++ Error handling +++ */
+
+ int save_errno = sock_errno();
+
+ if (save_errno == ECONNRESET) {
+
+ /* +++ Oups - closed +++ */
+
+ /* <KOLLA>
* IF THE CURRENT PROCESS IS *NOT* THE CONTROLLING
* PROCESS, WE NEED TO INFORM IT!!!
*
@@ -2445,9 +2712,29 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env,
} else {
- /* +++ We got only a part of what was expected - receive more later +++ */
+ /* +++ We sucessfully got a message - time to encode the address +++ */
+
+ ERL_NIF_TERM fromDomainT, fromSourceT;
- return make_ok3(env, atom_false, MKBIN(env, bufP));
+ encode_address(env,
+ fromAddrP, fromAddrLen,
+ &fromDomainT, &fromSourceT);
+
+ if (read == bufP->size) {
+ data = MKBIN(env, bufP);
+ } else {
+
+ /* +++ We got a chunk of data but +++
+ * +++ since we did not fill the +++
+ * +++ buffer, we must split it +++
+ * +++ into a sub-binary. +++
+ */
+
+ data = MKBIN(env, bufP);
+ data = MKSBIN(env, data, 0, read);
+ }
+
+ return make_ok2(env, MKT3(env, fromDomainT, fromSourceT, data));
}
}
@@ -2469,7 +2756,8 @@ char* decode_send_addr(ErlNifEnv* env,
int domain,
ERL_NIF_TERM addr,
int port,
- SocketAddress** toAddrP)
+ SocketAddress** toAddrP,
+ unsigned int* toAddrLenP)
{
if (IS_ATOM(env, addr)) {
unsigned int len;
@@ -2493,7 +2781,8 @@ char* decode_send_addr(ErlNifEnv* env,
} else if (IS_TUPLE(env, addr)) {
/* We now know that the we have a proper address. */
- return decode_send_addr_tuple(env, domain, addr, port, *toAddrP);
+ return decode_send_addr_tuple(env, domain, addr, port,
+ *toAddrP, toAddrLenP);
} else {
return str_einval;
}
@@ -2505,7 +2794,8 @@ char* decode_send_addr_tuple(ErlNifEnv* env,
int domain,
ERL_NIF_TERM addr,
int port,
- SocketAddress* toAddrP)
+ SocketAddress* toAddrP,
+ unsigned int* toAddrLenP)
{
/* We handle two different tuples:
* - size 4 (INET)
@@ -2538,7 +2828,7 @@ char* decode_send_addr_tuple(ErlNifEnv* env,
return decode_address_tuple(env, domain,
addrt, port,
- toAddrP);
+ toAddrP, toAddrLenP);
}
@@ -2551,7 +2841,8 @@ char* decode_address_tuple(ErlNifEnv* env,
int domain,
const ERL_NIF_TERM* addrt,
int port,
- SocketAddress* addrP)
+ SocketAddress* addrP,
+ unsigned int* addrLenP)
{
/* We now *know* that the size of the tuple is correct,
@@ -2564,19 +2855,19 @@ char* decode_address_tuple(ErlNifEnv* env,
int a, v;
char laddr[4];
- sys_memzero((char*)&addrP->u, sizeof(struct sockaddr_in));
+ sys_memzero((char*)addrP, sizeof(struct sockaddr_in));
#ifndef NO_SA_LEN
- addrP->u.sai.sin_len = sizeof(struct sockaddr_in);
+ addrP->sai.sin_len = sizeof(struct sockaddr_in);
#endif
- addrP->u.sai.sin_family = domain;
- addrP->u.sai.sin_port = sock_htons(port);
+ addrP->sai.sin_family = domain;
+ addrP->sai.sin_port = sock_htons(port);
for (a = 0; a < 4; a++) {
if (!GET_INT(env, addrt[a], &v))
return str_einval;
laddr[a] = v;
}
- sys_memcpy(&addrP->u.sai.sin_addr, &laddr, sizeof(laddr));
- addrP->len = sizeof(struct sockaddr_in);
+ sys_memcpy(&addrP->sai.sin_addr, &laddr, sizeof(laddr));
+ *addrLenP = sizeof(struct sockaddr_in);
return NULL;
}
break;
@@ -2587,13 +2878,13 @@ char* decode_address_tuple(ErlNifEnv* env,
int a, v;
char laddr[16];
- sys_memzero((char*)&addrP->u, sizeof(struct sockaddr_in6));
+ sys_memzero((char*)addrP, sizeof(struct sockaddr_in6));
#ifndef NO_SA_LEN
- addrP->u.sai6.sin6_len = sizeof(struct sockaddr_in6);
+ addrP->sai6.sin6_len = sizeof(struct sockaddr_in6);
#endif
- addrP->u.sai6.sin6_family = domain;
- addrP->u.sai6.sin6_port = sock_htons(port);
- addrP->u.sai6.sin6_flowinfo = 0;
+ addrP->sai6.sin6_family = domain;
+ addrP->sai6.sin6_port = sock_htons(port);
+ addrP->sai6.sin6_flowinfo = 0;
/* The address tuple is of size 8
* and each element is a two byte integer
*/
@@ -2603,8 +2894,8 @@ char* decode_address_tuple(ErlNifEnv* env,
laddr[a*2 ] = ((v >> 8) & 0xFF);
laddr[a*2+1] = (v & 0xFF);
}
- sys_memcpy(&addrP->u.sai6.sin6_addr, &laddr, sizeof(laddr));
- addrP->len = sizeof(struct sockaddr_in6);
+ sys_memcpy(&addrP->sai6.sin6_addr, &laddr, sizeof(laddr));
+ *addrLenP = sizeof(struct sockaddr_in6);
return NULL;
}
break;
@@ -2616,6 +2907,131 @@ char* decode_address_tuple(ErlNifEnv* env,
}
+/* Encode the 4- or 8-element address tuple from the socket address structure.
+ *
+ * This function is called when we have received a message. So, if we for some
+ * reason fail to decode the address or parts of it, it makes more sense to
+ * return with "undefined" for the values rather then fail completely (and not
+ * deliver the received message).
+ *
+ * Returns two things (assuming the encode works):
+ *
+ * Domain: inet | inet6 | local
+ * Source: {Address, Port} | string()
+ *
+ */
+static
+void encode_address(ErlNifEnv* env,
+ SocketAddress* addrP,
+ unsigned int addrLen,
+ ERL_NIF_TERM* domainT,
+ ERL_NIF_TERM* sourceT)
+{
+ short port;
+
+ switch (addrP->sa.sa_family) {
+
+ /* +++ inet (IPv4) +++ */
+
+ case AF_INET:
+ if (addrLen >= sizeof(struct sockaddr_in)) {
+ ERL_NIF_TERM addrT, portT;
+ unsigned int i;
+ ERL_NIF_TERM at4[4];
+ char* a4 = (char*) &addrP->sai.sin_addr;
+
+ port = sock_ntohs(addrP->sai.sin_port);
+ for (i = 0; i < 4; i++) {
+ at4[i] = MKI(env, a4[i]);
+ }
+
+ *domainT = MKA(env, "inet"); // Shall we encode these? See decode
+ addrT = MKT4(env, at4[0], at4[1], at4[2], at4[3]);
+ portT = MKI(env, port);
+ *sourceT = MKT2(env, addrT, portT);
+ } else {
+ *domainT = atom_undefined;
+ *sourceT = atom_undefined;
+ }
+ break;
+
+
+ /* +++ inet6 (IPv6) +++ */
+
+#if defined(HAVE_IN6) && defined(AF_INET6)
+ case AF_INET6:
+ if (addrLen >= sizeof(struct sockaddr_in6)) {
+ ERL_NIF_TERM addrT, portT;
+ unsigned int i;
+ ERL_NIF_TERM at6[8];
+ char* a16 = (char*) &addrP->sai6.sin6_addr;
+
+ port = sock_ntohs(addrP->sai6.sin6_port);
+ /* The address tuple is of size 8
+ * and each element is a two byte integer
+ */
+ for (i = 0; i < 8; i++) {
+ // at6[i] = MKI(env, get_int16(a16[i*2]));
+ at6[i] = MKI(env, get_int16(a16 + i*2));
+ }
+
+ *domainT = MKA(env, "inet6"); // Shall we encode these? See decode
+ addrT = MKT8(env,
+ at6[0], at6[1], at6[2], at6[3],
+ at6[4], at6[5], at6[6], at6[7]);
+ portT = MKI(env, port);
+ *sourceT = MKT2(env, addrT, portT);
+ } else {
+ *domainT = atom_undefined;
+ *sourceT = atom_undefined;
+ }
+ break;
+#endif
+
+ /* +++ local (Unix Domain Sockets) +++ */
+
+#ifdef HAVE_SYS_UN_H
+ case AF_UNIX:
+ {
+ size_t n, m;
+
+ *domainT = MKA(env, "local");
+ if (addrLen < offsetof(struct sockaddr_un, sun_path)) {
+ *sourceT = atom_undefined;
+ } else {
+ n = addrLen - offsetof(struct sockaddr_un, sun_path);
+ if (255 < n) {
+ *sourceT = atom_undefined;
+ } else {
+ m = my_strnlen(addrP->sal.sun_path, n);
+#ifdef __linux__
+ /* Assume that the address is a zero terminated string,
+ * except when the first byte is \0 i.e the string length is 0,
+ * then use the reported length instead.
+ * This fix handles Linux's nonportable
+ * abstract socket address extension.
+ */
+ if (m == 0) {
+ m = n;
+ }
+#endif
+
+ *sourceT = MKSL(env, addrP->sal.sun_path, m);
+ }
+ }
+ }
+ break;
+#endif
+
+ default:
+ *domainT = atom_undefined;
+ *sourceT = atom_undefined;
+ break;
+
+ } /* switch (addrP->sa.sa_family) */
+
+}
+
/* Decode the address when its an atom.
* Currently we only accept two atoms: 'any' and 'loopback'
@@ -2626,7 +3042,8 @@ char* decode_address_atom(ErlNifEnv* env,
char* addr,
int addrLen,
int port,
- SocketAddress* localP)
+ SocketAddress* addrP,
+ unsigned int* addrLenP)
{
BOOLEAN_T any;
@@ -2649,14 +3066,14 @@ char* decode_address_atom(ErlNifEnv* env,
} else {
addr.s_addr = sock_htonl(INADDR_LOOPBACK);
}
- sys_memzero((char*) localP, sizeof(struct sockaddr_in));
+ sys_memzero((char*) addrP, sizeof(struct sockaddr_in));
#ifndef NO_SA_LEN
- localP->u.sai.sin_len = sizeof(struct sockaddr_in6);
+ addrP->sai.sin_len = sizeof(struct sockaddr_in6);
#endif
- localP->u.sai.sin_family = domain;
- localP->u.sai.sin_port = sock_htons(port);
- localP->u.sai.sin_addr.s_addr = addr.s_addr;
- localP->len = sizeof(struct sockaddr_in);
+ addrP->sai.sin_family = domain;
+ addrP->sai.sin_port = sock_htons(port);
+ addrP->sai.sin_addr.s_addr = addr.s_addr;
+ *addrLenP = sizeof(struct sockaddr_in);
}
break;
@@ -2669,15 +3086,15 @@ char* decode_address_atom(ErlNifEnv* env,
} else {
paddr = &in6addr_loopback;
}
- sys_memzero((char*)localP, sizeof(struct sockaddr_in6));
+ sys_memzero((char*)addrP, sizeof(struct sockaddr_in6));
#ifndef NO_SA_LEN
- localP->u.sai6.sin6_len = sizeof(struct sockaddr_in6);
+ addrP->sai6.sin6_len = sizeof(struct sockaddr_in6);
#endif
- localP->u.sai6.sin6_family = domain;
- localP->u.sai6.sin6_port = sock_htons(port);
- localP->u.sai6.sin6_flowinfo = 0;
- localP->u.sai6.sin6_addr = *paddr;
- localP->len = sizeof(struct sockaddr_in6);
+ addrP->sai6.sin6_family = domain;
+ addrP->sai6.sin6_port = sock_htons(port);
+ addrP->sai6.sin6_flowinfo = 0;
+ addrP->sai6.sin6_addr = *paddr;
+ *addrLenP = sizeof(struct sockaddr_in6);
}
break;
#endif
@@ -2709,12 +3126,23 @@ SocketDescriptor* alloc_descriptor(SOCKET sock, HANDLE event)
descP->currentWriterP = NULL; // currentWriter not used
descP->writersQ.first = NULL;
descP->writersQ.last = NULL;
+ descP->isWritable = TRUE;
+ descP->writePkgCnt = 0;
+ descP->writeByteCnt = 0;
+ descP->writeTries = 0;
+ descP->writeWaits = 0;
+ descP->writeFails = 0;
sprintf(buf, "socket[r,%d]", sock);
descP->readMtx = MCREATE(buf);
descP->currentReaderP = NULL; // currentReader not used
descP->readersQ.first = NULL;
descP->readersQ.last = NULL;
+ descP->isReadable = TRUE;
+ descP->readPkgCnt = 0;
+ descP->readByteCnt = 0;
+ descP->readTries = 0;
+ descP->readWaits = 0;
sprintf(buf, "socket[acc,%d]", sock);
descP->accMtx = MCREATE(buf);
@@ -2722,22 +3150,12 @@ SocketDescriptor* alloc_descriptor(SOCKET sock, HANDLE event)
descP->acceptorsQ.first = NULL;
descP->acceptorsQ.last = NULL;
- descP->iow = FALSE;
- descP->dbg = SOCKET_DEBUG_DEFAULT;
- descP->isWritable = TRUE;
- descP->isReadable = TRUE;
- descP->writePkgCnt = 0;
- descP->writeByteCnt = 0;
- descP->writeTries = 0;
- descP->writeWaits = 0;
- descP->writeFails = 0;
- descP->readPkgCnt = 0;
- descP->readByteCnt = 0;
- descP->readTries = 0;
- descP->readWaits = 0;
-
- descP->sock = sock;
- descP->event = event;
+ descP->rBufSz = SOCKET_RECV_BUFFER_SIZE_DEFAULT;
+ descP->iow = FALSE;
+ descP->dbg = SOCKET_DEBUG_DEFAULT;
+
+ descP->sock = sock;
+ descP->event = event;
}
@@ -2931,10 +3349,6 @@ BOOLEAN_T esendflags2sendflags(unsigned int esendflags, int* sendflags)
tmp |= MSG_DONTROUTE;
break;
- case SOCKET_SEND_FLAG_DONTWAIT:
- tmp |= MSG_DONTWAIT;
- break;
-
case SOCKET_SEND_FLAG_EOR:
tmp |= MSG_EOR;
break;
@@ -2979,10 +3393,6 @@ BOOLEAN_T erecvflags2recvflags(unsigned int erecvflags, int* recvflags)
tmp |= MSG_CMSG_CLOEXEC;
break;
- case SOCKET_RECV_FLAG_DONTWAIT:
- tmp |= MSG_DONTWAIT;
- break;
-
case SOCKET_RECV_FLAG_ERRQUEUE:
tmp |= MSG_ERRQUEUE;
break;
@@ -2999,10 +3409,6 @@ BOOLEAN_T erecvflags2recvflags(unsigned int erecvflags, int* recvflags)
tmp |= MSG_TRUNC;
break;
- case SOCKET_RECV_FLAG_WAITALL:
- tmp |= MSG_WAITALL;
- break;
-
default:
return FALSE;
}
@@ -3015,6 +3421,18 @@ BOOLEAN_T erecvflags2recvflags(unsigned int erecvflags, int* recvflags)
}
+#if defined(HAVE_SYS_UN_H) || defined(SO_BINDTODEVICE)
+/* strnlen doesn't exist everywhere */
+static
+size_t my_strnlen(const char *s, size_t maxlen)
+{
+ size_t i = 0;
+ while (i < maxlen && s[i] != '\0')
+ i++;
+ return i;
+}
+#endif
+
/* Create an ok two (2) tuple in the form: {ok, Any}.
* The second element (Any) is already in the form of an
diff --git a/erts/preloaded/src/socket.erl b/erts/preloaded/src/socket.erl
index 0dadcecaa0..bae561cd51 100644
--- a/erts/preloaded/src/socket.erl
+++ b/erts/preloaded/src/socket.erl
@@ -39,7 +39,7 @@
%% writev/4, OR SENDV? It will be strange for recv then: recvv (instead of readv)
recv/2, recv/3, recv/4,
- recvfrom/1, recvfrom/2,
+ recvfrom/1, recvfrom/2, recvfrom/3, recvfrom/4,
%% recvmsg/4,
%% readv/3,
@@ -106,10 +106,10 @@
-type port_number() :: 0..65535.
-type socket_info() :: map().
-%% -record(socket, {info :: socket_info,
-%% ref :: reference()}).
--opaque socket() :: {socket, socket_info(), reference()}.
-%% -opaque socket() :: #socket{}.
+-record(socket, {info :: socket_info(),
+ ref :: reference()}).
+%% -opaque socket() :: {socket, socket_info(), reference()}.
+-opaque socket() :: #socket{}.
-type accept_flags() :: [accept_flag()].
-type accept_flag() :: nonblock | cloexec.
@@ -117,20 +117,26 @@
-type send_flags() :: [send_flag()].
-type send_flag() :: confirm |
dontroute |
- dontwait |
eor |
more |
nosignal |
oob.
+%% Extend with OWN flags for other usage:
+%% - adapt-buffer-sz:
+%% This will have the effect that the nif recvfrom will use
+%% MSG_PEEK to ensure no part of the message is lost, but if
+%% necessary adapt (increase) the buffer size until all of
+%% it fits.
+%%
+%% Note that not all of these flags is useful for every recv function!
+%%
-type recv_flags() :: [recv_flag()].
-type recv_flag() :: cmsg_cloexec |
- dontwait |
errqueue |
oob |
peek |
- trunc |
- waitall.
+ trunc.
-type setopt_key() :: foo.
-type getopt_key() :: foo.
@@ -177,23 +183,20 @@
-define(SOCKET_SEND_FLAG_CONFIRM, 0).
-define(SOCKET_SEND_FLAG_DONTROUTE, 1).
--define(SOCKET_SEND_FLAG_DONTWAIT, 2).
--define(SOCKET_SEND_FLAG_EOR, 3).
--define(SOCKET_SEND_FLAG_MORE, 4).
--define(SOCKET_SEND_FLAG_NOSIGNAL, 5).
--define(SOCKET_SEND_FLAG_OOB, 6).
+-define(SOCKET_SEND_FLAG_EOR, 2).
+-define(SOCKET_SEND_FLAG_MORE, 3).
+-define(SOCKET_SEND_FLAG_NOSIGNAL, 4).
+-define(SOCKET_SEND_FLAG_OOB, 5).
-define(SOCKET_SEND_FLAGS_DEFAULT, []).
-define(SOCKET_SEND_TIMEOUT_DEFAULT, infinity).
-define(SOCKET_SENDTO_TIMEOUT_DEFAULT, ?SOCKET_SEND_TIMEOUT_DEFAULT).
-define(SOCKET_RECV_FLAG_CMSG_CLOEXEC, 0).
--define(SOCKET_RECV_FLAG_DONTWAIT, 1).
--define(SOCKET_RECV_FLAG_ERRQUEUE, 2).
--define(SOCKET_RECV_FLAG_OOB, 3).
--define(SOCKET_RECV_FLAG_PEEK, 4).
--define(SOCKET_RECV_FLAG_TRUNC, 5).
--define(SOCKET_RECV_FLAG_WAITALL, 6).
+-define(SOCKET_RECV_FLAG_ERRQUEUE, 1).
+-define(SOCKET_RECV_FLAG_OOB, 2).
+-define(SOCKET_RECV_FLAG_PEEK, 3).
+-define(SOCKET_RECV_FLAG_TRUNC, 4).
-define(SOCKET_RECV_FLAGS_DEFAULT, []).
-define(SOCKET_RECV_TIMEOUT_DEFAULT, infinity).
@@ -287,7 +290,8 @@ open(Domain, Type, Protocol0, Extra) when is_map(Extra) ->
SocketInfo = #{domain => Domain,
type => Type,
protocol => Protocol},
- Socket = {socket, SocketInfo, SockRef},
+ Socket = #socket{info = SocketInfo,
+ ref = SockRef},
{ok, Socket};
{error, _} = ERROR ->
ERROR
@@ -352,7 +356,7 @@ bind(Socket, Addr) when is_tuple(Addr) orelse
Reason :: term().
%% Shall we keep info about domain so that we can verify address?
-bind({socket, _, SockRef}, Addr, Port)
+bind(#socket{ref = SockRef}, Addr, Port)
when (is_tuple(Addr) andalso
((size(Addr) =:= 4) orelse (size(Addr) =:= 8))) orelse
((Addr =:= any) orelse (Addr =:= loopback)) andalso
@@ -385,7 +389,7 @@ connect(Socket, Addr, Port) ->
connect(_Socket, _Addr, _Port, Timeout)
when (is_integer(Timeout) andalso (Timeout =< 0)) ->
{error, timeout};
-connect({socket, _, SockRef}, Addr, Port, Timeout)
+connect(#socket{ref = SockRef}, Addr, Port, Timeout)
when (is_tuple(Addr) andalso
((size(Addr) =:= 4) orelse (size(Addr) =:= 8))) andalso
(is_integer(Port) andalso (Port >= 0)) andalso
@@ -424,7 +428,7 @@ connect({socket, _, SockRef}, Addr, Port, Timeout)
listen(Socket) ->
listen(Socket, ?SOCKET_LISTEN_BACKLOG_DEFAULT).
-listen({socket, _, SockRef}, Backlog)
+listen(#socket{ref = SockRef}, Backlog)
when (is_integer(Backlog) andalso (Backlog >= 0)) ->
nif_listen(SockRef, Backlog).
@@ -448,7 +452,7 @@ accept(Socket) ->
%% Do we really need this optimization?
accept(_, Timeout) when is_integer(Timeout) andalso (Timeout =< 0) ->
{error, timeout};
-accept({socket, SI, LSockRef}, Timeout)
+accept(#socket{info = SI, ref = LSockRef}, Timeout)
when is_integer(Timeout) orelse (Timeout =:= infinity) ->
do_accept(LSockRef, SI, Timeout).
@@ -460,7 +464,8 @@ do_accept(LSockRef, SI, Timeout) ->
SocketInfo = #{domain => maps:get(domain, SI),
type => maps:get(type, SI),
protocol => maps:get(protocol, SI)},
- Socket = {socket, SocketInfo, SockRef},
+ Socket = #socket{info = SocketInfo,
+ ref = SockRef},
{ok, Socket};
{error, eagain} ->
NewTimeout = next_timeout(TS, Timeout),
@@ -499,9 +504,10 @@ send(Socket, Data, Timeout) ->
send(Socket, Data, Flags, Timeout) when is_list(Data) ->
Bin = erlang:list_to_binary(Data),
send(Socket, Bin, Flags, Timeout);
-send(Socket, Data, Flags, Timeout) when is_binary(Data) andalso is_list(Flags) ->
+send(#socket{ref = SockRef}, Data, Flags, Timeout)
+ when is_binary(Data) andalso is_list(Flags) ->
EFlags = enc_send_flags(Flags),
- do_send(Socket, Data, EFlags, Timeout).
+ do_send(SockRef, Data, EFlags, Timeout).
do_send(SockRef, Data, EFlags, Timeout) ->
TS = timestamp(Timeout),
@@ -562,14 +568,14 @@ sendto(Socket, Data, Flags, DestAddr, DestPort) ->
sendto(Socket, Data, Flags, DestAddr, DestPort, Timeout) when is_list(Data) ->
Bin = erlang:list_to_binary(Data),
sendto(Socket, Bin, Flags, DestAddr, DestPort, Timeout);
-sendto(Socket, Data, Flags, DestAddr, DestPort, Timeout)
+sendto(#socket{ref = SockRef}, Data, Flags, DestAddr, DestPort, Timeout)
when is_binary(Data) andalso
is_list(Flags) andalso
(is_tuple(DestAddr) orelse (DestAddr =:= null)) andalso
is_integer(DestPort) andalso
(is_integer(Timeout) orelse (Timeout =:= infinity)) ->
EFlags = enc_send_flags(Flags),
- do_sendto(Socket, Data, EFlags, DestAddr, DestPort, Timeout).
+ do_sendto(SockRef, Data, EFlags, DestAddr, DestPort, Timeout).
do_sendto(SockRef, Data, EFlags, DestAddr, DestPort, Timeout) ->
TS = timestamp(Timeout),
@@ -726,18 +732,18 @@ recv(Socket, Length, Timeout) ->
Data :: binary(),
Reason :: term().
-recv(Socket, Length, Flags, Timeout)
+recv(#socket{ref = SockRef}, Length, Flags, Timeout)
when (is_integer(Length) andalso (Length >= 0)) andalso
is_list(Flags) andalso
(is_integer(Timeout) orelse (Timeout =:= infinity)) ->
EFlags = enc_recv_flags(Flags),
- do_recv(Socket, undefined, Length, EFlags, <<>>, EFlags).
+ do_recv(SockRef, undefined, Length, EFlags, <<>>, Timeout).
%% We need to pass the "old recv ref" around because of the special case
%% with Length = 0. This case makes it neccessary to have a timeout function
%% clause since we may never wait for anything (no receive select), and so the
%% the only timeout check will be the function clause.
-do_recv({socket, _, SockRef} = Socket, _OldRef, Length, EFlags, Acc, Timeout)
+do_recv(SockRef, _OldRef, Length, EFlags, Acc, Timeout)
when (Timeout =:= infinity) orelse
(is_integer(Timeout) andalso (Timeout > 0)) ->
TS = timestamp(Timeout),
@@ -754,7 +760,7 @@ do_recv({socket, _, SockRef} = Socket, _OldRef, Length, EFlags, Acc, Timeout)
%% > 0 - We got a part of the message and we will be notified
%% when there is more to read (a select message)
{ok, false = _Complete, Bin} when (Length =:= 0) ->
- do_recv(Socket, RecvRef,
+ do_recv(SockRef, RecvRef,
Length, EFlags,
<<Acc/binary, Bin/binary>>,
next_timeout(TS, Timeout));
@@ -766,7 +772,7 @@ do_recv({socket, _, SockRef} = Socket, _OldRef, Length, EFlags, Acc, Timeout)
NewTimeout = next_timeout(TS, Timeout),
receive
{select, SockRef, RecvRef, ready_input} ->
- do_recv(Socket, RecvRef,
+ do_recv(SockRef, RecvRef,
Length-size(Bin), EFlags,
Bin,
next_timeout(TS, Timeout))
@@ -781,7 +787,7 @@ do_recv({socket, _, SockRef} = Socket, _OldRef, Length, EFlags, Acc, Timeout)
NewTimeout = next_timeout(TS, Timeout),
receive
{select, SockRef, RecvRef, ready_input} ->
- do_recv(Socket, RecvRef,
+ do_recv(SockRef, RecvRef,
Length-size(Bin), EFlags,
<<Acc/binary, Bin/binary>>,
next_timeout(TS, Timeout))
@@ -801,7 +807,7 @@ do_recv({socket, _, SockRef} = Socket, _OldRef, Length, EFlags, Acc, Timeout)
NewTimeout = next_timeout(TS, Timeout),
receive
{select, SockRef, RecvRef, ready_input} ->
- do_recv(Socket, RecvRef,
+ do_recv(SockRef, RecvRef,
Length, EFlags,
Acc,
next_timeout(TS, Timeout))
@@ -819,37 +825,92 @@ do_recv({socket, _, SockRef} = Socket, _OldRef, Length, EFlags, Acc, Timeout)
end;
-do_recv({socket, _, SockRef} = _Socket, RecvRef,
- 0 = _Length, _Eflags, Acc, _Timeout) ->
+do_recv(SockRef, RecvRef, 0 = _Length, _Eflags, Acc, _Timeout) ->
%% The current recv operation is to be cancelled, so no need for a ref...
%% The cancel will end our 'read everything you have' and "activate"
%% any waiting reader.
nif_cancel(SockRef, recv, RecvRef),
{ok, Acc};
-do_recv(_Socket, _RecvRef, _Length, _EFlags, Acc, _Timeout) when (size(Acc) > 0) ->
+do_recv(_SockRef, _RecvRef, _Length, _EFlags, Acc, _Timeout) when (size(Acc) > 0) ->
{error, {timeout, Acc}};
-do_recv(_Socket, _RecvRef, _Length, _EFlags, _Acc, _Timeout) ->
+do_recv(_SockRef, _RecvRef, _Length, _EFlags, _Acc, _Timeout) ->
{error, timeout}.
%% ---------------------------------------------------------------------------
%%
+%% With recvfrom we get messages, which means that regardless of how
+%% much we want to read, we return when we get a message.
+%% The MaxSize argument basically defines the size of our receive
+%% buffer. By setting the size to zero (0), we use the configured
+%% size (see setopt).
+%% It may be impossible to know what (buffer) size is appropriate
+%% "in advance", and in those cases it may be convenient to use the
+%% (recv) 'peek' flag. When this flag is provided the message is *not*
+%% "consumed" from the underlying buffers, so another recvfrom call
+%% is needed, possibly with a then adjusted buffer size.
+%%
recvfrom(Socket) ->
- recvfrom(Socket, ?SOCKET_RECV_FLAGS_DEFAULT).
+ recvfrom(Socket, 0).
+
+recvfrom(Socket, BufSz) ->
+ recvfrom(Socket, BufSz,
+ ?SOCKET_RECV_FLAGS_DEFAULT,
+ ?SOCKET_RECV_TIMEOUT_DEFAULT).
+
+
+recvfrom(Socket, Flags, Timeout) when is_list(Flags) ->
+ recvfrom(Socket, 0, Flags, Timeout);
+recvfrom(Socket, BufSz, Flags) when is_list(Flags) ->
+ recvfrom(Socket, BufSz, Flags, ?SOCKET_RECV_TIMEOUT_DEFAULT);
+recvfrom(Socket, BufSz, Timeout) ->
+ recvfrom(Socket, BufSz, ?SOCKET_RECV_FLAGS_DEFAULT, Timeout).
+
+-spec recvfrom(Socket, BufSz, Flags, Timeout) -> {ok, {SrcDomain, Source, Data}} | {error, Reason} when
+ Socket :: socket(),
+ BufSz :: non_neg_integer(),
+ Flags :: recv_flags(),
+ Timeout :: timeout(),
+ SrcDomain :: domain() | undefined,
+ Source :: {ip_address(), port_number()} | string() | undefined,
+ Data :: binary(),
+ Reason :: term().
+
+recvfrom(#socket{ref = SockRef}, BufSz, Flags, Timeout)
+ when (is_integer(BufSz) andalso (BufSz >= 0)) andalso
+ is_list(Flags) andalso
+ (is_integer(Timeout) orelse (Timeout =:= infinity)) ->
+ EFlags = enc_recv_flags(Flags),
+ do_recvfrom(SockRef, BufSz, EFlags, Timeout).
--spec recvfrom(Socket, Flags) -> {ok, Data, SrcAddr, SrcPort} | {error, Reason} when
- Socket :: socket(),
- Flags :: recv_flags(),
- Data :: binary(),
- SrcAddr :: ip_address(),
- SrcPort :: port_number(),
- Reason :: term().
+do_recvfrom(SockRef, BufSz, EFlags, Timeout) ->
+ TS = timestamp(Timeout),
+ RecvRef = make_ref(),
+ case nif_recvfrom(SockRef, RecvRef, BufSz, EFlags) of
+ {ok, {_Domain, _Source, _NewData}} = OK ->
+ OK;
+
+ {error, eagain} ->
+ %% There is nothing just now, but we will be notified when there
+ %% is something to read (a select message).
+ NewTimeout = next_timeout(TS, Timeout),
+ receive
+ {select, SockRef, RecvRef, ready_input} ->
+ do_recvfrom(SockRef, BufSz, EFlags,
+ next_timeout(TS, Timeout))
+ after NewTimeout ->
+ nif_cancel(SockRef, recvfrom, RecvRef),
+ flush_select_msgs(SockRef, RecvRef),
+ {error, timeout}
+ end;
+
+ {error, _} = ERROR ->
+ ERROR
+
+ end.
-recvfrom({socket, _, SockRef}, Flags) when is_list(Flags) ->
- EFlags = enc_recv_flags(Flags),
- nif_recvfrom(SockRef, EFlags).
%% ---------------------------------------------------------------------------
@@ -998,7 +1059,6 @@ enc_protocol(Type, Proto) -> throw({error, {invalid_protocol, {Type, Proto}}
enc_send_flags(Flags) ->
EFlags = [{confirm, ?SOCKET_SEND_FLAG_CONFIRM},
{dontroute, ?SOCKET_SEND_FLAG_DONTROUTE},
- {dontwait, ?SOCKET_SEND_FLAG_DONTWAIT},
{eor, ?SOCKET_SEND_FLAG_EOR},
{more, ?SOCKET_SEND_FLAG_MORE},
{nosignal, ?SOCKET_SEND_FLAG_NOSIGNAL},
@@ -1010,12 +1070,10 @@ enc_send_flags(Flags) ->
enc_recv_flags(Flags) ->
EFlags = [{cmsg_cloexec, ?SOCKET_RECV_FLAG_CMSG_CLOEXEC},
- {dontwait, ?SOCKET_RECV_FLAG_DONTWAIT},
{errqueue, ?SOCKET_RECV_FLAG_ERRQUEUE},
{oob, ?SOCKET_RECV_FLAG_OOB},
{peek, ?SOCKET_RECV_FLAG_PEEK},
- {trunc, ?SOCKET_RECV_FLAG_TRUNC},
- {waitall, ?SOCKET_RECV_FLAG_WAITALL}],
+ {trunc, ?SOCKET_RECV_FLAG_TRUNC}],
enc_flags(Flags, EFlags).
@@ -1159,7 +1217,7 @@ nif_sendto(_SRef, _SendRef, _Data, _Flags, _Dest, _Port) ->
nif_recv(_SRef, _RecvRef, _Length, _Flags) ->
erlang:error(badarg).
-nif_recvfrom(_SRef, _Flags) ->
+nif_recvfrom(_SRef, _RecvRef, _Length, _Flags) ->
erlang:error(badarg).
nif_cancel(_SRef, _Op, _Ref) ->