From 2d57ebfc6fb723a476fdcffbb366558a6fa18844 Mon Sep 17 00:00:00 2001 From: Micael Karlberg Date: Thu, 12 Apr 2018 18:24:00 +0200 Subject: [socket-nif] Completed send We still need to handle simultaneous ops. That is, handle if two different procs tries to send at the same time. Or a recv and send at the same time. Ops queue? --- erts/emulator/nifs/common/socket_nif.c | 387 +++++++++++++++++++++++++++++---- erts/preloaded/src/socket.erl | 178 +++++++++++++-- 2 files changed, 504 insertions(+), 61 deletions(-) (limited to 'erts') diff --git a/erts/emulator/nifs/common/socket_nif.c b/erts/emulator/nifs/common/socket_nif.c index 3e8fe7061a..bf9179d857 100644 --- a/erts/emulator/nifs/common/socket_nif.c +++ b/erts/emulator/nifs/common/socket_nif.c @@ -231,6 +231,9 @@ typedef unsigned long long llu_t; #define MCREATE(N) enif_mutex_create(N) #define MLOCK(M) enif_mutex_lock(M) #define MUNLOCK(M) enif_mutex_unlock(M) +#define SELECT(E,FD,M,O,P,R) \ + if (enif_select((E), (FD), (M), (O), (P), (R)) < 0) \ + return enif_make_badarg((E)); /* *** Socket state defs *** */ @@ -262,6 +265,39 @@ typedef unsigned long long llu_t; (((d)->state & SOCKET_FLAG_BUSY) == SOCKET_FLAG_BUSY) +#define SOCKET_SEND_FLAG_CONFIRM 0 +#define SOCKET_SEND_FLAG_DONTROUTE 1 +#define SOCKET_SEND_FLAG_DONTWAIT 2 +#define SOCKET_SEND_FLAG_EOR 3 +#define SOCKET_SEND_FLAG_MORE 4 +#define SOCKET_SEND_FLAG_NOSIGNAL 5 +#define SOCKET_SEND_FLAG_OOB 6 +#define SOCKET_SEND_FLAG_LOW SOCKET_SEND_FLAG_CONFIRM +#define SOCKET_SEND_FLAG_HIGH SOCKET_SEND_FLAG_OOB + +typedef union { + struct { + unsigned int open:1; + // 0 = not conn, 1 = connecting, 2 = connected + unsigned int connect:2; + // unsigned int connecting:1; + // unsigned int connected:1; + // 0 = not listen, 1 = listening, 2 = accepting + unsigned int listen:2; + // unsigned int listening:1; + // unsigned int accepting:1; + /* Room for more... */ + } flags; + unsigned int field; // Make it easy to reset all flags... +} SocketState; + +/* +#define IS_OPEN(d) ((d)->state.flags.open) +#define IS_CONNECTED(d) ((d)->state.flags.connect == SOCKET_STATE_CONNECTED) +#define IS_CONNECTING(d) ((d)->state.flags.connect == SOCKET_STATE_CONNECTING) +*/ + + /*---------------------------------------------------------------------------- * Interface constants. * @@ -329,6 +365,7 @@ typedef unsigned long long llu_t; #define sock_name(s, addr, len) getsockname((s), (addr), (len)) #define sock_open(domain, type, proto) \ make_noninheritable_handle(socket((domain), (type), (proto))) +#define sock_send(s,buf,len,flag) send((s),(buf),(len),(flag)) #define sock_errno() WSAGetLastError() #define sock_create_event(s) WSACreateEvent() @@ -355,6 +392,7 @@ static unsigned long one_value = 1; #define sock_listen(s, b) listen((s), (b)) #define sock_name(s, addr, len) getsockname((s), (addr), (len)) #define sock_open(domain, type, proto) socket((domain), (type), (proto)) +#define sock_send(s,buf,len,flag) send((s), (buf), (len), (flag)) #define sock_errno() errno #define sock_create_event(s) (s) /* return file descriptor */ @@ -420,20 +458,21 @@ typedef struct { // Write ErlNifMutex* writeMtx; BOOLEAN_T isWritable; - unsigned int writePkgCnt; - unsigned int writeByteCnt; - unsigned int writeTries; - unsigned int writeWaits; + uint32_t writePkgCnt; + uint32_t writeByteCnt; + uint32_t writeTries; + uint32_t writeWaits; + uint32_t writeFails; // Read ErlNifMutex* readMtx; BOOLEAN_T isReadable; ErlNifBinary rbuffer; - unsigned int readCapacity; - unsigned int readPkgCnt; - unsigned int readByteCnt; - unsigned int readTries; - unsigned int readWaits; + uint32_t readCapacity; + uint32_t readPkgCnt; + uint32_t readByteCnt; + uint32_t readTries; + uint32_t readWaits; /* Accept * We also need a queue for waiting acceptors... @@ -585,6 +624,11 @@ static ERL_NIF_TERM naccept_accepting(ErlNifEnv* env, static ERL_NIF_TERM naccept(ErlNifEnv* env, SocketDescriptor* descP, ERL_NIF_TERM ref); +static ERL_NIF_TERM nsend(ErlNifEnv* env, + SocketDescriptor* descP, + ERL_NIF_TERM sendRef, + ErlNifBinary* dataP, + int flags); static ERL_NIF_TERM nfinalize_connection(ErlNifEnv* env, SocketDescriptor* descP); @@ -599,12 +643,14 @@ static int compare_pids(ErlNifEnv* env, static BOOLEAN_T edomain2domain(int edomain, int* domain); static BOOLEAN_T etype2type(int etype, int* type); static BOOLEAN_T eproto2proto(int eproto, int* proto); +static BOOLEAN_T esendflags2sendflags(unsigned int esendflags, int* sendflags); #ifdef HAVE_SETNS static BOOLEAN_T emap2netns(ErlNifEnv* env, ERL_NIF_TERM map, char** netns); static BOOLEAN_T change_network_namespace(char* netns, int* cns, int* err); static BOOLEAN_T restore_network_namespace(int ns, SOCKET sock, int* err); #endif +static BOOLEAN_T cnt_inc(uint32_t* cnt, uint32_t inc); static void socket_dtor(ErlNifEnv* env, void* obj); static void socket_stop(ErlNifEnv* env, @@ -669,7 +715,7 @@ static char str_eagain[] = "eagain"; static char str_eafnosupport[] = "eafnosupport"; static char str_einval[] = "einval"; static char str_eisconn[] = "eisconn"; -static char str_eisnconn[] = "eisnconn"; +static char str_enotconn[] = "enotconn"; static char str_exbadstate[] = "exbadstate"; static char str_exbusy[] = "exbusy"; static char str_exmon[] = "exmonitor"; // failed monitor @@ -687,7 +733,7 @@ static ERL_NIF_TERM atom_eagain; static ERL_NIF_TERM atom_eafnosupport; static ERL_NIF_TERM atom_einval; static ERL_NIF_TERM atom_eisconn; -static ERL_NIF_TERM atom_eisnconn; +static ERL_NIF_TERM atom_enotconn; static ERL_NIF_TERM atom_exbadstate; static ERL_NIF_TERM atom_exbusy; static ERL_NIF_TERM atom_exmon; @@ -917,10 +963,10 @@ ERL_NIF_TERM nopen(ErlNifEnv* env, * * TODO! */ - enif_select(env, - event, - ERL_NIF_SELECT_READ, - descP, NULL, atom_undefined); + SELECT(env, + event, + ERL_NIF_SELECT_READ, + descP, NULL, atom_undefined); #endif @@ -1516,10 +1562,10 @@ ERL_NIF_TERM nconnect(ErlNifEnv* env, (sock_errno() == EINPROGRESS))) { /* Unix & OSE!! */ ERL_NIF_TERM ref = MKREF(env); descP->state = SOCKET_STATE_CONNECTING; - enif_select(env, - descP->sock, - (ERL_NIF_SELECT_WRITE), - descP, NULL, ref); + SELECT(env, + descP->sock, + (ERL_NIF_SELECT_WRITE), + descP, NULL, ref); return make_ok(env, ref); } else if (code == 0) { /* ok we are connected */ descP->state = SOCKET_STATE_CONNECTED; @@ -1572,7 +1618,7 @@ ERL_NIF_TERM nfinalize_connection(ErlNifEnv* env, int error; if (descP->state != SOCKET_STATE_CONNECTING) - return make_error(env, atom_eisnconn); + return make_error(env, atom_enotconn); if (!verify_is_connected(descP, &error)) { descP->state = SOCKET_STATE_OPEN; /* restore state */ @@ -1784,10 +1830,10 @@ ERL_NIF_TERM naccept_listening(ErlNifEnv* env, descP->acceptor.ref = ref; - enif_select(env, - descP->sock, - (ERL_NIF_SELECT_READ), - descP, NULL, ref); + SELECT(env, + descP->sock, + (ERL_NIF_SELECT_READ), + descP, NULL, ref); /* Shall we really change state? * The ready event is sent directly to the calling @@ -1860,9 +1906,10 @@ ERL_NIF_TERM naccept_listening(ErlNifEnv* env, #ifdef __WIN32__ /* See 'What is the point of this?' above */ - enif_select(env, - (ERL_NIF_SELECT_READ), - descP, NULL, atom_undefined); + SELECT(env, + descP->sock, + (ERL_NIF_SELECT_READ), + descP, NULL, atom_undefined); #endif accDescP->state = SOCKET_STATE_CONNECTED; @@ -1911,10 +1958,10 @@ ERL_NIF_TERM naccept_accepting(ErlNifEnv* env, * Just try again, no real error, just a ghost trigger from poll, */ - enif_select(env, - descP->sock, - (ERL_NIF_SELECT_READ), - descP, NULL, ref); + SELECT(env, + descP->sock, + (ERL_NIF_SELECT_READ), + descP, NULL, ref); return make_error(env, atom_eagain); } else { @@ -1960,9 +2007,10 @@ ERL_NIF_TERM naccept_accepting(ErlNifEnv* env, #ifdef __WIN32__ /* See 'What is the point of this?' above */ - enif_select(env, - (ERL_NIF_SELECT_READ), - descP, NULL, atom_undefined); + SELECT(env, + descP->sock, + (ERL_NIF_SELECT_READ), + descP, NULL, atom_undefined); #endif accDescP->state = SOCKET_STATE_CONNECTED; @@ -2008,6 +2056,7 @@ SocketDescriptor* alloc_descriptor(SOCKET sock, HANDLE event) descP->writeByteCnt = 0; descP->writeTries = 0; descP->writeWaits = 0; + descP->writeFails = 0; descP->readPkgCnt = 0; descP->readByteCnt = 0; descP->readTries = 0; @@ -2022,6 +2071,190 @@ SocketDescriptor* alloc_descriptor(SOCKET sock, HANDLE event) } + +/* ---------------------------------------------------------------------- + * nif_send + * + * Description: + * Send a message on a socket + * + * Arguments: + * Socket (ref) - Points to the socket descriptor. + * Sendref - A unique id for this (send) request. + * Data - The data to send in the form of a IOVec. + * Flags - Send flags. + */ + +static +ERL_NIF_TERM nif_send(ErlNifEnv* env, + int argc, + const ERL_NIF_TERM argv[]) +{ + SocketDescriptor* descP; + ERL_NIF_TERM sendRef; + ErlNifBinary data; + unsigned int eflags; + int flags; + ERL_NIF_TERM res; + + /* Extract arguments and perform preliminary validation */ + + if ((argc != 4) || + !enif_get_resource(env, argv[0], sockets, (void**) &descP) || + !enif_inspect_iolist_as_binary(env, argv[2], &data) || + !enif_get_uint(env, argv[3], &eflags)) { + return enif_make_badarg(env); + } + sendRef = argv[1]; + + if (!IS_CONNECTED(descP)) + return make_error(env, atom_enotconn); + + if (!esendflags2sendflags(eflags, &flags)) + return enif_make_badarg(env); + + MLOCK(descP->writeMtx); + + res = nsend(env, descP, sendRef, &data, flags); + + MUNLOCK(descP->writeMtx); + + return res; +} + + +/* What do we do when another process tries to write + * when the current writer has a select already waiting? + * Queue it? And what about simultaneous read and write? + * Queue up all operations towards the socket? + * + * We (may) need a currentOp field and an ops queue field. + */ +static +ERL_NIF_TERM nsend(ErlNifEnv* env, + SocketDescriptor* descP, + ERL_NIF_TERM sendRef, + ErlNifBinary* dataP, + int flags) +{ + int save_errno; + ssize_t written; + + if (!descP->isWritable) + return enif_make_badarg(env); + + /* We ignore the wrap for the moment. + * Maybe we should issue a wrap-message to controlling process... + */ + cnt_inc(&descP->writeTries, 1); + + written = sock_send(descP->sock, dataP->data, dataP->size, flags); + + if (written == dataP->size) { + + cnt_inc(&descP->writePkgCnt, 1); + cnt_inc(&descP->writeByteCnt, written); + + return atom_ok; + + } else if (written < 0) { + + /* Ouch, check what kind of failure */ + save_errno = sock_errno(); + if ((save_errno != EAGAIN) && + (save_errno != EINTR)) { + + cnt_inc(&descP->writeFails, 1); + + return make_error2(env, save_errno); + + } else { + + /* Ok, try again later */ + + written = 0; + + } + } + + /* We failed to write the *entire* packet (anything less then size + * of the packet, which is 0 <= written < sizeof packet), + * so schedule the rest for later. + */ + + cnt_inc(&descP->writeWaits, 1); + + SELECT(env, descP->sock, (ERL_NIF_SELECT_WRITE), + descP, NULL, sendRef); + + return make_ok(env, enif_make_int(env, written)); + +} + + +#ifdef FOBAR +static +ERL_NIF_TERM nwritev(ErlNifEnv* env, + SocketDescriptor* descP, + ERL_NIF_TERM sendRef, + ERL_NIF_TERM data) +{ + ERL_NIF_TERM tail; + ErlNifIOVec vec; + ErlNifIOVec* iovec = &vec; + SysIOVec* sysiovec; + int save_errno; + int iovcnt, n; + + if (!enif_inspect_iovec(env, MAX_VSZ, data, &tail, &iovec)) + return enif_make_badarg(env); + + if (enif_ioq_size(descP->outQ) > 0) { + /* If the I/O queue contains data we enqueue the iovec + * and then peek the data to write out of the queue. + */ + if (!enif_ioq_enqv(q, iovec, 0)) + return -3; + + sysiovec = enif_ioq_peek(descP->outQ, &iovcnt); + + } else { + /* If the I/O queue is empty we skip the trip through it. */ + iovcnt = iovec->iovcnt; + sysiovec = iovec->iov; + } + + /* Attempt to write the data */ + n = writev(fd, sysiovec, iovcnt); + saved_errno = errno; + + if (enif_ioq_size(descP->outQ) == 0) { + /* If the I/O queue was initially empty we enqueue any + remaining data into the queue for writing later. */ + if (n >= 0 && !enif_ioq_enqv(descP->outQ, iovec, n)) + return -3; + } else { + /* Dequeue any data that was written from the queue. */ + if (n > 0 && !enif_ioq_deq(descP->outQ, n, NULL)) + return -4; + } + /* return n, which is either number of bytes written or -1 if + some error happened */ + errno = saved_errno; + return n; +} +#endif + + + +/* ---------------------------------------------------------------------- + * U t i l i t y F u n c t i o n s + * ---------------------------------------------------------------------- + */ + +/* compare_pids - Test if two pids are equal + * + */ static int compare_pids(ErlNifEnv* env, const ErlNifPid* pid1, @@ -2034,11 +2267,6 @@ int compare_pids(ErlNifEnv* env, } -/* ---------------------------------------------------------------------- - * U t i l i t y F u n c t i o n s - * ---------------------------------------------------------------------- - */ - /* edomain2domain - convert internal (erlang) domain to (proper) domain * * Note that only a subset is supported. @@ -2070,7 +2298,6 @@ BOOLEAN_T edomain2domain(int edomain, int* domain) } - /* etype2type - convert internal (erlang) type to (proper) type * * Note that only a subset is supported. @@ -2103,7 +2330,6 @@ BOOLEAN_T etype2type(int etype, int* type) } - /* eproto2proto - convert internal (erlang) protocol to (proper) protocol * * Note that only a subset is supported. @@ -2193,6 +2419,58 @@ BOOLEAN_T emap2netns(ErlNifEnv* env, ERL_NIF_TERM map, char** netns) #endif +/* esendflags2sendflags - convert internal (erlang) send flags to (proper) + * send flags. + */ +static +BOOLEAN_T esendflags2sendflags(unsigned int esendflags, int* sendflags) +{ + unsigned int ef; + int tmp = 0; + + for (ef = SOCKET_SEND_FLAG_LOW; ef <= SOCKET_SEND_FLAG_HIGH; ef++) { + switch (ef) { + case SOCKET_SEND_FLAG_CONFIRM: + tmp |= MSG_CONFIRM; + break; + + case SOCKET_SEND_FLAG_DONTROUTE: + tmp |= MSG_DONTROUTE; + break; + + case SOCKET_SEND_FLAG_DONTWAIT: + tmp |= MSG_DONTWAIT; + break; + + case SOCKET_SEND_FLAG_EOR: + tmp |= MSG_EOR; + break; + + case SOCKET_SEND_FLAG_MORE: + tmp |= MSG_MORE; + break; + + case SOCKET_SEND_FLAG_NOSIGNAL: + tmp |= MSG_NOSIGNAL; + break; + + case SOCKET_SEND_FLAG_OOB: + tmp |= MSG_OOB; + break; + + default: + return FALSE; + } + + } + + *sendflags = tmp; + + return TRUE; +} + + + /* Create an ok two (2) tuple in the form: {ok, Any}. * The second element (Any) is already in the form of an * ERL_NIF_TERM so all we have to do is create the tuple. @@ -2239,6 +2517,31 @@ ERL_NIF_TERM make_error2(ErlNifEnv* env, int err) +/* ---------------------------------------------------------------------- + * C o u n t e r F u n c t i o n s + * ---------------------------------------------------------------------- + */ + +static +BOOLEAN_T cnt_inc(uint32_t* cnt, uint32_t inc) +{ + BOOLEAN_T wrap; + uint32_t max = 0xFFFFFFFF; + uint32_t current = *cnt; + + if ((max - inc) >= current) { + *cnt += inc; + wrap = FALSE; + } else { + *cnt = inc - (max - current) - 1; + wrap = TRUE; + } + + return (wrap); +} + + + /* ---------------------------------------------------------------------- * C a l l b a c k F u n c t i o n s * ---------------------------------------------------------------------- @@ -2397,7 +2700,7 @@ int on_load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info) atom_eafnosupport = MKA(env, str_eafnosupport); atom_einval = MKA(env, str_einval); atom_eisconn = MKA(env, str_eisconn); - atom_eisnconn = MKA(env, str_eisnconn); + atom_enotconn = MKA(env, str_enotconn); // atom_exalloc = MKA(env, str_exalloc); atom_exbadstate = MKA(env, str_exbadstate); atom_exbusy = MKA(env, str_exbusy); diff --git a/erts/preloaded/src/socket.erl b/erts/preloaded/src/socket.erl index f3a3d493ac..985b45a956 100644 --- a/erts/preloaded/src/socket.erl +++ b/erts/preloaded/src/socket.erl @@ -33,13 +33,21 @@ listen/1, listen/2, accept/1, accept/2, - send/2, send/3, sendto/5, - recv/1, recv/2, recvfrom/1, recvfrom/2, + send/2, send/3, send/4, + sendto/5, + %% sendmsg/4, + %% writev/4, OR SENDV? It will be strange for recv then: recvv (instead of readv) + + recv/1, recv/2, + recvfrom/1, recvfrom/2, + %% recvmsg/4, + %% readv/3, close/1, setopt/3, getopt/2, + %% ????? formated_timestamp/0 ]). @@ -457,45 +465,103 @@ flush_select_msgs(LSRef, Ref) -> end. + %% =========================================================================== %% %% send, sendto, sendmsg - send a message on a socket %% --spec send(Socket, Data, Flags) -> ok | {error, Reason} when - Socket :: socket(), - Data :: binary(), - Flags :: send_flags(), - Reason :: term(). - send(Socket, Data) -> - send(Socket, Data, ?SOCKET_SEND_FLAGS_DEFAULT). + send(Socket, Data, ?SOCKET_SEND_FLAGS_DEFAULT, infinity). -send({socket, _, SockRef}, Data, Flags) - when is_binary(Data) andalso is_list(Flags) -> +send(Socket, Data, Flags) when is_list(Flags) -> + send(Socket, Data, Flags, infinity); +send(Socket, Data, Timeout) -> + send(Socket, Data, ?SOCKET_SEND_FLAGS_DEFAULT, Timeout). + + +-spec send(Socket, Data, Flags, Timeout) -> ok | {error, Reason} when + Socket :: socket(), + Data :: iodata(), + Flags :: send_flags(), + Timeout :: timeout(), + Reason :: term(). + +send(Socket, Data, Flags, Timeout) when is_list(Data) -> + Bin = erlang:list_to_binary(Data), + send(Socket, Bin, Flags, Timeout); +send(Socket, Data, Flags, Timeout) when is_binary(Data) andalso is_list(Flags) -> EFlags = enc_send_flags(Flags), - nif_send(SockRef, Data, EFlags). + do_send(Socket, make_ref(), Data, EFlags, Timeout). + +do_send(SockRef, SendRef, Data, _EFlags, Timeout) + when (Timeout =< 0) -> + nif_cancel(SockRef, SendRef), + flush_select_msgs(SockRef, SendRef), + {error, {timeout, size(Data)}}; +do_send(SockRef, SendRef, Data, EFlags, Timeout) -> + TS = timestamp(Timeout), + case nif_send(SockRef, SendRef, Data, EFlags) of + ok -> + {ok, next_timeout(TS, Timeout)}; + {ok, Written} -> + %% We are partially done, wait for continuation + receive + {select, SockRef, SendRef, ready_output} when (Written > 0) -> + <<_:Written/binary, Rest/binary>> = Data, + do_send(SockRef, make_ref(), Rest, EFlags, + next_timeout(TS, Timeout)); + {select, SockRef, SendRef, ready_output} -> + do_send(SockRef, make_ref(), Data, EFlags, + next_timeout(TS, Timeout)) + after Timeout -> + nif_cancel(SockRef, SendRef), + flush_select_msgs(SockRef, SendRef), + {error, timeout} + end; + {error, eagain} -> + receive + {select, SockRef, SendRef, ready_output} -> + do_send(SockRef, SendRef, Data, EFlags, + next_timeout(TS, Timeout)) + after Timeout -> + nif_cancel(SockRef, SendRef), + flush_select_msgs(SockRef, SendRef), + {error, timeout} + end; + + {error, _} = ERROR -> + ERROR + end. + + %% --------------------------------------------------------------------------- +%% +%% Do we need a timeout argument here also? +%% -spec sendto(Socket, Data, Flags, DestAddr, Port) -> ok | {error, Reason} when Socket :: socket(), Data :: binary(), Flags :: send_flags(), - DestAddr :: ip_address(), + DestAddr :: null | ip_address(), Port :: port_number(), Reason :: term(). sendto({socket, _, SockRef}, Data, Flags, DestAddr, DestPort) when is_binary(Data) andalso is_list(Flags) andalso - (is_tuple(DestAddr) andalso - ((size(DestAddr) =:= 4) orelse - (size(DestAddr) =:= 8))) andalso + ((is_tuple(DestAddr) andalso + ((size(DestAddr) =:= 4) orelse + (size(DestAddr) =:= 8))) orelse + (DestAddr =:= null)) andalso (is_integer(DestPort) andalso (DestPort >= 0)) -> + %% We may need something like send/4 above? EFlags = enc_send_flags(Flags), - nif_sendto(SockRef, Data, EFlags, DestAddr, DestPort). + nif_sendto(SockRef, make_ref(), Data, EFlags, DestAddr, DestPort). + %% --------------------------------------------------------------------------- @@ -508,7 +574,73 @@ sendto({socket, _, SockRef}, Data, Flags, DestAddr, DestPort) +%% =========================================================================== +%% +%% writev - write data into multiple buffers +%% + +%% send(Socket, Data, Flags, Timeout) +%% when (is_list(Data) orelse is_binary(Data)) andalso is_list(Flags) -> +%% IOVec = erlang:iolist_to_iovec(Data), +%% EFlags = enc_send_flags(Flags), +%% send_iovec(Socket, IOVec, EFlags, Timeout). + + +%% %% Iterate over the IO-vector (list of binaries). + +%% send_iovec(_Socket, [] = _IOVec, _EFlags, _Timeout) -> +%% ok; +%% send_iovec({socket, _, SockRef} = Socket, [Bin|IOVec], EFlags, Timeout) -> +%% case do_send(SockRef, make_ref(), Bin, EFlags, Timeout) of +%% {ok, NewTimeout} -> +%% send_iovec(Socket, IOVec, EFlags, NewTimeout); +%% {error, _} = ERROR -> +%% ERROR +%% end. + + +%% do_send(SockRef, SendRef, Data, _EFlags, Timeout) +%% when (Timeout < 0) -> +%% nif_cancel(SockRef, SendRef), +%% flush_select_msgs(SockRef, SendRef), +%% {error, {timeout, size(Data)}}; +%% do_send(SockRef, SendRef, Data, EFlags, Timeout) -> +%% TS = timestamp(Timeout), +%% case nif_send(SockRef, SendRef, Data, EFlags) of +%% ok -> +%% {ok, next_timeout(TS, Timeout)}; +%% {ok, Written} -> +%% %% We are partially done, wait for continuation +%% receive +%% {select, SockRef, SendRef, ready_output} -> +%% <<_:Written/binary, Rest/binary>> = Data, +%% do_send(SockRef, make_ref(), Rest, EFlags, +%% next_timeout(TS, Timeout)) +%% after Timeout -> +%% nif_cancel(SockRef, SendRef), +%% flush_select_msgs(SockRef, SendRef), +%% {error, timeout} +%% end; +%% {error, eagain} -> +%% receive +%% {select, SockRef, SendRef, ready_output} -> +%% do_send(SockRef, SendRef, Data, EFlags, +%% next_timeout(TS, Timeout)) +%% after Timeout -> +%% nif_cancel(SockRef, SendRef), +%% flush_select_msgs(SockRef, SendRef), +%% {error, timeout} +%% end; + +%% {error, _} = ERROR -> +%% ERROR +%% end. + + +%% =========================================================================== +%% %% recv, recvfrom, recvmsg - receive a message from a socket +%% -spec recv(Socket, Flags) -> {ok, Data} | {error, Reason} when Socket :: socket(), @@ -540,6 +672,7 @@ recvfrom({socket, _, SockRef}, Flags) when is_list(Flags) -> EFlags = enc_recv_flags(Flags), nif_recvfrom(SockRef, EFlags). + %% -spec recvmsg(Socket, [out] MsgHdr, Flags) -> {ok, Data} | {error, Reason} when %% Socket :: socket(), %% MsgHdr :: msg_header(), @@ -549,6 +682,13 @@ recvfrom({socket, _, SockRef}, Flags) when is_list(Flags) -> +%% =========================================================================== +%% +%% readv - read data into multiple buffers +%% + + + %% close - close a file descriptor -spec close(Socket) -> ok | {error, Reason} when @@ -817,10 +957,10 @@ nif_listen(_SRef, _Backlog) -> nif_accept(_SRef, _Ref) -> erlang:error(badarg). -nif_send(_SRef, _Data, _Flags) -> +nif_send(_SockRef, _SendRef, _Data, _Flags) -> erlang:error(badarg). -nif_sendto(_SRef, _Data, _Flags, _Dest, _Port) -> +nif_sendto(_SRef, _SendRef, _Data, _Flags, _Dest, _Port) -> erlang:error(badarg). nif_recv(_SRef, _Flags) -> -- cgit v1.2.3