aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--erts/emulator/nifs/common/socket_nif.c798
-rw-r--r--erts/preloaded/ebin/socket.beambin66152 -> 66040 bytes
-rw-r--r--erts/preloaded/src/socket.erl126
-rw-r--r--lib/kernel/test/socket_client.erl365
-rw-r--r--lib/kernel/test/socket_server.erl32
5 files changed, 1065 insertions, 256 deletions
diff --git a/erts/emulator/nifs/common/socket_nif.c b/erts/emulator/nifs/common/socket_nif.c
index 6564c3c82f..c48d6eab00 100644
--- a/erts/emulator/nifs/common/socket_nif.c
+++ b/erts/emulator/nifs/common/socket_nif.c
@@ -104,6 +104,14 @@
#undef WANT_NONBLOCKING
#include "sys.h"
+
+
+
+/* AND HERE WE MAY HAVE A BUNCH OF DEFINES....SEE INET DRIVER.... */
+
+
+
+
#else /* !__WIN32__ */
#include <sys/time.h>
@@ -297,7 +305,7 @@ static void (*esock_sctp_freepaddrs)(struct sockaddr *addrs) = NULL;
#endif
#include "sys.h"
-#endif
+#endif /* !__WIN32__ */
#include <erl_nif.h>
@@ -731,6 +739,7 @@ typedef struct {
SocketAddress remote;
unsigned int addrLen;
+ ErlNifEnv* env;
/* +++ Controller (owner) process +++ */
ErlNifPid ctrlPid;
@@ -771,7 +780,7 @@ typedef struct {
size_t rBufSz; // Read buffer size (when data length = 0 is specified)
size_t rCtrlSz; // Read control buffer size
size_t wCtrlSz; // Write control buffer size
- BOOLEAN_T iow; // Inform On Wrap
+ BOOLEAN_T iow; // Inform On (counter) Wrap
BOOLEAN_T dbg;
/* +++ Close stuff +++ */
@@ -909,11 +918,9 @@ static ERL_NIF_TERM nif_finalize_connection(ErlNifEnv* env,
static ERL_NIF_TERM nif_finalize_close(ErlNifEnv* env,
int argc,
const ERL_NIF_TERM argv[]);
-/*
static ERL_NIF_TERM nif_cancel(ErlNifEnv* env,
int argc,
const ERL_NIF_TERM argv[]);
-*/
static ERL_NIF_TERM nopen(ErlNifEnv* env,
@@ -977,7 +984,6 @@ static ERL_NIF_TERM nclose(ErlNifEnv* env,
static ERL_NIF_TERM nshutdown(ErlNifEnv* env,
SocketDescriptor* descP,
int how);
-
static ERL_NIF_TERM nsetopt(ErlNifEnv* env,
SocketDescriptor* descP,
BOOLEAN_T isEncoded,
@@ -1817,6 +1823,43 @@ static ERL_NIF_TERM nsockname(ErlNifEnv* env,
SocketDescriptor* descP);
static ERL_NIF_TERM npeername(ErlNifEnv* env,
SocketDescriptor* descP);
+static ERL_NIF_TERM ncancel(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ ERL_NIF_TERM op,
+ ERL_NIF_TERM opRef);
+static ERL_NIF_TERM ncancel_connect(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ ERL_NIF_TERM opRef);
+static ERL_NIF_TERM ncancel_accept(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ ERL_NIF_TERM opRef);
+static ERL_NIF_TERM ncancel_accept_current(ErlNifEnv* env,
+ SocketDescriptor* descP);
+static ERL_NIF_TERM ncancel_accept_waiting(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ ERL_NIF_TERM opRef);
+static ERL_NIF_TERM ncancel_send(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ ERL_NIF_TERM opRef);
+static ERL_NIF_TERM ncancel_send_current(ErlNifEnv* env,
+ SocketDescriptor* descP);
+static ERL_NIF_TERM ncancel_send_waiting(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ ERL_NIF_TERM opRef);
+static ERL_NIF_TERM ncancel_recv(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ ERL_NIF_TERM opRef);
+static ERL_NIF_TERM ncancel_read_select(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ ERL_NIF_TERM opRef);
+static ERL_NIF_TERM ncancel_write_select(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ ERL_NIF_TERM opRef);
+static ERL_NIF_TERM ncancel_mode_select(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ ERL_NIF_TERM opRef,
+ int smode,
+ int rmode);
static ERL_NIF_TERM nsetopt_str_opt(ErlNifEnv* env,
SocketDescriptor* descP,
@@ -1858,6 +1901,10 @@ static ERL_NIF_TERM ngetopt_timeval_opt(ErlNifEnv* env,
int level,
int opt);
+static BOOLEAN_T send_check_writer(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ ERL_NIF_TERM ref,
+ ERL_NIF_TERM* checkResult);
static ERL_NIF_TERM send_check_result(ErlNifEnv* env,
SocketDescriptor* descP,
ssize_t written,
@@ -2083,6 +2130,25 @@ static BOOLEAN_T acceptor_pop(ErlNifEnv* env,
ErlNifPid* pid,
ErlNifMonitor* mon,
ERL_NIF_TERM* ref);
+static BOOLEAN_T acceptor_unqueue(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ const ErlNifPid* pid);
+
+static BOOLEAN_T writer_search4pid(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ ErlNifPid* pid);
+static ERL_NIF_TERM writer_push(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ ErlNifPid pid,
+ ERL_NIF_TERM ref);
+static BOOLEAN_T writer_pop(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ ErlNifPid* pid,
+ ErlNifMonitor* mon,
+ ERL_NIF_TERM* ref);
+static BOOLEAN_T writer_unqueue(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ const ErlNifPid* pid);
static BOOLEAN_T qsearch4pid(ErlNifEnv* env,
SocketRequestQueue* q,
@@ -2235,8 +2301,10 @@ static char str_exsend[] = "exsend"; // failed send
/* *** "Global" Atoms *** */
+ERL_NIF_TERM esock_atom_accept;
ERL_NIF_TERM esock_atom_addr;
ERL_NIF_TERM esock_atom_any;
+ERL_NIF_TERM esock_atom_connect;
ERL_NIF_TERM esock_atom_credentials;
ERL_NIF_TERM esock_atom_ctrl;
ERL_NIF_TERM esock_atom_ctrunc;
@@ -2261,6 +2329,7 @@ ERL_NIF_TERM esock_atom_local;
ERL_NIF_TERM esock_atom_loopback;
ERL_NIF_TERM esock_atom_lowdelay;
ERL_NIF_TERM esock_atom_mincost;
+ERL_NIF_TERM esock_atom_not_found;
ERL_NIF_TERM esock_atom_ok;
ERL_NIF_TERM esock_atom_oob;
ERL_NIF_TERM esock_atom_origdstaddr;
@@ -2270,11 +2339,18 @@ ERL_NIF_TERM esock_atom_port;
ERL_NIF_TERM esock_atom_protocol;
ERL_NIF_TERM esock_atom_raw;
ERL_NIF_TERM esock_atom_rdm;
-ERL_NIF_TERM esock_atom_rights;
+ERL_NIF_TERM esock_atom_recv;
+ERL_NIF_TERM esock_atom_recvfrom;
+ERL_NIF_TERM esock_atom_recvmsg;
ERL_NIF_TERM esock_atom_reliability;
+ERL_NIF_TERM esock_atom_rights;
ERL_NIF_TERM esock_atom_scope_id;
ERL_NIF_TERM esock_atom_sctp;
ERL_NIF_TERM esock_atom_sec;
+ERL_NIF_TERM esock_atom_select_sent;
+ERL_NIF_TERM esock_atom_send;
+ERL_NIF_TERM esock_atom_sendmsg;
+ERL_NIF_TERM esock_atom_sendto;
ERL_NIF_TERM esock_atom_seqpacket;
ERL_NIF_TERM esock_atom_socket;
ERL_NIF_TERM esock_atom_spec_dst;
@@ -3222,7 +3298,8 @@ ERL_NIF_TERM naccept_listening(ErlNifEnv* env,
&descP->currentAcceptor.mon) > 0)
return esock_make_error(env, atom_exmon);
- descP->currentAcceptor.ref = ref;
+ descP->currentAcceptor.ref = enif_make_copy(descP->env, ref);
+ descP->currentAcceptorP = &descP->currentAcceptor;
SELECT(env,
descP->sock,
@@ -3320,7 +3397,8 @@ ERL_NIF_TERM naccept_listening(ErlNifEnv* env,
/* *** naccept_accepting ***
* We have an active acceptor and possibly acceptors waiting in queue.
- * At the moment the queue is *not* implemented.
+ * If the pid of the calling process is not the pid of the "current process",
+ * push the requester onto the queue.
*/
static
ERL_NIF_TERM naccept_accepting(ErlNifEnv* env,
@@ -3347,11 +3425,8 @@ ERL_NIF_TERM naccept_accepting(ErlNifEnv* env,
"\r\n", caller, descP->currentAcceptor.pid) );
if (!compare_pids(env, &descP->currentAcceptor.pid, &caller)) {
- /* This will have to do until we implement the queue.
- * When we have the queue, we should simply push this request,
- * and instead return with eagain (the caller will then wait
- * for the select message).
- */
+
+ /* Not the "current acceptor", so (maybe) push onto queue */
SSDBG( descP, ("SOCKET", "naccept_accepting -> not (active) acceptor\r\n") );
@@ -3411,6 +3486,8 @@ ERL_NIF_TERM naccept_accepting(ErlNifEnv* env,
SSDBG( descP, ("SOCKET", "naccept_accepting -> accept success\r\n") );
+ DEMONP(env, descP, &descP->currentAcceptor.mon);
+
if ((accEvent = sock_create_event(accSock)) == INVALID_EVENT) {
save_errno = sock_errno();
while ((sock_close(accSock) == INVALID_SOCKET) &&
@@ -3451,10 +3528,7 @@ ERL_NIF_TERM naccept_accepting(ErlNifEnv* env,
accDescP->state = SOCKET_STATE_CONNECTED;
- /* Here we should have the test if we have something in the queue.
- * And if so, pop it and copy the (waiting) acceptor, and then
- * make a new select with that info).
- */
+ /* Check if there are waiting acceptors (popping the acceptor queue) */
if (acceptor_pop(env, descP,
&descP->currentAcceptor.pid,
@@ -3559,12 +3633,12 @@ ERL_NIF_TERM nif_send(ErlNifEnv* env,
}
-/* What do we do when another process tries to write
- * when the current writer has a select already waiting?
- * Queue it? And what about simultaneous read and write?
- * Queue up all operations towards the socket?
+/* *** nsend ***
*
- * We (may) need a currentOp field and an ops queue field.
+ * Do the actual send.
+ * Do some initial writer checks, do the actual send and then
+ * analyze the result. If we are done, another writer may be
+ * scheduled (if there is one in the writer queue).
*/
static
ERL_NIF_TERM nsend(ErlNifEnv* env,
@@ -3573,12 +3647,17 @@ ERL_NIF_TERM nsend(ErlNifEnv* env,
ErlNifBinary* sndDataP,
int flags)
{
- int save_errno;
- ssize_t written;
+ int save_errno;
+ ssize_t written;
+ ERL_NIF_TERM writerCheck;
if (!descP->isWritable)
return enif_make_badarg(env);
+ /* Check if there is already a current writer and if its us */
+ if (!send_check_writer(env, descP, sendRef, &writerCheck))
+ return writerCheck;
+
/* We ignore the wrap for the moment.
* Maybe we should issue a wrap-message to controlling process...
*/
@@ -3597,6 +3676,7 @@ ERL_NIF_TERM nsend(ErlNifEnv* env,
}
+
/* ----------------------------------------------------------------------
* nif_sendto
*
@@ -3662,9 +3742,13 @@ ERL_NIF_TERM nif_sendto(ErlNifEnv* env,
&remoteAddrLen)) != NULL)
return esock_make_error_str(env, xres);
+ MLOCK(descP->writeMtx);
+
res = nsendto(env, descP, sendRef, &sndData, flags,
&remoteAddr, remoteAddrLen);
+ MUNLOCK(descP->writeMtx);
+
SGDBG( ("SOCKET", "nif_sendto -> done with result: "
"\r\n %T"
"\r\n", res) );
@@ -3682,12 +3766,17 @@ ERL_NIF_TERM nsendto(ErlNifEnv* env,
SocketAddress* toAddrP,
unsigned int toAddrLen)
{
- int save_errno;
- ssize_t written;
+ int save_errno;
+ ssize_t written;
+ ERL_NIF_TERM writerCheck;
if (!descP->isWritable)
return enif_make_badarg(env);
+ /* Check if there is already a current writer and if its us */
+ if (!send_check_writer(env, descP, sendRef, &writerCheck))
+ return writerCheck;
+
/* We ignore the wrap for the moment.
* Maybe we should issue a wrap-message to controlling process...
*/
@@ -3763,8 +3852,12 @@ ERL_NIF_TERM nif_sendmsg(ErlNifEnv* env,
if (!esendflags2sendflags(eflags, &flags))
return esock_make_error(env, esock_atom_einval);
+ MLOCK(descP->writeMtx);
+
res = nsendmsg(env, descP, sendRef, eMsgHdr, flags);
+ MUNLOCK(descP->writeMtx);
+
SSDBG( descP,
("SOCKET", "nif_sendmsg -> done with result: "
"\r\n %T"
@@ -3791,12 +3884,16 @@ ERL_NIF_TERM nsendmsg(ErlNifEnv* env,
size_t ctrlBufLen, ctrlBufUsed;
int save_errno;
ssize_t written, dataSize;
+ ERL_NIF_TERM writerCheck;
char* xres;
if (!descP->isWritable)
return enif_make_badarg(env);
-
+ /* Check if there is already a current writer and if its us */
+ if (!send_check_writer(env, descP, sendRef, &writerCheck))
+ return writerCheck;
+
/* Depending on if we are *connected* or not, we require
* different things in the msghdr map.
*/
@@ -7175,6 +7272,7 @@ ERL_NIF_TERM nsetopt_lvl_sctp_associnfo(ErlNifEnv* env,
int res;
size_t sz;
unsigned int tmp;
+ int32_t tmpAssocId;
SSDBG( descP,
("SOCKET", "nsetopt_lvl_sctp_associnfo -> entry with"
@@ -7213,8 +7311,12 @@ ERL_NIF_TERM nsetopt_lvl_sctp_associnfo(ErlNifEnv* env,
SSDBG( descP,
("SOCKET", "nsetopt_lvl_sctp_associnfo -> decode attributes\r\n") );
- if (!GET_UINT(env, eAssocId, &assocParams.sasoc_assoc_id))
+ /* On some platforms the assoc id is typed as an unsigned integer (uint32)
+ * So, to avoid warnings there, we always make an explicit cast...
+ */
+ if (!GET_INT(env, eAssocId, &tmpAssocId))
return esock_make_error(env, esock_atom_einval);
+ assocParams.sasoc_assoc_id = (typeof(assocParams.sasoc_assoc_id)) tmpAssocId;
/*
* We should really make sure this is ok in erlang (to ensure that
@@ -7509,6 +7611,7 @@ ERL_NIF_TERM nsetopt_lvl_sctp_rtoinfo(ErlNifEnv* env,
struct sctp_rtoinfo rtoInfo;
int res;
size_t sz;
+ int32_t tmpAssocId;
SSDBG( descP,
("SOCKET", "nsetopt_lvl_sctp_rtoinfo -> entry with"
@@ -7541,8 +7644,12 @@ ERL_NIF_TERM nsetopt_lvl_sctp_rtoinfo(ErlNifEnv* env,
SSDBG( descP,
("SOCKET", "nsetopt_lvl_sctp_rtoinfo -> decode attributes\r\n") );
- if (!GET_UINT(env, eAssocId, &rtoInfo.srto_assoc_id))
+ /* On some platforms the assoc id is typed as an unsigned integer (uint32)
+ * So, to avoid warnings there, we always make an explicit cast...
+ */
+ if (!GET_INT(env, eAssocId, &tmpAssocId))
return esock_make_error(env, esock_atom_einval);
+ rtoInfo.srto_assoc_id = (typeof(rtoInfo.srto_assoc_id)) tmpAssocId;
if (!GET_UINT(env, eInitial, &rtoInfo.srto_initial))
return esock_make_error(env, esock_atom_einval);
@@ -10414,10 +10521,471 @@ ERL_NIF_TERM npeername(ErlNifEnv* env,
/* ----------------------------------------------------------------------
+ * nif_cancel
+ *
+ * Description:
+ * Cancel a previous select!
+ *
+ * Arguments:
+ * Socket (ref) - Points to the socket descriptor.
+ * Operation (atom) - What kind of operation (accept, send, ...) is to be cancelled
+ * Ref (ref) - Unique id for the operation
+ */
+static
+ERL_NIF_TERM nif_cancel(ErlNifEnv* env,
+ int argc,
+ const ERL_NIF_TERM argv[])
+{
+ SocketDescriptor* descP;
+ ERL_NIF_TERM op, opRef, result;
+
+ SGDBG( ("SOCKET", "nif_cancel -> entry with argc: %d\r\n", argc) );
+
+ /* Extract arguments and perform preliminary validation */
+
+ if ((argc != 3) ||
+ !enif_get_resource(env, argv[0], sockets, (void**) &descP)) {
+ return enif_make_badarg(env);
+ }
+ op = argv[1];
+ opRef = argv[2];
+
+ SSDBG( descP,
+ ("SOCKET", "nif_cancel -> args when sock = %d:"
+ "\r\n op: %T"
+ "\r\n opRef: %T"
+ "\r\n", descP->sock, op, opRef) );
+
+ result = ncancel(env, descP, op, opRef);
+
+ SSDBG( descP,
+ ("SOCKET", "nif_cancel -> done with result: "
+ "\r\n %T"
+ "\r\n", result) );
+
+ return result;
+
+}
+
+
+static
+ERL_NIF_TERM ncancel(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ ERL_NIF_TERM op,
+ ERL_NIF_TERM opRef)
+{
+ /* <KOLLA>
+ *
+ * Do we really need all these variants? Should it not be enough with:
+ *
+ * connect | accept | send | recv
+ *
+ * </KOLLA>
+ */
+ if (COMPARE(op, esock_atom_connect) == 0) {
+ return ncancel_connect(env, descP, opRef);
+ } else if (COMPARE(op, esock_atom_accept) == 0) {
+ return ncancel_accept(env, descP, opRef);
+ } else if (COMPARE(op, esock_atom_send) == 0) {
+ return ncancel_send(env, descP, opRef);
+ } else if (COMPARE(op, esock_atom_sendto) == 0) {
+ return ncancel_send(env, descP, opRef);
+ } else if (COMPARE(op, esock_atom_sendmsg) == 0) {
+ return ncancel_send(env, descP, opRef);
+ } else if (COMPARE(op, esock_atom_recv) == 0) {
+ return ncancel_recv(env, descP, opRef);
+ } else if (COMPARE(op, esock_atom_recvfrom) == 0) {
+ return ncancel_recv(env, descP, opRef);
+ } else if (COMPARE(op, esock_atom_recvmsg) == 0) {
+ return ncancel_recv(env, descP, opRef);
+ } else {
+ return esock_make_error(env, esock_atom_einval);
+ }
+}
+
+
+
+/* *** ncancel_connect ***
+ *
+ *
+ */
+static
+ERL_NIF_TERM ncancel_connect(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ ERL_NIF_TERM opRef)
+{
+ return ncancel_write_select(env, descP, opRef);
+}
+
+
+/* *** ncancel_accept ***
+ *
+ * We have two different cases:
+ * *) Its the current acceptor
+ * Cancel the select!
+ * We need to activate one of the waiting acceptors.
+ * *) Its one of the acceptors ("waiting") in the queue
+ * Simply remove the acceptor from the queue.
+ *
+ */
+static
+ERL_NIF_TERM ncancel_accept(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ ERL_NIF_TERM opRef)
+{
+ ERL_NIF_TERM res;
+
+ SSDBG( descP,
+ ("SOCKET", "ncancel_accept -> entry with"
+ "\r\n opRef: %T"
+ "\r\n %s"
+ "\r\n", opRef,
+ ((descP->currentAcceptorP == NULL) ? "without acceptor" : "with acceptor")) );
+
+ MLOCK(descP->accMtx);
+
+ if (descP->currentAcceptorP != NULL) {
+ if (COMPARE(opRef, descP->currentAcceptor.ref) == 0) {
+ res = ncancel_accept_current(env, descP);
+ } else {
+ res = ncancel_accept_waiting(env, descP, opRef);
+ }
+ } else {
+ /* Or badarg? */
+ res = esock_make_error(env, esock_atom_einval);
+ }
+
+ MUNLOCK(descP->accMtx);
+
+ SSDBG( descP,
+ ("SOCKET", "ncancel_accept -> done with result:"
+ "\r\n %T"
+ "\r\n", res) );
+
+ return res;
+}
+
+
+/* The current acceptor process has an ongoing select we first must
+ * cancel. Then we must re-activate the "first" (the first
+ * in the acceptor queue).
+ */
+static
+ERL_NIF_TERM ncancel_accept_current(ErlNifEnv* env,
+ SocketDescriptor* descP)
+{
+ ERL_NIF_TERM res;
+
+ SSDBG( descP, ("SOCKET", "ncancel_accept_current -> entry\r\n") );
+
+ res = ncancel_read_select(env, descP, descP->currentAcceptor.ref);
+
+ SSDBG( descP, ("SOCKET", "ncancel_accept_current -> cancel res: %T\r\n", res) );
+
+ if (acceptor_pop(env, descP,
+ &descP->currentAcceptor.pid,
+ &descP->currentAcceptor.mon,
+ &descP->currentAcceptor.ref)) {
+
+ /* There was another one */
+
+ SSDBG( descP, ("SOCKET", "ncancel_accept_current -> new (active) acceptor: "
+ "\r\n pid: %T"
+ "\r\n ref: %T"
+ "\r\n",
+ descP->currentAcceptor.pid,
+ descP->currentAcceptor.ref) );
+
+ SELECT(env,
+ descP->sock,
+ (ERL_NIF_SELECT_READ),
+ descP, &descP->currentAcceptor.pid, descP->currentAcceptor.ref);
+
+ } else {
+ SSDBG( descP, ("SOCKET", "ncancel_accept_current -> no more acceptors\r\n") );
+ descP->currentAcceptorP = NULL;
+ descP->state = SOCKET_STATE_LISTENING;
+ }
+
+ SSDBG( descP, ("SOCKET", "ncancel_accept_current -> done with result:"
+ "\r\n %T"
+ "\r\n", res) );
+
+ return res;
+}
+
+
+/* These processes have not performed a select, so we can simply
+ * remove them from the acceptor queue.
+ */
+static
+ERL_NIF_TERM ncancel_accept_waiting(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ ERL_NIF_TERM opRef)
+{
+ ErlNifPid caller;
+
+ if (enif_self(env, &caller) == NULL)
+ return esock_make_error(env, atom_exself);
+
+ /* unqueue request from (acceptor) queue */
+
+ if (acceptor_unqueue(env, descP, &caller)) {
+ return esock_atom_ok;
+ } else {
+ /* Race? */
+ return esock_make_error(env, esock_atom_not_found);
+ }
+}
+
+
+
+/* *** ncancel_send ***
+ *
+ * Cancel a send operation.
+ * Its either the current writer or one of the waiting writers.
+ */
+static
+ERL_NIF_TERM ncancel_send(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ ERL_NIF_TERM opRef)
+{
+ ERL_NIF_TERM res;
+
+ SSDBG( descP,
+ ("SOCKET", "ncancel_send -> entry with"
+ "\r\n opRef: %T"
+ "\r\n %s"
+ "\r\n", opRef,
+ ((descP->currentWriterP == NULL) ? "without writer" : "with writer")) );
+
+ MLOCK(descP->writeMtx);
+
+ if (descP->currentWriterP != NULL) {
+ if (COMPARE(opRef, descP->currentWriter.ref) == 0) {
+ res = ncancel_send_current(env, descP);
+ } else {
+ res = ncancel_send_waiting(env, descP, opRef);
+ }
+ } else {
+ /* Or badarg? */
+ res = esock_make_error(env, esock_atom_einval);
+ }
+
+ MUNLOCK(descP->writeMtx);
+
+ SSDBG( descP,
+ ("SOCKET", "ncancel_send -> done with result:"
+ "\r\n %T"
+ "\r\n", res) );
+
+ return res;
+}
+
+
+
+/* The current writer process has an ongoing select we first must
+ * cancel. Then we must re-activate the "first" (the first
+ * in the writer queue).
+ */
+static
+ERL_NIF_TERM ncancel_send_current(ErlNifEnv* env,
+ SocketDescriptor* descP)
+{
+ ERL_NIF_TERM res;
+
+ SSDBG( descP, ("SOCKET", "ncancel_send_current -> entry\r\n") );
+
+ res = ncancel_write_select(env, descP, descP->currentWriter.ref);
+
+ SSDBG( descP, ("SOCKET", "ncancel_send_current -> cancel res: %T\r\n", res) );
+
+ if (writer_pop(env, descP,
+ &descP->currentWriter.pid,
+ &descP->currentWriter.mon,
+ &descP->currentWriter.ref)) {
+
+ /* There was another one */
+
+ SSDBG( descP, ("SOCKET", "ncancel_send_current -> new (active) writer: "
+ "\r\n pid: %T"
+ "\r\n ref: %T"
+ "\r\n",
+ descP->currentWriter.pid,
+ descP->currentWriter.ref) );
+
+ SELECT(env,
+ descP->sock,
+ (ERL_NIF_SELECT_WRITE),
+ descP, &descP->currentWriter.pid, descP->currentWriter.ref);
+
+ } else {
+ SSDBG( descP, ("SOCKET", "ncancel_send_current -> no more writers\r\n") );
+ descP->currentWriterP = NULL;
+ }
+
+ SSDBG( descP, ("SOCKET", "ncancel_send_current -> done with result:"
+ "\r\n %T"
+ "\r\n", res) );
+
+ return res;
+}
+
+
+/* These processes have not performed a select, so we can simply
+ * remove them from the writer queue.
+ */
+static
+ERL_NIF_TERM ncancel_send_waiting(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ ERL_NIF_TERM opRef)
+{
+ ErlNifPid caller;
+
+ if (enif_self(env, &caller) == NULL)
+ return esock_make_error(env, atom_exself);
+
+ /* unqueue request from (writer) queue */
+
+ if (writer_unqueue(env, descP, &caller)) {
+ return esock_atom_ok;
+ } else {
+ /* Race? */
+ return esock_make_error(env, esock_atom_not_found);
+ }
+}
+
+
+
+/* *** ncancel_recv ***
+ *
+ *
+ */
+static
+ERL_NIF_TERM ncancel_recv(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ ERL_NIF_TERM opRef)
+{
+ return esock_make_error(env, esock_atom_einval);
+}
+
+
+
+static
+ERL_NIF_TERM ncancel_read_select(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ ERL_NIF_TERM opRef)
+{
+ return ncancel_mode_select(env, descP, opRef,
+ ERL_NIF_SELECT_READ,
+ ERL_NIF_SELECT_READ_CANCELLED);
+}
+
+
+static
+ERL_NIF_TERM ncancel_write_select(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ ERL_NIF_TERM opRef)
+{
+ return ncancel_mode_select(env, descP, opRef,
+ ERL_NIF_SELECT_WRITE,
+ ERL_NIF_SELECT_WRITE_CANCELLED);
+}
+
+
+static
+ERL_NIF_TERM ncancel_mode_select(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ ERL_NIF_TERM opRef,
+ int smode,
+ int rmode)
+{
+ int selectRes = enif_select(env, descP->sock,
+ (ERL_NIF_SELECT_CANCEL | smode),
+ descP, NULL, opRef);
+
+ if (selectRes & rmode) {
+ /* Was cancelled */
+ return esock_atom_ok;
+ } else if (selectRes > 0) {
+ /* Has already sent the message */
+ return esock_make_error(env, esock_atom_select_sent);
+ } else {
+ /* Stopped? */
+ SSDBG( descP, ("SOCKET", "ncancel_mode_select -> failed: %d (0x%lX)"
+ "\r\n", selectRes, selectRes) );
+ return esock_make_error(env, esock_atom_einval);
+ }
+
+}
+
+
+
+/* ----------------------------------------------------------------------
* U t i l i t y F u n c t i o n s
* ----------------------------------------------------------------------
*/
+/* *** send_check_writer ***
+ *
+ * Checks if we have a current writer and if that is us. If not, then we must
+ * be made to wait for our turn. This is done by pushing us unto the writer queue.
+ */
+static
+BOOLEAN_T send_check_writer(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ ERL_NIF_TERM ref,
+ ERL_NIF_TERM* checkResult)
+{
+ if (descP->currentWriterP != NULL) {
+ ErlNifPid caller;
+
+ if (enif_self(env, &caller) == NULL) {
+ *checkResult = esock_make_error(env, atom_exself);
+ return FALSE;
+ }
+
+ if (!compare_pids(env, &descP->currentWriter.pid, &caller)) {
+ /* Not the "current writer", so (maybe) push onto queue */
+
+ SSDBG( descP,
+ ("SOCKET", "send_check_writer -> not (current) writer\r\n") );
+
+ if (!writer_search4pid(env, descP, &caller))
+ *checkResult = writer_push(env, descP, caller, ref);
+ else
+ *checkResult = esock_make_error(env, esock_atom_eagain);
+
+ SSDBG( descP,
+ ("SOCKET",
+ "nsend -> queue (push) result: %T\r\n", checkResult) );
+
+ return FALSE;
+
+ }
+
+ }
+
+ *checkResult = esock_atom_ok; // Does not actually matter in this case, but ...
+
+ return TRUE;
+}
+
+
+
+/* *** send_check_result ***
+ *
+ * Check the result of a socket send (send, sendto and sendmsg) call.
+ * If a "complete" send has been made, the next (waiting) writer will be
+ * scheduled (if there is one).
+ * If we did not manage to send the entire package, make another select,
+ * so that we can be informed when we can make another try (to send the rest),
+ * and return with the amount we actually managed to send (its up to the caller
+ * (that is the erlang code) to figure out hust much is left to send).
+ * If the write fail, we give up and return with the appropriate error code.
+ *
+ * What about the remaining writers!!
+ */
static
ERL_NIF_TERM send_check_result(ErlNifEnv* env,
SocketDescriptor* descP,
@@ -10437,24 +11005,67 @@ ERL_NIF_TERM send_check_result(ErlNifEnv* env,
cnt_inc(&descP->writePkgCnt, 1);
cnt_inc(&descP->writeByteCnt, written);
+ DEMONP(env, descP, &descP->currentWriter.mon);
SSDBG( descP,
("SOCKET", "send_check_result -> "
"everything written (%d,%d) - done\r\n", dataSize, written) );
+ /* Ok, this write is done maybe activate the next (if any) */
+
+ if (writer_pop(env, descP,
+ &descP->currentWriter.pid,
+ &descP->currentWriter.mon,
+ &descP->currentWriter.ref)) {
+
+ /* There was another one */
+
+ SSDBG( descP, ("SOCKET", "send_check_result -> new (active) writer: "
+ "\r\n pid: %T"
+ "\r\n ref: %T"
+ "\r\n",
+ descP->currentWriter.pid,
+ descP->currentWriter.ref) );
+
+ SELECT(env,
+ descP->sock,
+ (ERL_NIF_SELECT_WRITE),
+ descP, &descP->currentWriter.pid, descP->currentWriter.ref);
+
+ } else {
+ descP->currentWriterP = NULL;
+ }
+
return esock_atom_ok;
} else if (written < 0) {
- /* Ouch, check what kind of failure */
+ /* Some kind of send failure - check what kind */
+
if ((saveErrno != EAGAIN) && (saveErrno != EINTR)) {
+ ErlNifPid pid;
+ ErlNifMonitor mon;
+ ERL_NIF_TERM ref, res;
+
+ /*
+ * An actual failure - we (and everyone waiting) give up
+ */
cnt_inc(&descP->writeFails, 1);
SSDBG( descP,
("SOCKET", "send_check_result -> error: %d\r\n", saveErrno) );
- return esock_make_error_errno(env, saveErrno);
+ res = esock_make_error_errno(env, saveErrno);
+
+ while (writer_pop(env, descP, &pid, &mon, &ref)) {
+ SSDBG( descP,
+ ("SOCKET", "send_check_result -> abort %T\r\n", pid) );
+ send_msg_nif_abort(env, ref, res, &pid);
+ DEMONP(env, descP, &mon);
+ }
+
+ return res;
} else {
@@ -10531,8 +11142,9 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env,
}
/* There is a special case: If the provided 'to read' value is
- * zero (0). That means that we reads as much as we can, using
- * the default read buffer size.
+ * zero (0) (only for type =/= stream).
+ * That means that we reads as much as we can, using the default
+ * read buffer size.
*/
if (bufP->size == read) {
@@ -12541,6 +13153,8 @@ SocketDescriptor* alloc_descriptor(SOCKET sock, HANDLE event)
if ((descP = enif_alloc_resource(sockets, sizeof(SocketDescriptor))) != NULL) {
char buf[64]; /* Buffer used for building the mutex name */
+ descP->env = enif_alloc_env();
+
sprintf(buf, "socket[w,%d]", sock);
descP->writeMtx = MCREATE(buf);
descP->currentWriterP = NULL; // currentWriter not used
@@ -13187,7 +13801,7 @@ ERL_NIF_TERM acceptor_push(ErlNifEnv* env,
SocketRequestor* reqP = &e->data;
reqP->pid = pid;
- reqP->ref = ref;
+ reqP->ref = enif_make_copy(descP->env, ref);
if (MONP(env, descP, &pid, &reqP->mon) > 0) {
FREE(reqP);
@@ -13231,6 +13845,108 @@ BOOLEAN_T acceptor_pop(ErlNifEnv* env,
}
+/* *** acceptor unqueue ***
+ *
+ * Remove an acceptor from the acceptor queue.
+ */
+static
+BOOLEAN_T acceptor_unqueue(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ const ErlNifPid* pid)
+{
+ return qunqueue(env, &descP->acceptorsQ, pid);
+}
+
+
+
+/* *** writer search for pid ***
+ *
+ * Search for a pid in the writer queue.
+ */
+static
+BOOLEAN_T writer_search4pid(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ ErlNifPid* pid)
+{
+ return qsearch4pid(env, &descP->writersQ, pid);
+}
+
+
+/* *** writer push ***
+ *
+ * Push an writer onto the writer queue.
+ * This happens when we already have atleast one current writer.
+ */
+static
+ERL_NIF_TERM writer_push(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ ErlNifPid pid,
+ ERL_NIF_TERM ref)
+{
+ SocketRequestQueueElement* e = MALLOC(sizeof(SocketRequestQueueElement));
+ SocketRequestor* reqP = &e->data;
+
+ reqP->pid = pid;
+ reqP->ref = enif_make_copy(descP->env, ref);
+
+ if (MONP(env, descP, &pid, &reqP->mon) > 0) {
+ FREE(reqP);
+ return esock_make_error(env, atom_exmon);
+ }
+
+ qpush(&descP->writersQ, e);
+
+ // THIS IS OK => MAKES THE CALLER WAIT FOR ITS TURN
+ return esock_make_error(env, esock_atom_eagain);
+}
+
+
+/* *** writer pop ***
+ *
+ * Pop an writer from the writer queue.
+ */
+static
+BOOLEAN_T writer_pop(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ ErlNifPid* pid,
+ ErlNifMonitor* mon,
+ ERL_NIF_TERM* ref)
+{
+ SocketRequestQueueElement* e = qpop(&descP->writersQ);
+
+ if (e != NULL) {
+ *pid = e->data.pid;
+ *mon = e->data.mon;
+ *ref = e->data.ref; // At this point the ref has already been copied (env)
+ FREE(e);
+ return TRUE;
+ } else {
+ /* (acceptors) Queue was empty */
+ // *pid = NULL; we have no null value for pids
+ // *mon = NULL; we have no null value for monitors
+ *ref = esock_atom_undefined; // Just in case
+ return FALSE;
+ }
+
+}
+
+
+/* *** writer unqueue ***
+ *
+ * Remove an writer from the writer queue.
+ */
+static
+BOOLEAN_T writer_unqueue(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ const ErlNifPid* pid)
+{
+ return qunqueue(env, &descP->writersQ, pid);
+}
+
+
+
+
+
static
BOOLEAN_T qsearch4pid(ErlNifEnv* env,
SocketRequestQueue* q,
@@ -13673,7 +14389,7 @@ void socket_down(ErlNifEnv* env,
"socket_down -> "
"not current acceptor - maybe a waiting acceptor\r\n") );
- qunqueue(env, &descP->acceptorsQ, pid);
+ acceptor_unqueue(env, descP, pid);
}
}
@@ -13723,7 +14439,7 @@ ErlNifFunc socket_funcs[] =
* is called after the connect *select* has "completed".
*/
{"nif_finalize_connection", 1, nif_finalize_connection, 0},
- // {"nif_cancel", 2, nif_cancel, 0},
+ {"nif_cancel", 3, nif_cancel, 0},
{"nif_finalize_close", 1, nif_finalize_close, ERL_NIF_DIRTY_JOB_IO_BOUND}
};
@@ -13843,8 +14559,10 @@ int on_load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info)
atom_want = MKA(env, str_want);
/* Global atom(s) */
+ esock_atom_accept = MKA(env, "accept");
esock_atom_addr = MKA(env, "addr");
esock_atom_any = MKA(env, "any");
+ esock_atom_connect = MKA(env, "connect");
esock_atom_credentials = MKA(env, "credentials");
esock_atom_ctrl = MKA(env, "ctrl");
esock_atom_ctrunc = MKA(env, "ctrunc");
@@ -13869,6 +14587,7 @@ int on_load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info)
esock_atom_loopback = MKA(env, "loopback");
esock_atom_lowdelay = MKA(env, "lowdelay");
esock_atom_mincost = MKA(env, "mincost");
+ esock_atom_not_found = MKA(env, "not_found");
esock_atom_ok = MKA(env, "ok");
esock_atom_oob = MKA(env, "oob");
esock_atom_origdstaddr = MKA(env, "origdstaddr");
@@ -13878,11 +14597,18 @@ int on_load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info)
esock_atom_protocol = MKA(env, "protocol");
esock_atom_raw = MKA(env, "raw");
esock_atom_rdm = MKA(env, "rdm");
+ esock_atom_recv = MKA(env, "recv");
+ esock_atom_recvfrom = MKA(env, "recvfrom");
+ esock_atom_recvmsg = MKA(env, "recvmsg");
esock_atom_reliability = MKA(env, "reliability");
esock_atom_rights = MKA(env, "rights");
esock_atom_scope_id = MKA(env, "scope_id");
esock_atom_sctp = MKA(env, "sctp");
esock_atom_sec = MKA(env, "sec");
+ esock_atom_select_sent = MKA(env, "select_sent");
+ esock_atom_send = MKA(env, "send");
+ esock_atom_sendmsg = MKA(env, "sendmsg");
+ esock_atom_sendto = MKA(env, "sendto");
esock_atom_seqpacket = MKA(env, "seqpacket");
esock_atom_socket = MKA(env, "socket");
esock_atom_spec_dst = MKA(env, "spec_dst");
diff --git a/erts/preloaded/ebin/socket.beam b/erts/preloaded/ebin/socket.beam
index e6a33337ba..9e6d9f4709 100644
--- a/erts/preloaded/ebin/socket.beam
+++ b/erts/preloaded/ebin/socket.beam
Binary files differ
diff --git a/erts/preloaded/src/socket.erl b/erts/preloaded/src/socket.erl
index ad7a35694b..652054457f 100644
--- a/erts/preloaded/src/socket.erl
+++ b/erts/preloaded/src/socket.erl
@@ -1137,7 +1137,7 @@ connect(#socket{ref = SockRef}, #{family := Fam} = SockAddr, Timeout)
%% </KOLLA>
nif_finalize_connection(SockRef)
after NewTimeout ->
- nif_cancel(SockRef, connect, Ref),
+ cancel(SockRef, connect, Ref),
{error, timeout}
end;
{error, _} = ERROR ->
@@ -1145,7 +1145,6 @@ connect(#socket{ref = SockRef}, #{family := Fam} = SockAddr, Timeout)
end.
-
%% ===========================================================================
%%
%% listen - listen for connections on a socket
@@ -1227,13 +1226,12 @@ do_accept(LSockRef, Timeout) ->
{error, Reason}
after NewTimeout ->
- nif_cancel(LSockRef, accept, AccRef),
- flush_select_msgs(LSockRef, AccRef),
+ cancel(LSockRef, accept, AccRef),
{error, timeout}
end;
{error, _} = ERROR ->
- nif_cancel(LSockRef, accept, AccRef), % Just to be on the safe side...
+ cancel(LSockRef, accept, AccRef), % Just to be on the safe side...
ERROR
end.
@@ -1305,8 +1303,7 @@ do_send(SockRef, Data, EFlags, Timeout) ->
{error, Reason}
after NewTimeout ->
- nif_cancel(SockRef, send, SendRef),
- flush_select_msgs(SockRef, SendRef),
+ cancel(SockRef, send, SendRef),
{error, {timeout, size(Data)}}
end;
{error, eagain} ->
@@ -1319,8 +1316,7 @@ do_send(SockRef, Data, EFlags, Timeout) ->
{error, Reason}
after Timeout ->
- nif_cancel(SockRef, send, SendRef),
- flush_select_msgs(SockRef, SendRef),
+ cancel(SockRef, send, SendRef),
{error, {timeout, size(Data)}}
end;
@@ -1349,10 +1345,19 @@ sendto(Socket, Data, Dest) ->
Data :: binary(),
Dest :: null | sockaddr(),
Flags :: send_flags(),
- Reason :: term().
+ Reason :: term()
+ ; (Socket, Data, Dest, Timeout) -> ok | {error, Reason} when
+ Socket :: socket(),
+ Data :: iodata(),
+ Dest :: null | sockaddr(),
+ Timeout :: timeout(),
+ Reason :: term().
+
+sendto(Socket, Data, Dest, Flags) when is_list(Flags) ->
+ sendto(Socket, Data, Dest, Flags, ?SOCKET_SENDTO_TIMEOUT_DEFAULT);
+sendto(Socket, Data, Dest, Timeout) ->
+ sendto(Socket, Data, Dest, ?SOCKET_SENDTO_FLAGS_DEFAULT, Timeout).
-sendto(Socket, Data, Dest, Flags) ->
- sendto(Socket, Data, Dest, Flags, ?SOCKET_SENDTO_TIMEOUT_DEFAULT).
-spec sendto(Socket, Data, Dest, Flags, Timeout) -> ok | {error, Reason} when
Socket :: socket(),
@@ -1403,8 +1408,7 @@ do_sendto(SockRef, Data, Dest, EFlags, Timeout) ->
{error, Reason}
after Timeout ->
- nif_cancel(SockRef, sendto, SendRef),
- flush_select_msgs(SockRef, SendRef),
+ cancel(SockRef, sendto, SendRef),
{error, timeout}
end;
@@ -1414,8 +1418,7 @@ do_sendto(SockRef, Data, Dest, EFlags, Timeout) ->
do_sendto(SockRef, Data, Dest, EFlags,
next_timeout(TS, Timeout))
after Timeout ->
- nif_cancel(SockRef, sendto, SendRef),
- flush_select_msgs(SockRef, SendRef),
+ cancel(SockRef, sendto, SendRef),
{error, timeout}
end;
@@ -1497,8 +1500,7 @@ do_sendmsg(SockRef, MsgHdr, EFlags, Timeout) ->
do_sendmsg(SockRef, MsgHdr, EFlags,
next_timeout(TS, Timeout))
after Timeout ->
- nif_cancel(SockRef, sendmsg, SendRef),
- flush_select_msgs(SockRef, SendRef),
+ cancel(SockRef, sendmsg, SendRef),
{error, timeout}
end;
@@ -1519,62 +1521,6 @@ ensure_msghdr(_) ->
%% writev - write data into multiple buffers
%%
-%% send(Socket, Data, Flags, Timeout)
-%% when (is_list(Data) orelse is_binary(Data)) andalso is_list(Flags) ->
-%% IOVec = erlang:iolist_to_iovec(Data),
-%% EFlags = enc_send_flags(Flags),
-%% send_iovec(Socket, IOVec, EFlags, Timeout).
-
-
-%% %% Iterate over the IO-vector (list of binaries).
-
-%% send_iovec(_Socket, [] = _IOVec, _EFlags, _Timeout) ->
-%% ok;
-%% send_iovec({socket, _, SockRef} = Socket, [Bin|IOVec], EFlags, Timeout) ->
-%% case do_send(SockRef, make_ref(), Bin, EFlags, Timeout) of
-%% {ok, NewTimeout} ->
-%% send_iovec(Socket, IOVec, EFlags, NewTimeout);
-%% {error, _} = ERROR ->
-%% ERROR
-%% end.
-
-
-%% do_send(SockRef, SendRef, Data, _EFlags, Timeout)
-%% when (Timeout < 0) ->
-%% nif_cancel(SockRef, SendRef),
-%% flush_select_msgs(SockRef, SendRef),
-%% {error, {timeout, size(Data)}};
-%% do_send(SockRef, SendRef, Data, EFlags, Timeout) ->
-%% TS = timestamp(Timeout),
-%% case nif_send(SockRef, SendRef, Data, EFlags) of
-%% ok ->
-%% {ok, next_timeout(TS, Timeout)};
-%% {ok, Written} ->
-%% %% We are partially done, wait for continuation
-%% receive
-%% {select, SockRef, SendRef, ready_output} ->
-%% <<_:Written/binary, Rest/binary>> = Data,
-%% do_send(SockRef, make_ref(), Rest, EFlags,
-%% next_timeout(TS, Timeout))
-%% after Timeout ->
-%% nif_cancel(SockRef, SendRef),
-%% flush_select_msgs(SockRef, SendRef),
-%% {error, timeout}
-%% end;
-%% {error, eagain} ->
-%% receive
-%% {select, SockRef, SendRef, ready_output} ->
-%% do_send(SockRef, SendRef, Data, EFlags,
-%% next_timeout(TS, Timeout))
-%% after Timeout ->
-%% nif_cancel(SockRef, SendRef),
-%% flush_select_msgs(SockRef, SendRef),
-%% {error, timeout}
-%% end;
-
-%% {error, _} = ERROR ->
-%% ERROR
-%% end.
%% ===========================================================================
@@ -1695,8 +1641,7 @@ do_recv(SockRef, _OldRef, Length, EFlags, Acc, Timeout)
after NewTimeout ->
- nif_cancel(SockRef, recv, RecvRef),
- flush_select_msgs(SockRef, RecvRef),
+ cancel(SockRef, recv, RecvRef),
{error, {timeout, Acc}}
end;
@@ -1715,8 +1660,7 @@ do_recv(SockRef, _OldRef, Length, EFlags, Acc, Timeout)
after NewTimeout ->
- nif_cancel(SockRef, recv, RecvRef),
- flush_select_msgs(SockRef, RecvRef),
+ cancel(SockRef, recv, RecvRef),
{error, {timeout, Acc}}
end;
@@ -1739,8 +1683,7 @@ do_recv(SockRef, _OldRef, Length, EFlags, Acc, Timeout)
{error, Reason}
after NewTimeout ->
- nif_cancel(SockRef, recv, RecvRef),
- flush_select_msgs(SockRef, RecvRef),
+ cancel(SockRef, recv, RecvRef),
{error, timeout}
end;
@@ -1765,7 +1708,7 @@ do_recv(SockRef, RecvRef, 0 = _Length, _Eflags, Acc, _Timeout) ->
%% The current recv operation is to be cancelled, so no need for a ref...
%% The cancel will end our 'read everything you have' and "activate"
%% any waiting reader.
- nif_cancel(SockRef, recv, RecvRef),
+ cancel(SockRef, recv, RecvRef),
{ok, Acc};
do_recv(_SockRef, _RecvRef, _Length, _EFlags, Acc, _Timeout) when (size(Acc) > 0) ->
{error, {timeout, Acc}};
@@ -1878,8 +1821,7 @@ do_recvfrom(SockRef, BufSz, EFlags, Timeout) ->
{error, Reason}
after NewTimeout ->
- nif_cancel(SockRef, recvfrom, RecvRef),
- flush_select_msgs(SockRef, RecvRef),
+ cancel(SockRef, recvfrom, RecvRef),
{error, timeout}
end;
@@ -1966,8 +1908,7 @@ do_recvmsg(SockRef, BufSz, CtrlSz, EFlags, Timeout) ->
{error, Reason}
after NewTimeout ->
- nif_cancel(SockRef, recvmsg, RecvRef),
- flush_select_msgs(SockRef, RecvRef),
+ cancel(SockRef, recvmsg, RecvRef),
{error, timeout}
end;
@@ -3325,10 +3266,19 @@ ensure_sockaddr(_SockAddr) ->
-flush_select_msgs(LSRef, Ref) ->
+cancel(SockRef, Op, OpRef) ->
+ case nif_cancel(SockRef, Op, OpRef) of
+ %% The select has already completed
+ {error, select_sent} ->
+ flush_select_msgs(SockRef, OpRef);
+ Other ->
+ Other
+ end.
+
+flush_select_msgs(SockRef, Ref) ->
receive
- {select, LSRef, Ref, _} ->
- flush_select_msgs(LSRef, Ref)
+ {select, SockRef, Ref, _} ->
+ flush_select_msgs(SockRef, Ref)
after 0 ->
ok
end.
diff --git a/lib/kernel/test/socket_client.erl b/lib/kernel/test/socket_client.erl
index 58d70b6181..1c07e799b8 100644
--- a/lib/kernel/test/socket_client.erl
+++ b/lib/kernel/test/socket_client.erl
@@ -21,164 +21,246 @@
-module(socket_client).
-export([
- start/1, start/5,
- start_tcp/1, start_tcp/2, start_tcp4/1, start_tcp6/1,
- start_udp/1, start_udp/2, start_udp4/1, start_udp6/1
+ start/1, start/2, start/5, start/6,
+ start_tcp/1, start_tcp/2, start_tcp/3,
+ start_tcp4/1, start_tcp4/2, start_tcp6/1, start_tcp6/2,
+ start_udp/1, start_udp/2, start_udp/3,
+ start_udp4/1, start_udp4/2, start_udp6/1, start_udp6/2
]).
-define(LIB, socket_lib).
--record(client, {socket, msg = true, type, dest, msg_id = 1}).
+-record(client, {socket, verbose = true, msg = true, type, dest, msg_id = 1}).
start(Port) ->
- start_tcp(Port).
+ start(Port, 1).
+
+start(Port, Num) ->
+ start_tcp(Port, Num).
start_tcp(Port) ->
- start_tcp4(Port).
+ start_tcp(Port, 1).
+
+start_tcp(Port, Num) ->
+ start_tcp4(Port, Num).
start_tcp4(Port) ->
- start(inet, stream, tcp, Port).
+ start_tcp4(Port, 1).
+
+start_tcp4(Port, Num) ->
+ start(inet, stream, tcp, Port, Num).
start_tcp6(Port) ->
- start(inet6, stream, tcp, Port).
+ start_tcp6(Port, 1).
-start_tcp(Addr, Port) when (size(Addr) =:= 4) ->
- start(inet, stream, tcp, Addr, Port);
-start_tcp(Addr, Port) when (size(Addr) =:= 8) ->
- start(inet6, stream, tcp, Addr, Port).
+start_tcp6(Port, Num) ->
+ start(inet6, stream, tcp, Port, Num).
+
+start_tcp(Addr, Port, Num) when (size(Addr) =:= 4) andalso
+ is_integer(Num) andalso
+ (Num > 0) ->
+ start(inet, stream, tcp, Addr, Port, Num);
+start_tcp(Addr, Port, Num) when (size(Addr) =:= 8) andalso
+ is_integer(Num) andalso
+ (Num > 0) ->
+ start(inet6, stream, tcp, Addr, Port, Num).
start_udp(Port) ->
- start_udp4(Port).
+ start_udp(Port, 1).
+
+start_udp(Port, Num) ->
+ start_udp4(Port, Num).
start_udp4(Port) ->
- start(inet, dgram, udp, Port).
+ start_udp4(Port, 1).
+
+start_udp4(Port, Num) ->
+ start(inet, dgram, udp, Port, Num).
start_udp6(Port) ->
- start(inet6, dgram, udp, Port).
+ start_udp6(Port, 1).
+
+start_udp6(Port, Num) ->
+ start(inet6, dgram, udp, Port, Num).
-start_udp(Addr, Port) when (size(Addr) =:= 4) ->
- start(inet, dgram, udp, Addr, Port);
-start_udp(Addr, Port) when (size(Addr) =:= 8) ->
- start(inet6, dgram, udp, Addr, Port).
+start_udp(Addr, Port, Num) when (size(Addr) =:= 4) ->
+ start(inet, dgram, udp, Addr, Port, Num);
+start_udp(Addr, Port, Num) when (size(Addr) =:= 8) ->
+ start(inet6, dgram, udp, Addr, Port, Num).
-start(Domain, Type, Proto, Port) ->
- start(Domain, Type, Proto, which_addr(Domain), Port).
+start(Domain, Type, Proto, Port, Num)
+ when is_integer(Port) andalso is_integer(Num) ->
+ start(Domain, Type, Proto, which_addr(Domain), Port, Num);
start(Domain, Type, Proto, Addr, Port) ->
+ start(Domain, Type, Proto, Addr, Port, 1).
+
+start(Domain, Type, Proto, Addr, Port, 1 = Num) ->
+ start(Domain, Type, Proto, Addr, Port, Num, true);
+start(Domain, Type, Proto, Addr, Port, Num)
+ when is_integer(Num) andalso (Num > 1) ->
+ start(Domain, Type, Proto, Addr, Port, Num, false).
+
+start(Domain, Type, Proto, Addr, Port, Num, Verbose) ->
put(sname, "starter"),
+ Clients = start_clients(Num, Domain, Type, Proto, Addr, Port, Verbose),
+ await_clients(Clients).
+
+start_clients(Num, Domain, Type, Proto, Addr, Port, Verbose) ->
+ start_clients(Num, 1, Domain, Type, Proto, Addr, Port, Verbose, []).
+
+start_clients(Num, ID, Domain, Type, Proto, Addr, Port, Verbose, Acc)
+ when (Num > 0) ->
+ StartClient = fun() ->
+ start_client(ID, Domain, Type, Proto, Addr, Port, Verbose)
+ end,
+ {Pid, _} = spawn_monitor(StartClient),
+ ?LIB:sleep(500),
+ i("start client ~w", [ID]),
+ start_clients(Num-1, ID+1, Domain, Type, Proto, Addr, Port, Verbose, [Pid|Acc]);
+start_clients(_, _, _, _, _, _, _, _, Acc) ->
+ i("all client(s) started"),
+ lists:reverse(Acc).
+
+await_clients([]) ->
+ i("all clients done");
+await_clients(Clients) ->
+ receive
+ {'DOWN', _MRef, process, Pid, _Reason} ->
+ case lists:delete(Pid, Clients) of
+ Clients2 when (Clients2 =/= Clients) ->
+ i("client ~p done", [Pid]),
+ await_clients(Clients2);
+ _ ->
+ await_clients(Clients)
+ end
+ end.
+
+
+start_client(ID, Domain, Type, Proto, Addr, Port, Verbose) ->
+ put(sname, ?LIB:f("client[~w]", [ID])),
SA = #{family => Domain,
addr => Addr,
port => Port},
%% The way we use tos only works because we
%% send so few messages (a new value for every
%% message).
- put(tos, 1),
- do_start(Domain, Type, Proto, SA).
+ tos_init(),
+ do_start(Domain, Type, Proto, SA, Verbose).
-do_start(Domain, stream = Type, Proto, SA) ->
+do_start(Domain, stream = Type, Proto, SA, Verbose) ->
try do_init(Domain, Type, Proto) of
Sock ->
connect(Sock, SA),
- {ok, Name} = socket:sockname(Sock),
- {ok, Peer} = socket:peername(Sock),
- {ok, Domain} = socket:getopt(Sock, socket, domain),
- {ok, Type} = socket:getopt(Sock, socket, type),
- {ok, Proto} = socket:getopt(Sock, socket, protocol),
- {ok, OOBI} = socket:getopt(Sock, socket, oobinline),
- {ok, SndBuf} = socket:getopt(Sock, socket, sndbuf),
- {ok, RcvBuf} = socket:getopt(Sock, socket, rcvbuf),
- {ok, Linger} = socket:getopt(Sock, socket, linger),
- {ok, MTU} = socket:getopt(Sock, ip, mtu),
- {ok, MTUDisc} = socket:getopt(Sock, ip, mtu_discover),
- {ok, MALL} = socket:getopt(Sock, ip, multicast_all),
- {ok, MIF} = socket:getopt(Sock, ip, multicast_if),
- {ok, MLoop} = socket:getopt(Sock, ip, multicast_loop),
- {ok, MTTL} = socket:getopt(Sock, ip, multicast_ttl),
- {ok, RecvTOS} = socket:getopt(Sock, ip, recvtos),
- i("connected: "
- "~n From: ~p"
- "~n To: ~p"
- "~nwhen"
- "~n (socket) Domain: ~p"
- "~n (socket) Type: ~p"
- "~n (socket) Protocol: ~p"
- "~n (socket) OOBInline: ~p"
- "~n (socket) SndBuf: ~p"
- "~n (socket) RcvBuf: ~p"
- "~n (socket) Linger: ~p"
- "~n (ip) MTU: ~p"
- "~n (ip) MTU Discovery: ~p"
- "~n (ip) Multicast ALL: ~p"
- "~n (ip) Multicast IF: ~p"
- "~n (ip) Multicast Loop: ~p"
- "~n (ip) Multicast TTL: ~p"
- "~n (ip) RecvTOS: ~p"
- "~n => wait some",
- [Name, Peer,
- Domain, Type, Proto,
- OOBI, SndBuf, RcvBuf, Linger,
- MTU, MTUDisc, MALL, MIF, MLoop, MTTL,
- RecvTOS]),
+ maybe_print_start_info(Verbose, Sock, Type),
%% Give the server some time...
?LIB:sleep(5000),
%% ok = socket:close(Sock),
- send_loop(#client{socket = Sock,
- type = Type})
+ send_loop(#client{socket = Sock,
+ type = Type,
+ verbose = Verbose})
catch
throw:E ->
e("Failed initiate: "
"~n Error: ~p", [E])
end;
-do_start(Domain, dgram = Type, Proto, SA) ->
+do_start(Domain, dgram = Type, Proto, SA, Verbose) ->
try do_init(Domain, Type, Proto) of
Sock ->
+ maybe_print_start_info(Verbose, Sock, Type),
%% Give the server some time...
- {ok, Domain} = socket:getopt(Sock, socket, domain),
- {ok, Type} = socket:getopt(Sock, socket, type),
- {ok, Proto} = socket:getopt(Sock, socket, protocol),
- {ok, OOBI} = socket:getopt(Sock, socket, oobinline),
- {ok, SndBuf} = socket:getopt(Sock, socket, sndbuf),
- {ok, RcvBuf} = socket:getopt(Sock, socket, rcvbuf),
- {ok, Linger} = socket:getopt(Sock, socket, linger),
- {ok, MALL} = socket:getopt(Sock, ip, multicast_all),
- {ok, MIF} = socket:getopt(Sock, ip, multicast_if),
- {ok, MLoop} = socket:getopt(Sock, ip, multicast_loop),
- {ok, MTTL} = socket:getopt(Sock, ip, multicast_ttl),
- {ok, RecvTOS} = socket:getopt(Sock, ip, recvtos),
- {ok, RecvTTL} = socket:getopt(Sock, ip, recvttl),
- i("initiated when: "
- "~n (socket) Domain: ~p"
- "~n (socket) Type: ~p"
- "~n (socket) Protocol: ~p"
- "~n (socket) OOBInline: ~p"
- "~n (socket) SndBuf: ~p"
- "~n (socket) RcvBuf: ~p"
- "~n (socket) Linger: ~p"
- "~n (ip) Multicast ALL: ~p"
- "~n (ip) Multicast IF: ~p"
- "~n (ip) Multicast Loop: ~p"
- "~n (ip) Multicast TTL: ~p"
- "~n (ip) RecvTOS: ~p"
- "~n (ip) RecvTTL: ~p"
- "~n => wait some",
- [Domain, Type, Proto,
- OOBI, SndBuf, RcvBuf, Linger,
- MALL, MIF, MLoop, MTTL,
- RecvTOS, RecvTTL]),
?LIB:sleep(5000),
%% ok = socket:close(Sock),
- send_loop(#client{socket = Sock,
- type = Type,
- dest = SA})
+ send_loop(#client{socket = Sock,
+ type = Type,
+ dest = SA,
+ verbose = Verbose})
catch
throw:E ->
e("Failed initiate: "
"~n Error: ~p", [E])
end.
+maybe_print_start_info(true = _Verbose, Sock, stream = _Type) ->
+ {ok, Name} = socket:sockname(Sock),
+ {ok, Peer} = socket:peername(Sock),
+ {ok, Domain} = socket:getopt(Sock, socket, domain),
+ {ok, Type} = socket:getopt(Sock, socket, type),
+ {ok, Proto} = socket:getopt(Sock, socket, protocol),
+ {ok, OOBI} = socket:getopt(Sock, socket, oobinline),
+ {ok, SndBuf} = socket:getopt(Sock, socket, sndbuf),
+ {ok, RcvBuf} = socket:getopt(Sock, socket, rcvbuf),
+ {ok, Linger} = socket:getopt(Sock, socket, linger),
+ {ok, MTU} = socket:getopt(Sock, ip, mtu),
+ {ok, MTUDisc} = socket:getopt(Sock, ip, mtu_discover),
+ {ok, MALL} = socket:getopt(Sock, ip, multicast_all),
+ {ok, MIF} = socket:getopt(Sock, ip, multicast_if),
+ {ok, MLoop} = socket:getopt(Sock, ip, multicast_loop),
+ {ok, MTTL} = socket:getopt(Sock, ip, multicast_ttl),
+ {ok, RecvTOS} = socket:getopt(Sock, ip, recvtos),
+ i("connected: "
+ "~n From: ~p"
+ "~n To: ~p"
+ "~nwhen"
+ "~n (socket) Domain: ~p"
+ "~n (socket) Type: ~p"
+ "~n (socket) Protocol: ~p"
+ "~n (socket) OOBInline: ~p"
+ "~n (socket) SndBuf: ~p"
+ "~n (socket) RcvBuf: ~p"
+ "~n (socket) Linger: ~p"
+ "~n (ip) MTU: ~p"
+ "~n (ip) MTU Discovery: ~p"
+ "~n (ip) Multicast ALL: ~p"
+ "~n (ip) Multicast IF: ~p"
+ "~n (ip) Multicast Loop: ~p"
+ "~n (ip) Multicast TTL: ~p"
+ "~n (ip) RecvTOS: ~p"
+ "~n => wait some",
+ [Name, Peer,
+ Domain, Type, Proto,
+ OOBI, SndBuf, RcvBuf, Linger,
+ MTU, MTUDisc, MALL, MIF, MLoop, MTTL,
+ RecvTOS]);
+maybe_print_start_info(true = _Verbose, Sock, dgram = _Type) ->
+ {ok, Domain} = socket:getopt(Sock, socket, domain),
+ {ok, Type} = socket:getopt(Sock, socket, type),
+ {ok, Proto} = socket:getopt(Sock, socket, protocol),
+ {ok, OOBI} = socket:getopt(Sock, socket, oobinline),
+ {ok, SndBuf} = socket:getopt(Sock, socket, sndbuf),
+ {ok, RcvBuf} = socket:getopt(Sock, socket, rcvbuf),
+ {ok, Linger} = socket:getopt(Sock, socket, linger),
+ {ok, MALL} = socket:getopt(Sock, ip, multicast_all),
+ {ok, MIF} = socket:getopt(Sock, ip, multicast_if),
+ {ok, MLoop} = socket:getopt(Sock, ip, multicast_loop),
+ {ok, MTTL} = socket:getopt(Sock, ip, multicast_ttl),
+ {ok, RecvTOS} = socket:getopt(Sock, ip, recvtos),
+ {ok, RecvTTL} = socket:getopt(Sock, ip, recvttl),
+ i("initiated when: "
+ "~n (socket) Domain: ~p"
+ "~n (socket) Type: ~p"
+ "~n (socket) Protocol: ~p"
+ "~n (socket) OOBInline: ~p"
+ "~n (socket) SndBuf: ~p"
+ "~n (socket) RcvBuf: ~p"
+ "~n (socket) Linger: ~p"
+ "~n (ip) Multicast ALL: ~p"
+ "~n (ip) Multicast IF: ~p"
+ "~n (ip) Multicast Loop: ~p"
+ "~n (ip) Multicast TTL: ~p"
+ "~n (ip) RecvTOS: ~p"
+ "~n (ip) RecvTTL: ~p"
+ "~n => wait some",
+ [Domain, Type, Proto,
+ OOBI, SndBuf, RcvBuf, Linger,
+ MALL, MIF, MLoop, MTTL,
+ RecvTOS, RecvTTL]);
+maybe_print_start_info(_Verbose, _Sock, _Type) ->
+ ok.
+
do_init(Domain, stream = Type, Proto) ->
i("try (socket) open"),
Sock = case socket:open(Domain, Type, Proto) of
@@ -248,14 +330,25 @@ send_loop(#client{msg_id = N} = C) when (N =< 10) ->
i("request ~w sent - now try read answer", [N]),
case recv(C) of
{ok, {Source, Msg}} ->
- i("received ~w bytes of data~s",
- [size(Msg), case Source of
- undefined -> "";
- _ -> ?LIB:f(" from:~n ~p", [Source])
- end]),
+ if
+ (C#client.verbose =:= true) ->
+ i("received ~w bytes of data~s",
+ [size(Msg), case Source of
+ undefined -> "";
+ _ -> ?LIB:f(" from:~n ~p", [Source])
+ end]);
+ true ->
+ i("received ~w bytes", [size(Msg)])
+ end,
case ?LIB:dec_msg(Msg) of
{reply, N, Reply} ->
- i("received reply ~w: ~p", [N, Reply]),
+ if
+ (C#client.verbose =:= true) ->
+ i("received reply ~w: ~p", [N, Reply]);
+ true ->
+ i("received reply ~w", [N])
+ end,
+ ?LIB:sleep(500), % Just to spread it out a bit
send_loop(C#client{msg_id = N+1})
end;
{error, RReason} ->
@@ -268,13 +361,20 @@ send_loop(#client{msg_id = N} = C) when (N =< 10) ->
"~n ~p", [N, SReason]),
exit({failed_send, SReason})
end;
-send_loop(#client{socket = Sock}) ->
+send_loop(Client) ->
+ sock_close(Client).
+
+sock_close(#client{socket = Sock, verbose = true}) ->
i("we are done - close the socket when: "
"~n ~p", [socket:info()]),
ok = socket:close(Sock),
i("we are done - socket closed when: "
- "~n ~p", [socket:info()]).
+ "~n ~p", [socket:info()]);
+sock_close(#client{socket = Sock}) ->
+ i("we are done"),
+ ok = socket:close(Sock).
+
send(#client{socket = Sock, type = stream}, Msg) ->
socket:send(Sock, Msg);
@@ -282,11 +382,10 @@ send(#client{socket = Sock, type = dgram, dest = Dest}, Msg) ->
%% i("try send to: "
%% "~n ~p", [Dest]),
%% ok = socket:setopt(Sock, otp, debug, true),
- TOS = get(tos),
+ TOS = tos_next(),
ok = socket:setopt(Sock, ip, tos, TOS),
case socket:sendto(Sock, Msg, Dest) of
ok = OK ->
- put(tos, TOS+1),
OK;
{error, _} = ERROR ->
ERROR
@@ -299,31 +398,41 @@ recv(#client{socket = Sock, type = stream, msg = false}) ->
{error, _} = ERROR ->
ERROR
end;
-recv(#client{socket = Sock, type = stream, msg = true}) ->
+recv(#client{socket = Sock, verbose = Verbose, type = stream, msg = true}) ->
case socket:recvmsg(Sock) of
%% An iov of length 1 is an simplification...
{ok, #{addr := undefined = Source,
iov := [Msg],
ctrl := CMsgHdrs,
flags := Flags}} ->
- i("received message: "
- "~n CMsgHdr: ~p"
- "~n Flags: ~p", [CMsgHdrs, Flags]),
+ if
+ (Verbose =:= true) ->
+ i("received message: "
+ "~n CMsgHdr: ~p"
+ "~n Flags: ~p", [CMsgHdrs, Flags]);
+ true ->
+ ok
+ end,
{ok, {Source, Msg}};
{error, _} = ERROR ->
ERROR
end;
recv(#client{socket = Sock, type = dgram, msg = false}) ->
socket:recvfrom(Sock);
-recv(#client{socket = Sock, type = dgram, msg = true}) ->
+recv(#client{socket = Sock, verbose = Verbose, type = dgram, msg = true}) ->
case socket:recvmsg(Sock) of
{ok, #{addr := Source,
iov := [Msg],
ctrl := CMsgHdrs,
flags := Flags}} ->
- i("received message: "
- "~n CMsgHdr: ~p"
- "~n Flags: ~p", [CMsgHdrs, Flags]),
+ if
+ (Verbose =:= true) ->
+ i("received message: "
+ "~n CMsgHdr: ~p"
+ "~n Flags: ~p", [CMsgHdrs, Flags]);
+ true ->
+ ok
+ end,
{ok, {Source, Msg}};
{error, _} = ERROR ->
ERROR
@@ -402,6 +511,22 @@ which_addr2(Domain, [_|IFO]) ->
%% ---
+tos_init() ->
+ put(tos, 1).
+
+tos_next() ->
+ case get(tos) of
+ TOS when (TOS < 100) ->
+ put(tos, TOS + 1),
+ TOS;
+ _ ->
+ put(tos, 1),
+ 1
+ end.
+
+
+%% ---
+
e(F, A) ->
?LIB:e(F, A).
diff --git a/lib/kernel/test/socket_server.erl b/lib/kernel/test/socket_server.erl
index ea2bdc8e0d..9142942428 100644
--- a/lib/kernel/test/socket_server.erl
+++ b/lib/kernel/test/socket_server.erl
@@ -34,8 +34,10 @@
-define(LIB, socket_lib).
-record(manager, {socket, msg, peek, acceptors, handler_id, handlers}).
--record(acceptor, {id, socket, manager}).
--record(handler, {socket, peek, msg, type, manager}).
+-record(acceptor, {id, socket, manager,
+ atimeout = 5000}).
+-record(handler, {socket, peek, msg, type, manager,
+ stimeout = 5000, rtimeout = 5000}).
-define(NUM_ACCEPTORS, 5).
@@ -521,13 +523,14 @@ acceptor_stop(Pid, _Reason) ->
acceptor_init(Manager, Sock, ID) ->
put(sname, f("acceptor[~w]", [ID])),
Manager ! {acceptor, self(), ok},
+ %% ok = socket:setopt(Sock, otp, debug, true),
acceptor_loop(#acceptor{id = ID,
manager = Manager,
socket = Sock}).
-acceptor_loop(#acceptor{socket = LSock} = A) ->
+acceptor_loop(#acceptor{socket = LSock, atimeout = Timeout} = A) ->
i("try accept"),
- case socket:accept(LSock, infinity) of
+ case socket:accept(LSock, Timeout) of
{ok, Sock} ->
i("accepted: "
"~n ~p"
@@ -542,6 +545,9 @@ acceptor_loop(#acceptor{socket = LSock} = A) ->
socket:close(Sock),
exit({failed_starting_handler, Reason})
end;
+ {error, timeout} ->
+ i("timeout"),
+ acceptor_loop(A);
{error, Reason} ->
e("accept failure: "
"~n ~p", [Reason]),
@@ -900,28 +906,30 @@ peek_recvfrom(Sock, BufSz) ->
end.
-send(#handler{socket = Sock, msg = true, type = stream}, Msg, _) ->
+send(#handler{socket = Sock, msg = true, type = stream, stimeout = Timeout},
+ Msg, _) ->
CMsgHdr = #{level => ip, type => tos, data => reliability},
CMsgHdrs = [CMsgHdr],
MsgHdr = #{iov => [Msg], ctrl => CMsgHdrs},
%% socket:setopt(Sock, otp, debug, true),
- Res = socket:sendmsg(Sock, MsgHdr),
+ Res = socket:sendmsg(Sock, MsgHdr, Timeout),
%% socket:setopt(Sock, otp, debug, false),
Res;
-send(#handler{socket = Sock, type = stream}, Msg, _) ->
- socket:send(Sock, Msg);
-send(#handler{socket = Sock, msg = true, type = dgram}, Msg, Dest) ->
+send(#handler{socket = Sock, type = stream, stimeout = Timeout}, Msg, _) ->
+ socket:send(Sock, Msg, Timeout);
+send(#handler{socket = Sock, msg = true, type = dgram, stimeout = Timeout},
+ Msg, Dest) ->
CMsgHdr = #{level => ip, type => tos, data => reliability},
CMsgHdrs = [CMsgHdr],
MsgHdr = #{addr => Dest,
iov => [Msg],
ctrl => CMsgHdrs},
%% ok = socket:setopt(Sock, otp, debug, true),
- Res = socket:sendmsg(Sock, MsgHdr),
+ Res = socket:sendmsg(Sock, MsgHdr, Timeout),
%% ok = socket:setopt(Sock, otp, debug, false),
Res;
-send(#handler{socket = Sock, type = dgram}, Msg, Dest) ->
- socket:sendto(Sock, Msg, Dest).
+send(#handler{socket = Sock, type = dgram, stimeout = Timeout}, Msg, Dest) ->
+ socket:sendto(Sock, Msg, Dest, Timeout).
%% filler() ->
%% list_to_binary(lists:duplicate(2048, " FILLER ")).