aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--erts/emulator/nifs/common/socket_nif.c387
-rw-r--r--erts/preloaded/src/socket.erl178
2 files changed, 504 insertions, 61 deletions
diff --git a/erts/emulator/nifs/common/socket_nif.c b/erts/emulator/nifs/common/socket_nif.c
index 3e8fe7061a..bf9179d857 100644
--- a/erts/emulator/nifs/common/socket_nif.c
+++ b/erts/emulator/nifs/common/socket_nif.c
@@ -231,6 +231,9 @@ typedef unsigned long long llu_t;
#define MCREATE(N) enif_mutex_create(N)
#define MLOCK(M) enif_mutex_lock(M)
#define MUNLOCK(M) enif_mutex_unlock(M)
+#define SELECT(E,FD,M,O,P,R) \
+ if (enif_select((E), (FD), (M), (O), (P), (R)) < 0) \
+ return enif_make_badarg((E));
/* *** Socket state defs *** */
@@ -262,6 +265,39 @@ typedef unsigned long long llu_t;
(((d)->state & SOCKET_FLAG_BUSY) == SOCKET_FLAG_BUSY)
+#define SOCKET_SEND_FLAG_CONFIRM 0
+#define SOCKET_SEND_FLAG_DONTROUTE 1
+#define SOCKET_SEND_FLAG_DONTWAIT 2
+#define SOCKET_SEND_FLAG_EOR 3
+#define SOCKET_SEND_FLAG_MORE 4
+#define SOCKET_SEND_FLAG_NOSIGNAL 5
+#define SOCKET_SEND_FLAG_OOB 6
+#define SOCKET_SEND_FLAG_LOW SOCKET_SEND_FLAG_CONFIRM
+#define SOCKET_SEND_FLAG_HIGH SOCKET_SEND_FLAG_OOB
+
+typedef union {
+ struct {
+ unsigned int open:1;
+ // 0 = not conn, 1 = connecting, 2 = connected
+ unsigned int connect:2;
+ // unsigned int connecting:1;
+ // unsigned int connected:1;
+ // 0 = not listen, 1 = listening, 2 = accepting
+ unsigned int listen:2;
+ // unsigned int listening:1;
+ // unsigned int accepting:1;
+ /* Room for more... */
+ } flags;
+ unsigned int field; // Make it easy to reset all flags...
+} SocketState;
+
+/*
+#define IS_OPEN(d) ((d)->state.flags.open)
+#define IS_CONNECTED(d) ((d)->state.flags.connect == SOCKET_STATE_CONNECTED)
+#define IS_CONNECTING(d) ((d)->state.flags.connect == SOCKET_STATE_CONNECTING)
+*/
+
+
/*----------------------------------------------------------------------------
* Interface constants.
*
@@ -329,6 +365,7 @@ typedef unsigned long long llu_t;
#define sock_name(s, addr, len) getsockname((s), (addr), (len))
#define sock_open(domain, type, proto) \
make_noninheritable_handle(socket((domain), (type), (proto)))
+#define sock_send(s,buf,len,flag) send((s),(buf),(len),(flag))
#define sock_errno() WSAGetLastError()
#define sock_create_event(s) WSACreateEvent()
@@ -355,6 +392,7 @@ static unsigned long one_value = 1;
#define sock_listen(s, b) listen((s), (b))
#define sock_name(s, addr, len) getsockname((s), (addr), (len))
#define sock_open(domain, type, proto) socket((domain), (type), (proto))
+#define sock_send(s,buf,len,flag) send((s), (buf), (len), (flag))
#define sock_errno() errno
#define sock_create_event(s) (s) /* return file descriptor */
@@ -420,20 +458,21 @@ typedef struct {
// Write
ErlNifMutex* writeMtx;
BOOLEAN_T isWritable;
- unsigned int writePkgCnt;
- unsigned int writeByteCnt;
- unsigned int writeTries;
- unsigned int writeWaits;
+ uint32_t writePkgCnt;
+ uint32_t writeByteCnt;
+ uint32_t writeTries;
+ uint32_t writeWaits;
+ uint32_t writeFails;
// Read
ErlNifMutex* readMtx;
BOOLEAN_T isReadable;
ErlNifBinary rbuffer;
- unsigned int readCapacity;
- unsigned int readPkgCnt;
- unsigned int readByteCnt;
- unsigned int readTries;
- unsigned int readWaits;
+ uint32_t readCapacity;
+ uint32_t readPkgCnt;
+ uint32_t readByteCnt;
+ uint32_t readTries;
+ uint32_t readWaits;
/* Accept
* We also need a queue for waiting acceptors...
@@ -585,6 +624,11 @@ static ERL_NIF_TERM naccept_accepting(ErlNifEnv* env,
static ERL_NIF_TERM naccept(ErlNifEnv* env,
SocketDescriptor* descP,
ERL_NIF_TERM ref);
+static ERL_NIF_TERM nsend(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ ERL_NIF_TERM sendRef,
+ ErlNifBinary* dataP,
+ int flags);
static ERL_NIF_TERM nfinalize_connection(ErlNifEnv* env,
SocketDescriptor* descP);
@@ -599,12 +643,14 @@ static int compare_pids(ErlNifEnv* env,
static BOOLEAN_T edomain2domain(int edomain, int* domain);
static BOOLEAN_T etype2type(int etype, int* type);
static BOOLEAN_T eproto2proto(int eproto, int* proto);
+static BOOLEAN_T esendflags2sendflags(unsigned int esendflags, int* sendflags);
#ifdef HAVE_SETNS
static BOOLEAN_T emap2netns(ErlNifEnv* env, ERL_NIF_TERM map, char** netns);
static BOOLEAN_T change_network_namespace(char* netns, int* cns, int* err);
static BOOLEAN_T restore_network_namespace(int ns, SOCKET sock, int* err);
#endif
+static BOOLEAN_T cnt_inc(uint32_t* cnt, uint32_t inc);
static void socket_dtor(ErlNifEnv* env, void* obj);
static void socket_stop(ErlNifEnv* env,
@@ -669,7 +715,7 @@ static char str_eagain[] = "eagain";
static char str_eafnosupport[] = "eafnosupport";
static char str_einval[] = "einval";
static char str_eisconn[] = "eisconn";
-static char str_eisnconn[] = "eisnconn";
+static char str_enotconn[] = "enotconn";
static char str_exbadstate[] = "exbadstate";
static char str_exbusy[] = "exbusy";
static char str_exmon[] = "exmonitor"; // failed monitor
@@ -687,7 +733,7 @@ static ERL_NIF_TERM atom_eagain;
static ERL_NIF_TERM atom_eafnosupport;
static ERL_NIF_TERM atom_einval;
static ERL_NIF_TERM atom_eisconn;
-static ERL_NIF_TERM atom_eisnconn;
+static ERL_NIF_TERM atom_enotconn;
static ERL_NIF_TERM atom_exbadstate;
static ERL_NIF_TERM atom_exbusy;
static ERL_NIF_TERM atom_exmon;
@@ -917,10 +963,10 @@ ERL_NIF_TERM nopen(ErlNifEnv* env,
*
* TODO!
*/
- enif_select(env,
- event,
- ERL_NIF_SELECT_READ,
- descP, NULL, atom_undefined);
+ SELECT(env,
+ event,
+ ERL_NIF_SELECT_READ,
+ descP, NULL, atom_undefined);
#endif
@@ -1516,10 +1562,10 @@ ERL_NIF_TERM nconnect(ErlNifEnv* env,
(sock_errno() == EINPROGRESS))) { /* Unix & OSE!! */
ERL_NIF_TERM ref = MKREF(env);
descP->state = SOCKET_STATE_CONNECTING;
- enif_select(env,
- descP->sock,
- (ERL_NIF_SELECT_WRITE),
- descP, NULL, ref);
+ SELECT(env,
+ descP->sock,
+ (ERL_NIF_SELECT_WRITE),
+ descP, NULL, ref);
return make_ok(env, ref);
} else if (code == 0) { /* ok we are connected */
descP->state = SOCKET_STATE_CONNECTED;
@@ -1572,7 +1618,7 @@ ERL_NIF_TERM nfinalize_connection(ErlNifEnv* env,
int error;
if (descP->state != SOCKET_STATE_CONNECTING)
- return make_error(env, atom_eisnconn);
+ return make_error(env, atom_enotconn);
if (!verify_is_connected(descP, &error)) {
descP->state = SOCKET_STATE_OPEN; /* restore state */
@@ -1784,10 +1830,10 @@ ERL_NIF_TERM naccept_listening(ErlNifEnv* env,
descP->acceptor.ref = ref;
- enif_select(env,
- descP->sock,
- (ERL_NIF_SELECT_READ),
- descP, NULL, ref);
+ SELECT(env,
+ descP->sock,
+ (ERL_NIF_SELECT_READ),
+ descP, NULL, ref);
/* Shall we really change state?
* The ready event is sent directly to the calling
@@ -1860,9 +1906,10 @@ ERL_NIF_TERM naccept_listening(ErlNifEnv* env,
#ifdef __WIN32__
/* See 'What is the point of this?' above */
- enif_select(env,
- (ERL_NIF_SELECT_READ),
- descP, NULL, atom_undefined);
+ SELECT(env,
+ descP->sock,
+ (ERL_NIF_SELECT_READ),
+ descP, NULL, atom_undefined);
#endif
accDescP->state = SOCKET_STATE_CONNECTED;
@@ -1911,10 +1958,10 @@ ERL_NIF_TERM naccept_accepting(ErlNifEnv* env,
* Just try again, no real error, just a ghost trigger from poll,
*/
- enif_select(env,
- descP->sock,
- (ERL_NIF_SELECT_READ),
- descP, NULL, ref);
+ SELECT(env,
+ descP->sock,
+ (ERL_NIF_SELECT_READ),
+ descP, NULL, ref);
return make_error(env, atom_eagain);
} else {
@@ -1960,9 +2007,10 @@ ERL_NIF_TERM naccept_accepting(ErlNifEnv* env,
#ifdef __WIN32__
/* See 'What is the point of this?' above */
- enif_select(env,
- (ERL_NIF_SELECT_READ),
- descP, NULL, atom_undefined);
+ SELECT(env,
+ descP->sock,
+ (ERL_NIF_SELECT_READ),
+ descP, NULL, atom_undefined);
#endif
accDescP->state = SOCKET_STATE_CONNECTED;
@@ -2008,6 +2056,7 @@ SocketDescriptor* alloc_descriptor(SOCKET sock, HANDLE event)
descP->writeByteCnt = 0;
descP->writeTries = 0;
descP->writeWaits = 0;
+ descP->writeFails = 0;
descP->readPkgCnt = 0;
descP->readByteCnt = 0;
descP->readTries = 0;
@@ -2022,6 +2071,190 @@ SocketDescriptor* alloc_descriptor(SOCKET sock, HANDLE event)
}
+
+/* ----------------------------------------------------------------------
+ * nif_send
+ *
+ * Description:
+ * Send a message on a socket
+ *
+ * Arguments:
+ * Socket (ref) - Points to the socket descriptor.
+ * Sendref - A unique id for this (send) request.
+ * Data - The data to send in the form of a IOVec.
+ * Flags - Send flags.
+ */
+
+static
+ERL_NIF_TERM nif_send(ErlNifEnv* env,
+ int argc,
+ const ERL_NIF_TERM argv[])
+{
+ SocketDescriptor* descP;
+ ERL_NIF_TERM sendRef;
+ ErlNifBinary data;
+ unsigned int eflags;
+ int flags;
+ ERL_NIF_TERM res;
+
+ /* Extract arguments and perform preliminary validation */
+
+ if ((argc != 4) ||
+ !enif_get_resource(env, argv[0], sockets, (void**) &descP) ||
+ !enif_inspect_iolist_as_binary(env, argv[2], &data) ||
+ !enif_get_uint(env, argv[3], &eflags)) {
+ return enif_make_badarg(env);
+ }
+ sendRef = argv[1];
+
+ if (!IS_CONNECTED(descP))
+ return make_error(env, atom_enotconn);
+
+ if (!esendflags2sendflags(eflags, &flags))
+ return enif_make_badarg(env);
+
+ MLOCK(descP->writeMtx);
+
+ res = nsend(env, descP, sendRef, &data, flags);
+
+ MUNLOCK(descP->writeMtx);
+
+ return res;
+}
+
+
+/* What do we do when another process tries to write
+ * when the current writer has a select already waiting?
+ * Queue it? And what about simultaneous read and write?
+ * Queue up all operations towards the socket?
+ *
+ * We (may) need a currentOp field and an ops queue field.
+ */
+static
+ERL_NIF_TERM nsend(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ ERL_NIF_TERM sendRef,
+ ErlNifBinary* dataP,
+ int flags)
+{
+ int save_errno;
+ ssize_t written;
+
+ if (!descP->isWritable)
+ return enif_make_badarg(env);
+
+ /* We ignore the wrap for the moment.
+ * Maybe we should issue a wrap-message to controlling process...
+ */
+ cnt_inc(&descP->writeTries, 1);
+
+ written = sock_send(descP->sock, dataP->data, dataP->size, flags);
+
+ if (written == dataP->size) {
+
+ cnt_inc(&descP->writePkgCnt, 1);
+ cnt_inc(&descP->writeByteCnt, written);
+
+ return atom_ok;
+
+ } else if (written < 0) {
+
+ /* Ouch, check what kind of failure */
+ save_errno = sock_errno();
+ if ((save_errno != EAGAIN) &&
+ (save_errno != EINTR)) {
+
+ cnt_inc(&descP->writeFails, 1);
+
+ return make_error2(env, save_errno);
+
+ } else {
+
+ /* Ok, try again later */
+
+ written = 0;
+
+ }
+ }
+
+ /* We failed to write the *entire* packet (anything less then size
+ * of the packet, which is 0 <= written < sizeof packet),
+ * so schedule the rest for later.
+ */
+
+ cnt_inc(&descP->writeWaits, 1);
+
+ SELECT(env, descP->sock, (ERL_NIF_SELECT_WRITE),
+ descP, NULL, sendRef);
+
+ return make_ok(env, enif_make_int(env, written));
+
+}
+
+
+#ifdef FOBAR
+static
+ERL_NIF_TERM nwritev(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ ERL_NIF_TERM sendRef,
+ ERL_NIF_TERM data)
+{
+ ERL_NIF_TERM tail;
+ ErlNifIOVec vec;
+ ErlNifIOVec* iovec = &vec;
+ SysIOVec* sysiovec;
+ int save_errno;
+ int iovcnt, n;
+
+ if (!enif_inspect_iovec(env, MAX_VSZ, data, &tail, &iovec))
+ return enif_make_badarg(env);
+
+ if (enif_ioq_size(descP->outQ) > 0) {
+ /* If the I/O queue contains data we enqueue the iovec
+ * and then peek the data to write out of the queue.
+ */
+ if (!enif_ioq_enqv(q, iovec, 0))
+ return -3;
+
+ sysiovec = enif_ioq_peek(descP->outQ, &iovcnt);
+
+ } else {
+ /* If the I/O queue is empty we skip the trip through it. */
+ iovcnt = iovec->iovcnt;
+ sysiovec = iovec->iov;
+ }
+
+ /* Attempt to write the data */
+ n = writev(fd, sysiovec, iovcnt);
+ saved_errno = errno;
+
+ if (enif_ioq_size(descP->outQ) == 0) {
+ /* If the I/O queue was initially empty we enqueue any
+ remaining data into the queue for writing later. */
+ if (n >= 0 && !enif_ioq_enqv(descP->outQ, iovec, n))
+ return -3;
+ } else {
+ /* Dequeue any data that was written from the queue. */
+ if (n > 0 && !enif_ioq_deq(descP->outQ, n, NULL))
+ return -4;
+ }
+ /* return n, which is either number of bytes written or -1 if
+ some error happened */
+ errno = saved_errno;
+ return n;
+}
+#endif
+
+
+
+/* ----------------------------------------------------------------------
+ * U t i l i t y F u n c t i o n s
+ * ----------------------------------------------------------------------
+ */
+
+/* compare_pids - Test if two pids are equal
+ *
+ */
static
int compare_pids(ErlNifEnv* env,
const ErlNifPid* pid1,
@@ -2034,11 +2267,6 @@ int compare_pids(ErlNifEnv* env,
}
-/* ----------------------------------------------------------------------
- * U t i l i t y F u n c t i o n s
- * ----------------------------------------------------------------------
- */
-
/* edomain2domain - convert internal (erlang) domain to (proper) domain
*
* Note that only a subset is supported.
@@ -2070,7 +2298,6 @@ BOOLEAN_T edomain2domain(int edomain, int* domain)
}
-
/* etype2type - convert internal (erlang) type to (proper) type
*
* Note that only a subset is supported.
@@ -2103,7 +2330,6 @@ BOOLEAN_T etype2type(int etype, int* type)
}
-
/* eproto2proto - convert internal (erlang) protocol to (proper) protocol
*
* Note that only a subset is supported.
@@ -2193,6 +2419,58 @@ BOOLEAN_T emap2netns(ErlNifEnv* env, ERL_NIF_TERM map, char** netns)
#endif
+/* esendflags2sendflags - convert internal (erlang) send flags to (proper)
+ * send flags.
+ */
+static
+BOOLEAN_T esendflags2sendflags(unsigned int esendflags, int* sendflags)
+{
+ unsigned int ef;
+ int tmp = 0;
+
+ for (ef = SOCKET_SEND_FLAG_LOW; ef <= SOCKET_SEND_FLAG_HIGH; ef++) {
+ switch (ef) {
+ case SOCKET_SEND_FLAG_CONFIRM:
+ tmp |= MSG_CONFIRM;
+ break;
+
+ case SOCKET_SEND_FLAG_DONTROUTE:
+ tmp |= MSG_DONTROUTE;
+ break;
+
+ case SOCKET_SEND_FLAG_DONTWAIT:
+ tmp |= MSG_DONTWAIT;
+ break;
+
+ case SOCKET_SEND_FLAG_EOR:
+ tmp |= MSG_EOR;
+ break;
+
+ case SOCKET_SEND_FLAG_MORE:
+ tmp |= MSG_MORE;
+ break;
+
+ case SOCKET_SEND_FLAG_NOSIGNAL:
+ tmp |= MSG_NOSIGNAL;
+ break;
+
+ case SOCKET_SEND_FLAG_OOB:
+ tmp |= MSG_OOB;
+ break;
+
+ default:
+ return FALSE;
+ }
+
+ }
+
+ *sendflags = tmp;
+
+ return TRUE;
+}
+
+
+
/* Create an ok two (2) tuple in the form: {ok, Any}.
* The second element (Any) is already in the form of an
* ERL_NIF_TERM so all we have to do is create the tuple.
@@ -2240,6 +2518,31 @@ ERL_NIF_TERM make_error2(ErlNifEnv* env, int err)
/* ----------------------------------------------------------------------
+ * C o u n t e r F u n c t i o n s
+ * ----------------------------------------------------------------------
+ */
+
+static
+BOOLEAN_T cnt_inc(uint32_t* cnt, uint32_t inc)
+{
+ BOOLEAN_T wrap;
+ uint32_t max = 0xFFFFFFFF;
+ uint32_t current = *cnt;
+
+ if ((max - inc) >= current) {
+ *cnt += inc;
+ wrap = FALSE;
+ } else {
+ *cnt = inc - (max - current) - 1;
+ wrap = TRUE;
+ }
+
+ return (wrap);
+}
+
+
+
+/* ----------------------------------------------------------------------
* C a l l b a c k F u n c t i o n s
* ----------------------------------------------------------------------
*/
@@ -2397,7 +2700,7 @@ int on_load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info)
atom_eafnosupport = MKA(env, str_eafnosupport);
atom_einval = MKA(env, str_einval);
atom_eisconn = MKA(env, str_eisconn);
- atom_eisnconn = MKA(env, str_eisnconn);
+ atom_enotconn = MKA(env, str_enotconn);
// atom_exalloc = MKA(env, str_exalloc);
atom_exbadstate = MKA(env, str_exbadstate);
atom_exbusy = MKA(env, str_exbusy);
diff --git a/erts/preloaded/src/socket.erl b/erts/preloaded/src/socket.erl
index f3a3d493ac..985b45a956 100644
--- a/erts/preloaded/src/socket.erl
+++ b/erts/preloaded/src/socket.erl
@@ -33,13 +33,21 @@
listen/1, listen/2,
accept/1, accept/2,
- send/2, send/3, sendto/5,
- recv/1, recv/2, recvfrom/1, recvfrom/2,
+ send/2, send/3, send/4,
+ sendto/5,
+ %% sendmsg/4,
+ %% writev/4, OR SENDV? It will be strange for recv then: recvv (instead of readv)
+
+ recv/1, recv/2,
+ recvfrom/1, recvfrom/2,
+ %% recvmsg/4,
+ %% readv/3,
close/1,
setopt/3,
getopt/2,
+ %% ?????
formated_timestamp/0
]).
@@ -457,45 +465,103 @@ flush_select_msgs(LSRef, Ref) ->
end.
+
%% ===========================================================================
%%
%% send, sendto, sendmsg - send a message on a socket
%%
--spec send(Socket, Data, Flags) -> ok | {error, Reason} when
- Socket :: socket(),
- Data :: binary(),
- Flags :: send_flags(),
- Reason :: term().
-
send(Socket, Data) ->
- send(Socket, Data, ?SOCKET_SEND_FLAGS_DEFAULT).
+ send(Socket, Data, ?SOCKET_SEND_FLAGS_DEFAULT, infinity).
-send({socket, _, SockRef}, Data, Flags)
- when is_binary(Data) andalso is_list(Flags) ->
+send(Socket, Data, Flags) when is_list(Flags) ->
+ send(Socket, Data, Flags, infinity);
+send(Socket, Data, Timeout) ->
+ send(Socket, Data, ?SOCKET_SEND_FLAGS_DEFAULT, Timeout).
+
+
+-spec send(Socket, Data, Flags, Timeout) -> ok | {error, Reason} when
+ Socket :: socket(),
+ Data :: iodata(),
+ Flags :: send_flags(),
+ Timeout :: timeout(),
+ Reason :: term().
+
+send(Socket, Data, Flags, Timeout) when is_list(Data) ->
+ Bin = erlang:list_to_binary(Data),
+ send(Socket, Bin, Flags, Timeout);
+send(Socket, Data, Flags, Timeout) when is_binary(Data) andalso is_list(Flags) ->
EFlags = enc_send_flags(Flags),
- nif_send(SockRef, Data, EFlags).
+ do_send(Socket, make_ref(), Data, EFlags, Timeout).
+
+do_send(SockRef, SendRef, Data, _EFlags, Timeout)
+ when (Timeout =< 0) ->
+ nif_cancel(SockRef, SendRef),
+ flush_select_msgs(SockRef, SendRef),
+ {error, {timeout, size(Data)}};
+do_send(SockRef, SendRef, Data, EFlags, Timeout) ->
+ TS = timestamp(Timeout),
+ case nif_send(SockRef, SendRef, Data, EFlags) of
+ ok ->
+ {ok, next_timeout(TS, Timeout)};
+ {ok, Written} ->
+ %% We are partially done, wait for continuation
+ receive
+ {select, SockRef, SendRef, ready_output} when (Written > 0) ->
+ <<_:Written/binary, Rest/binary>> = Data,
+ do_send(SockRef, make_ref(), Rest, EFlags,
+ next_timeout(TS, Timeout));
+ {select, SockRef, SendRef, ready_output} ->
+ do_send(SockRef, make_ref(), Data, EFlags,
+ next_timeout(TS, Timeout))
+ after Timeout ->
+ nif_cancel(SockRef, SendRef),
+ flush_select_msgs(SockRef, SendRef),
+ {error, timeout}
+ end;
+ {error, eagain} ->
+ receive
+ {select, SockRef, SendRef, ready_output} ->
+ do_send(SockRef, SendRef, Data, EFlags,
+ next_timeout(TS, Timeout))
+ after Timeout ->
+ nif_cancel(SockRef, SendRef),
+ flush_select_msgs(SockRef, SendRef),
+ {error, timeout}
+ end;
+
+ {error, _} = ERROR ->
+ ERROR
+ end.
+
+
%% ---------------------------------------------------------------------------
+%%
+%% Do we need a timeout argument here also?
+%%
-spec sendto(Socket, Data, Flags, DestAddr, Port) -> ok | {error, Reason} when
Socket :: socket(),
Data :: binary(),
Flags :: send_flags(),
- DestAddr :: ip_address(),
+ DestAddr :: null | ip_address(),
Port :: port_number(),
Reason :: term().
sendto({socket, _, SockRef}, Data, Flags, DestAddr, DestPort)
when is_binary(Data) andalso
is_list(Flags) andalso
- (is_tuple(DestAddr) andalso
- ((size(DestAddr) =:= 4) orelse
- (size(DestAddr) =:= 8))) andalso
+ ((is_tuple(DestAddr) andalso
+ ((size(DestAddr) =:= 4) orelse
+ (size(DestAddr) =:= 8))) orelse
+ (DestAddr =:= null)) andalso
(is_integer(DestPort) andalso (DestPort >= 0)) ->
+ %% We may need something like send/4 above?
EFlags = enc_send_flags(Flags),
- nif_sendto(SockRef, Data, EFlags, DestAddr, DestPort).
+ nif_sendto(SockRef, make_ref(), Data, EFlags, DestAddr, DestPort).
+
%% ---------------------------------------------------------------------------
@@ -508,7 +574,73 @@ sendto({socket, _, SockRef}, Data, Flags, DestAddr, DestPort)
+%% ===========================================================================
+%%
+%% writev - write data into multiple buffers
+%%
+
+%% send(Socket, Data, Flags, Timeout)
+%% when (is_list(Data) orelse is_binary(Data)) andalso is_list(Flags) ->
+%% IOVec = erlang:iolist_to_iovec(Data),
+%% EFlags = enc_send_flags(Flags),
+%% send_iovec(Socket, IOVec, EFlags, Timeout).
+
+
+%% %% Iterate over the IO-vector (list of binaries).
+
+%% send_iovec(_Socket, [] = _IOVec, _EFlags, _Timeout) ->
+%% ok;
+%% send_iovec({socket, _, SockRef} = Socket, [Bin|IOVec], EFlags, Timeout) ->
+%% case do_send(SockRef, make_ref(), Bin, EFlags, Timeout) of
+%% {ok, NewTimeout} ->
+%% send_iovec(Socket, IOVec, EFlags, NewTimeout);
+%% {error, _} = ERROR ->
+%% ERROR
+%% end.
+
+
+%% do_send(SockRef, SendRef, Data, _EFlags, Timeout)
+%% when (Timeout < 0) ->
+%% nif_cancel(SockRef, SendRef),
+%% flush_select_msgs(SockRef, SendRef),
+%% {error, {timeout, size(Data)}};
+%% do_send(SockRef, SendRef, Data, EFlags, Timeout) ->
+%% TS = timestamp(Timeout),
+%% case nif_send(SockRef, SendRef, Data, EFlags) of
+%% ok ->
+%% {ok, next_timeout(TS, Timeout)};
+%% {ok, Written} ->
+%% %% We are partially done, wait for continuation
+%% receive
+%% {select, SockRef, SendRef, ready_output} ->
+%% <<_:Written/binary, Rest/binary>> = Data,
+%% do_send(SockRef, make_ref(), Rest, EFlags,
+%% next_timeout(TS, Timeout))
+%% after Timeout ->
+%% nif_cancel(SockRef, SendRef),
+%% flush_select_msgs(SockRef, SendRef),
+%% {error, timeout}
+%% end;
+%% {error, eagain} ->
+%% receive
+%% {select, SockRef, SendRef, ready_output} ->
+%% do_send(SockRef, SendRef, Data, EFlags,
+%% next_timeout(TS, Timeout))
+%% after Timeout ->
+%% nif_cancel(SockRef, SendRef),
+%% flush_select_msgs(SockRef, SendRef),
+%% {error, timeout}
+%% end;
+
+%% {error, _} = ERROR ->
+%% ERROR
+%% end.
+
+
+%% ===========================================================================
+%%
%% recv, recvfrom, recvmsg - receive a message from a socket
+%%
-spec recv(Socket, Flags) -> {ok, Data} | {error, Reason} when
Socket :: socket(),
@@ -540,6 +672,7 @@ recvfrom({socket, _, SockRef}, Flags) when is_list(Flags) ->
EFlags = enc_recv_flags(Flags),
nif_recvfrom(SockRef, EFlags).
+
%% -spec recvmsg(Socket, [out] MsgHdr, Flags) -> {ok, Data} | {error, Reason} when
%% Socket :: socket(),
%% MsgHdr :: msg_header(),
@@ -549,6 +682,13 @@ recvfrom({socket, _, SockRef}, Flags) when is_list(Flags) ->
+%% ===========================================================================
+%%
+%% readv - read data into multiple buffers
+%%
+
+
+
%% close - close a file descriptor
-spec close(Socket) -> ok | {error, Reason} when
@@ -817,10 +957,10 @@ nif_listen(_SRef, _Backlog) ->
nif_accept(_SRef, _Ref) ->
erlang:error(badarg).
-nif_send(_SRef, _Data, _Flags) ->
+nif_send(_SockRef, _SendRef, _Data, _Flags) ->
erlang:error(badarg).
-nif_sendto(_SRef, _Data, _Flags, _Dest, _Port) ->
+nif_sendto(_SRef, _SendRef, _Data, _Flags, _Dest, _Port) ->
erlang:error(badarg).
nif_recv(_SRef, _Flags) ->