aboutsummaryrefslogtreecommitdiffstats
path: root/erts/emulator
diff options
context:
space:
mode:
authorMicael Karlberg <[email protected]>2018-04-12 18:24:00 +0200
committerMicael Karlberg <[email protected]>2018-09-18 13:01:37 +0200
commit2d57ebfc6fb723a476fdcffbb366558a6fa18844 (patch)
tree634705328f415cafd11162f1a3aa653d64c8b180 /erts/emulator
parentc5c8da4ecb985837817e60738811793754c679a0 (diff)
downloadotp-2d57ebfc6fb723a476fdcffbb366558a6fa18844.tar.gz
otp-2d57ebfc6fb723a476fdcffbb366558a6fa18844.tar.bz2
otp-2d57ebfc6fb723a476fdcffbb366558a6fa18844.zip
[socket-nif] Completed send
We still need to handle simultaneous ops. That is, handle if two different procs tries to send at the same time. Or a recv and send at the same time. Ops queue?
Diffstat (limited to 'erts/emulator')
-rw-r--r--erts/emulator/nifs/common/socket_nif.c387
1 files changed, 345 insertions, 42 deletions
diff --git a/erts/emulator/nifs/common/socket_nif.c b/erts/emulator/nifs/common/socket_nif.c
index 3e8fe7061a..bf9179d857 100644
--- a/erts/emulator/nifs/common/socket_nif.c
+++ b/erts/emulator/nifs/common/socket_nif.c
@@ -231,6 +231,9 @@ typedef unsigned long long llu_t;
#define MCREATE(N) enif_mutex_create(N)
#define MLOCK(M) enif_mutex_lock(M)
#define MUNLOCK(M) enif_mutex_unlock(M)
+#define SELECT(E,FD,M,O,P,R) \
+ if (enif_select((E), (FD), (M), (O), (P), (R)) < 0) \
+ return enif_make_badarg((E));
/* *** Socket state defs *** */
@@ -262,6 +265,39 @@ typedef unsigned long long llu_t;
(((d)->state & SOCKET_FLAG_BUSY) == SOCKET_FLAG_BUSY)
+#define SOCKET_SEND_FLAG_CONFIRM 0
+#define SOCKET_SEND_FLAG_DONTROUTE 1
+#define SOCKET_SEND_FLAG_DONTWAIT 2
+#define SOCKET_SEND_FLAG_EOR 3
+#define SOCKET_SEND_FLAG_MORE 4
+#define SOCKET_SEND_FLAG_NOSIGNAL 5
+#define SOCKET_SEND_FLAG_OOB 6
+#define SOCKET_SEND_FLAG_LOW SOCKET_SEND_FLAG_CONFIRM
+#define SOCKET_SEND_FLAG_HIGH SOCKET_SEND_FLAG_OOB
+
+typedef union {
+ struct {
+ unsigned int open:1;
+ // 0 = not conn, 1 = connecting, 2 = connected
+ unsigned int connect:2;
+ // unsigned int connecting:1;
+ // unsigned int connected:1;
+ // 0 = not listen, 1 = listening, 2 = accepting
+ unsigned int listen:2;
+ // unsigned int listening:1;
+ // unsigned int accepting:1;
+ /* Room for more... */
+ } flags;
+ unsigned int field; // Make it easy to reset all flags...
+} SocketState;
+
+/*
+#define IS_OPEN(d) ((d)->state.flags.open)
+#define IS_CONNECTED(d) ((d)->state.flags.connect == SOCKET_STATE_CONNECTED)
+#define IS_CONNECTING(d) ((d)->state.flags.connect == SOCKET_STATE_CONNECTING)
+*/
+
+
/*----------------------------------------------------------------------------
* Interface constants.
*
@@ -329,6 +365,7 @@ typedef unsigned long long llu_t;
#define sock_name(s, addr, len) getsockname((s), (addr), (len))
#define sock_open(domain, type, proto) \
make_noninheritable_handle(socket((domain), (type), (proto)))
+#define sock_send(s,buf,len,flag) send((s),(buf),(len),(flag))
#define sock_errno() WSAGetLastError()
#define sock_create_event(s) WSACreateEvent()
@@ -355,6 +392,7 @@ static unsigned long one_value = 1;
#define sock_listen(s, b) listen((s), (b))
#define sock_name(s, addr, len) getsockname((s), (addr), (len))
#define sock_open(domain, type, proto) socket((domain), (type), (proto))
+#define sock_send(s,buf,len,flag) send((s), (buf), (len), (flag))
#define sock_errno() errno
#define sock_create_event(s) (s) /* return file descriptor */
@@ -420,20 +458,21 @@ typedef struct {
// Write
ErlNifMutex* writeMtx;
BOOLEAN_T isWritable;
- unsigned int writePkgCnt;
- unsigned int writeByteCnt;
- unsigned int writeTries;
- unsigned int writeWaits;
+ uint32_t writePkgCnt;
+ uint32_t writeByteCnt;
+ uint32_t writeTries;
+ uint32_t writeWaits;
+ uint32_t writeFails;
// Read
ErlNifMutex* readMtx;
BOOLEAN_T isReadable;
ErlNifBinary rbuffer;
- unsigned int readCapacity;
- unsigned int readPkgCnt;
- unsigned int readByteCnt;
- unsigned int readTries;
- unsigned int readWaits;
+ uint32_t readCapacity;
+ uint32_t readPkgCnt;
+ uint32_t readByteCnt;
+ uint32_t readTries;
+ uint32_t readWaits;
/* Accept
* We also need a queue for waiting acceptors...
@@ -585,6 +624,11 @@ static ERL_NIF_TERM naccept_accepting(ErlNifEnv* env,
static ERL_NIF_TERM naccept(ErlNifEnv* env,
SocketDescriptor* descP,
ERL_NIF_TERM ref);
+static ERL_NIF_TERM nsend(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ ERL_NIF_TERM sendRef,
+ ErlNifBinary* dataP,
+ int flags);
static ERL_NIF_TERM nfinalize_connection(ErlNifEnv* env,
SocketDescriptor* descP);
@@ -599,12 +643,14 @@ static int compare_pids(ErlNifEnv* env,
static BOOLEAN_T edomain2domain(int edomain, int* domain);
static BOOLEAN_T etype2type(int etype, int* type);
static BOOLEAN_T eproto2proto(int eproto, int* proto);
+static BOOLEAN_T esendflags2sendflags(unsigned int esendflags, int* sendflags);
#ifdef HAVE_SETNS
static BOOLEAN_T emap2netns(ErlNifEnv* env, ERL_NIF_TERM map, char** netns);
static BOOLEAN_T change_network_namespace(char* netns, int* cns, int* err);
static BOOLEAN_T restore_network_namespace(int ns, SOCKET sock, int* err);
#endif
+static BOOLEAN_T cnt_inc(uint32_t* cnt, uint32_t inc);
static void socket_dtor(ErlNifEnv* env, void* obj);
static void socket_stop(ErlNifEnv* env,
@@ -669,7 +715,7 @@ static char str_eagain[] = "eagain";
static char str_eafnosupport[] = "eafnosupport";
static char str_einval[] = "einval";
static char str_eisconn[] = "eisconn";
-static char str_eisnconn[] = "eisnconn";
+static char str_enotconn[] = "enotconn";
static char str_exbadstate[] = "exbadstate";
static char str_exbusy[] = "exbusy";
static char str_exmon[] = "exmonitor"; // failed monitor
@@ -687,7 +733,7 @@ static ERL_NIF_TERM atom_eagain;
static ERL_NIF_TERM atom_eafnosupport;
static ERL_NIF_TERM atom_einval;
static ERL_NIF_TERM atom_eisconn;
-static ERL_NIF_TERM atom_eisnconn;
+static ERL_NIF_TERM atom_enotconn;
static ERL_NIF_TERM atom_exbadstate;
static ERL_NIF_TERM atom_exbusy;
static ERL_NIF_TERM atom_exmon;
@@ -917,10 +963,10 @@ ERL_NIF_TERM nopen(ErlNifEnv* env,
*
* TODO!
*/
- enif_select(env,
- event,
- ERL_NIF_SELECT_READ,
- descP, NULL, atom_undefined);
+ SELECT(env,
+ event,
+ ERL_NIF_SELECT_READ,
+ descP, NULL, atom_undefined);
#endif
@@ -1516,10 +1562,10 @@ ERL_NIF_TERM nconnect(ErlNifEnv* env,
(sock_errno() == EINPROGRESS))) { /* Unix & OSE!! */
ERL_NIF_TERM ref = MKREF(env);
descP->state = SOCKET_STATE_CONNECTING;
- enif_select(env,
- descP->sock,
- (ERL_NIF_SELECT_WRITE),
- descP, NULL, ref);
+ SELECT(env,
+ descP->sock,
+ (ERL_NIF_SELECT_WRITE),
+ descP, NULL, ref);
return make_ok(env, ref);
} else if (code == 0) { /* ok we are connected */
descP->state = SOCKET_STATE_CONNECTED;
@@ -1572,7 +1618,7 @@ ERL_NIF_TERM nfinalize_connection(ErlNifEnv* env,
int error;
if (descP->state != SOCKET_STATE_CONNECTING)
- return make_error(env, atom_eisnconn);
+ return make_error(env, atom_enotconn);
if (!verify_is_connected(descP, &error)) {
descP->state = SOCKET_STATE_OPEN; /* restore state */
@@ -1784,10 +1830,10 @@ ERL_NIF_TERM naccept_listening(ErlNifEnv* env,
descP->acceptor.ref = ref;
- enif_select(env,
- descP->sock,
- (ERL_NIF_SELECT_READ),
- descP, NULL, ref);
+ SELECT(env,
+ descP->sock,
+ (ERL_NIF_SELECT_READ),
+ descP, NULL, ref);
/* Shall we really change state?
* The ready event is sent directly to the calling
@@ -1860,9 +1906,10 @@ ERL_NIF_TERM naccept_listening(ErlNifEnv* env,
#ifdef __WIN32__
/* See 'What is the point of this?' above */
- enif_select(env,
- (ERL_NIF_SELECT_READ),
- descP, NULL, atom_undefined);
+ SELECT(env,
+ descP->sock,
+ (ERL_NIF_SELECT_READ),
+ descP, NULL, atom_undefined);
#endif
accDescP->state = SOCKET_STATE_CONNECTED;
@@ -1911,10 +1958,10 @@ ERL_NIF_TERM naccept_accepting(ErlNifEnv* env,
* Just try again, no real error, just a ghost trigger from poll,
*/
- enif_select(env,
- descP->sock,
- (ERL_NIF_SELECT_READ),
- descP, NULL, ref);
+ SELECT(env,
+ descP->sock,
+ (ERL_NIF_SELECT_READ),
+ descP, NULL, ref);
return make_error(env, atom_eagain);
} else {
@@ -1960,9 +2007,10 @@ ERL_NIF_TERM naccept_accepting(ErlNifEnv* env,
#ifdef __WIN32__
/* See 'What is the point of this?' above */
- enif_select(env,
- (ERL_NIF_SELECT_READ),
- descP, NULL, atom_undefined);
+ SELECT(env,
+ descP->sock,
+ (ERL_NIF_SELECT_READ),
+ descP, NULL, atom_undefined);
#endif
accDescP->state = SOCKET_STATE_CONNECTED;
@@ -2008,6 +2056,7 @@ SocketDescriptor* alloc_descriptor(SOCKET sock, HANDLE event)
descP->writeByteCnt = 0;
descP->writeTries = 0;
descP->writeWaits = 0;
+ descP->writeFails = 0;
descP->readPkgCnt = 0;
descP->readByteCnt = 0;
descP->readTries = 0;
@@ -2022,6 +2071,190 @@ SocketDescriptor* alloc_descriptor(SOCKET sock, HANDLE event)
}
+
+/* ----------------------------------------------------------------------
+ * nif_send
+ *
+ * Description:
+ * Send a message on a socket
+ *
+ * Arguments:
+ * Socket (ref) - Points to the socket descriptor.
+ * Sendref - A unique id for this (send) request.
+ * Data - The data to send in the form of a IOVec.
+ * Flags - Send flags.
+ */
+
+static
+ERL_NIF_TERM nif_send(ErlNifEnv* env,
+ int argc,
+ const ERL_NIF_TERM argv[])
+{
+ SocketDescriptor* descP;
+ ERL_NIF_TERM sendRef;
+ ErlNifBinary data;
+ unsigned int eflags;
+ int flags;
+ ERL_NIF_TERM res;
+
+ /* Extract arguments and perform preliminary validation */
+
+ if ((argc != 4) ||
+ !enif_get_resource(env, argv[0], sockets, (void**) &descP) ||
+ !enif_inspect_iolist_as_binary(env, argv[2], &data) ||
+ !enif_get_uint(env, argv[3], &eflags)) {
+ return enif_make_badarg(env);
+ }
+ sendRef = argv[1];
+
+ if (!IS_CONNECTED(descP))
+ return make_error(env, atom_enotconn);
+
+ if (!esendflags2sendflags(eflags, &flags))
+ return enif_make_badarg(env);
+
+ MLOCK(descP->writeMtx);
+
+ res = nsend(env, descP, sendRef, &data, flags);
+
+ MUNLOCK(descP->writeMtx);
+
+ return res;
+}
+
+
+/* What do we do when another process tries to write
+ * when the current writer has a select already waiting?
+ * Queue it? And what about simultaneous read and write?
+ * Queue up all operations towards the socket?
+ *
+ * We (may) need a currentOp field and an ops queue field.
+ */
+static
+ERL_NIF_TERM nsend(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ ERL_NIF_TERM sendRef,
+ ErlNifBinary* dataP,
+ int flags)
+{
+ int save_errno;
+ ssize_t written;
+
+ if (!descP->isWritable)
+ return enif_make_badarg(env);
+
+ /* We ignore the wrap for the moment.
+ * Maybe we should issue a wrap-message to controlling process...
+ */
+ cnt_inc(&descP->writeTries, 1);
+
+ written = sock_send(descP->sock, dataP->data, dataP->size, flags);
+
+ if (written == dataP->size) {
+
+ cnt_inc(&descP->writePkgCnt, 1);
+ cnt_inc(&descP->writeByteCnt, written);
+
+ return atom_ok;
+
+ } else if (written < 0) {
+
+ /* Ouch, check what kind of failure */
+ save_errno = sock_errno();
+ if ((save_errno != EAGAIN) &&
+ (save_errno != EINTR)) {
+
+ cnt_inc(&descP->writeFails, 1);
+
+ return make_error2(env, save_errno);
+
+ } else {
+
+ /* Ok, try again later */
+
+ written = 0;
+
+ }
+ }
+
+ /* We failed to write the *entire* packet (anything less then size
+ * of the packet, which is 0 <= written < sizeof packet),
+ * so schedule the rest for later.
+ */
+
+ cnt_inc(&descP->writeWaits, 1);
+
+ SELECT(env, descP->sock, (ERL_NIF_SELECT_WRITE),
+ descP, NULL, sendRef);
+
+ return make_ok(env, enif_make_int(env, written));
+
+}
+
+
+#ifdef FOBAR
+static
+ERL_NIF_TERM nwritev(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ ERL_NIF_TERM sendRef,
+ ERL_NIF_TERM data)
+{
+ ERL_NIF_TERM tail;
+ ErlNifIOVec vec;
+ ErlNifIOVec* iovec = &vec;
+ SysIOVec* sysiovec;
+ int save_errno;
+ int iovcnt, n;
+
+ if (!enif_inspect_iovec(env, MAX_VSZ, data, &tail, &iovec))
+ return enif_make_badarg(env);
+
+ if (enif_ioq_size(descP->outQ) > 0) {
+ /* If the I/O queue contains data we enqueue the iovec
+ * and then peek the data to write out of the queue.
+ */
+ if (!enif_ioq_enqv(q, iovec, 0))
+ return -3;
+
+ sysiovec = enif_ioq_peek(descP->outQ, &iovcnt);
+
+ } else {
+ /* If the I/O queue is empty we skip the trip through it. */
+ iovcnt = iovec->iovcnt;
+ sysiovec = iovec->iov;
+ }
+
+ /* Attempt to write the data */
+ n = writev(fd, sysiovec, iovcnt);
+ saved_errno = errno;
+
+ if (enif_ioq_size(descP->outQ) == 0) {
+ /* If the I/O queue was initially empty we enqueue any
+ remaining data into the queue for writing later. */
+ if (n >= 0 && !enif_ioq_enqv(descP->outQ, iovec, n))
+ return -3;
+ } else {
+ /* Dequeue any data that was written from the queue. */
+ if (n > 0 && !enif_ioq_deq(descP->outQ, n, NULL))
+ return -4;
+ }
+ /* return n, which is either number of bytes written or -1 if
+ some error happened */
+ errno = saved_errno;
+ return n;
+}
+#endif
+
+
+
+/* ----------------------------------------------------------------------
+ * U t i l i t y F u n c t i o n s
+ * ----------------------------------------------------------------------
+ */
+
+/* compare_pids - Test if two pids are equal
+ *
+ */
static
int compare_pids(ErlNifEnv* env,
const ErlNifPid* pid1,
@@ -2034,11 +2267,6 @@ int compare_pids(ErlNifEnv* env,
}
-/* ----------------------------------------------------------------------
- * U t i l i t y F u n c t i o n s
- * ----------------------------------------------------------------------
- */
-
/* edomain2domain - convert internal (erlang) domain to (proper) domain
*
* Note that only a subset is supported.
@@ -2070,7 +2298,6 @@ BOOLEAN_T edomain2domain(int edomain, int* domain)
}
-
/* etype2type - convert internal (erlang) type to (proper) type
*
* Note that only a subset is supported.
@@ -2103,7 +2330,6 @@ BOOLEAN_T etype2type(int etype, int* type)
}
-
/* eproto2proto - convert internal (erlang) protocol to (proper) protocol
*
* Note that only a subset is supported.
@@ -2193,6 +2419,58 @@ BOOLEAN_T emap2netns(ErlNifEnv* env, ERL_NIF_TERM map, char** netns)
#endif
+/* esendflags2sendflags - convert internal (erlang) send flags to (proper)
+ * send flags.
+ */
+static
+BOOLEAN_T esendflags2sendflags(unsigned int esendflags, int* sendflags)
+{
+ unsigned int ef;
+ int tmp = 0;
+
+ for (ef = SOCKET_SEND_FLAG_LOW; ef <= SOCKET_SEND_FLAG_HIGH; ef++) {
+ switch (ef) {
+ case SOCKET_SEND_FLAG_CONFIRM:
+ tmp |= MSG_CONFIRM;
+ break;
+
+ case SOCKET_SEND_FLAG_DONTROUTE:
+ tmp |= MSG_DONTROUTE;
+ break;
+
+ case SOCKET_SEND_FLAG_DONTWAIT:
+ tmp |= MSG_DONTWAIT;
+ break;
+
+ case SOCKET_SEND_FLAG_EOR:
+ tmp |= MSG_EOR;
+ break;
+
+ case SOCKET_SEND_FLAG_MORE:
+ tmp |= MSG_MORE;
+ break;
+
+ case SOCKET_SEND_FLAG_NOSIGNAL:
+ tmp |= MSG_NOSIGNAL;
+ break;
+
+ case SOCKET_SEND_FLAG_OOB:
+ tmp |= MSG_OOB;
+ break;
+
+ default:
+ return FALSE;
+ }
+
+ }
+
+ *sendflags = tmp;
+
+ return TRUE;
+}
+
+
+
/* Create an ok two (2) tuple in the form: {ok, Any}.
* The second element (Any) is already in the form of an
* ERL_NIF_TERM so all we have to do is create the tuple.
@@ -2240,6 +2518,31 @@ ERL_NIF_TERM make_error2(ErlNifEnv* env, int err)
/* ----------------------------------------------------------------------
+ * C o u n t e r F u n c t i o n s
+ * ----------------------------------------------------------------------
+ */
+
+static
+BOOLEAN_T cnt_inc(uint32_t* cnt, uint32_t inc)
+{
+ BOOLEAN_T wrap;
+ uint32_t max = 0xFFFFFFFF;
+ uint32_t current = *cnt;
+
+ if ((max - inc) >= current) {
+ *cnt += inc;
+ wrap = FALSE;
+ } else {
+ *cnt = inc - (max - current) - 1;
+ wrap = TRUE;
+ }
+
+ return (wrap);
+}
+
+
+
+/* ----------------------------------------------------------------------
* C a l l b a c k F u n c t i o n s
* ----------------------------------------------------------------------
*/
@@ -2397,7 +2700,7 @@ int on_load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info)
atom_eafnosupport = MKA(env, str_eafnosupport);
atom_einval = MKA(env, str_einval);
atom_eisconn = MKA(env, str_eisconn);
- atom_eisnconn = MKA(env, str_eisnconn);
+ atom_enotconn = MKA(env, str_enotconn);
// atom_exalloc = MKA(env, str_exalloc);
atom_exbadstate = MKA(env, str_exbadstate);
atom_exbusy = MKA(env, str_exbusy);