From 28611d6e6daab8ae24e5e593c001bcd6442506eb Mon Sep 17 00:00:00 2001 From: Micael Karlberg Date: Thu, 19 Apr 2018 10:57:54 +0200 Subject: [socket-nif] Completed recv Need to fix the use of the request ref (ID) handling in previous functions. --- erts/emulator/nifs/common/socket_nif.c | 526 +++++++++++++++++++++++++-------- erts/preloaded/src/socket.erl | 204 ++++++++++--- 2 files changed, 563 insertions(+), 167 deletions(-) (limited to 'erts') diff --git a/erts/emulator/nifs/common/socket_nif.c b/erts/emulator/nifs/common/socket_nif.c index 46c5c696e2..d3aa3db2aa 100644 --- a/erts/emulator/nifs/common/socket_nif.c +++ b/erts/emulator/nifs/common/socket_nif.c @@ -223,19 +223,20 @@ typedef unsigned long long llu_t; /* *** Misc macros and defines *** */ -#define MALLOC(SZ) enif_alloc(SZ) -#define FREE(P) enif_free(P) -#define MKA(E,S) enif_make_atom(E, S) -#define MKREF(E) enif_make_ref(E) -#define MKT2(E,E1,E2) enif_make_tuple2(E, E1, E2) -#define MCREATE(N) enif_mutex_create(N) -#define MLOCK(M) enif_mutex_lock(M) -#define MUNLOCK(M) enif_mutex_unlock(M) +#define MALLOC(SZ) enif_alloc(SZ) +#define FREE(P) enif_free(P) +#define MKA(E,S) enif_make_atom(E, S) +#define MKBIN(E,B) enif_make_binary(E, B) +#define MKREF(E) enif_make_ref(E) +#define MKT2(E,E1,E2) enif_make_tuple2(E, E1, E2) +#define MKT3(E,E1,E2,E3) enif_make_tuple3(E, E1, E2, E3) +#define MCREATE(N) enif_mutex_create(N) +#define MLOCK(M) enif_mutex_lock(M) +#define MUNLOCK(M) enif_mutex_unlock(M) #define SELECT(E,FD,M,O,P,R) \ if (enif_select((E), (FD), (M), (O), (P), (R)) < 0) \ return enif_make_badarg((E)); - /* *** Socket state defs *** */ #define SOCKET_FLAG_OPEN 0x0001 @@ -275,6 +276,18 @@ typedef unsigned long long llu_t; #define SOCKET_SEND_FLAG_LOW SOCKET_SEND_FLAG_CONFIRM #define SOCKET_SEND_FLAG_HIGH SOCKET_SEND_FLAG_OOB +#define SOCKET_RECV_FLAG_CMSG_CLOEXEC 0 +#define SOCKET_RECV_FLAG_DONTWAIT 1 +#define SOCKET_RECV_FLAG_ERRQUEUE 2 +#define SOCKET_RECV_FLAG_OOB 3 +#define SOCKET_RECV_FLAG_PEEK 4 +#define SOCKET_RECV_FLAG_TRUNC 5 +#define SOCKET_RECV_FLAG_WAITALL 6 +#define SOCKET_RECV_FLAG_LOW SOCKET_RECV_FLAG_CMSG_CLOEXEC +#define SOCKET_RECV_FLAG_HIGH SOCKET_RECV_FLAG_WAITALL + +#define SOCKET_RECV_BUFFER_SIZE_DEFAULT 2048 + typedef union { struct { unsigned int open:1; @@ -343,6 +356,8 @@ typedef union { #define GET_UINT(E, TE, IP) enif_get_uint((E), (TE), (IP)) #define GET_TUPLE(E, TE, TSZ, TA) enif_get_tuple((E), (TE), (TSZ), (TA)) +#define ALLOC_BIN(SZ, BP) enif_alloc_binary((SZ), (BP)) + /* =================================================================== * * * @@ -366,6 +381,7 @@ typedef union { #define sock_name(s, addr, len) getsockname((s), (addr), (len)) #define sock_open(domain, type, proto) \ make_noninheritable_handle(socket((domain), (type), (proto))) +#define sock_recv(s,buf,len,flag) recv((s),(buf),(len),(flag)) #define sock_send(s,buf,len,flag) send((s),(buf),(len),(flag)) #define sock_sendto(s,buf,blen,flag,addr,alen) \ sendto((s),(buf),(blen),(flag),(addr),(alen)) @@ -395,6 +411,7 @@ static unsigned long one_value = 1; #define sock_listen(s, b) listen((s), (b)) #define sock_name(s, addr, len) getsockname((s), (addr), (len)) #define sock_open(domain, type, proto) socket((domain), (type), (proto)) +#define sock_recv(s,buf,len,flag) recv((s),(buf),(len),(flag)) #define sock_send(s,buf,len,flag) send((s), (buf), (len), (flag)) #define sock_sendto(s,buf,blen,flag,addr,alen) \ sendto((s),(buf),(blen),(flag),(addr),(alen)) @@ -437,17 +454,28 @@ typedef struct { typedef struct { - ErlNifPid pid; // PID of the acceptor - ErlNifMonitor mon; // Monitor for the acceptor - ERL_NIF_TERM ref; // The (unique) reference of the (accept) request -} SocketAcceptor; + ErlNifPid pid; // PID of the requesting process + ErlNifMonitor mon; // Monitor to the requesting process + ERL_NIF_TERM ref; // The (unique) reference (ID) of the request +} SocketRequestor; + +typedef struct socket_request_queue_element { + struct socket_request_queue_element* next; + SocketRequestor data; +} SocketRequestQueueElement; typedef struct { - // The actual socket + SocketRequestQueueElement* first; + SocketRequestQueueElement* last; +} SocketRequestQueue; + + +typedef struct { + /* +++ The actual socket +++ */ SOCKET sock; HANDLE event; - /* "Stuff" about the socket */ + /* +++ Stuff "about" the socket +++ */ int domain; int type; int protocol; @@ -456,51 +484,45 @@ typedef struct { SocketAddress remote; - // Controller (owner) process + /* +++ Controller (owner) process +++ */ ErlNifPid ctrlPid; ErlNifMonitor ctrlMon; - // Write - ErlNifMutex* writeMtx; - BOOLEAN_T isWritable; - uint32_t writePkgCnt; - uint32_t writeByteCnt; - uint32_t writeTries; - uint32_t writeWaits; - uint32_t writeFails; - - // Read - ErlNifMutex* readMtx; - BOOLEAN_T isReadable; - ErlNifBinary rbuffer; - uint32_t readCapacity; - uint32_t readPkgCnt; - uint32_t readByteCnt; - uint32_t readTries; - uint32_t readWaits; - - /* Accept - * We also need a queue for waiting acceptors... - * Lets see if this can be a common "request" queue... - */ - ErlNifMutex* accMtx; - SocketAcceptor acceptor; - - - /* We need to keep track of the "request(s)" we have pending. - * If for instance an accept takes to long, the issuer may - * decide to "cancel" the accept (actually the select). This - * is done by calling the *nif_cancel* function with the request - * ref as argument. - * We also need to keep track of requests so that if a new - * request is issued before the current has completed, we - * reply with e.g. ebusy (or something to that effect). - * Or do we? Can the caller actually do that? - */ - - - /* Misc stuff */ + /* +++ Write stuff +++ */ + ErlNifMutex* writeMtx; + SocketRequestor currentWriter; + SocketRequestor* currentWriterP; // NULL or points to currentWriter + SocketRequestQueue writersQ; + BOOLEAN_T isWritable; + uint32_t writePkgCnt; + uint32_t writeByteCnt; + uint32_t writeTries; + uint32_t writeWaits; + uint32_t writeFails; + + /* +++ Read stuff +++ */ + ErlNifMutex* readMtx; + SocketRequestor currentReader; + SocketRequestor* currentReaderP; // NULL or points to currentReader + SocketRequestQueue readersQ; + BOOLEAN_T isReadable; + ErlNifBinary rbuffer; // DO WE NEED THIS + uint32_t readCapacity; // DO WE NEED THIS + uint32_t readPkgCnt; + uint32_t readByteCnt; + uint32_t readTries; + uint32_t readWaits; + + /* +++ Accept stuff +++ */ + ErlNifMutex* accMtx; + SocketRequestor currentAcceptor; + SocketRequestor* currentAcceptorP; // NULL or points to currentReader + SocketRequestQueue acceptorsQ; + + /* +++ Misc stuff +++ */ + BOOLEAN_T iow; // Inform On Wrap BOOLEAN_T dbg; + } SocketDescriptor; @@ -515,39 +537,6 @@ typedef struct { } SocketData; -/* typedef struct socket_queue_element { */ -/* struct socket_queue_element* next; */ -/* /\* */ -/* unsigned int tag; */ -/* union { */ -/* SocketAcceptor acceptor; */ -/* } u; */ -/* *\/ */ -/* SocketAcceptor acceptor; */ -/* } SocketQueueElement; */ - -/* typedef struct socket_queue { */ -/* SocketQueueElement* first; */ -/* SocketQueueElement* last; */ -/* } SocketQueue; */ - -/* Macros for defining the various queues (accept, send receive) */ -#define SOCKET_QUEUE_ELEMENT(QE,QEP) \ - typedef struct socket_queue_element_##QEP { \ - struct socket_queue_element_##QEP* next; \ - QE elem; \ - } QE##Element; - -#define SOCKET_QUEUE(QE,Q) \ - typedef struct { \ - QE* first; \ - QE* last; \ - } Q; - -/* The Acceptor Queue types */ -SOCKET_QUEUE_ELEMENT(SocketAcceptor, acceptor); -SOCKET_QUEUE(SocketAcceptorElement, SocketAcceptQueue); - /* ---------------------------------------------------------------------- * F o r w a r d s * ---------------------------------------------------------------------- @@ -600,12 +589,12 @@ static ERL_NIF_TERM nif_recvfrom(ErlNifEnv* env, static ERL_NIF_TERM nif_close(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]); -static ERL_NIF_TERM nif_setopt(ErlNifEnv* env, - int argc, - const ERL_NIF_TERM argv[]); -static ERL_NIF_TERM nif_getopt(ErlNifEnv* env, - int argc, - const ERL_NIF_TERM argv[]); +static ERL_NIF_TERM nif_setsockopt(ErlNifEnv* env, + int argc, + const ERL_NIF_TERM argv[]); +static ERL_NIF_TERM nif_getsockopt(ErlNifEnv* env, + int argc, + const ERL_NIF_TERM argv[]); static ERL_NIF_TERM nif_finalize_connection(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]); @@ -649,12 +638,23 @@ static ERL_NIF_TERM nsendto(ErlNifEnv* env, ErlNifBinary* dataP, int flags, SocketAddress* toAddrP); +static ERL_NIF_TERM nrecv(ErlNifEnv* env, + SocketDescriptor* descP, + ERL_NIF_TERM recvRef, + int len, + int flags); static ERL_NIF_TERM send_check_result(ErlNifEnv* env, SocketDescriptor* descP, ssize_t written, ssize_t dataSize, ERL_NIF_TERM sendRef); +static ERL_NIF_TERM recv_check_result(ErlNifEnv* env, + SocketDescriptor* descP, + int read, + int toRead, + ErlNifBinary* bufP, + ERL_NIF_TERM recvRef); static ERL_NIF_TERM nfinalize_connection(ErlNifEnv* env, SocketDescriptor* descP); @@ -707,6 +707,7 @@ static BOOLEAN_T edomain2domain(int edomain, int* domain); static BOOLEAN_T etype2type(int etype, int* type); static BOOLEAN_T eproto2proto(int eproto, int* proto); static BOOLEAN_T esendflags2sendflags(unsigned int esendflags, int* sendflags); +static BOOLEAN_T erecvflags2recvflags(unsigned int erecvflags, int* recvflags); #ifdef HAVE_SETNS static BOOLEAN_T emap2netns(ErlNifEnv* env, ERL_NIF_TERM map, char** netns); static BOOLEAN_T change_network_namespace(char* netns, int* cns, int* err); @@ -725,7 +726,8 @@ static void socket_down(ErlNifEnv* env, const ErlNifPid* pid, const ErlNifMonitor* mon); -static ERL_NIF_TERM make_ok(ErlNifEnv* env, ERL_NIF_TERM any); +static ERL_NIF_TERM make_ok2(ErlNifEnv* env, ERL_NIF_TERM val); +static ERL_NIF_TERM make_ok3(ErlNifEnv* env, ERL_NIF_TERM val1, ERL_NIF_TERM val2); static ERL_NIF_TERM make_error(ErlNifEnv* env, ERL_NIF_TERM reason); static ERL_NIF_TERM make_error1(ErlNifEnv* env, char* reason); static ERL_NIF_TERM make_error2(ErlNifEnv* env, int err); @@ -767,8 +769,9 @@ static const struct in6_addr in6addr_loopback = /* *** String constants *** */ -static char str_false[] = "false"; +static char str_closed[] = "closed"; static char str_error[] = "error"; +static char str_false[] = "false"; static char str_ok[] = "ok"; static char str_true[] = "true"; static char str_undefined[] = "undefined"; @@ -779,6 +782,7 @@ static char str_eafnosupport[] = "eafnosupport"; static char str_einval[] = "einval"; static char str_eisconn[] = "eisconn"; static char str_enotconn[] = "enotconn"; +static char str_exalloc[] = "exalloc"; static char str_exbadstate[] = "exbadstate"; static char str_exbusy[] = "exbusy"; static char str_exmon[] = "exmonitor"; // failed monitor @@ -786,8 +790,9 @@ static char str_exself[] = "exself"; // failed self /* *** Atoms *** */ -static ERL_NIF_TERM atom_false; +static ERL_NIF_TERM atom_closed; static ERL_NIF_TERM atom_error; +static ERL_NIF_TERM atom_false; static ERL_NIF_TERM atom_ok; static ERL_NIF_TERM atom_true; static ERL_NIF_TERM atom_undefined; @@ -797,6 +802,7 @@ static ERL_NIF_TERM atom_eafnosupport; static ERL_NIF_TERM atom_einval; static ERL_NIF_TERM atom_eisconn; static ERL_NIF_TERM atom_enotconn; +static ERL_NIF_TERM atom_exalloc; static ERL_NIF_TERM atom_exbadstate; static ERL_NIF_TERM atom_exbusy; static ERL_NIF_TERM atom_exmon; @@ -832,16 +838,16 @@ static SocketData socketData; * nif_connect(Sock, Addr, Port) * nif_listen(Sock, Backlog) * nif_accept(LSock, Ref) - * nif_send(Sock, Data, Flags) - * nif_sendto(Sock, Data, Flags, DstAddr, DstPort) - * nif_recv(Sock, Flags) + * nif_send(Sock, SendRef, Data, Flags) + * nif_sendto(Sock, SendRef, Data, Flags, DstAddr, DstPort) + * nif_recv(Sock, RecvRef, Length, Flags) * nif_recvfrom(Sock, Flags) * nif_close(Sock) * * And some functions to manipulate and retrieve socket options: * ------------------------------------------------------------- - * nif_setopt/3 - * nif_getopt/2 + * nif_setsockopt/3 + * nif_getsockopt/2 * * And some socket admin functions: * ------------------------------------------------------------- @@ -1033,7 +1039,7 @@ ERL_NIF_TERM nopen(ErlNifEnv* env, #endif - return make_ok(env, res); + return make_ok2(env, res); } @@ -1213,7 +1219,7 @@ ERL_NIF_TERM nbind(ErlNifEnv* env, port = 0; } - return make_ok(env, enif_make_int(env, port)); + return make_ok2(env, enif_make_int(env, port)); } @@ -1482,7 +1488,7 @@ ERL_NIF_TERM nconnect(ErlNifEnv* env, descP->sock, (ERL_NIF_SELECT_WRITE), descP, NULL, ref); - return make_ok(env, ref); + return make_ok2(env, ref); } else if (code == 0) { /* ok we are connected */ descP->state = SOCKET_STATE_CONNECTED; /* Do we need to do somthing for "active" mode? @@ -1738,13 +1744,13 @@ ERL_NIF_TERM naccept_listening(ErlNifEnv* env, /* *** Try again later *** */ - descP->acceptor.pid = caller; + descP->currentAcceptor.pid = caller; if (enif_monitor_process(env, descP, - &descP->acceptor.pid, - &descP->acceptor.mon) > 0) + &descP->currentAcceptor.pid, + &descP->currentAcceptor.mon) > 0) return make_error(env, atom_exmon); - descP->acceptor.ref = ref; + descP->currentAcceptor.ref = ref; SELECT(env, descP->sock, @@ -1830,7 +1836,7 @@ ERL_NIF_TERM naccept_listening(ErlNifEnv* env, accDescP->state = SOCKET_STATE_CONNECTED; - return make_ok(env, accRef); + return make_ok2(env, accRef); } } @@ -1854,7 +1860,7 @@ ERL_NIF_TERM naccept_accepting(ErlNifEnv* env, if (enif_self(env, &caller) == NULL) return make_error(env, atom_exself); - if (compare_pids(env, &descP->acceptor.pid, &caller) != 0) { + if (compare_pids(env, &descP->currentAcceptor.pid, &caller) != 0) { /* This will have to do until we implement the queue. * When we have the queue, we should simply push this request, * and instead return with eagain (the caller will then wait @@ -1938,7 +1944,7 @@ ERL_NIF_TERM naccept_accepting(ErlNifEnv* env, */ descP->state = SOCKET_STATE_LISTENING; - return make_ok(env, accRef); + return make_ok2(env, accRef); } } @@ -2200,6 +2206,101 @@ ERL_NIF_TERM nwritev(ErlNifEnv* env, +/* ---------------------------------------------------------------------- + * nif_recv + * + * Description: + * Receive a message on a socket. + * Normally used only on a connected socket! + * If we are trying to read > 0 bytes, then that is what we do. + * But if we have specified 0 bytes, then we want to read + * whatever is in the buffers (everything it got). + * + * Arguments: + * Socket (ref) - Points to the socket descriptor. + * RecvRef - A unique id for this (send) request. + * Length - The number of bytes to receive. + * Flags - Receive flags. + */ + +static +ERL_NIF_TERM nif_recv(ErlNifEnv* env, + int argc, + const ERL_NIF_TERM argv[]) +{ + SocketDescriptor* descP; + ERL_NIF_TERM recvRef; + int len; + unsigned int eflags; + int flags; + ERL_NIF_TERM res; + + if ((argc != 4) || + !enif_get_resource(env, argv[0], sockets, (void**) &descP) || + !GET_INT(env, argv[2], &len) || + !GET_UINT(env, argv[3], &eflags)) { + return enif_make_badarg(env); + } + recvRef = argv[1]; + + if (!IS_CONNECTED(descP)) + return make_error(env, atom_enotconn); + + if (!erecvflags2recvflags(eflags, &flags)) + return enif_make_badarg(env); + + MLOCK(descP->readMtx); + + /* We need to handle the case when another process tries + * to receive at the same time. + * If the current recv could not read its entire package + * this time (resulting in an select). The read of the + * other process must be made to wait until current + * is done! + * Basically, we need a read queue! + * + * A 'reading' field (boolean), which is set if we did + * not manage to read the entire message and reset every + * time we do. + */ + + res = nrecv(env, descP, recvRef, len, flags); + + MUNLOCK(descP->readMtx); + + return res; + +} + + +static +ERL_NIF_TERM nrecv(ErlNifEnv* env, + SocketDescriptor* descP, + ERL_NIF_TERM recvRef, + int len, + int flags) +{ + ssize_t read; + ErlNifBinary buf; + + if (!descP->isReadable) + return enif_make_badarg(env); + + if (!ALLOC_BIN((len ? len : SOCKET_RECV_BUFFER_SIZE_DEFAULT), &buf)) + return make_error(env, atom_exalloc); + + /* We ignore the wrap for the moment. + * Maybe we should issue a wrap-message to controlling process... + */ + cnt_inc(&descP->readTries, 1); + + read = sock_recv(descP->sock, buf.data, buf.size, flags); + + return recv_check_result(env, descP, + read, len, + &buf, + recvRef); +} @@ -2237,7 +2338,7 @@ ERL_NIF_TERM send_check_result(ErlNifEnv* env, /* Ok, try again later */ - written = 0; + written = 0; // SHOULD RESULT IN {error, eagain}!!!! } } @@ -2252,12 +2353,109 @@ ERL_NIF_TERM send_check_result(ErlNifEnv* env, SELECT(env, descP->sock, (ERL_NIF_SELECT_WRITE), descP, NULL, sendRef); - return make_ok(env, enif_make_int(env, written)); + return make_ok2(env, enif_make_int(env, written)); } -/* The rather odd thing about the 'toAddrP' (the **) is +static +ERL_NIF_TERM recv_check_result(ErlNifEnv* env, + SocketDescriptor* descP, + int read, + int toRead, + ErlNifBinary* bufP, + ERL_NIF_TERM recvRef) +{ + /* There is a special case: If the provided 'to read' value is + * zero (0). That means that if we filled the (default size) + * buffer, we need to continue to read (since there *may* be + * more data), but we cannot loop here. Instead we inform the + * caller that it must call again. + */ + + if (bufP->size == read) { + + /* +++ We filled the buffer +++ */ + + if (toRead == 0) { + + /* +++ Give us everything you have got => needs to continue +++ */ + + /* How do we do this? + * Either: + * 1) Send up each chunk of data for each of the read + * and let the erlang code assemble it: {ok, false, Bin} + * (when complete it should return {ok, true, Bin}). + * We need to read atleast one more time to be sure if its + * done... + * 2) Or put it in a buffer here, and then let the erlang code + * know that it should call again (special return value) + * (continuous binary realloc "here"). + * + * => We choose alt 1 for now. + */ + + return make_ok3(env, atom_false, MKBIN(env, bufP)); + + } else { + + /* +++ We got exactly as much as we requested +++ */ + + /* + * WE NEED TO INFORM ANY WAITING READERS + * + */ + + return make_ok3(env, atom_true, MKBIN(env, bufP)); + + } + + } else if (read < 0) { + + /* +++ Error handling +++ */ + + int save_errno = sock_errno(); + + if (save_errno == ECONNRESET) { + + /* +++ Oups - closed +++ */ + + /* + * IF THE CURRENT PROCESS IS *NOT* THE CONTROLLING + * PROCESS, WE NEED TO INFORM IT!!! + * + * ALL WAITING PROCESSES MUST ALSO GET THE ERROR!! + * + * + */ + + SELECT(env, + descP->sock, + (ERL_NIF_SELECT_STOP), + descP, NULL, recvRef); + + return make_error(env, atom_closed); + + } else if ((save_errno == ERRNO_BLOCK) || + (save_errno == EAGAIN)) { + return make_error(env, atom_eagain); + } else { + return make_error2(env, save_errno); + } + + } else { + + /* +++ We got only a part of what was expected - receive more later +++ */ + + return make_ok3(env, atom_false, MKBIN(env, bufP)); + + } +} + + +/* *** decode_send_addr *** + * + * The rather odd thing about the 'toAddrP' (the **) is * because we need to be able to return a NULL pointer, * in the case of the dest address is the atom 'null'. * Its possible to call the sendto function with the @@ -2507,14 +2705,24 @@ SocketDescriptor* alloc_descriptor(SOCKET sock, HANDLE event) char buf[64]; /* Buffer used for building the mutex name */ sprintf(buf, "socket[w,%d]", sock); - descP->writeMtx = MCREATE(buf); + descP->writeMtx = MCREATE(buf); + descP->currentWriterP = NULL; // currentWriter not used + descP->writersQ.first = NULL; + descP->writersQ.last = NULL; sprintf(buf, "socket[r,%d]", sock); - descP->readMtx = MCREATE(buf); + descP->readMtx = MCREATE(buf); + descP->currentReaderP = NULL; // currentReader not used + descP->readersQ.first = NULL; + descP->readersQ.last = NULL; sprintf(buf, "socket[acc,%d]", sock); - descP->accMtx = MCREATE(buf); + descP->accMtx = MCREATE(buf); + descP->currentAcceptorP = NULL; // currentAcceptor not used + descP->acceptorsQ.first = NULL; + descP->acceptorsQ.last = NULL; + descP->iow = FALSE; descP->dbg = SOCKET_DEBUG_DEFAULT; descP->isWritable = TRUE; descP->isReadable = TRUE; @@ -2756,17 +2964,81 @@ BOOLEAN_T esendflags2sendflags(unsigned int esendflags, int* sendflags) +/* erecvflags2recvflags - convert internal (erlang) send flags to (proper) + * send flags. + */ +static +BOOLEAN_T erecvflags2recvflags(unsigned int erecvflags, int* recvflags) +{ + unsigned int ef; + int tmp = 0; + + for (ef = SOCKET_RECV_FLAG_LOW; ef <= SOCKET_RECV_FLAG_HIGH; ef++) { + switch (ef) { + case SOCKET_RECV_FLAG_CMSG_CLOEXEC: + tmp |= MSG_CMSG_CLOEXEC; + break; + + case SOCKET_RECV_FLAG_DONTWAIT: + tmp |= MSG_DONTWAIT; + break; + + case SOCKET_RECV_FLAG_ERRQUEUE: + tmp |= MSG_ERRQUEUE; + break; + + case SOCKET_RECV_FLAG_OOB: + tmp |= MSG_OOB; + break; + + case SOCKET_RECV_FLAG_PEEK: + tmp |= MSG_PEEK; + break; + + case SOCKET_RECV_FLAG_TRUNC: + tmp |= MSG_TRUNC; + break; + + case SOCKET_RECV_FLAG_WAITALL: + tmp |= MSG_WAITALL; + break; + + default: + return FALSE; + } + + } + + *recvflags = tmp; + + return TRUE; +} + + + /* Create an ok two (2) tuple in the form: {ok, Any}. * The second element (Any) is already in the form of an * ERL_NIF_TERM so all we have to do is create the tuple. */ static -ERL_NIF_TERM make_ok(ErlNifEnv* env, ERL_NIF_TERM any) +ERL_NIF_TERM make_ok2(ErlNifEnv* env, ERL_NIF_TERM any) { return MKT2(env, atom_ok, any); } +/* Create an ok three (3) tuple in the form: {ok, Val1, Val2}. + * The second (Val1) and third (Val2) elements are already in + * the form of an ERL_NIF_TERM so all we have to do is create + * the tuple. + */ +static +ERL_NIF_TERM make_ok3(ErlNifEnv* env, ERL_NIF_TERM val1, ERL_NIF_TERM val2) +{ + return MKT3(env, atom_ok, val1, val2); +} + + /* Create an error two (2) tuple in the form: {error, Reason}. * The second element (Reason) is already in the form of an * ERL_NIF_TERM so all we have to do is create the tuple. @@ -2885,13 +3157,13 @@ ErlNifFunc socket_funcs[] = {"nif_connect", 3, nif_connect}, {"nif_listen", 2, nif_listen}, {"nif_accept", 2, nif_accept}, - {"nif_send", 3, nif_send}, - {"nif_sendto", 5, nif_sendto}, - {"nif_recv", 2, nif_recv}, + {"nif_send", 4, nif_send}, + {"nif_sendto", 6, nif_sendto}, + {"nif_recv", 4, nif_recv}, {"nif_recvfrom", 2, nif_recvfrom}, {"nif_close", 1, nif_close}, - {"nif_setopt", 3, nif_setopt}, - {"nif_getopt", 2, nif_getopt}, + {"nif_setsockopt", 3, nif_setsockopt}, + {"nif_getsockopt", 2, nif_getsockopt}, /* "Extra" functions to "complete" the socket interface. * For instance, the function nif_finalize_connection @@ -2966,7 +3238,7 @@ int on_load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info) // atom_active_once = MKA(env, str_active_once); // atom_binary = MKA(env, str_binary); // atom_buildDate = MKA(env, str_buildDate); - // atom_closed = MKA(env, str_closed); + atom_closed = MKA(env, str_closed); atom_error = MKA(env, str_error); atom_false = MKA(env, str_false); // atom_list = MKA(env, str_list); @@ -2986,7 +3258,7 @@ int on_load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info) atom_einval = MKA(env, str_einval); atom_eisconn = MKA(env, str_eisconn); atom_enotconn = MKA(env, str_enotconn); - // atom_exalloc = MKA(env, str_exalloc); + atom_exalloc = MKA(env, str_exalloc); atom_exbadstate = MKA(env, str_exbadstate); atom_exbusy = MKA(env, str_exbusy); // atom_exnotopen = MKA(env, str_exnotopen); diff --git a/erts/preloaded/src/socket.erl b/erts/preloaded/src/socket.erl index 0a78feab4e..6784477123 100644 --- a/erts/preloaded/src/socket.erl +++ b/erts/preloaded/src/socket.erl @@ -38,7 +38,7 @@ %% sendmsg/4, %% writev/4, OR SENDV? It will be strange for recv then: recvv (instead of readv) - recv/1, recv/2, + recv/2, recv/3, recv/4, recvfrom/1, recvfrom/2, %% recvmsg/4, %% readv/3, @@ -173,11 +173,7 @@ -define(SOCKET_LISTEN_BACKLOG_DEFAULT, 5). -%% Bit numbers (from right). --define(SOCKET_ACCEPT_FLAG_NONBLOCK, 0). --define(SOCKET_ACCEPT_FLAG_CLOEXEC, 1). - --define(SOCKET_ACCEPT_FLAGS_DEFAULT, []). +-define(SOCKET_ACCEPT_TIMEOUT_DEFAULT, infinity). -define(SOCKET_SEND_FLAG_CONFIRM, 0). -define(SOCKET_SEND_FLAG_DONTROUTE, 1). @@ -187,7 +183,9 @@ -define(SOCKET_SEND_FLAG_NOSIGNAL, 5). -define(SOCKET_SEND_FLAG_OOB, 6). --define(SOCKET_SEND_FLAGS_DEFAULT, []). +-define(SOCKET_SEND_FLAGS_DEFAULT, []). +-define(SOCKET_SEND_TIMEOUT_DEFAULT, infinity). +-define(SOCKET_SENDTO_TIMEOUT_DEFAULT, ?SOCKET_SEND_TIMEOUT_DEFAULT). -define(SOCKET_RECV_FLAG_CMSG_CLOEXEC, 0). -define(SOCKET_RECV_FLAG_DONTWAIT, 1). @@ -197,7 +195,8 @@ -define(SOCKET_RECV_FLAG_TRUNC, 5). -define(SOCKET_RECV_FLAG_WAITALL, 6). --define(SOCKET_RECV_FLAGS_DEFAULT, []). +-define(SOCKET_RECV_FLAGS_DEFAULT, []). +-define(SOCKET_RECV_TIMEOUT_DEFAULT, infinity). -define(SOCKET_SETOPT_KEY_DEBUG, 0). @@ -403,7 +402,7 @@ connect({socket, _, SockRef}, Addr, Port, Timeout) {select, SockRef, Ref, ready_output} -> nif_finalize_connection(SockRef) after NewTimeout -> - nif_cancel(SockRef, Ref), + nif_cancel(SockRef, connect, Ref), {error, timeout} end; {error, _} = ERROR -> @@ -444,7 +443,7 @@ listen({socket, _, SockRef}, Backlog) Reason :: term(). accept(Socket) -> - accept(Socket, infinity). + accept(Socket, ?SOCKET_ACCEPT_TIMEOUT_DEFAULT). %% Do we really need this optimization? accept(_, Timeout) when is_integer(Timeout) andalso (Timeout < 0) -> @@ -466,11 +465,12 @@ do_accept(LSockRef, SI, Ref, Timeout) -> Socket = {socket, SocketInfo, SockRef}, {ok, Socket}; {error, eagain} -> + NewTimeout = next_timeout(TS, Timeout), receive {select, LSockRef, Ref, ready_input} -> do_accept(LSockRef, SI, make_ref(), next_timeout(TS, Timeout)) - after Timeout -> - nif_cancel(LSockRef, Ref), + after NewTimeout -> + nif_cancel(LSockRef, accept, Ref), flush_select_msgs(LSockRef, Ref), {error, timeout} end @@ -492,14 +492,13 @@ flush_select_msgs(LSRef, Ref) -> %% send(Socket, Data) -> - send(Socket, Data, ?SOCKET_SEND_FLAGS_DEFAULT, infinity). + send(Socket, Data, ?SOCKET_SEND_FLAGS_DEFAULT, ?SOCKET_SEND_TIMEOUT_DEFAULT). send(Socket, Data, Flags) when is_list(Flags) -> - send(Socket, Data, Flags, infinity); + send(Socket, Data, Flags, ?SOCKET_SEND_TIMEOUT_DEFAULT); send(Socket, Data, Timeout) -> send(Socket, Data, ?SOCKET_SEND_FLAGS_DEFAULT, Timeout). - -spec send(Socket, Data, Flags, Timeout) -> ok | {error, Reason} when Socket :: socket(), Data :: iodata(), @@ -516,15 +515,20 @@ send(Socket, Data, Flags, Timeout) when is_binary(Data) andalso is_list(Flags) do_send(SockRef, SendRef, Data, _EFlags, Timeout) when (Timeout =< 0) -> - nif_cancel(SockRef, SendRef), + %% + %% THIS IS THE WRONG SEND REF + %% WE SHOULD NOT HAVE THIS REF AS AN ARGUMENT - SEE RECV + %% + nif_cancel(SockRef, send, SendRef), flush_select_msgs(SockRef, SendRef), {error, {timeout, size(Data)}}; do_send(SockRef, SendRef, Data, EFlags, Timeout) -> TS = timestamp(Timeout), case nif_send(SockRef, SendRef, Data, EFlags) of ok -> - {ok, next_timeout(TS, Timeout)}; + ok; {ok, Written} -> + NewTimeout = next_timeout(TS, Timeout), %% We are partially done, wait for continuation receive {select, SockRef, SendRef, ready_output} when (Written > 0) -> @@ -534,8 +538,8 @@ do_send(SockRef, SendRef, Data, EFlags, Timeout) -> {select, SockRef, SendRef, ready_output} -> do_send(SockRef, make_ref(), Data, EFlags, next_timeout(TS, Timeout)) - after Timeout -> - nif_cancel(SockRef, SendRef), + after NewTimeout -> + nif_cancel(SockRef, send, SendRef), flush_select_msgs(SockRef, SendRef), {error, timeout} end; @@ -545,7 +549,7 @@ do_send(SockRef, SendRef, Data, EFlags, Timeout) -> do_send(SockRef, SendRef, Data, EFlags, next_timeout(TS, Timeout)) after Timeout -> - nif_cancel(SockRef, SendRef), + nif_cancel(SockRef, send, SendRef), flush_select_msgs(SockRef, SendRef), {error, timeout} end; @@ -563,7 +567,7 @@ do_send(SockRef, SendRef, Data, EFlags, Timeout) -> %% sendto(Socket, Data, Flags, DestAddr, DestPort) -> - sendto(Socket, Data, Flags, DestAddr, DestPort, infinity). + sendto(Socket, Data, Flags, DestAddr, DestPort, ?SOCKET_SENDTO_TIMEOUT_DEFAULT). -spec sendto(Socket, Data, Flags, DestAddr, DestPort, Timeout) -> ok | {error, Reason} when @@ -589,12 +593,16 @@ sendto(Socket, Data, Flags, DestAddr, DestPort, Timeout) do_sendto(SockRef, SendRef, Data, _EFlags, _DestAddr, _DestPort, Timeout) when (Timeout =< 0) -> - nif_cancel(SockRef, SendRef), + %% + %% THIS IS THE WRONG SEND REF + %% WE SHOULD NOT HAVE THIS REF AS AN ARGUMENT - SEE RECV + %% + nif_cancel(SockRef, sendto, SendRef), flush_select_msgs(SockRef, SendRef), {error, {timeout, size(Data)}}; do_sendto(SockRef, SendRef, Data, EFlags, DestAddr, DestPort, Timeout) -> TS = timestamp(Timeout), - case nif_sendto(SockRef, SendRef, Data, DestAddr, DestPort, EFlags) of + case nif_sendto(SockRef, SendRef, Data, EFlags, DestAddr, DestPort) of ok -> {ok, next_timeout(TS, Timeout)}; {ok, Written} -> @@ -610,7 +618,7 @@ do_sendto(SockRef, SendRef, Data, EFlags, DestAddr, DestPort, Timeout) -> DestAddr, DestPort, next_timeout(TS, Timeout)) after Timeout -> - nif_cancel(SockRef, SendRef), + nif_cancel(SockRef, sendto, SendRef), flush_select_msgs(SockRef, SendRef), {error, timeout} end; @@ -623,7 +631,7 @@ do_sendto(SockRef, SendRef, Data, EFlags, DestAddr, DestPort, Timeout) -> DestAddr, DestPort, next_timeout(TS, Timeout)) after Timeout -> - nif_cancel(SockRef, SendRef), + nif_cancel(SockRef, sendto, SendRef), flush_select_msgs(SockRef, SendRef), {error, timeout} end; @@ -711,20 +719,136 @@ do_sendto(SockRef, SendRef, Data, EFlags, DestAddr, DestPort, Timeout) -> %% %% recv, recvfrom, recvmsg - receive a message from a socket %% +%% Description: +%% There is a special case for the argument Length. If its set to zero (0), +%% it means "give me everything you have". +%% +%% Returns: {ok, Binary} | {error, Reason} +%% Binary - The received data as a binary +%% Reason - The error reason: +%% timeout | {timeout, AccData} | +%% posix() | {posix(), AccData} | +%% atom() | {atom(), AccData} +%% AccData - The data (as a binary) that we did manage to receive +%% before the timeout. +%% +%% Arguments: +%% Socket - The socket to read from. +%% Length - The number of bytes to read. +%% Flags - A list of "options" for the read. +%% Timeout - Time-out in milliseconds. + +recv(Socket, Length) -> + recv(Socket, Length, + ?SOCKET_RECV_FLAGS_DEFAULT, + ?SOCKET_RECV_TIMEOUT_DEFAULT). + +recv(Socket, Length, Flags) when is_list(Flags) -> + recv(Socket, Length, Flags, ?SOCKET_RECV_TIMEOUT_DEFAULT); +recv(Socket, Length, Timeout) -> + recv(Socket, Length, ?SOCKET_RECV_FLAGS_DEFAULT, Timeout). + +-spec recv(Socket, Length, Flags, Timeout) -> {ok, Data} | {error, Reason} when + Socket :: socket(), + Length :: non_neg_integer(), + Flags :: recv_flags(), + Timeout :: timeout(), + Data :: binary(), + Reason :: term(). --spec recv(Socket, Flags) -> {ok, Data} | {error, Reason} when - Socket :: socket(), - Flags :: recv_flags(), - Data :: binary(), - Reason :: term(). +recv(Socket, Length, Flags, Timeout) + when (is_integer(Length) andalso (Length >= 0)) andalso + is_list(Flags) andalso + (is_integer(Timeout) orelse (Timeout =:= infinity)) -> + EFlags = enc_recv_flags(Flags), + do_recv(Socket, Length, EFlags, <<>>, EFlags). + +do_recv({socket, _, SockRef} = Socket, Length, EFlags, Acc, Timeout) + when (Timeout =:= infinity) orelse + (is_integer(Timeout) andalso (Timeout > 0)) -> + TS = timestamp(Timeout), + RecvRef = make_ref(), + case nif_recv(SockRef, RecvRef, Length, EFlags) of + {ok, true = _Complete, Bin} when (size(Acc) =:= 0) -> + {ok, Bin}; + {ok, true = _Complete, Bin} -> + {ok, <>}; + + %% It depends on the amount of bytes we tried to read: + %% 0 - Read everything available + %% We got something, but there may be more - keep reading. + %% > 0 - We got a part of the message and we will be notified + %% when there is more to read (a select message) + {ok, false = _Complete, Bin} when (Length =:= 0) -> + do_recv(Socket, Length, EFlags, + <>, + next_timeout(TS, Timeout)); + + {ok, false = _Completed, Bin} when (size(Acc) =:= 0) -> + %% We got the first chunk of it. + %% We will be notified (select message) when there + %% is more to read. + NewTimeout = next_timeout(TS, Timeout), + receive + {select, SockRef, RecvRef, ready_input} -> + do_recv(Socket, Length-size(Bin), EFlags, + Bin, + next_timeout(TS, Timeout)) + after NewTimeout -> + nif_cancel(SockRef, recv, RecvRef), + flush_select_msgs(SockRef, RecvRef), + {error, {timeout, Acc}} + end; + + {ok, false = _Completed, Bin} -> + %% We got a chunk of it! + NewTimeout = next_timeout(TS, Timeout), + receive + {select, SockRef, RecvRef, ready_input} -> + do_recv(Socket, Length-size(Bin), EFlags, + <>, + next_timeout(TS, Timeout)) + after NewTimeout -> + nif_cancel(SockRef, recv, RecvRef), + flush_select_msgs(SockRef, RecvRef), + {error, {timeout, Acc}} + end; -recv(Socket) -> - recv(Socket, ?SOCKET_RECV_FLAGS_DEFAULT). + %% We return with the accumulated binary regardless if its empty... + {error, eagain} when (Length =:= 0) -> + {ok, Acc}; -%% WE "may" need a timeout option here... -recv({socket, _, SockRef}, Flags) when is_list(Flags) -> - EFlags = enc_recv_flags(Flags), - nif_recv(SockRef, EFlags). + {error, eagain} -> + %% There is nothing just now, but we will be notified when there + %% is something to read (a select message). + NewTimeout = next_timeout(TS, Timeout), + receive + {select, SockRef, RecvRef, ready_input} -> + do_recv(Socket, Length, EFlags, + Acc, + next_timeout(TS, Timeout)) + after NewTimeout -> + nif_cancel(SockRef, recv, RecvRef), + flush_select_msgs(SockRef, RecvRef), + {error, timeout} + end; + + {error, _} = ERROR when (size(Acc) =:= 0) -> + ERROR; + + {error, Reason} -> + {error, {Reason, Acc}} + + end; + +do_recv({socket, _, SockRef} = _Socket, 0 = _Length, _Eflags, Acc, _Timeout) -> + %% The current recv operation is to be cancelled, so no need for a ref... + %% The cancel will end our 'read everything you have' and "activate" + %% any waiting readers. + nif_cancel(SockRef, recv, undefined), + {ok, Acc}; +do_recv(_Socket, _Length, _EFlags, Acc, _Timeout) -> + {error, {timeout, Acc}}. -spec recvfrom(Socket, Flags) -> {ok, Data, SrcAddr, SrcPort} | {error, Reason} when @@ -980,9 +1104,9 @@ timestamp() -> {A,B,C} = os:timestamp(), A*1000000000+B*1000+(C div 1000). -next_timeout(infinity = Timeout, _) -> +next_timeout(_, infinity = Timeout) -> Timeout; -next_timeout(Timeout, TS) -> +next_timeout(TS, Timeout) -> NewTimeout = Timeout - tdiff(TS, timestamp()), if (NewTimeout > 0) -> @@ -1033,13 +1157,13 @@ nif_send(_SockRef, _SendRef, _Data, _Flags) -> nif_sendto(_SRef, _SendRef, _Data, _Flags, _Dest, _Port) -> erlang:error(badarg). -nif_recv(_SRef, _Flags) -> +nif_recv(_SRef, _RecvRef, _Length, _Flags) -> erlang:error(badarg). nif_recvfrom(_SRef, _Flags) -> erlang:error(badarg). -nif_cancel(_SRef, _Ref) -> +nif_cancel(_SRef, _Op, _Ref) -> erlang:error(badarg). nif_close(_SRef) -> -- cgit v1.2.3