diff options
Diffstat (limited to 'erts/emulator')
-rw-r--r-- | erts/emulator/nifs/common/socket_nif.c | 387 |
1 files changed, 345 insertions, 42 deletions
diff --git a/erts/emulator/nifs/common/socket_nif.c b/erts/emulator/nifs/common/socket_nif.c index 3e8fe7061a..bf9179d857 100644 --- a/erts/emulator/nifs/common/socket_nif.c +++ b/erts/emulator/nifs/common/socket_nif.c @@ -231,6 +231,9 @@ typedef unsigned long long llu_t; #define MCREATE(N) enif_mutex_create(N) #define MLOCK(M) enif_mutex_lock(M) #define MUNLOCK(M) enif_mutex_unlock(M) +#define SELECT(E,FD,M,O,P,R) \ + if (enif_select((E), (FD), (M), (O), (P), (R)) < 0) \ + return enif_make_badarg((E)); /* *** Socket state defs *** */ @@ -262,6 +265,39 @@ typedef unsigned long long llu_t; (((d)->state & SOCKET_FLAG_BUSY) == SOCKET_FLAG_BUSY) +#define SOCKET_SEND_FLAG_CONFIRM 0 +#define SOCKET_SEND_FLAG_DONTROUTE 1 +#define SOCKET_SEND_FLAG_DONTWAIT 2 +#define SOCKET_SEND_FLAG_EOR 3 +#define SOCKET_SEND_FLAG_MORE 4 +#define SOCKET_SEND_FLAG_NOSIGNAL 5 +#define SOCKET_SEND_FLAG_OOB 6 +#define SOCKET_SEND_FLAG_LOW SOCKET_SEND_FLAG_CONFIRM +#define SOCKET_SEND_FLAG_HIGH SOCKET_SEND_FLAG_OOB + +typedef union { + struct { + unsigned int open:1; + // 0 = not conn, 1 = connecting, 2 = connected + unsigned int connect:2; + // unsigned int connecting:1; + // unsigned int connected:1; + // 0 = not listen, 1 = listening, 2 = accepting + unsigned int listen:2; + // unsigned int listening:1; + // unsigned int accepting:1; + /* Room for more... */ + } flags; + unsigned int field; // Make it easy to reset all flags... +} SocketState; + +/* +#define IS_OPEN(d) ((d)->state.flags.open) +#define IS_CONNECTED(d) ((d)->state.flags.connect == SOCKET_STATE_CONNECTED) +#define IS_CONNECTING(d) ((d)->state.flags.connect == SOCKET_STATE_CONNECTING) +*/ + + /*---------------------------------------------------------------------------- * Interface constants. * @@ -329,6 +365,7 @@ typedef unsigned long long llu_t; #define sock_name(s, addr, len) getsockname((s), (addr), (len)) #define sock_open(domain, type, proto) \ make_noninheritable_handle(socket((domain), (type), (proto))) +#define sock_send(s,buf,len,flag) send((s),(buf),(len),(flag)) #define sock_errno() WSAGetLastError() #define sock_create_event(s) WSACreateEvent() @@ -355,6 +392,7 @@ static unsigned long one_value = 1; #define sock_listen(s, b) listen((s), (b)) #define sock_name(s, addr, len) getsockname((s), (addr), (len)) #define sock_open(domain, type, proto) socket((domain), (type), (proto)) +#define sock_send(s,buf,len,flag) send((s), (buf), (len), (flag)) #define sock_errno() errno #define sock_create_event(s) (s) /* return file descriptor */ @@ -420,20 +458,21 @@ typedef struct { // Write ErlNifMutex* writeMtx; BOOLEAN_T isWritable; - unsigned int writePkgCnt; - unsigned int writeByteCnt; - unsigned int writeTries; - unsigned int writeWaits; + uint32_t writePkgCnt; + uint32_t writeByteCnt; + uint32_t writeTries; + uint32_t writeWaits; + uint32_t writeFails; // Read ErlNifMutex* readMtx; BOOLEAN_T isReadable; ErlNifBinary rbuffer; - unsigned int readCapacity; - unsigned int readPkgCnt; - unsigned int readByteCnt; - unsigned int readTries; - unsigned int readWaits; + uint32_t readCapacity; + uint32_t readPkgCnt; + uint32_t readByteCnt; + uint32_t readTries; + uint32_t readWaits; /* Accept * We also need a queue for waiting acceptors... @@ -585,6 +624,11 @@ static ERL_NIF_TERM naccept_accepting(ErlNifEnv* env, static ERL_NIF_TERM naccept(ErlNifEnv* env, SocketDescriptor* descP, ERL_NIF_TERM ref); +static ERL_NIF_TERM nsend(ErlNifEnv* env, + SocketDescriptor* descP, + ERL_NIF_TERM sendRef, + ErlNifBinary* dataP, + int flags); static ERL_NIF_TERM nfinalize_connection(ErlNifEnv* env, SocketDescriptor* descP); @@ -599,12 +643,14 @@ static int compare_pids(ErlNifEnv* env, static BOOLEAN_T edomain2domain(int edomain, int* domain); static BOOLEAN_T etype2type(int etype, int* type); static BOOLEAN_T eproto2proto(int eproto, int* proto); +static BOOLEAN_T esendflags2sendflags(unsigned int esendflags, int* sendflags); #ifdef HAVE_SETNS static BOOLEAN_T emap2netns(ErlNifEnv* env, ERL_NIF_TERM map, char** netns); static BOOLEAN_T change_network_namespace(char* netns, int* cns, int* err); static BOOLEAN_T restore_network_namespace(int ns, SOCKET sock, int* err); #endif +static BOOLEAN_T cnt_inc(uint32_t* cnt, uint32_t inc); static void socket_dtor(ErlNifEnv* env, void* obj); static void socket_stop(ErlNifEnv* env, @@ -669,7 +715,7 @@ static char str_eagain[] = "eagain"; static char str_eafnosupport[] = "eafnosupport"; static char str_einval[] = "einval"; static char str_eisconn[] = "eisconn"; -static char str_eisnconn[] = "eisnconn"; +static char str_enotconn[] = "enotconn"; static char str_exbadstate[] = "exbadstate"; static char str_exbusy[] = "exbusy"; static char str_exmon[] = "exmonitor"; // failed monitor @@ -687,7 +733,7 @@ static ERL_NIF_TERM atom_eagain; static ERL_NIF_TERM atom_eafnosupport; static ERL_NIF_TERM atom_einval; static ERL_NIF_TERM atom_eisconn; -static ERL_NIF_TERM atom_eisnconn; +static ERL_NIF_TERM atom_enotconn; static ERL_NIF_TERM atom_exbadstate; static ERL_NIF_TERM atom_exbusy; static ERL_NIF_TERM atom_exmon; @@ -917,10 +963,10 @@ ERL_NIF_TERM nopen(ErlNifEnv* env, * * TODO! */ - enif_select(env, - event, - ERL_NIF_SELECT_READ, - descP, NULL, atom_undefined); + SELECT(env, + event, + ERL_NIF_SELECT_READ, + descP, NULL, atom_undefined); #endif @@ -1516,10 +1562,10 @@ ERL_NIF_TERM nconnect(ErlNifEnv* env, (sock_errno() == EINPROGRESS))) { /* Unix & OSE!! */ ERL_NIF_TERM ref = MKREF(env); descP->state = SOCKET_STATE_CONNECTING; - enif_select(env, - descP->sock, - (ERL_NIF_SELECT_WRITE), - descP, NULL, ref); + SELECT(env, + descP->sock, + (ERL_NIF_SELECT_WRITE), + descP, NULL, ref); return make_ok(env, ref); } else if (code == 0) { /* ok we are connected */ descP->state = SOCKET_STATE_CONNECTED; @@ -1572,7 +1618,7 @@ ERL_NIF_TERM nfinalize_connection(ErlNifEnv* env, int error; if (descP->state != SOCKET_STATE_CONNECTING) - return make_error(env, atom_eisnconn); + return make_error(env, atom_enotconn); if (!verify_is_connected(descP, &error)) { descP->state = SOCKET_STATE_OPEN; /* restore state */ @@ -1784,10 +1830,10 @@ ERL_NIF_TERM naccept_listening(ErlNifEnv* env, descP->acceptor.ref = ref; - enif_select(env, - descP->sock, - (ERL_NIF_SELECT_READ), - descP, NULL, ref); + SELECT(env, + descP->sock, + (ERL_NIF_SELECT_READ), + descP, NULL, ref); /* Shall we really change state? * The ready event is sent directly to the calling @@ -1860,9 +1906,10 @@ ERL_NIF_TERM naccept_listening(ErlNifEnv* env, #ifdef __WIN32__ /* See 'What is the point of this?' above */ - enif_select(env, - (ERL_NIF_SELECT_READ), - descP, NULL, atom_undefined); + SELECT(env, + descP->sock, + (ERL_NIF_SELECT_READ), + descP, NULL, atom_undefined); #endif accDescP->state = SOCKET_STATE_CONNECTED; @@ -1911,10 +1958,10 @@ ERL_NIF_TERM naccept_accepting(ErlNifEnv* env, * Just try again, no real error, just a ghost trigger from poll, */ - enif_select(env, - descP->sock, - (ERL_NIF_SELECT_READ), - descP, NULL, ref); + SELECT(env, + descP->sock, + (ERL_NIF_SELECT_READ), + descP, NULL, ref); return make_error(env, atom_eagain); } else { @@ -1960,9 +2007,10 @@ ERL_NIF_TERM naccept_accepting(ErlNifEnv* env, #ifdef __WIN32__ /* See 'What is the point of this?' above */ - enif_select(env, - (ERL_NIF_SELECT_READ), - descP, NULL, atom_undefined); + SELECT(env, + descP->sock, + (ERL_NIF_SELECT_READ), + descP, NULL, atom_undefined); #endif accDescP->state = SOCKET_STATE_CONNECTED; @@ -2008,6 +2056,7 @@ SocketDescriptor* alloc_descriptor(SOCKET sock, HANDLE event) descP->writeByteCnt = 0; descP->writeTries = 0; descP->writeWaits = 0; + descP->writeFails = 0; descP->readPkgCnt = 0; descP->readByteCnt = 0; descP->readTries = 0; @@ -2022,6 +2071,190 @@ SocketDescriptor* alloc_descriptor(SOCKET sock, HANDLE event) } + +/* ---------------------------------------------------------------------- + * nif_send + * + * Description: + * Send a message on a socket + * + * Arguments: + * Socket (ref) - Points to the socket descriptor. + * Sendref - A unique id for this (send) request. + * Data - The data to send in the form of a IOVec. + * Flags - Send flags. + */ + +static +ERL_NIF_TERM nif_send(ErlNifEnv* env, + int argc, + const ERL_NIF_TERM argv[]) +{ + SocketDescriptor* descP; + ERL_NIF_TERM sendRef; + ErlNifBinary data; + unsigned int eflags; + int flags; + ERL_NIF_TERM res; + + /* Extract arguments and perform preliminary validation */ + + if ((argc != 4) || + !enif_get_resource(env, argv[0], sockets, (void**) &descP) || + !enif_inspect_iolist_as_binary(env, argv[2], &data) || + !enif_get_uint(env, argv[3], &eflags)) { + return enif_make_badarg(env); + } + sendRef = argv[1]; + + if (!IS_CONNECTED(descP)) + return make_error(env, atom_enotconn); + + if (!esendflags2sendflags(eflags, &flags)) + return enif_make_badarg(env); + + MLOCK(descP->writeMtx); + + res = nsend(env, descP, sendRef, &data, flags); + + MUNLOCK(descP->writeMtx); + + return res; +} + + +/* What do we do when another process tries to write + * when the current writer has a select already waiting? + * Queue it? And what about simultaneous read and write? + * Queue up all operations towards the socket? + * + * We (may) need a currentOp field and an ops queue field. + */ +static +ERL_NIF_TERM nsend(ErlNifEnv* env, + SocketDescriptor* descP, + ERL_NIF_TERM sendRef, + ErlNifBinary* dataP, + int flags) +{ + int save_errno; + ssize_t written; + + if (!descP->isWritable) + return enif_make_badarg(env); + + /* We ignore the wrap for the moment. + * Maybe we should issue a wrap-message to controlling process... + */ + cnt_inc(&descP->writeTries, 1); + + written = sock_send(descP->sock, dataP->data, dataP->size, flags); + + if (written == dataP->size) { + + cnt_inc(&descP->writePkgCnt, 1); + cnt_inc(&descP->writeByteCnt, written); + + return atom_ok; + + } else if (written < 0) { + + /* Ouch, check what kind of failure */ + save_errno = sock_errno(); + if ((save_errno != EAGAIN) && + (save_errno != EINTR)) { + + cnt_inc(&descP->writeFails, 1); + + return make_error2(env, save_errno); + + } else { + + /* Ok, try again later */ + + written = 0; + + } + } + + /* We failed to write the *entire* packet (anything less then size + * of the packet, which is 0 <= written < sizeof packet), + * so schedule the rest for later. + */ + + cnt_inc(&descP->writeWaits, 1); + + SELECT(env, descP->sock, (ERL_NIF_SELECT_WRITE), + descP, NULL, sendRef); + + return make_ok(env, enif_make_int(env, written)); + +} + + +#ifdef FOBAR +static +ERL_NIF_TERM nwritev(ErlNifEnv* env, + SocketDescriptor* descP, + ERL_NIF_TERM sendRef, + ERL_NIF_TERM data) +{ + ERL_NIF_TERM tail; + ErlNifIOVec vec; + ErlNifIOVec* iovec = &vec; + SysIOVec* sysiovec; + int save_errno; + int iovcnt, n; + + if (!enif_inspect_iovec(env, MAX_VSZ, data, &tail, &iovec)) + return enif_make_badarg(env); + + if (enif_ioq_size(descP->outQ) > 0) { + /* If the I/O queue contains data we enqueue the iovec + * and then peek the data to write out of the queue. + */ + if (!enif_ioq_enqv(q, iovec, 0)) + return -3; + + sysiovec = enif_ioq_peek(descP->outQ, &iovcnt); + + } else { + /* If the I/O queue is empty we skip the trip through it. */ + iovcnt = iovec->iovcnt; + sysiovec = iovec->iov; + } + + /* Attempt to write the data */ + n = writev(fd, sysiovec, iovcnt); + saved_errno = errno; + + if (enif_ioq_size(descP->outQ) == 0) { + /* If the I/O queue was initially empty we enqueue any + remaining data into the queue for writing later. */ + if (n >= 0 && !enif_ioq_enqv(descP->outQ, iovec, n)) + return -3; + } else { + /* Dequeue any data that was written from the queue. */ + if (n > 0 && !enif_ioq_deq(descP->outQ, n, NULL)) + return -4; + } + /* return n, which is either number of bytes written or -1 if + some error happened */ + errno = saved_errno; + return n; +} +#endif + + + +/* ---------------------------------------------------------------------- + * U t i l i t y F u n c t i o n s + * ---------------------------------------------------------------------- + */ + +/* compare_pids - Test if two pids are equal + * + */ static int compare_pids(ErlNifEnv* env, const ErlNifPid* pid1, @@ -2034,11 +2267,6 @@ int compare_pids(ErlNifEnv* env, } -/* ---------------------------------------------------------------------- - * U t i l i t y F u n c t i o n s - * ---------------------------------------------------------------------- - */ - /* edomain2domain - convert internal (erlang) domain to (proper) domain * * Note that only a subset is supported. @@ -2070,7 +2298,6 @@ BOOLEAN_T edomain2domain(int edomain, int* domain) } - /* etype2type - convert internal (erlang) type to (proper) type * * Note that only a subset is supported. @@ -2103,7 +2330,6 @@ BOOLEAN_T etype2type(int etype, int* type) } - /* eproto2proto - convert internal (erlang) protocol to (proper) protocol * * Note that only a subset is supported. @@ -2193,6 +2419,58 @@ BOOLEAN_T emap2netns(ErlNifEnv* env, ERL_NIF_TERM map, char** netns) #endif +/* esendflags2sendflags - convert internal (erlang) send flags to (proper) + * send flags. + */ +static +BOOLEAN_T esendflags2sendflags(unsigned int esendflags, int* sendflags) +{ + unsigned int ef; + int tmp = 0; + + for (ef = SOCKET_SEND_FLAG_LOW; ef <= SOCKET_SEND_FLAG_HIGH; ef++) { + switch (ef) { + case SOCKET_SEND_FLAG_CONFIRM: + tmp |= MSG_CONFIRM; + break; + + case SOCKET_SEND_FLAG_DONTROUTE: + tmp |= MSG_DONTROUTE; + break; + + case SOCKET_SEND_FLAG_DONTWAIT: + tmp |= MSG_DONTWAIT; + break; + + case SOCKET_SEND_FLAG_EOR: + tmp |= MSG_EOR; + break; + + case SOCKET_SEND_FLAG_MORE: + tmp |= MSG_MORE; + break; + + case SOCKET_SEND_FLAG_NOSIGNAL: + tmp |= MSG_NOSIGNAL; + break; + + case SOCKET_SEND_FLAG_OOB: + tmp |= MSG_OOB; + break; + + default: + return FALSE; + } + + } + + *sendflags = tmp; + + return TRUE; +} + + + /* Create an ok two (2) tuple in the form: {ok, Any}. * The second element (Any) is already in the form of an * ERL_NIF_TERM so all we have to do is create the tuple. @@ -2240,6 +2518,31 @@ ERL_NIF_TERM make_error2(ErlNifEnv* env, int err) /* ---------------------------------------------------------------------- + * C o u n t e r F u n c t i o n s + * ---------------------------------------------------------------------- + */ + +static +BOOLEAN_T cnt_inc(uint32_t* cnt, uint32_t inc) +{ + BOOLEAN_T wrap; + uint32_t max = 0xFFFFFFFF; + uint32_t current = *cnt; + + if ((max - inc) >= current) { + *cnt += inc; + wrap = FALSE; + } else { + *cnt = inc - (max - current) - 1; + wrap = TRUE; + } + + return (wrap); +} + + + +/* ---------------------------------------------------------------------- * C a l l b a c k F u n c t i o n s * ---------------------------------------------------------------------- */ @@ -2397,7 +2700,7 @@ int on_load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info) atom_eafnosupport = MKA(env, str_eafnosupport); atom_einval = MKA(env, str_einval); atom_eisconn = MKA(env, str_eisconn); - atom_eisnconn = MKA(env, str_eisnconn); + atom_enotconn = MKA(env, str_enotconn); // atom_exalloc = MKA(env, str_exalloc); atom_exbadstate = MKA(env, str_exbadstate); atom_exbusy = MKA(env, str_exbusy); |