aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMicael Karlberg <[email protected]>2018-04-19 10:57:54 +0200
committerMicael Karlberg <[email protected]>2018-09-18 13:01:37 +0200
commit28611d6e6daab8ae24e5e593c001bcd6442506eb (patch)
tree630ea2ff977cac8644a5929673f288b61041f295
parent5920705deb70a44311e1b7552cfa73553f284164 (diff)
downloadotp-28611d6e6daab8ae24e5e593c001bcd6442506eb.tar.gz
otp-28611d6e6daab8ae24e5e593c001bcd6442506eb.tar.bz2
otp-28611d6e6daab8ae24e5e593c001bcd6442506eb.zip
[socket-nif] Completed recv
Need to fix the use of the request ref (ID) handling in previous functions.
-rw-r--r--erts/emulator/nifs/common/socket_nif.c526
-rw-r--r--erts/preloaded/src/socket.erl204
2 files changed, 563 insertions, 167 deletions
diff --git a/erts/emulator/nifs/common/socket_nif.c b/erts/emulator/nifs/common/socket_nif.c
index 46c5c696e2..d3aa3db2aa 100644
--- a/erts/emulator/nifs/common/socket_nif.c
+++ b/erts/emulator/nifs/common/socket_nif.c
@@ -223,19 +223,20 @@ typedef unsigned long long llu_t;
/* *** Misc macros and defines *** */
-#define MALLOC(SZ) enif_alloc(SZ)
-#define FREE(P) enif_free(P)
-#define MKA(E,S) enif_make_atom(E, S)
-#define MKREF(E) enif_make_ref(E)
-#define MKT2(E,E1,E2) enif_make_tuple2(E, E1, E2)
-#define MCREATE(N) enif_mutex_create(N)
-#define MLOCK(M) enif_mutex_lock(M)
-#define MUNLOCK(M) enif_mutex_unlock(M)
+#define MALLOC(SZ) enif_alloc(SZ)
+#define FREE(P) enif_free(P)
+#define MKA(E,S) enif_make_atom(E, S)
+#define MKBIN(E,B) enif_make_binary(E, B)
+#define MKREF(E) enif_make_ref(E)
+#define MKT2(E,E1,E2) enif_make_tuple2(E, E1, E2)
+#define MKT3(E,E1,E2,E3) enif_make_tuple3(E, E1, E2, E3)
+#define MCREATE(N) enif_mutex_create(N)
+#define MLOCK(M) enif_mutex_lock(M)
+#define MUNLOCK(M) enif_mutex_unlock(M)
#define SELECT(E,FD,M,O,P,R) \
if (enif_select((E), (FD), (M), (O), (P), (R)) < 0) \
return enif_make_badarg((E));
-
/* *** Socket state defs *** */
#define SOCKET_FLAG_OPEN 0x0001
@@ -275,6 +276,18 @@ typedef unsigned long long llu_t;
#define SOCKET_SEND_FLAG_LOW SOCKET_SEND_FLAG_CONFIRM
#define SOCKET_SEND_FLAG_HIGH SOCKET_SEND_FLAG_OOB
+#define SOCKET_RECV_FLAG_CMSG_CLOEXEC 0
+#define SOCKET_RECV_FLAG_DONTWAIT 1
+#define SOCKET_RECV_FLAG_ERRQUEUE 2
+#define SOCKET_RECV_FLAG_OOB 3
+#define SOCKET_RECV_FLAG_PEEK 4
+#define SOCKET_RECV_FLAG_TRUNC 5
+#define SOCKET_RECV_FLAG_WAITALL 6
+#define SOCKET_RECV_FLAG_LOW SOCKET_RECV_FLAG_CMSG_CLOEXEC
+#define SOCKET_RECV_FLAG_HIGH SOCKET_RECV_FLAG_WAITALL
+
+#define SOCKET_RECV_BUFFER_SIZE_DEFAULT 2048
+
typedef union {
struct {
unsigned int open:1;
@@ -343,6 +356,8 @@ typedef union {
#define GET_UINT(E, TE, IP) enif_get_uint((E), (TE), (IP))
#define GET_TUPLE(E, TE, TSZ, TA) enif_get_tuple((E), (TE), (TSZ), (TA))
+#define ALLOC_BIN(SZ, BP) enif_alloc_binary((SZ), (BP))
+
/* =================================================================== *
* *
@@ -366,6 +381,7 @@ typedef union {
#define sock_name(s, addr, len) getsockname((s), (addr), (len))
#define sock_open(domain, type, proto) \
make_noninheritable_handle(socket((domain), (type), (proto)))
+#define sock_recv(s,buf,len,flag) recv((s),(buf),(len),(flag))
#define sock_send(s,buf,len,flag) send((s),(buf),(len),(flag))
#define sock_sendto(s,buf,blen,flag,addr,alen) \
sendto((s),(buf),(blen),(flag),(addr),(alen))
@@ -395,6 +411,7 @@ static unsigned long one_value = 1;
#define sock_listen(s, b) listen((s), (b))
#define sock_name(s, addr, len) getsockname((s), (addr), (len))
#define sock_open(domain, type, proto) socket((domain), (type), (proto))
+#define sock_recv(s,buf,len,flag) recv((s),(buf),(len),(flag))
#define sock_send(s,buf,len,flag) send((s), (buf), (len), (flag))
#define sock_sendto(s,buf,blen,flag,addr,alen) \
sendto((s),(buf),(blen),(flag),(addr),(alen))
@@ -437,17 +454,28 @@ typedef struct {
typedef struct {
- ErlNifPid pid; // PID of the acceptor
- ErlNifMonitor mon; // Monitor for the acceptor
- ERL_NIF_TERM ref; // The (unique) reference of the (accept) request
-} SocketAcceptor;
+ ErlNifPid pid; // PID of the requesting process
+ ErlNifMonitor mon; // Monitor to the requesting process
+ ERL_NIF_TERM ref; // The (unique) reference (ID) of the request
+} SocketRequestor;
+
+typedef struct socket_request_queue_element {
+ struct socket_request_queue_element* next;
+ SocketRequestor data;
+} SocketRequestQueueElement;
typedef struct {
- // The actual socket
+ SocketRequestQueueElement* first;
+ SocketRequestQueueElement* last;
+} SocketRequestQueue;
+
+
+typedef struct {
+ /* +++ The actual socket +++ */
SOCKET sock;
HANDLE event;
- /* "Stuff" about the socket */
+ /* +++ Stuff "about" the socket +++ */
int domain;
int type;
int protocol;
@@ -456,51 +484,45 @@ typedef struct {
SocketAddress remote;
- // Controller (owner) process
+ /* +++ Controller (owner) process +++ */
ErlNifPid ctrlPid;
ErlNifMonitor ctrlMon;
- // Write
- ErlNifMutex* writeMtx;
- BOOLEAN_T isWritable;
- uint32_t writePkgCnt;
- uint32_t writeByteCnt;
- uint32_t writeTries;
- uint32_t writeWaits;
- uint32_t writeFails;
-
- // Read
- ErlNifMutex* readMtx;
- BOOLEAN_T isReadable;
- ErlNifBinary rbuffer;
- uint32_t readCapacity;
- uint32_t readPkgCnt;
- uint32_t readByteCnt;
- uint32_t readTries;
- uint32_t readWaits;
-
- /* Accept
- * We also need a queue for waiting acceptors...
- * Lets see if this can be a common "request" queue...
- */
- ErlNifMutex* accMtx;
- SocketAcceptor acceptor;
-
-
- /* We need to keep track of the "request(s)" we have pending.
- * If for instance an accept takes to long, the issuer may
- * decide to "cancel" the accept (actually the select). This
- * is done by calling the *nif_cancel* function with the request
- * ref as argument.
- * We also need to keep track of requests so that if a new
- * request is issued before the current has completed, we
- * reply with e.g. ebusy (or something to that effect).
- * Or do we? Can the caller actually do that?
- */
-
-
- /* Misc stuff */
+ /* +++ Write stuff +++ */
+ ErlNifMutex* writeMtx;
+ SocketRequestor currentWriter;
+ SocketRequestor* currentWriterP; // NULL or points to currentWriter
+ SocketRequestQueue writersQ;
+ BOOLEAN_T isWritable;
+ uint32_t writePkgCnt;
+ uint32_t writeByteCnt;
+ uint32_t writeTries;
+ uint32_t writeWaits;
+ uint32_t writeFails;
+
+ /* +++ Read stuff +++ */
+ ErlNifMutex* readMtx;
+ SocketRequestor currentReader;
+ SocketRequestor* currentReaderP; // NULL or points to currentReader
+ SocketRequestQueue readersQ;
+ BOOLEAN_T isReadable;
+ ErlNifBinary rbuffer; // DO WE NEED THIS
+ uint32_t readCapacity; // DO WE NEED THIS
+ uint32_t readPkgCnt;
+ uint32_t readByteCnt;
+ uint32_t readTries;
+ uint32_t readWaits;
+
+ /* +++ Accept stuff +++ */
+ ErlNifMutex* accMtx;
+ SocketRequestor currentAcceptor;
+ SocketRequestor* currentAcceptorP; // NULL or points to currentReader
+ SocketRequestQueue acceptorsQ;
+
+ /* +++ Misc stuff +++ */
+ BOOLEAN_T iow; // Inform On Wrap
BOOLEAN_T dbg;
+
} SocketDescriptor;
@@ -515,39 +537,6 @@ typedef struct {
} SocketData;
-/* typedef struct socket_queue_element { */
-/* struct socket_queue_element* next; */
-/* /\* */
-/* unsigned int tag; */
-/* union { */
-/* SocketAcceptor acceptor; */
-/* } u; */
-/* *\/ */
-/* SocketAcceptor acceptor; */
-/* } SocketQueueElement; */
-
-/* typedef struct socket_queue { */
-/* SocketQueueElement* first; */
-/* SocketQueueElement* last; */
-/* } SocketQueue; */
-
-/* Macros for defining the various queues (accept, send receive) */
-#define SOCKET_QUEUE_ELEMENT(QE,QEP) \
- typedef struct socket_queue_element_##QEP { \
- struct socket_queue_element_##QEP* next; \
- QE elem; \
- } QE##Element;
-
-#define SOCKET_QUEUE(QE,Q) \
- typedef struct { \
- QE* first; \
- QE* last; \
- } Q;
-
-/* The Acceptor Queue types */
-SOCKET_QUEUE_ELEMENT(SocketAcceptor, acceptor);
-SOCKET_QUEUE(SocketAcceptorElement, SocketAcceptQueue);
-
/* ----------------------------------------------------------------------
* F o r w a r d s
* ----------------------------------------------------------------------
@@ -600,12 +589,12 @@ static ERL_NIF_TERM nif_recvfrom(ErlNifEnv* env,
static ERL_NIF_TERM nif_close(ErlNifEnv* env,
int argc,
const ERL_NIF_TERM argv[]);
-static ERL_NIF_TERM nif_setopt(ErlNifEnv* env,
- int argc,
- const ERL_NIF_TERM argv[]);
-static ERL_NIF_TERM nif_getopt(ErlNifEnv* env,
- int argc,
- const ERL_NIF_TERM argv[]);
+static ERL_NIF_TERM nif_setsockopt(ErlNifEnv* env,
+ int argc,
+ const ERL_NIF_TERM argv[]);
+static ERL_NIF_TERM nif_getsockopt(ErlNifEnv* env,
+ int argc,
+ const ERL_NIF_TERM argv[]);
static ERL_NIF_TERM nif_finalize_connection(ErlNifEnv* env,
int argc,
const ERL_NIF_TERM argv[]);
@@ -649,12 +638,23 @@ static ERL_NIF_TERM nsendto(ErlNifEnv* env,
ErlNifBinary* dataP,
int flags,
SocketAddress* toAddrP);
+static ERL_NIF_TERM nrecv(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ ERL_NIF_TERM recvRef,
+ int len,
+ int flags);
static ERL_NIF_TERM send_check_result(ErlNifEnv* env,
SocketDescriptor* descP,
ssize_t written,
ssize_t dataSize,
ERL_NIF_TERM sendRef);
+static ERL_NIF_TERM recv_check_result(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ int read,
+ int toRead,
+ ErlNifBinary* bufP,
+ ERL_NIF_TERM recvRef);
static ERL_NIF_TERM nfinalize_connection(ErlNifEnv* env,
SocketDescriptor* descP);
@@ -707,6 +707,7 @@ static BOOLEAN_T edomain2domain(int edomain, int* domain);
static BOOLEAN_T etype2type(int etype, int* type);
static BOOLEAN_T eproto2proto(int eproto, int* proto);
static BOOLEAN_T esendflags2sendflags(unsigned int esendflags, int* sendflags);
+static BOOLEAN_T erecvflags2recvflags(unsigned int erecvflags, int* recvflags);
#ifdef HAVE_SETNS
static BOOLEAN_T emap2netns(ErlNifEnv* env, ERL_NIF_TERM map, char** netns);
static BOOLEAN_T change_network_namespace(char* netns, int* cns, int* err);
@@ -725,7 +726,8 @@ static void socket_down(ErlNifEnv* env,
const ErlNifPid* pid,
const ErlNifMonitor* mon);
-static ERL_NIF_TERM make_ok(ErlNifEnv* env, ERL_NIF_TERM any);
+static ERL_NIF_TERM make_ok2(ErlNifEnv* env, ERL_NIF_TERM val);
+static ERL_NIF_TERM make_ok3(ErlNifEnv* env, ERL_NIF_TERM val1, ERL_NIF_TERM val2);
static ERL_NIF_TERM make_error(ErlNifEnv* env, ERL_NIF_TERM reason);
static ERL_NIF_TERM make_error1(ErlNifEnv* env, char* reason);
static ERL_NIF_TERM make_error2(ErlNifEnv* env, int err);
@@ -767,8 +769,9 @@ static const struct in6_addr in6addr_loopback =
/* *** String constants *** */
-static char str_false[] = "false";
+static char str_closed[] = "closed";
static char str_error[] = "error";
+static char str_false[] = "false";
static char str_ok[] = "ok";
static char str_true[] = "true";
static char str_undefined[] = "undefined";
@@ -779,6 +782,7 @@ static char str_eafnosupport[] = "eafnosupport";
static char str_einval[] = "einval";
static char str_eisconn[] = "eisconn";
static char str_enotconn[] = "enotconn";
+static char str_exalloc[] = "exalloc";
static char str_exbadstate[] = "exbadstate";
static char str_exbusy[] = "exbusy";
static char str_exmon[] = "exmonitor"; // failed monitor
@@ -786,8 +790,9 @@ static char str_exself[] = "exself"; // failed self
/* *** Atoms *** */
-static ERL_NIF_TERM atom_false;
+static ERL_NIF_TERM atom_closed;
static ERL_NIF_TERM atom_error;
+static ERL_NIF_TERM atom_false;
static ERL_NIF_TERM atom_ok;
static ERL_NIF_TERM atom_true;
static ERL_NIF_TERM atom_undefined;
@@ -797,6 +802,7 @@ static ERL_NIF_TERM atom_eafnosupport;
static ERL_NIF_TERM atom_einval;
static ERL_NIF_TERM atom_eisconn;
static ERL_NIF_TERM atom_enotconn;
+static ERL_NIF_TERM atom_exalloc;
static ERL_NIF_TERM atom_exbadstate;
static ERL_NIF_TERM atom_exbusy;
static ERL_NIF_TERM atom_exmon;
@@ -832,16 +838,16 @@ static SocketData socketData;
* nif_connect(Sock, Addr, Port)
* nif_listen(Sock, Backlog)
* nif_accept(LSock, Ref)
- * nif_send(Sock, Data, Flags)
- * nif_sendto(Sock, Data, Flags, DstAddr, DstPort)
- * nif_recv(Sock, Flags)
+ * nif_send(Sock, SendRef, Data, Flags)
+ * nif_sendto(Sock, SendRef, Data, Flags, DstAddr, DstPort)
+ * nif_recv(Sock, RecvRef, Length, Flags)
* nif_recvfrom(Sock, Flags)
* nif_close(Sock)
*
* And some functions to manipulate and retrieve socket options:
* -------------------------------------------------------------
- * nif_setopt/3
- * nif_getopt/2
+ * nif_setsockopt/3
+ * nif_getsockopt/2
*
* And some socket admin functions:
* -------------------------------------------------------------
@@ -1033,7 +1039,7 @@ ERL_NIF_TERM nopen(ErlNifEnv* env,
#endif
- return make_ok(env, res);
+ return make_ok2(env, res);
}
@@ -1213,7 +1219,7 @@ ERL_NIF_TERM nbind(ErlNifEnv* env,
port = 0;
}
- return make_ok(env, enif_make_int(env, port));
+ return make_ok2(env, enif_make_int(env, port));
}
@@ -1482,7 +1488,7 @@ ERL_NIF_TERM nconnect(ErlNifEnv* env,
descP->sock,
(ERL_NIF_SELECT_WRITE),
descP, NULL, ref);
- return make_ok(env, ref);
+ return make_ok2(env, ref);
} else if (code == 0) { /* ok we are connected */
descP->state = SOCKET_STATE_CONNECTED;
/* Do we need to do somthing for "active" mode?
@@ -1738,13 +1744,13 @@ ERL_NIF_TERM naccept_listening(ErlNifEnv* env,
/* *** Try again later *** */
- descP->acceptor.pid = caller;
+ descP->currentAcceptor.pid = caller;
if (enif_monitor_process(env, descP,
- &descP->acceptor.pid,
- &descP->acceptor.mon) > 0)
+ &descP->currentAcceptor.pid,
+ &descP->currentAcceptor.mon) > 0)
return make_error(env, atom_exmon);
- descP->acceptor.ref = ref;
+ descP->currentAcceptor.ref = ref;
SELECT(env,
descP->sock,
@@ -1830,7 +1836,7 @@ ERL_NIF_TERM naccept_listening(ErlNifEnv* env,
accDescP->state = SOCKET_STATE_CONNECTED;
- return make_ok(env, accRef);
+ return make_ok2(env, accRef);
}
}
@@ -1854,7 +1860,7 @@ ERL_NIF_TERM naccept_accepting(ErlNifEnv* env,
if (enif_self(env, &caller) == NULL)
return make_error(env, atom_exself);
- if (compare_pids(env, &descP->acceptor.pid, &caller) != 0) {
+ if (compare_pids(env, &descP->currentAcceptor.pid, &caller) != 0) {
/* This will have to do until we implement the queue.
* When we have the queue, we should simply push this request,
* and instead return with eagain (the caller will then wait
@@ -1938,7 +1944,7 @@ ERL_NIF_TERM naccept_accepting(ErlNifEnv* env,
*/
descP->state = SOCKET_STATE_LISTENING;
- return make_ok(env, accRef);
+ return make_ok2(env, accRef);
}
}
@@ -2200,6 +2206,101 @@ ERL_NIF_TERM nwritev(ErlNifEnv* env,
+/* ----------------------------------------------------------------------
+ * nif_recv
+ *
+ * Description:
+ * Receive a message on a socket.
+ * Normally used only on a connected socket!
+ * If we are trying to read > 0 bytes, then that is what we do.
+ * But if we have specified 0 bytes, then we want to read
+ * whatever is in the buffers (everything it got).
+ *
+ * Arguments:
+ * Socket (ref) - Points to the socket descriptor.
+ * RecvRef - A unique id for this (send) request.
+ * Length - The number of bytes to receive.
+ * Flags - Receive flags.
+ */
+
+static
+ERL_NIF_TERM nif_recv(ErlNifEnv* env,
+ int argc,
+ const ERL_NIF_TERM argv[])
+{
+ SocketDescriptor* descP;
+ ERL_NIF_TERM recvRef;
+ int len;
+ unsigned int eflags;
+ int flags;
+ ERL_NIF_TERM res;
+
+ if ((argc != 4) ||
+ !enif_get_resource(env, argv[0], sockets, (void**) &descP) ||
+ !GET_INT(env, argv[2], &len) ||
+ !GET_UINT(env, argv[3], &eflags)) {
+ return enif_make_badarg(env);
+ }
+ recvRef = argv[1];
+
+ if (!IS_CONNECTED(descP))
+ return make_error(env, atom_enotconn);
+
+ if (!erecvflags2recvflags(eflags, &flags))
+ return enif_make_badarg(env);
+
+ MLOCK(descP->readMtx);
+
+ /* We need to handle the case when another process tries
+ * to receive at the same time.
+ * If the current recv could not read its entire package
+ * this time (resulting in an select). The read of the
+ * other process must be made to wait until current
+ * is done!
+ * Basically, we need a read queue!
+ *
+ * A 'reading' field (boolean), which is set if we did
+ * not manage to read the entire message and reset every
+ * time we do.
+ */
+
+ res = nrecv(env, descP, recvRef, len, flags);
+
+ MUNLOCK(descP->readMtx);
+
+ return res;
+
+}
+
+
+static
+ERL_NIF_TERM nrecv(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ ERL_NIF_TERM recvRef,
+ int len,
+ int flags)
+{
+ ssize_t read;
+ ErlNifBinary buf;
+
+ if (!descP->isReadable)
+ return enif_make_badarg(env);
+
+ if (!ALLOC_BIN((len ? len : SOCKET_RECV_BUFFER_SIZE_DEFAULT), &buf))
+ return make_error(env, atom_exalloc);
+
+ /* We ignore the wrap for the moment.
+ * Maybe we should issue a wrap-message to controlling process...
+ */
+ cnt_inc(&descP->readTries, 1);
+
+ read = sock_recv(descP->sock, buf.data, buf.size, flags);
+
+ return recv_check_result(env, descP,
+ read, len,
+ &buf,
+ recvRef);
+}
@@ -2237,7 +2338,7 @@ ERL_NIF_TERM send_check_result(ErlNifEnv* env,
/* Ok, try again later */
- written = 0;
+ written = 0; // SHOULD RESULT IN {error, eagain}!!!!
}
}
@@ -2252,12 +2353,109 @@ ERL_NIF_TERM send_check_result(ErlNifEnv* env,
SELECT(env, descP->sock, (ERL_NIF_SELECT_WRITE),
descP, NULL, sendRef);
- return make_ok(env, enif_make_int(env, written));
+ return make_ok2(env, enif_make_int(env, written));
}
-/* The rather odd thing about the 'toAddrP' (the **) is
+static
+ERL_NIF_TERM recv_check_result(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ int read,
+ int toRead,
+ ErlNifBinary* bufP,
+ ERL_NIF_TERM recvRef)
+{
+ /* There is a special case: If the provided 'to read' value is
+ * zero (0). That means that if we filled the (default size)
+ * buffer, we need to continue to read (since there *may* be
+ * more data), but we cannot loop here. Instead we inform the
+ * caller that it must call again.
+ */
+
+ if (bufP->size == read) {
+
+ /* +++ We filled the buffer +++ */
+
+ if (toRead == 0) {
+
+ /* +++ Give us everything you have got => needs to continue +++ */
+
+ /* How do we do this?
+ * Either:
+ * 1) Send up each chunk of data for each of the read
+ * and let the erlang code assemble it: {ok, false, Bin}
+ * (when complete it should return {ok, true, Bin}).
+ * We need to read atleast one more time to be sure if its
+ * done...
+ * 2) Or put it in a buffer here, and then let the erlang code
+ * know that it should call again (special return value)
+ * (continuous binary realloc "here").
+ *
+ * => We choose alt 1 for now.
+ */
+
+ return make_ok3(env, atom_false, MKBIN(env, bufP));
+
+ } else {
+
+ /* +++ We got exactly as much as we requested +++ */
+
+ /* <KOLLA>
+ * WE NEED TO INFORM ANY WAITING READERS
+ * </KOLLA>
+ */
+
+ return make_ok3(env, atom_true, MKBIN(env, bufP));
+
+ }
+
+ } else if (read < 0) {
+
+ /* +++ Error handling +++ */
+
+ int save_errno = sock_errno();
+
+ if (save_errno == ECONNRESET) {
+
+ /* +++ Oups - closed +++ */
+
+ /* <KOLLA>
+ * IF THE CURRENT PROCESS IS *NOT* THE CONTROLLING
+ * PROCESS, WE NEED TO INFORM IT!!!
+ *
+ * ALL WAITING PROCESSES MUST ALSO GET THE ERROR!!
+ *
+ * </KOLLA>
+ */
+
+ SELECT(env,
+ descP->sock,
+ (ERL_NIF_SELECT_STOP),
+ descP, NULL, recvRef);
+
+ return make_error(env, atom_closed);
+
+ } else if ((save_errno == ERRNO_BLOCK) ||
+ (save_errno == EAGAIN)) {
+ return make_error(env, atom_eagain);
+ } else {
+ return make_error2(env, save_errno);
+ }
+
+ } else {
+
+ /* +++ We got only a part of what was expected - receive more later +++ */
+
+ return make_ok3(env, atom_false, MKBIN(env, bufP));
+
+ }
+}
+
+
+/* *** decode_send_addr ***
+ *
+ * The rather odd thing about the 'toAddrP' (the **) is
* because we need to be able to return a NULL pointer,
* in the case of the dest address is the atom 'null'.
* Its possible to call the sendto function with the
@@ -2507,14 +2705,24 @@ SocketDescriptor* alloc_descriptor(SOCKET sock, HANDLE event)
char buf[64]; /* Buffer used for building the mutex name */
sprintf(buf, "socket[w,%d]", sock);
- descP->writeMtx = MCREATE(buf);
+ descP->writeMtx = MCREATE(buf);
+ descP->currentWriterP = NULL; // currentWriter not used
+ descP->writersQ.first = NULL;
+ descP->writersQ.last = NULL;
sprintf(buf, "socket[r,%d]", sock);
- descP->readMtx = MCREATE(buf);
+ descP->readMtx = MCREATE(buf);
+ descP->currentReaderP = NULL; // currentReader not used
+ descP->readersQ.first = NULL;
+ descP->readersQ.last = NULL;
sprintf(buf, "socket[acc,%d]", sock);
- descP->accMtx = MCREATE(buf);
+ descP->accMtx = MCREATE(buf);
+ descP->currentAcceptorP = NULL; // currentAcceptor not used
+ descP->acceptorsQ.first = NULL;
+ descP->acceptorsQ.last = NULL;
+ descP->iow = FALSE;
descP->dbg = SOCKET_DEBUG_DEFAULT;
descP->isWritable = TRUE;
descP->isReadable = TRUE;
@@ -2756,17 +2964,81 @@ BOOLEAN_T esendflags2sendflags(unsigned int esendflags, int* sendflags)
+/* erecvflags2recvflags - convert internal (erlang) send flags to (proper)
+ * send flags.
+ */
+static
+BOOLEAN_T erecvflags2recvflags(unsigned int erecvflags, int* recvflags)
+{
+ unsigned int ef;
+ int tmp = 0;
+
+ for (ef = SOCKET_RECV_FLAG_LOW; ef <= SOCKET_RECV_FLAG_HIGH; ef++) {
+ switch (ef) {
+ case SOCKET_RECV_FLAG_CMSG_CLOEXEC:
+ tmp |= MSG_CMSG_CLOEXEC;
+ break;
+
+ case SOCKET_RECV_FLAG_DONTWAIT:
+ tmp |= MSG_DONTWAIT;
+ break;
+
+ case SOCKET_RECV_FLAG_ERRQUEUE:
+ tmp |= MSG_ERRQUEUE;
+ break;
+
+ case SOCKET_RECV_FLAG_OOB:
+ tmp |= MSG_OOB;
+ break;
+
+ case SOCKET_RECV_FLAG_PEEK:
+ tmp |= MSG_PEEK;
+ break;
+
+ case SOCKET_RECV_FLAG_TRUNC:
+ tmp |= MSG_TRUNC;
+ break;
+
+ case SOCKET_RECV_FLAG_WAITALL:
+ tmp |= MSG_WAITALL;
+ break;
+
+ default:
+ return FALSE;
+ }
+
+ }
+
+ *recvflags = tmp;
+
+ return TRUE;
+}
+
+
+
/* Create an ok two (2) tuple in the form: {ok, Any}.
* The second element (Any) is already in the form of an
* ERL_NIF_TERM so all we have to do is create the tuple.
*/
static
-ERL_NIF_TERM make_ok(ErlNifEnv* env, ERL_NIF_TERM any)
+ERL_NIF_TERM make_ok2(ErlNifEnv* env, ERL_NIF_TERM any)
{
return MKT2(env, atom_ok, any);
}
+/* Create an ok three (3) tuple in the form: {ok, Val1, Val2}.
+ * The second (Val1) and third (Val2) elements are already in
+ * the form of an ERL_NIF_TERM so all we have to do is create
+ * the tuple.
+ */
+static
+ERL_NIF_TERM make_ok3(ErlNifEnv* env, ERL_NIF_TERM val1, ERL_NIF_TERM val2)
+{
+ return MKT3(env, atom_ok, val1, val2);
+}
+
+
/* Create an error two (2) tuple in the form: {error, Reason}.
* The second element (Reason) is already in the form of an
* ERL_NIF_TERM so all we have to do is create the tuple.
@@ -2885,13 +3157,13 @@ ErlNifFunc socket_funcs[] =
{"nif_connect", 3, nif_connect},
{"nif_listen", 2, nif_listen},
{"nif_accept", 2, nif_accept},
- {"nif_send", 3, nif_send},
- {"nif_sendto", 5, nif_sendto},
- {"nif_recv", 2, nif_recv},
+ {"nif_send", 4, nif_send},
+ {"nif_sendto", 6, nif_sendto},
+ {"nif_recv", 4, nif_recv},
{"nif_recvfrom", 2, nif_recvfrom},
{"nif_close", 1, nif_close},
- {"nif_setopt", 3, nif_setopt},
- {"nif_getopt", 2, nif_getopt},
+ {"nif_setsockopt", 3, nif_setsockopt},
+ {"nif_getsockopt", 2, nif_getsockopt},
/* "Extra" functions to "complete" the socket interface.
* For instance, the function nif_finalize_connection
@@ -2966,7 +3238,7 @@ int on_load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info)
// atom_active_once = MKA(env, str_active_once);
// atom_binary = MKA(env, str_binary);
// atom_buildDate = MKA(env, str_buildDate);
- // atom_closed = MKA(env, str_closed);
+ atom_closed = MKA(env, str_closed);
atom_error = MKA(env, str_error);
atom_false = MKA(env, str_false);
// atom_list = MKA(env, str_list);
@@ -2986,7 +3258,7 @@ int on_load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info)
atom_einval = MKA(env, str_einval);
atom_eisconn = MKA(env, str_eisconn);
atom_enotconn = MKA(env, str_enotconn);
- // atom_exalloc = MKA(env, str_exalloc);
+ atom_exalloc = MKA(env, str_exalloc);
atom_exbadstate = MKA(env, str_exbadstate);
atom_exbusy = MKA(env, str_exbusy);
// atom_exnotopen = MKA(env, str_exnotopen);
diff --git a/erts/preloaded/src/socket.erl b/erts/preloaded/src/socket.erl
index 0a78feab4e..6784477123 100644
--- a/erts/preloaded/src/socket.erl
+++ b/erts/preloaded/src/socket.erl
@@ -38,7 +38,7 @@
%% sendmsg/4,
%% writev/4, OR SENDV? It will be strange for recv then: recvv (instead of readv)
- recv/1, recv/2,
+ recv/2, recv/3, recv/4,
recvfrom/1, recvfrom/2,
%% recvmsg/4,
%% readv/3,
@@ -173,11 +173,7 @@
-define(SOCKET_LISTEN_BACKLOG_DEFAULT, 5).
-%% Bit numbers (from right).
--define(SOCKET_ACCEPT_FLAG_NONBLOCK, 0).
--define(SOCKET_ACCEPT_FLAG_CLOEXEC, 1).
-
--define(SOCKET_ACCEPT_FLAGS_DEFAULT, []).
+-define(SOCKET_ACCEPT_TIMEOUT_DEFAULT, infinity).
-define(SOCKET_SEND_FLAG_CONFIRM, 0).
-define(SOCKET_SEND_FLAG_DONTROUTE, 1).
@@ -187,7 +183,9 @@
-define(SOCKET_SEND_FLAG_NOSIGNAL, 5).
-define(SOCKET_SEND_FLAG_OOB, 6).
--define(SOCKET_SEND_FLAGS_DEFAULT, []).
+-define(SOCKET_SEND_FLAGS_DEFAULT, []).
+-define(SOCKET_SEND_TIMEOUT_DEFAULT, infinity).
+-define(SOCKET_SENDTO_TIMEOUT_DEFAULT, ?SOCKET_SEND_TIMEOUT_DEFAULT).
-define(SOCKET_RECV_FLAG_CMSG_CLOEXEC, 0).
-define(SOCKET_RECV_FLAG_DONTWAIT, 1).
@@ -197,7 +195,8 @@
-define(SOCKET_RECV_FLAG_TRUNC, 5).
-define(SOCKET_RECV_FLAG_WAITALL, 6).
--define(SOCKET_RECV_FLAGS_DEFAULT, []).
+-define(SOCKET_RECV_FLAGS_DEFAULT, []).
+-define(SOCKET_RECV_TIMEOUT_DEFAULT, infinity).
-define(SOCKET_SETOPT_KEY_DEBUG, 0).
@@ -403,7 +402,7 @@ connect({socket, _, SockRef}, Addr, Port, Timeout)
{select, SockRef, Ref, ready_output} ->
nif_finalize_connection(SockRef)
after NewTimeout ->
- nif_cancel(SockRef, Ref),
+ nif_cancel(SockRef, connect, Ref),
{error, timeout}
end;
{error, _} = ERROR ->
@@ -444,7 +443,7 @@ listen({socket, _, SockRef}, Backlog)
Reason :: term().
accept(Socket) ->
- accept(Socket, infinity).
+ accept(Socket, ?SOCKET_ACCEPT_TIMEOUT_DEFAULT).
%% Do we really need this optimization?
accept(_, Timeout) when is_integer(Timeout) andalso (Timeout < 0) ->
@@ -466,11 +465,12 @@ do_accept(LSockRef, SI, Ref, Timeout) ->
Socket = {socket, SocketInfo, SockRef},
{ok, Socket};
{error, eagain} ->
+ NewTimeout = next_timeout(TS, Timeout),
receive
{select, LSockRef, Ref, ready_input} ->
do_accept(LSockRef, SI, make_ref(), next_timeout(TS, Timeout))
- after Timeout ->
- nif_cancel(LSockRef, Ref),
+ after NewTimeout ->
+ nif_cancel(LSockRef, accept, Ref),
flush_select_msgs(LSockRef, Ref),
{error, timeout}
end
@@ -492,14 +492,13 @@ flush_select_msgs(LSRef, Ref) ->
%%
send(Socket, Data) ->
- send(Socket, Data, ?SOCKET_SEND_FLAGS_DEFAULT, infinity).
+ send(Socket, Data, ?SOCKET_SEND_FLAGS_DEFAULT, ?SOCKET_SEND_TIMEOUT_DEFAULT).
send(Socket, Data, Flags) when is_list(Flags) ->
- send(Socket, Data, Flags, infinity);
+ send(Socket, Data, Flags, ?SOCKET_SEND_TIMEOUT_DEFAULT);
send(Socket, Data, Timeout) ->
send(Socket, Data, ?SOCKET_SEND_FLAGS_DEFAULT, Timeout).
-
-spec send(Socket, Data, Flags, Timeout) -> ok | {error, Reason} when
Socket :: socket(),
Data :: iodata(),
@@ -516,15 +515,20 @@ send(Socket, Data, Flags, Timeout) when is_binary(Data) andalso is_list(Flags)
do_send(SockRef, SendRef, Data, _EFlags, Timeout)
when (Timeout =< 0) ->
- nif_cancel(SockRef, SendRef),
+ %% <KOLLA>
+ %% THIS IS THE WRONG SEND REF
+ %% WE SHOULD NOT HAVE THIS REF AS AN ARGUMENT - SEE RECV
+ %% </KOLLA>
+ nif_cancel(SockRef, send, SendRef),
flush_select_msgs(SockRef, SendRef),
{error, {timeout, size(Data)}};
do_send(SockRef, SendRef, Data, EFlags, Timeout) ->
TS = timestamp(Timeout),
case nif_send(SockRef, SendRef, Data, EFlags) of
ok ->
- {ok, next_timeout(TS, Timeout)};
+ ok;
{ok, Written} ->
+ NewTimeout = next_timeout(TS, Timeout),
%% We are partially done, wait for continuation
receive
{select, SockRef, SendRef, ready_output} when (Written > 0) ->
@@ -534,8 +538,8 @@ do_send(SockRef, SendRef, Data, EFlags, Timeout) ->
{select, SockRef, SendRef, ready_output} ->
do_send(SockRef, make_ref(), Data, EFlags,
next_timeout(TS, Timeout))
- after Timeout ->
- nif_cancel(SockRef, SendRef),
+ after NewTimeout ->
+ nif_cancel(SockRef, send, SendRef),
flush_select_msgs(SockRef, SendRef),
{error, timeout}
end;
@@ -545,7 +549,7 @@ do_send(SockRef, SendRef, Data, EFlags, Timeout) ->
do_send(SockRef, SendRef, Data, EFlags,
next_timeout(TS, Timeout))
after Timeout ->
- nif_cancel(SockRef, SendRef),
+ nif_cancel(SockRef, send, SendRef),
flush_select_msgs(SockRef, SendRef),
{error, timeout}
end;
@@ -563,7 +567,7 @@ do_send(SockRef, SendRef, Data, EFlags, Timeout) ->
%%
sendto(Socket, Data, Flags, DestAddr, DestPort) ->
- sendto(Socket, Data, Flags, DestAddr, DestPort, infinity).
+ sendto(Socket, Data, Flags, DestAddr, DestPort, ?SOCKET_SENDTO_TIMEOUT_DEFAULT).
-spec sendto(Socket, Data, Flags, DestAddr, DestPort, Timeout) ->
ok | {error, Reason} when
@@ -589,12 +593,16 @@ sendto(Socket, Data, Flags, DestAddr, DestPort, Timeout)
do_sendto(SockRef, SendRef, Data, _EFlags, _DestAddr, _DestPort, Timeout)
when (Timeout =< 0) ->
- nif_cancel(SockRef, SendRef),
+ %% <KOLLA>
+ %% THIS IS THE WRONG SEND REF
+ %% WE SHOULD NOT HAVE THIS REF AS AN ARGUMENT - SEE RECV
+ %% </KOLLA>
+ nif_cancel(SockRef, sendto, SendRef),
flush_select_msgs(SockRef, SendRef),
{error, {timeout, size(Data)}};
do_sendto(SockRef, SendRef, Data, EFlags, DestAddr, DestPort, Timeout) ->
TS = timestamp(Timeout),
- case nif_sendto(SockRef, SendRef, Data, DestAddr, DestPort, EFlags) of
+ case nif_sendto(SockRef, SendRef, Data, EFlags, DestAddr, DestPort) of
ok ->
{ok, next_timeout(TS, Timeout)};
{ok, Written} ->
@@ -610,7 +618,7 @@ do_sendto(SockRef, SendRef, Data, EFlags, DestAddr, DestPort, Timeout) ->
DestAddr, DestPort,
next_timeout(TS, Timeout))
after Timeout ->
- nif_cancel(SockRef, SendRef),
+ nif_cancel(SockRef, sendto, SendRef),
flush_select_msgs(SockRef, SendRef),
{error, timeout}
end;
@@ -623,7 +631,7 @@ do_sendto(SockRef, SendRef, Data, EFlags, DestAddr, DestPort, Timeout) ->
DestAddr, DestPort,
next_timeout(TS, Timeout))
after Timeout ->
- nif_cancel(SockRef, SendRef),
+ nif_cancel(SockRef, sendto, SendRef),
flush_select_msgs(SockRef, SendRef),
{error, timeout}
end;
@@ -711,20 +719,136 @@ do_sendto(SockRef, SendRef, Data, EFlags, DestAddr, DestPort, Timeout) ->
%%
%% recv, recvfrom, recvmsg - receive a message from a socket
%%
+%% Description:
+%% There is a special case for the argument Length. If its set to zero (0),
+%% it means "give me everything you have".
+%%
+%% Returns: {ok, Binary} | {error, Reason}
+%% Binary - The received data as a binary
+%% Reason - The error reason:
+%% timeout | {timeout, AccData} |
+%% posix() | {posix(), AccData} |
+%% atom() | {atom(), AccData}
+%% AccData - The data (as a binary) that we did manage to receive
+%% before the timeout.
+%%
+%% Arguments:
+%% Socket - The socket to read from.
+%% Length - The number of bytes to read.
+%% Flags - A list of "options" for the read.
+%% Timeout - Time-out in milliseconds.
+
+recv(Socket, Length) ->
+ recv(Socket, Length,
+ ?SOCKET_RECV_FLAGS_DEFAULT,
+ ?SOCKET_RECV_TIMEOUT_DEFAULT).
+
+recv(Socket, Length, Flags) when is_list(Flags) ->
+ recv(Socket, Length, Flags, ?SOCKET_RECV_TIMEOUT_DEFAULT);
+recv(Socket, Length, Timeout) ->
+ recv(Socket, Length, ?SOCKET_RECV_FLAGS_DEFAULT, Timeout).
+
+-spec recv(Socket, Length, Flags, Timeout) -> {ok, Data} | {error, Reason} when
+ Socket :: socket(),
+ Length :: non_neg_integer(),
+ Flags :: recv_flags(),
+ Timeout :: timeout(),
+ Data :: binary(),
+ Reason :: term().
--spec recv(Socket, Flags) -> {ok, Data} | {error, Reason} when
- Socket :: socket(),
- Flags :: recv_flags(),
- Data :: binary(),
- Reason :: term().
+recv(Socket, Length, Flags, Timeout)
+ when (is_integer(Length) andalso (Length >= 0)) andalso
+ is_list(Flags) andalso
+ (is_integer(Timeout) orelse (Timeout =:= infinity)) ->
+ EFlags = enc_recv_flags(Flags),
+ do_recv(Socket, Length, EFlags, <<>>, EFlags).
+
+do_recv({socket, _, SockRef} = Socket, Length, EFlags, Acc, Timeout)
+ when (Timeout =:= infinity) orelse
+ (is_integer(Timeout) andalso (Timeout > 0)) ->
+ TS = timestamp(Timeout),
+ RecvRef = make_ref(),
+ case nif_recv(SockRef, RecvRef, Length, EFlags) of
+ {ok, true = _Complete, Bin} when (size(Acc) =:= 0) ->
+ {ok, Bin};
+ {ok, true = _Complete, Bin} ->
+ {ok, <<Acc/binary, Bin/binary>>};
+
+ %% It depends on the amount of bytes we tried to read:
+ %% 0 - Read everything available
+ %% We got something, but there may be more - keep reading.
+ %% > 0 - We got a part of the message and we will be notified
+ %% when there is more to read (a select message)
+ {ok, false = _Complete, Bin} when (Length =:= 0) ->
+ do_recv(Socket, Length, EFlags,
+ <<Acc/binary, Bin/binary>>,
+ next_timeout(TS, Timeout));
+
+ {ok, false = _Completed, Bin} when (size(Acc) =:= 0) ->
+ %% We got the first chunk of it.
+ %% We will be notified (select message) when there
+ %% is more to read.
+ NewTimeout = next_timeout(TS, Timeout),
+ receive
+ {select, SockRef, RecvRef, ready_input} ->
+ do_recv(Socket, Length-size(Bin), EFlags,
+ Bin,
+ next_timeout(TS, Timeout))
+ after NewTimeout ->
+ nif_cancel(SockRef, recv, RecvRef),
+ flush_select_msgs(SockRef, RecvRef),
+ {error, {timeout, Acc}}
+ end;
+
+ {ok, false = _Completed, Bin} ->
+ %% We got a chunk of it!
+ NewTimeout = next_timeout(TS, Timeout),
+ receive
+ {select, SockRef, RecvRef, ready_input} ->
+ do_recv(Socket, Length-size(Bin), EFlags,
+ <<Acc/binary, Bin/binary>>,
+ next_timeout(TS, Timeout))
+ after NewTimeout ->
+ nif_cancel(SockRef, recv, RecvRef),
+ flush_select_msgs(SockRef, RecvRef),
+ {error, {timeout, Acc}}
+ end;
-recv(Socket) ->
- recv(Socket, ?SOCKET_RECV_FLAGS_DEFAULT).
+ %% We return with the accumulated binary regardless if its empty...
+ {error, eagain} when (Length =:= 0) ->
+ {ok, Acc};
-%% WE "may" need a timeout option here...
-recv({socket, _, SockRef}, Flags) when is_list(Flags) ->
- EFlags = enc_recv_flags(Flags),
- nif_recv(SockRef, EFlags).
+ {error, eagain} ->
+ %% There is nothing just now, but we will be notified when there
+ %% is something to read (a select message).
+ NewTimeout = next_timeout(TS, Timeout),
+ receive
+ {select, SockRef, RecvRef, ready_input} ->
+ do_recv(Socket, Length, EFlags,
+ Acc,
+ next_timeout(TS, Timeout))
+ after NewTimeout ->
+ nif_cancel(SockRef, recv, RecvRef),
+ flush_select_msgs(SockRef, RecvRef),
+ {error, timeout}
+ end;
+
+ {error, _} = ERROR when (size(Acc) =:= 0) ->
+ ERROR;
+
+ {error, Reason} ->
+ {error, {Reason, Acc}}
+
+ end;
+
+do_recv({socket, _, SockRef} = _Socket, 0 = _Length, _Eflags, Acc, _Timeout) ->
+ %% The current recv operation is to be cancelled, so no need for a ref...
+ %% The cancel will end our 'read everything you have' and "activate"
+ %% any waiting readers.
+ nif_cancel(SockRef, recv, undefined),
+ {ok, Acc};
+do_recv(_Socket, _Length, _EFlags, Acc, _Timeout) ->
+ {error, {timeout, Acc}}.
-spec recvfrom(Socket, Flags) -> {ok, Data, SrcAddr, SrcPort} | {error, Reason} when
@@ -980,9 +1104,9 @@ timestamp() ->
{A,B,C} = os:timestamp(),
A*1000000000+B*1000+(C div 1000).
-next_timeout(infinity = Timeout, _) ->
+next_timeout(_, infinity = Timeout) ->
Timeout;
-next_timeout(Timeout, TS) ->
+next_timeout(TS, Timeout) ->
NewTimeout = Timeout - tdiff(TS, timestamp()),
if
(NewTimeout > 0) ->
@@ -1033,13 +1157,13 @@ nif_send(_SockRef, _SendRef, _Data, _Flags) ->
nif_sendto(_SRef, _SendRef, _Data, _Flags, _Dest, _Port) ->
erlang:error(badarg).
-nif_recv(_SRef, _Flags) ->
+nif_recv(_SRef, _RecvRef, _Length, _Flags) ->
erlang:error(badarg).
nif_recvfrom(_SRef, _Flags) ->
erlang:error(badarg).
-nif_cancel(_SRef, _Ref) ->
+nif_cancel(_SRef, _Op, _Ref) ->
erlang:error(badarg).
nif_close(_SRef) ->