diff options
-rw-r--r-- | erts/emulator/nifs/common/socket_nif.c | 798 | ||||
-rw-r--r-- | erts/preloaded/ebin/socket.beam | bin | 66152 -> 66040 bytes | |||
-rw-r--r-- | erts/preloaded/src/socket.erl | 126 | ||||
-rw-r--r-- | lib/kernel/test/socket_client.erl | 365 | ||||
-rw-r--r-- | lib/kernel/test/socket_server.erl | 32 |
5 files changed, 1065 insertions, 256 deletions
diff --git a/erts/emulator/nifs/common/socket_nif.c b/erts/emulator/nifs/common/socket_nif.c index 6564c3c82f..c48d6eab00 100644 --- a/erts/emulator/nifs/common/socket_nif.c +++ b/erts/emulator/nifs/common/socket_nif.c @@ -104,6 +104,14 @@ #undef WANT_NONBLOCKING #include "sys.h" + + + +/* AND HERE WE MAY HAVE A BUNCH OF DEFINES....SEE INET DRIVER.... */ + + + + #else /* !__WIN32__ */ #include <sys/time.h> @@ -297,7 +305,7 @@ static void (*esock_sctp_freepaddrs)(struct sockaddr *addrs) = NULL; #endif #include "sys.h" -#endif +#endif /* !__WIN32__ */ #include <erl_nif.h> @@ -731,6 +739,7 @@ typedef struct { SocketAddress remote; unsigned int addrLen; + ErlNifEnv* env; /* +++ Controller (owner) process +++ */ ErlNifPid ctrlPid; @@ -771,7 +780,7 @@ typedef struct { size_t rBufSz; // Read buffer size (when data length = 0 is specified) size_t rCtrlSz; // Read control buffer size size_t wCtrlSz; // Write control buffer size - BOOLEAN_T iow; // Inform On Wrap + BOOLEAN_T iow; // Inform On (counter) Wrap BOOLEAN_T dbg; /* +++ Close stuff +++ */ @@ -909,11 +918,9 @@ static ERL_NIF_TERM nif_finalize_connection(ErlNifEnv* env, static ERL_NIF_TERM nif_finalize_close(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]); -/* static ERL_NIF_TERM nif_cancel(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]); -*/ static ERL_NIF_TERM nopen(ErlNifEnv* env, @@ -977,7 +984,6 @@ static ERL_NIF_TERM nclose(ErlNifEnv* env, static ERL_NIF_TERM nshutdown(ErlNifEnv* env, SocketDescriptor* descP, int how); - static ERL_NIF_TERM nsetopt(ErlNifEnv* env, SocketDescriptor* descP, BOOLEAN_T isEncoded, @@ -1817,6 +1823,43 @@ static ERL_NIF_TERM nsockname(ErlNifEnv* env, SocketDescriptor* descP); static ERL_NIF_TERM npeername(ErlNifEnv* env, SocketDescriptor* descP); +static ERL_NIF_TERM ncancel(ErlNifEnv* env, + SocketDescriptor* descP, + ERL_NIF_TERM op, + ERL_NIF_TERM opRef); +static ERL_NIF_TERM ncancel_connect(ErlNifEnv* env, + SocketDescriptor* descP, + ERL_NIF_TERM opRef); +static ERL_NIF_TERM ncancel_accept(ErlNifEnv* env, + SocketDescriptor* descP, + ERL_NIF_TERM opRef); +static ERL_NIF_TERM ncancel_accept_current(ErlNifEnv* env, + SocketDescriptor* descP); +static ERL_NIF_TERM ncancel_accept_waiting(ErlNifEnv* env, + SocketDescriptor* descP, + ERL_NIF_TERM opRef); +static ERL_NIF_TERM ncancel_send(ErlNifEnv* env, + SocketDescriptor* descP, + ERL_NIF_TERM opRef); +static ERL_NIF_TERM ncancel_send_current(ErlNifEnv* env, + SocketDescriptor* descP); +static ERL_NIF_TERM ncancel_send_waiting(ErlNifEnv* env, + SocketDescriptor* descP, + ERL_NIF_TERM opRef); +static ERL_NIF_TERM ncancel_recv(ErlNifEnv* env, + SocketDescriptor* descP, + ERL_NIF_TERM opRef); +static ERL_NIF_TERM ncancel_read_select(ErlNifEnv* env, + SocketDescriptor* descP, + ERL_NIF_TERM opRef); +static ERL_NIF_TERM ncancel_write_select(ErlNifEnv* env, + SocketDescriptor* descP, + ERL_NIF_TERM opRef); +static ERL_NIF_TERM ncancel_mode_select(ErlNifEnv* env, + SocketDescriptor* descP, + ERL_NIF_TERM opRef, + int smode, + int rmode); static ERL_NIF_TERM nsetopt_str_opt(ErlNifEnv* env, SocketDescriptor* descP, @@ -1858,6 +1901,10 @@ static ERL_NIF_TERM ngetopt_timeval_opt(ErlNifEnv* env, int level, int opt); +static BOOLEAN_T send_check_writer(ErlNifEnv* env, + SocketDescriptor* descP, + ERL_NIF_TERM ref, + ERL_NIF_TERM* checkResult); static ERL_NIF_TERM send_check_result(ErlNifEnv* env, SocketDescriptor* descP, ssize_t written, @@ -2083,6 +2130,25 @@ static BOOLEAN_T acceptor_pop(ErlNifEnv* env, ErlNifPid* pid, ErlNifMonitor* mon, ERL_NIF_TERM* ref); +static BOOLEAN_T acceptor_unqueue(ErlNifEnv* env, + SocketDescriptor* descP, + const ErlNifPid* pid); + +static BOOLEAN_T writer_search4pid(ErlNifEnv* env, + SocketDescriptor* descP, + ErlNifPid* pid); +static ERL_NIF_TERM writer_push(ErlNifEnv* env, + SocketDescriptor* descP, + ErlNifPid pid, + ERL_NIF_TERM ref); +static BOOLEAN_T writer_pop(ErlNifEnv* env, + SocketDescriptor* descP, + ErlNifPid* pid, + ErlNifMonitor* mon, + ERL_NIF_TERM* ref); +static BOOLEAN_T writer_unqueue(ErlNifEnv* env, + SocketDescriptor* descP, + const ErlNifPid* pid); static BOOLEAN_T qsearch4pid(ErlNifEnv* env, SocketRequestQueue* q, @@ -2235,8 +2301,10 @@ static char str_exsend[] = "exsend"; // failed send /* *** "Global" Atoms *** */ +ERL_NIF_TERM esock_atom_accept; ERL_NIF_TERM esock_atom_addr; ERL_NIF_TERM esock_atom_any; +ERL_NIF_TERM esock_atom_connect; ERL_NIF_TERM esock_atom_credentials; ERL_NIF_TERM esock_atom_ctrl; ERL_NIF_TERM esock_atom_ctrunc; @@ -2261,6 +2329,7 @@ ERL_NIF_TERM esock_atom_local; ERL_NIF_TERM esock_atom_loopback; ERL_NIF_TERM esock_atom_lowdelay; ERL_NIF_TERM esock_atom_mincost; +ERL_NIF_TERM esock_atom_not_found; ERL_NIF_TERM esock_atom_ok; ERL_NIF_TERM esock_atom_oob; ERL_NIF_TERM esock_atom_origdstaddr; @@ -2270,11 +2339,18 @@ ERL_NIF_TERM esock_atom_port; ERL_NIF_TERM esock_atom_protocol; ERL_NIF_TERM esock_atom_raw; ERL_NIF_TERM esock_atom_rdm; -ERL_NIF_TERM esock_atom_rights; +ERL_NIF_TERM esock_atom_recv; +ERL_NIF_TERM esock_atom_recvfrom; +ERL_NIF_TERM esock_atom_recvmsg; ERL_NIF_TERM esock_atom_reliability; +ERL_NIF_TERM esock_atom_rights; ERL_NIF_TERM esock_atom_scope_id; ERL_NIF_TERM esock_atom_sctp; ERL_NIF_TERM esock_atom_sec; +ERL_NIF_TERM esock_atom_select_sent; +ERL_NIF_TERM esock_atom_send; +ERL_NIF_TERM esock_atom_sendmsg; +ERL_NIF_TERM esock_atom_sendto; ERL_NIF_TERM esock_atom_seqpacket; ERL_NIF_TERM esock_atom_socket; ERL_NIF_TERM esock_atom_spec_dst; @@ -3222,7 +3298,8 @@ ERL_NIF_TERM naccept_listening(ErlNifEnv* env, &descP->currentAcceptor.mon) > 0) return esock_make_error(env, atom_exmon); - descP->currentAcceptor.ref = ref; + descP->currentAcceptor.ref = enif_make_copy(descP->env, ref); + descP->currentAcceptorP = &descP->currentAcceptor; SELECT(env, descP->sock, @@ -3320,7 +3397,8 @@ ERL_NIF_TERM naccept_listening(ErlNifEnv* env, /* *** naccept_accepting *** * We have an active acceptor and possibly acceptors waiting in queue. - * At the moment the queue is *not* implemented. + * If the pid of the calling process is not the pid of the "current process", + * push the requester onto the queue. */ static ERL_NIF_TERM naccept_accepting(ErlNifEnv* env, @@ -3347,11 +3425,8 @@ ERL_NIF_TERM naccept_accepting(ErlNifEnv* env, "\r\n", caller, descP->currentAcceptor.pid) ); if (!compare_pids(env, &descP->currentAcceptor.pid, &caller)) { - /* This will have to do until we implement the queue. - * When we have the queue, we should simply push this request, - * and instead return with eagain (the caller will then wait - * for the select message). - */ + + /* Not the "current acceptor", so (maybe) push onto queue */ SSDBG( descP, ("SOCKET", "naccept_accepting -> not (active) acceptor\r\n") ); @@ -3411,6 +3486,8 @@ ERL_NIF_TERM naccept_accepting(ErlNifEnv* env, SSDBG( descP, ("SOCKET", "naccept_accepting -> accept success\r\n") ); + DEMONP(env, descP, &descP->currentAcceptor.mon); + if ((accEvent = sock_create_event(accSock)) == INVALID_EVENT) { save_errno = sock_errno(); while ((sock_close(accSock) == INVALID_SOCKET) && @@ -3451,10 +3528,7 @@ ERL_NIF_TERM naccept_accepting(ErlNifEnv* env, accDescP->state = SOCKET_STATE_CONNECTED; - /* Here we should have the test if we have something in the queue. - * And if so, pop it and copy the (waiting) acceptor, and then - * make a new select with that info). - */ + /* Check if there are waiting acceptors (popping the acceptor queue) */ if (acceptor_pop(env, descP, &descP->currentAcceptor.pid, @@ -3559,12 +3633,12 @@ ERL_NIF_TERM nif_send(ErlNifEnv* env, } -/* What do we do when another process tries to write - * when the current writer has a select already waiting? - * Queue it? And what about simultaneous read and write? - * Queue up all operations towards the socket? +/* *** nsend *** * - * We (may) need a currentOp field and an ops queue field. + * Do the actual send. + * Do some initial writer checks, do the actual send and then + * analyze the result. If we are done, another writer may be + * scheduled (if there is one in the writer queue). */ static ERL_NIF_TERM nsend(ErlNifEnv* env, @@ -3573,12 +3647,17 @@ ERL_NIF_TERM nsend(ErlNifEnv* env, ErlNifBinary* sndDataP, int flags) { - int save_errno; - ssize_t written; + int save_errno; + ssize_t written; + ERL_NIF_TERM writerCheck; if (!descP->isWritable) return enif_make_badarg(env); + /* Check if there is already a current writer and if its us */ + if (!send_check_writer(env, descP, sendRef, &writerCheck)) + return writerCheck; + /* We ignore the wrap for the moment. * Maybe we should issue a wrap-message to controlling process... */ @@ -3597,6 +3676,7 @@ ERL_NIF_TERM nsend(ErlNifEnv* env, } + /* ---------------------------------------------------------------------- * nif_sendto * @@ -3662,9 +3742,13 @@ ERL_NIF_TERM nif_sendto(ErlNifEnv* env, &remoteAddrLen)) != NULL) return esock_make_error_str(env, xres); + MLOCK(descP->writeMtx); + res = nsendto(env, descP, sendRef, &sndData, flags, &remoteAddr, remoteAddrLen); + MUNLOCK(descP->writeMtx); + SGDBG( ("SOCKET", "nif_sendto -> done with result: " "\r\n %T" "\r\n", res) ); @@ -3682,12 +3766,17 @@ ERL_NIF_TERM nsendto(ErlNifEnv* env, SocketAddress* toAddrP, unsigned int toAddrLen) { - int save_errno; - ssize_t written; + int save_errno; + ssize_t written; + ERL_NIF_TERM writerCheck; if (!descP->isWritable) return enif_make_badarg(env); + /* Check if there is already a current writer and if its us */ + if (!send_check_writer(env, descP, sendRef, &writerCheck)) + return writerCheck; + /* We ignore the wrap for the moment. * Maybe we should issue a wrap-message to controlling process... */ @@ -3763,8 +3852,12 @@ ERL_NIF_TERM nif_sendmsg(ErlNifEnv* env, if (!esendflags2sendflags(eflags, &flags)) return esock_make_error(env, esock_atom_einval); + MLOCK(descP->writeMtx); + res = nsendmsg(env, descP, sendRef, eMsgHdr, flags); + MUNLOCK(descP->writeMtx); + SSDBG( descP, ("SOCKET", "nif_sendmsg -> done with result: " "\r\n %T" @@ -3791,12 +3884,16 @@ ERL_NIF_TERM nsendmsg(ErlNifEnv* env, size_t ctrlBufLen, ctrlBufUsed; int save_errno; ssize_t written, dataSize; + ERL_NIF_TERM writerCheck; char* xres; if (!descP->isWritable) return enif_make_badarg(env); - + /* Check if there is already a current writer and if its us */ + if (!send_check_writer(env, descP, sendRef, &writerCheck)) + return writerCheck; + /* Depending on if we are *connected* or not, we require * different things in the msghdr map. */ @@ -7175,6 +7272,7 @@ ERL_NIF_TERM nsetopt_lvl_sctp_associnfo(ErlNifEnv* env, int res; size_t sz; unsigned int tmp; + int32_t tmpAssocId; SSDBG( descP, ("SOCKET", "nsetopt_lvl_sctp_associnfo -> entry with" @@ -7213,8 +7311,12 @@ ERL_NIF_TERM nsetopt_lvl_sctp_associnfo(ErlNifEnv* env, SSDBG( descP, ("SOCKET", "nsetopt_lvl_sctp_associnfo -> decode attributes\r\n") ); - if (!GET_UINT(env, eAssocId, &assocParams.sasoc_assoc_id)) + /* On some platforms the assoc id is typed as an unsigned integer (uint32) + * So, to avoid warnings there, we always make an explicit cast... + */ + if (!GET_INT(env, eAssocId, &tmpAssocId)) return esock_make_error(env, esock_atom_einval); + assocParams.sasoc_assoc_id = (typeof(assocParams.sasoc_assoc_id)) tmpAssocId; /* * We should really make sure this is ok in erlang (to ensure that @@ -7509,6 +7611,7 @@ ERL_NIF_TERM nsetopt_lvl_sctp_rtoinfo(ErlNifEnv* env, struct sctp_rtoinfo rtoInfo; int res; size_t sz; + int32_t tmpAssocId; SSDBG( descP, ("SOCKET", "nsetopt_lvl_sctp_rtoinfo -> entry with" @@ -7541,8 +7644,12 @@ ERL_NIF_TERM nsetopt_lvl_sctp_rtoinfo(ErlNifEnv* env, SSDBG( descP, ("SOCKET", "nsetopt_lvl_sctp_rtoinfo -> decode attributes\r\n") ); - if (!GET_UINT(env, eAssocId, &rtoInfo.srto_assoc_id)) + /* On some platforms the assoc id is typed as an unsigned integer (uint32) + * So, to avoid warnings there, we always make an explicit cast... + */ + if (!GET_INT(env, eAssocId, &tmpAssocId)) return esock_make_error(env, esock_atom_einval); + rtoInfo.srto_assoc_id = (typeof(rtoInfo.srto_assoc_id)) tmpAssocId; if (!GET_UINT(env, eInitial, &rtoInfo.srto_initial)) return esock_make_error(env, esock_atom_einval); @@ -10414,10 +10521,471 @@ ERL_NIF_TERM npeername(ErlNifEnv* env, /* ---------------------------------------------------------------------- + * nif_cancel + * + * Description: + * Cancel a previous select! + * + * Arguments: + * Socket (ref) - Points to the socket descriptor. + * Operation (atom) - What kind of operation (accept, send, ...) is to be cancelled + * Ref (ref) - Unique id for the operation + */ +static +ERL_NIF_TERM nif_cancel(ErlNifEnv* env, + int argc, + const ERL_NIF_TERM argv[]) +{ + SocketDescriptor* descP; + ERL_NIF_TERM op, opRef, result; + + SGDBG( ("SOCKET", "nif_cancel -> entry with argc: %d\r\n", argc) ); + + /* Extract arguments and perform preliminary validation */ + + if ((argc != 3) || + !enif_get_resource(env, argv[0], sockets, (void**) &descP)) { + return enif_make_badarg(env); + } + op = argv[1]; + opRef = argv[2]; + + SSDBG( descP, + ("SOCKET", "nif_cancel -> args when sock = %d:" + "\r\n op: %T" + "\r\n opRef: %T" + "\r\n", descP->sock, op, opRef) ); + + result = ncancel(env, descP, op, opRef); + + SSDBG( descP, + ("SOCKET", "nif_cancel -> done with result: " + "\r\n %T" + "\r\n", result) ); + + return result; + +} + + +static +ERL_NIF_TERM ncancel(ErlNifEnv* env, + SocketDescriptor* descP, + ERL_NIF_TERM op, + ERL_NIF_TERM opRef) +{ + /* <KOLLA> + * + * Do we really need all these variants? Should it not be enough with: + * + * connect | accept | send | recv + * + * </KOLLA> + */ + if (COMPARE(op, esock_atom_connect) == 0) { + return ncancel_connect(env, descP, opRef); + } else if (COMPARE(op, esock_atom_accept) == 0) { + return ncancel_accept(env, descP, opRef); + } else if (COMPARE(op, esock_atom_send) == 0) { + return ncancel_send(env, descP, opRef); + } else if (COMPARE(op, esock_atom_sendto) == 0) { + return ncancel_send(env, descP, opRef); + } else if (COMPARE(op, esock_atom_sendmsg) == 0) { + return ncancel_send(env, descP, opRef); + } else if (COMPARE(op, esock_atom_recv) == 0) { + return ncancel_recv(env, descP, opRef); + } else if (COMPARE(op, esock_atom_recvfrom) == 0) { + return ncancel_recv(env, descP, opRef); + } else if (COMPARE(op, esock_atom_recvmsg) == 0) { + return ncancel_recv(env, descP, opRef); + } else { + return esock_make_error(env, esock_atom_einval); + } +} + + + +/* *** ncancel_connect *** + * + * + */ +static +ERL_NIF_TERM ncancel_connect(ErlNifEnv* env, + SocketDescriptor* descP, + ERL_NIF_TERM opRef) +{ + return ncancel_write_select(env, descP, opRef); +} + + +/* *** ncancel_accept *** + * + * We have two different cases: + * *) Its the current acceptor + * Cancel the select! + * We need to activate one of the waiting acceptors. + * *) Its one of the acceptors ("waiting") in the queue + * Simply remove the acceptor from the queue. + * + */ +static +ERL_NIF_TERM ncancel_accept(ErlNifEnv* env, + SocketDescriptor* descP, + ERL_NIF_TERM opRef) +{ + ERL_NIF_TERM res; + + SSDBG( descP, + ("SOCKET", "ncancel_accept -> entry with" + "\r\n opRef: %T" + "\r\n %s" + "\r\n", opRef, + ((descP->currentAcceptorP == NULL) ? "without acceptor" : "with acceptor")) ); + + MLOCK(descP->accMtx); + + if (descP->currentAcceptorP != NULL) { + if (COMPARE(opRef, descP->currentAcceptor.ref) == 0) { + res = ncancel_accept_current(env, descP); + } else { + res = ncancel_accept_waiting(env, descP, opRef); + } + } else { + /* Or badarg? */ + res = esock_make_error(env, esock_atom_einval); + } + + MUNLOCK(descP->accMtx); + + SSDBG( descP, + ("SOCKET", "ncancel_accept -> done with result:" + "\r\n %T" + "\r\n", res) ); + + return res; +} + + +/* The current acceptor process has an ongoing select we first must + * cancel. Then we must re-activate the "first" (the first + * in the acceptor queue). + */ +static +ERL_NIF_TERM ncancel_accept_current(ErlNifEnv* env, + SocketDescriptor* descP) +{ + ERL_NIF_TERM res; + + SSDBG( descP, ("SOCKET", "ncancel_accept_current -> entry\r\n") ); + + res = ncancel_read_select(env, descP, descP->currentAcceptor.ref); + + SSDBG( descP, ("SOCKET", "ncancel_accept_current -> cancel res: %T\r\n", res) ); + + if (acceptor_pop(env, descP, + &descP->currentAcceptor.pid, + &descP->currentAcceptor.mon, + &descP->currentAcceptor.ref)) { + + /* There was another one */ + + SSDBG( descP, ("SOCKET", "ncancel_accept_current -> new (active) acceptor: " + "\r\n pid: %T" + "\r\n ref: %T" + "\r\n", + descP->currentAcceptor.pid, + descP->currentAcceptor.ref) ); + + SELECT(env, + descP->sock, + (ERL_NIF_SELECT_READ), + descP, &descP->currentAcceptor.pid, descP->currentAcceptor.ref); + + } else { + SSDBG( descP, ("SOCKET", "ncancel_accept_current -> no more acceptors\r\n") ); + descP->currentAcceptorP = NULL; + descP->state = SOCKET_STATE_LISTENING; + } + + SSDBG( descP, ("SOCKET", "ncancel_accept_current -> done with result:" + "\r\n %T" + "\r\n", res) ); + + return res; +} + + +/* These processes have not performed a select, so we can simply + * remove them from the acceptor queue. + */ +static +ERL_NIF_TERM ncancel_accept_waiting(ErlNifEnv* env, + SocketDescriptor* descP, + ERL_NIF_TERM opRef) +{ + ErlNifPid caller; + + if (enif_self(env, &caller) == NULL) + return esock_make_error(env, atom_exself); + + /* unqueue request from (acceptor) queue */ + + if (acceptor_unqueue(env, descP, &caller)) { + return esock_atom_ok; + } else { + /* Race? */ + return esock_make_error(env, esock_atom_not_found); + } +} + + + +/* *** ncancel_send *** + * + * Cancel a send operation. + * Its either the current writer or one of the waiting writers. + */ +static +ERL_NIF_TERM ncancel_send(ErlNifEnv* env, + SocketDescriptor* descP, + ERL_NIF_TERM opRef) +{ + ERL_NIF_TERM res; + + SSDBG( descP, + ("SOCKET", "ncancel_send -> entry with" + "\r\n opRef: %T" + "\r\n %s" + "\r\n", opRef, + ((descP->currentWriterP == NULL) ? "without writer" : "with writer")) ); + + MLOCK(descP->writeMtx); + + if (descP->currentWriterP != NULL) { + if (COMPARE(opRef, descP->currentWriter.ref) == 0) { + res = ncancel_send_current(env, descP); + } else { + res = ncancel_send_waiting(env, descP, opRef); + } + } else { + /* Or badarg? */ + res = esock_make_error(env, esock_atom_einval); + } + + MUNLOCK(descP->writeMtx); + + SSDBG( descP, + ("SOCKET", "ncancel_send -> done with result:" + "\r\n %T" + "\r\n", res) ); + + return res; +} + + + +/* The current writer process has an ongoing select we first must + * cancel. Then we must re-activate the "first" (the first + * in the writer queue). + */ +static +ERL_NIF_TERM ncancel_send_current(ErlNifEnv* env, + SocketDescriptor* descP) +{ + ERL_NIF_TERM res; + + SSDBG( descP, ("SOCKET", "ncancel_send_current -> entry\r\n") ); + + res = ncancel_write_select(env, descP, descP->currentWriter.ref); + + SSDBG( descP, ("SOCKET", "ncancel_send_current -> cancel res: %T\r\n", res) ); + + if (writer_pop(env, descP, + &descP->currentWriter.pid, + &descP->currentWriter.mon, + &descP->currentWriter.ref)) { + + /* There was another one */ + + SSDBG( descP, ("SOCKET", "ncancel_send_current -> new (active) writer: " + "\r\n pid: %T" + "\r\n ref: %T" + "\r\n", + descP->currentWriter.pid, + descP->currentWriter.ref) ); + + SELECT(env, + descP->sock, + (ERL_NIF_SELECT_WRITE), + descP, &descP->currentWriter.pid, descP->currentWriter.ref); + + } else { + SSDBG( descP, ("SOCKET", "ncancel_send_current -> no more writers\r\n") ); + descP->currentWriterP = NULL; + } + + SSDBG( descP, ("SOCKET", "ncancel_send_current -> done with result:" + "\r\n %T" + "\r\n", res) ); + + return res; +} + + +/* These processes have not performed a select, so we can simply + * remove them from the writer queue. + */ +static +ERL_NIF_TERM ncancel_send_waiting(ErlNifEnv* env, + SocketDescriptor* descP, + ERL_NIF_TERM opRef) +{ + ErlNifPid caller; + + if (enif_self(env, &caller) == NULL) + return esock_make_error(env, atom_exself); + + /* unqueue request from (writer) queue */ + + if (writer_unqueue(env, descP, &caller)) { + return esock_atom_ok; + } else { + /* Race? */ + return esock_make_error(env, esock_atom_not_found); + } +} + + + +/* *** ncancel_recv *** + * + * + */ +static +ERL_NIF_TERM ncancel_recv(ErlNifEnv* env, + SocketDescriptor* descP, + ERL_NIF_TERM opRef) +{ + return esock_make_error(env, esock_atom_einval); +} + + + +static +ERL_NIF_TERM ncancel_read_select(ErlNifEnv* env, + SocketDescriptor* descP, + ERL_NIF_TERM opRef) +{ + return ncancel_mode_select(env, descP, opRef, + ERL_NIF_SELECT_READ, + ERL_NIF_SELECT_READ_CANCELLED); +} + + +static +ERL_NIF_TERM ncancel_write_select(ErlNifEnv* env, + SocketDescriptor* descP, + ERL_NIF_TERM opRef) +{ + return ncancel_mode_select(env, descP, opRef, + ERL_NIF_SELECT_WRITE, + ERL_NIF_SELECT_WRITE_CANCELLED); +} + + +static +ERL_NIF_TERM ncancel_mode_select(ErlNifEnv* env, + SocketDescriptor* descP, + ERL_NIF_TERM opRef, + int smode, + int rmode) +{ + int selectRes = enif_select(env, descP->sock, + (ERL_NIF_SELECT_CANCEL | smode), + descP, NULL, opRef); + + if (selectRes & rmode) { + /* Was cancelled */ + return esock_atom_ok; + } else if (selectRes > 0) { + /* Has already sent the message */ + return esock_make_error(env, esock_atom_select_sent); + } else { + /* Stopped? */ + SSDBG( descP, ("SOCKET", "ncancel_mode_select -> failed: %d (0x%lX)" + "\r\n", selectRes, selectRes) ); + return esock_make_error(env, esock_atom_einval); + } + +} + + + +/* ---------------------------------------------------------------------- * U t i l i t y F u n c t i o n s * ---------------------------------------------------------------------- */ +/* *** send_check_writer *** + * + * Checks if we have a current writer and if that is us. If not, then we must + * be made to wait for our turn. This is done by pushing us unto the writer queue. + */ +static +BOOLEAN_T send_check_writer(ErlNifEnv* env, + SocketDescriptor* descP, + ERL_NIF_TERM ref, + ERL_NIF_TERM* checkResult) +{ + if (descP->currentWriterP != NULL) { + ErlNifPid caller; + + if (enif_self(env, &caller) == NULL) { + *checkResult = esock_make_error(env, atom_exself); + return FALSE; + } + + if (!compare_pids(env, &descP->currentWriter.pid, &caller)) { + /* Not the "current writer", so (maybe) push onto queue */ + + SSDBG( descP, + ("SOCKET", "send_check_writer -> not (current) writer\r\n") ); + + if (!writer_search4pid(env, descP, &caller)) + *checkResult = writer_push(env, descP, caller, ref); + else + *checkResult = esock_make_error(env, esock_atom_eagain); + + SSDBG( descP, + ("SOCKET", + "nsend -> queue (push) result: %T\r\n", checkResult) ); + + return FALSE; + + } + + } + + *checkResult = esock_atom_ok; // Does not actually matter in this case, but ... + + return TRUE; +} + + + +/* *** send_check_result *** + * + * Check the result of a socket send (send, sendto and sendmsg) call. + * If a "complete" send has been made, the next (waiting) writer will be + * scheduled (if there is one). + * If we did not manage to send the entire package, make another select, + * so that we can be informed when we can make another try (to send the rest), + * and return with the amount we actually managed to send (its up to the caller + * (that is the erlang code) to figure out hust much is left to send). + * If the write fail, we give up and return with the appropriate error code. + * + * What about the remaining writers!! + */ static ERL_NIF_TERM send_check_result(ErlNifEnv* env, SocketDescriptor* descP, @@ -10437,24 +11005,67 @@ ERL_NIF_TERM send_check_result(ErlNifEnv* env, cnt_inc(&descP->writePkgCnt, 1); cnt_inc(&descP->writeByteCnt, written); + DEMONP(env, descP, &descP->currentWriter.mon); SSDBG( descP, ("SOCKET", "send_check_result -> " "everything written (%d,%d) - done\r\n", dataSize, written) ); + /* Ok, this write is done maybe activate the next (if any) */ + + if (writer_pop(env, descP, + &descP->currentWriter.pid, + &descP->currentWriter.mon, + &descP->currentWriter.ref)) { + + /* There was another one */ + + SSDBG( descP, ("SOCKET", "send_check_result -> new (active) writer: " + "\r\n pid: %T" + "\r\n ref: %T" + "\r\n", + descP->currentWriter.pid, + descP->currentWriter.ref) ); + + SELECT(env, + descP->sock, + (ERL_NIF_SELECT_WRITE), + descP, &descP->currentWriter.pid, descP->currentWriter.ref); + + } else { + descP->currentWriterP = NULL; + } + return esock_atom_ok; } else if (written < 0) { - /* Ouch, check what kind of failure */ + /* Some kind of send failure - check what kind */ + if ((saveErrno != EAGAIN) && (saveErrno != EINTR)) { + ErlNifPid pid; + ErlNifMonitor mon; + ERL_NIF_TERM ref, res; + + /* + * An actual failure - we (and everyone waiting) give up + */ cnt_inc(&descP->writeFails, 1); SSDBG( descP, ("SOCKET", "send_check_result -> error: %d\r\n", saveErrno) ); - return esock_make_error_errno(env, saveErrno); + res = esock_make_error_errno(env, saveErrno); + + while (writer_pop(env, descP, &pid, &mon, &ref)) { + SSDBG( descP, + ("SOCKET", "send_check_result -> abort %T\r\n", pid) ); + send_msg_nif_abort(env, ref, res, &pid); + DEMONP(env, descP, &mon); + } + + return res; } else { @@ -10531,8 +11142,9 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env, } /* There is a special case: If the provided 'to read' value is - * zero (0). That means that we reads as much as we can, using - * the default read buffer size. + * zero (0) (only for type =/= stream). + * That means that we reads as much as we can, using the default + * read buffer size. */ if (bufP->size == read) { @@ -12541,6 +13153,8 @@ SocketDescriptor* alloc_descriptor(SOCKET sock, HANDLE event) if ((descP = enif_alloc_resource(sockets, sizeof(SocketDescriptor))) != NULL) { char buf[64]; /* Buffer used for building the mutex name */ + descP->env = enif_alloc_env(); + sprintf(buf, "socket[w,%d]", sock); descP->writeMtx = MCREATE(buf); descP->currentWriterP = NULL; // currentWriter not used @@ -13187,7 +13801,7 @@ ERL_NIF_TERM acceptor_push(ErlNifEnv* env, SocketRequestor* reqP = &e->data; reqP->pid = pid; - reqP->ref = ref; + reqP->ref = enif_make_copy(descP->env, ref); if (MONP(env, descP, &pid, &reqP->mon) > 0) { FREE(reqP); @@ -13231,6 +13845,108 @@ BOOLEAN_T acceptor_pop(ErlNifEnv* env, } +/* *** acceptor unqueue *** + * + * Remove an acceptor from the acceptor queue. + */ +static +BOOLEAN_T acceptor_unqueue(ErlNifEnv* env, + SocketDescriptor* descP, + const ErlNifPid* pid) +{ + return qunqueue(env, &descP->acceptorsQ, pid); +} + + + +/* *** writer search for pid *** + * + * Search for a pid in the writer queue. + */ +static +BOOLEAN_T writer_search4pid(ErlNifEnv* env, + SocketDescriptor* descP, + ErlNifPid* pid) +{ + return qsearch4pid(env, &descP->writersQ, pid); +} + + +/* *** writer push *** + * + * Push an writer onto the writer queue. + * This happens when we already have atleast one current writer. + */ +static +ERL_NIF_TERM writer_push(ErlNifEnv* env, + SocketDescriptor* descP, + ErlNifPid pid, + ERL_NIF_TERM ref) +{ + SocketRequestQueueElement* e = MALLOC(sizeof(SocketRequestQueueElement)); + SocketRequestor* reqP = &e->data; + + reqP->pid = pid; + reqP->ref = enif_make_copy(descP->env, ref); + + if (MONP(env, descP, &pid, &reqP->mon) > 0) { + FREE(reqP); + return esock_make_error(env, atom_exmon); + } + + qpush(&descP->writersQ, e); + + // THIS IS OK => MAKES THE CALLER WAIT FOR ITS TURN + return esock_make_error(env, esock_atom_eagain); +} + + +/* *** writer pop *** + * + * Pop an writer from the writer queue. + */ +static +BOOLEAN_T writer_pop(ErlNifEnv* env, + SocketDescriptor* descP, + ErlNifPid* pid, + ErlNifMonitor* mon, + ERL_NIF_TERM* ref) +{ + SocketRequestQueueElement* e = qpop(&descP->writersQ); + + if (e != NULL) { + *pid = e->data.pid; + *mon = e->data.mon; + *ref = e->data.ref; // At this point the ref has already been copied (env) + FREE(e); + return TRUE; + } else { + /* (acceptors) Queue was empty */ + // *pid = NULL; we have no null value for pids + // *mon = NULL; we have no null value for monitors + *ref = esock_atom_undefined; // Just in case + return FALSE; + } + +} + + +/* *** writer unqueue *** + * + * Remove an writer from the writer queue. + */ +static +BOOLEAN_T writer_unqueue(ErlNifEnv* env, + SocketDescriptor* descP, + const ErlNifPid* pid) +{ + return qunqueue(env, &descP->writersQ, pid); +} + + + + + static BOOLEAN_T qsearch4pid(ErlNifEnv* env, SocketRequestQueue* q, @@ -13673,7 +14389,7 @@ void socket_down(ErlNifEnv* env, "socket_down -> " "not current acceptor - maybe a waiting acceptor\r\n") ); - qunqueue(env, &descP->acceptorsQ, pid); + acceptor_unqueue(env, descP, pid); } } @@ -13723,7 +14439,7 @@ ErlNifFunc socket_funcs[] = * is called after the connect *select* has "completed". */ {"nif_finalize_connection", 1, nif_finalize_connection, 0}, - // {"nif_cancel", 2, nif_cancel, 0}, + {"nif_cancel", 3, nif_cancel, 0}, {"nif_finalize_close", 1, nif_finalize_close, ERL_NIF_DIRTY_JOB_IO_BOUND} }; @@ -13843,8 +14559,10 @@ int on_load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info) atom_want = MKA(env, str_want); /* Global atom(s) */ + esock_atom_accept = MKA(env, "accept"); esock_atom_addr = MKA(env, "addr"); esock_atom_any = MKA(env, "any"); + esock_atom_connect = MKA(env, "connect"); esock_atom_credentials = MKA(env, "credentials"); esock_atom_ctrl = MKA(env, "ctrl"); esock_atom_ctrunc = MKA(env, "ctrunc"); @@ -13869,6 +14587,7 @@ int on_load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info) esock_atom_loopback = MKA(env, "loopback"); esock_atom_lowdelay = MKA(env, "lowdelay"); esock_atom_mincost = MKA(env, "mincost"); + esock_atom_not_found = MKA(env, "not_found"); esock_atom_ok = MKA(env, "ok"); esock_atom_oob = MKA(env, "oob"); esock_atom_origdstaddr = MKA(env, "origdstaddr"); @@ -13878,11 +14597,18 @@ int on_load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info) esock_atom_protocol = MKA(env, "protocol"); esock_atom_raw = MKA(env, "raw"); esock_atom_rdm = MKA(env, "rdm"); + esock_atom_recv = MKA(env, "recv"); + esock_atom_recvfrom = MKA(env, "recvfrom"); + esock_atom_recvmsg = MKA(env, "recvmsg"); esock_atom_reliability = MKA(env, "reliability"); esock_atom_rights = MKA(env, "rights"); esock_atom_scope_id = MKA(env, "scope_id"); esock_atom_sctp = MKA(env, "sctp"); esock_atom_sec = MKA(env, "sec"); + esock_atom_select_sent = MKA(env, "select_sent"); + esock_atom_send = MKA(env, "send"); + esock_atom_sendmsg = MKA(env, "sendmsg"); + esock_atom_sendto = MKA(env, "sendto"); esock_atom_seqpacket = MKA(env, "seqpacket"); esock_atom_socket = MKA(env, "socket"); esock_atom_spec_dst = MKA(env, "spec_dst"); diff --git a/erts/preloaded/ebin/socket.beam b/erts/preloaded/ebin/socket.beam Binary files differindex e6a33337ba..9e6d9f4709 100644 --- a/erts/preloaded/ebin/socket.beam +++ b/erts/preloaded/ebin/socket.beam diff --git a/erts/preloaded/src/socket.erl b/erts/preloaded/src/socket.erl index ad7a35694b..652054457f 100644 --- a/erts/preloaded/src/socket.erl +++ b/erts/preloaded/src/socket.erl @@ -1137,7 +1137,7 @@ connect(#socket{ref = SockRef}, #{family := Fam} = SockAddr, Timeout) %% </KOLLA> nif_finalize_connection(SockRef) after NewTimeout -> - nif_cancel(SockRef, connect, Ref), + cancel(SockRef, connect, Ref), {error, timeout} end; {error, _} = ERROR -> @@ -1145,7 +1145,6 @@ connect(#socket{ref = SockRef}, #{family := Fam} = SockAddr, Timeout) end. - %% =========================================================================== %% %% listen - listen for connections on a socket @@ -1227,13 +1226,12 @@ do_accept(LSockRef, Timeout) -> {error, Reason} after NewTimeout -> - nif_cancel(LSockRef, accept, AccRef), - flush_select_msgs(LSockRef, AccRef), + cancel(LSockRef, accept, AccRef), {error, timeout} end; {error, _} = ERROR -> - nif_cancel(LSockRef, accept, AccRef), % Just to be on the safe side... + cancel(LSockRef, accept, AccRef), % Just to be on the safe side... ERROR end. @@ -1305,8 +1303,7 @@ do_send(SockRef, Data, EFlags, Timeout) -> {error, Reason} after NewTimeout -> - nif_cancel(SockRef, send, SendRef), - flush_select_msgs(SockRef, SendRef), + cancel(SockRef, send, SendRef), {error, {timeout, size(Data)}} end; {error, eagain} -> @@ -1319,8 +1316,7 @@ do_send(SockRef, Data, EFlags, Timeout) -> {error, Reason} after Timeout -> - nif_cancel(SockRef, send, SendRef), - flush_select_msgs(SockRef, SendRef), + cancel(SockRef, send, SendRef), {error, {timeout, size(Data)}} end; @@ -1349,10 +1345,19 @@ sendto(Socket, Data, Dest) -> Data :: binary(), Dest :: null | sockaddr(), Flags :: send_flags(), - Reason :: term(). + Reason :: term() + ; (Socket, Data, Dest, Timeout) -> ok | {error, Reason} when + Socket :: socket(), + Data :: iodata(), + Dest :: null | sockaddr(), + Timeout :: timeout(), + Reason :: term(). + +sendto(Socket, Data, Dest, Flags) when is_list(Flags) -> + sendto(Socket, Data, Dest, Flags, ?SOCKET_SENDTO_TIMEOUT_DEFAULT); +sendto(Socket, Data, Dest, Timeout) -> + sendto(Socket, Data, Dest, ?SOCKET_SENDTO_FLAGS_DEFAULT, Timeout). -sendto(Socket, Data, Dest, Flags) -> - sendto(Socket, Data, Dest, Flags, ?SOCKET_SENDTO_TIMEOUT_DEFAULT). -spec sendto(Socket, Data, Dest, Flags, Timeout) -> ok | {error, Reason} when Socket :: socket(), @@ -1403,8 +1408,7 @@ do_sendto(SockRef, Data, Dest, EFlags, Timeout) -> {error, Reason} after Timeout -> - nif_cancel(SockRef, sendto, SendRef), - flush_select_msgs(SockRef, SendRef), + cancel(SockRef, sendto, SendRef), {error, timeout} end; @@ -1414,8 +1418,7 @@ do_sendto(SockRef, Data, Dest, EFlags, Timeout) -> do_sendto(SockRef, Data, Dest, EFlags, next_timeout(TS, Timeout)) after Timeout -> - nif_cancel(SockRef, sendto, SendRef), - flush_select_msgs(SockRef, SendRef), + cancel(SockRef, sendto, SendRef), {error, timeout} end; @@ -1497,8 +1500,7 @@ do_sendmsg(SockRef, MsgHdr, EFlags, Timeout) -> do_sendmsg(SockRef, MsgHdr, EFlags, next_timeout(TS, Timeout)) after Timeout -> - nif_cancel(SockRef, sendmsg, SendRef), - flush_select_msgs(SockRef, SendRef), + cancel(SockRef, sendmsg, SendRef), {error, timeout} end; @@ -1519,62 +1521,6 @@ ensure_msghdr(_) -> %% writev - write data into multiple buffers %% -%% send(Socket, Data, Flags, Timeout) -%% when (is_list(Data) orelse is_binary(Data)) andalso is_list(Flags) -> -%% IOVec = erlang:iolist_to_iovec(Data), -%% EFlags = enc_send_flags(Flags), -%% send_iovec(Socket, IOVec, EFlags, Timeout). - - -%% %% Iterate over the IO-vector (list of binaries). - -%% send_iovec(_Socket, [] = _IOVec, _EFlags, _Timeout) -> -%% ok; -%% send_iovec({socket, _, SockRef} = Socket, [Bin|IOVec], EFlags, Timeout) -> -%% case do_send(SockRef, make_ref(), Bin, EFlags, Timeout) of -%% {ok, NewTimeout} -> -%% send_iovec(Socket, IOVec, EFlags, NewTimeout); -%% {error, _} = ERROR -> -%% ERROR -%% end. - - -%% do_send(SockRef, SendRef, Data, _EFlags, Timeout) -%% when (Timeout < 0) -> -%% nif_cancel(SockRef, SendRef), -%% flush_select_msgs(SockRef, SendRef), -%% {error, {timeout, size(Data)}}; -%% do_send(SockRef, SendRef, Data, EFlags, Timeout) -> -%% TS = timestamp(Timeout), -%% case nif_send(SockRef, SendRef, Data, EFlags) of -%% ok -> -%% {ok, next_timeout(TS, Timeout)}; -%% {ok, Written} -> -%% %% We are partially done, wait for continuation -%% receive -%% {select, SockRef, SendRef, ready_output} -> -%% <<_:Written/binary, Rest/binary>> = Data, -%% do_send(SockRef, make_ref(), Rest, EFlags, -%% next_timeout(TS, Timeout)) -%% after Timeout -> -%% nif_cancel(SockRef, SendRef), -%% flush_select_msgs(SockRef, SendRef), -%% {error, timeout} -%% end; -%% {error, eagain} -> -%% receive -%% {select, SockRef, SendRef, ready_output} -> -%% do_send(SockRef, SendRef, Data, EFlags, -%% next_timeout(TS, Timeout)) -%% after Timeout -> -%% nif_cancel(SockRef, SendRef), -%% flush_select_msgs(SockRef, SendRef), -%% {error, timeout} -%% end; - -%% {error, _} = ERROR -> -%% ERROR -%% end. %% =========================================================================== @@ -1695,8 +1641,7 @@ do_recv(SockRef, _OldRef, Length, EFlags, Acc, Timeout) after NewTimeout -> - nif_cancel(SockRef, recv, RecvRef), - flush_select_msgs(SockRef, RecvRef), + cancel(SockRef, recv, RecvRef), {error, {timeout, Acc}} end; @@ -1715,8 +1660,7 @@ do_recv(SockRef, _OldRef, Length, EFlags, Acc, Timeout) after NewTimeout -> - nif_cancel(SockRef, recv, RecvRef), - flush_select_msgs(SockRef, RecvRef), + cancel(SockRef, recv, RecvRef), {error, {timeout, Acc}} end; @@ -1739,8 +1683,7 @@ do_recv(SockRef, _OldRef, Length, EFlags, Acc, Timeout) {error, Reason} after NewTimeout -> - nif_cancel(SockRef, recv, RecvRef), - flush_select_msgs(SockRef, RecvRef), + cancel(SockRef, recv, RecvRef), {error, timeout} end; @@ -1765,7 +1708,7 @@ do_recv(SockRef, RecvRef, 0 = _Length, _Eflags, Acc, _Timeout) -> %% The current recv operation is to be cancelled, so no need for a ref... %% The cancel will end our 'read everything you have' and "activate" %% any waiting reader. - nif_cancel(SockRef, recv, RecvRef), + cancel(SockRef, recv, RecvRef), {ok, Acc}; do_recv(_SockRef, _RecvRef, _Length, _EFlags, Acc, _Timeout) when (size(Acc) > 0) -> {error, {timeout, Acc}}; @@ -1878,8 +1821,7 @@ do_recvfrom(SockRef, BufSz, EFlags, Timeout) -> {error, Reason} after NewTimeout -> - nif_cancel(SockRef, recvfrom, RecvRef), - flush_select_msgs(SockRef, RecvRef), + cancel(SockRef, recvfrom, RecvRef), {error, timeout} end; @@ -1966,8 +1908,7 @@ do_recvmsg(SockRef, BufSz, CtrlSz, EFlags, Timeout) -> {error, Reason} after NewTimeout -> - nif_cancel(SockRef, recvmsg, RecvRef), - flush_select_msgs(SockRef, RecvRef), + cancel(SockRef, recvmsg, RecvRef), {error, timeout} end; @@ -3325,10 +3266,19 @@ ensure_sockaddr(_SockAddr) -> -flush_select_msgs(LSRef, Ref) -> +cancel(SockRef, Op, OpRef) -> + case nif_cancel(SockRef, Op, OpRef) of + %% The select has already completed + {error, select_sent} -> + flush_select_msgs(SockRef, OpRef); + Other -> + Other + end. + +flush_select_msgs(SockRef, Ref) -> receive - {select, LSRef, Ref, _} -> - flush_select_msgs(LSRef, Ref) + {select, SockRef, Ref, _} -> + flush_select_msgs(SockRef, Ref) after 0 -> ok end. diff --git a/lib/kernel/test/socket_client.erl b/lib/kernel/test/socket_client.erl index 58d70b6181..1c07e799b8 100644 --- a/lib/kernel/test/socket_client.erl +++ b/lib/kernel/test/socket_client.erl @@ -21,164 +21,246 @@ -module(socket_client). -export([ - start/1, start/5, - start_tcp/1, start_tcp/2, start_tcp4/1, start_tcp6/1, - start_udp/1, start_udp/2, start_udp4/1, start_udp6/1 + start/1, start/2, start/5, start/6, + start_tcp/1, start_tcp/2, start_tcp/3, + start_tcp4/1, start_tcp4/2, start_tcp6/1, start_tcp6/2, + start_udp/1, start_udp/2, start_udp/3, + start_udp4/1, start_udp4/2, start_udp6/1, start_udp6/2 ]). -define(LIB, socket_lib). --record(client, {socket, msg = true, type, dest, msg_id = 1}). +-record(client, {socket, verbose = true, msg = true, type, dest, msg_id = 1}). start(Port) -> - start_tcp(Port). + start(Port, 1). + +start(Port, Num) -> + start_tcp(Port, Num). start_tcp(Port) -> - start_tcp4(Port). + start_tcp(Port, 1). + +start_tcp(Port, Num) -> + start_tcp4(Port, Num). start_tcp4(Port) -> - start(inet, stream, tcp, Port). + start_tcp4(Port, 1). + +start_tcp4(Port, Num) -> + start(inet, stream, tcp, Port, Num). start_tcp6(Port) -> - start(inet6, stream, tcp, Port). + start_tcp6(Port, 1). -start_tcp(Addr, Port) when (size(Addr) =:= 4) -> - start(inet, stream, tcp, Addr, Port); -start_tcp(Addr, Port) when (size(Addr) =:= 8) -> - start(inet6, stream, tcp, Addr, Port). +start_tcp6(Port, Num) -> + start(inet6, stream, tcp, Port, Num). + +start_tcp(Addr, Port, Num) when (size(Addr) =:= 4) andalso + is_integer(Num) andalso + (Num > 0) -> + start(inet, stream, tcp, Addr, Port, Num); +start_tcp(Addr, Port, Num) when (size(Addr) =:= 8) andalso + is_integer(Num) andalso + (Num > 0) -> + start(inet6, stream, tcp, Addr, Port, Num). start_udp(Port) -> - start_udp4(Port). + start_udp(Port, 1). + +start_udp(Port, Num) -> + start_udp4(Port, Num). start_udp4(Port) -> - start(inet, dgram, udp, Port). + start_udp4(Port, 1). + +start_udp4(Port, Num) -> + start(inet, dgram, udp, Port, Num). start_udp6(Port) -> - start(inet6, dgram, udp, Port). + start_udp6(Port, 1). + +start_udp6(Port, Num) -> + start(inet6, dgram, udp, Port, Num). -start_udp(Addr, Port) when (size(Addr) =:= 4) -> - start(inet, dgram, udp, Addr, Port); -start_udp(Addr, Port) when (size(Addr) =:= 8) -> - start(inet6, dgram, udp, Addr, Port). +start_udp(Addr, Port, Num) when (size(Addr) =:= 4) -> + start(inet, dgram, udp, Addr, Port, Num); +start_udp(Addr, Port, Num) when (size(Addr) =:= 8) -> + start(inet6, dgram, udp, Addr, Port, Num). -start(Domain, Type, Proto, Port) -> - start(Domain, Type, Proto, which_addr(Domain), Port). +start(Domain, Type, Proto, Port, Num) + when is_integer(Port) andalso is_integer(Num) -> + start(Domain, Type, Proto, which_addr(Domain), Port, Num); start(Domain, Type, Proto, Addr, Port) -> + start(Domain, Type, Proto, Addr, Port, 1). + +start(Domain, Type, Proto, Addr, Port, 1 = Num) -> + start(Domain, Type, Proto, Addr, Port, Num, true); +start(Domain, Type, Proto, Addr, Port, Num) + when is_integer(Num) andalso (Num > 1) -> + start(Domain, Type, Proto, Addr, Port, Num, false). + +start(Domain, Type, Proto, Addr, Port, Num, Verbose) -> put(sname, "starter"), + Clients = start_clients(Num, Domain, Type, Proto, Addr, Port, Verbose), + await_clients(Clients). + +start_clients(Num, Domain, Type, Proto, Addr, Port, Verbose) -> + start_clients(Num, 1, Domain, Type, Proto, Addr, Port, Verbose, []). + +start_clients(Num, ID, Domain, Type, Proto, Addr, Port, Verbose, Acc) + when (Num > 0) -> + StartClient = fun() -> + start_client(ID, Domain, Type, Proto, Addr, Port, Verbose) + end, + {Pid, _} = spawn_monitor(StartClient), + ?LIB:sleep(500), + i("start client ~w", [ID]), + start_clients(Num-1, ID+1, Domain, Type, Proto, Addr, Port, Verbose, [Pid|Acc]); +start_clients(_, _, _, _, _, _, _, _, Acc) -> + i("all client(s) started"), + lists:reverse(Acc). + +await_clients([]) -> + i("all clients done"); +await_clients(Clients) -> + receive + {'DOWN', _MRef, process, Pid, _Reason} -> + case lists:delete(Pid, Clients) of + Clients2 when (Clients2 =/= Clients) -> + i("client ~p done", [Pid]), + await_clients(Clients2); + _ -> + await_clients(Clients) + end + end. + + +start_client(ID, Domain, Type, Proto, Addr, Port, Verbose) -> + put(sname, ?LIB:f("client[~w]", [ID])), SA = #{family => Domain, addr => Addr, port => Port}, %% The way we use tos only works because we %% send so few messages (a new value for every %% message). - put(tos, 1), - do_start(Domain, Type, Proto, SA). + tos_init(), + do_start(Domain, Type, Proto, SA, Verbose). -do_start(Domain, stream = Type, Proto, SA) -> +do_start(Domain, stream = Type, Proto, SA, Verbose) -> try do_init(Domain, Type, Proto) of Sock -> connect(Sock, SA), - {ok, Name} = socket:sockname(Sock), - {ok, Peer} = socket:peername(Sock), - {ok, Domain} = socket:getopt(Sock, socket, domain), - {ok, Type} = socket:getopt(Sock, socket, type), - {ok, Proto} = socket:getopt(Sock, socket, protocol), - {ok, OOBI} = socket:getopt(Sock, socket, oobinline), - {ok, SndBuf} = socket:getopt(Sock, socket, sndbuf), - {ok, RcvBuf} = socket:getopt(Sock, socket, rcvbuf), - {ok, Linger} = socket:getopt(Sock, socket, linger), - {ok, MTU} = socket:getopt(Sock, ip, mtu), - {ok, MTUDisc} = socket:getopt(Sock, ip, mtu_discover), - {ok, MALL} = socket:getopt(Sock, ip, multicast_all), - {ok, MIF} = socket:getopt(Sock, ip, multicast_if), - {ok, MLoop} = socket:getopt(Sock, ip, multicast_loop), - {ok, MTTL} = socket:getopt(Sock, ip, multicast_ttl), - {ok, RecvTOS} = socket:getopt(Sock, ip, recvtos), - i("connected: " - "~n From: ~p" - "~n To: ~p" - "~nwhen" - "~n (socket) Domain: ~p" - "~n (socket) Type: ~p" - "~n (socket) Protocol: ~p" - "~n (socket) OOBInline: ~p" - "~n (socket) SndBuf: ~p" - "~n (socket) RcvBuf: ~p" - "~n (socket) Linger: ~p" - "~n (ip) MTU: ~p" - "~n (ip) MTU Discovery: ~p" - "~n (ip) Multicast ALL: ~p" - "~n (ip) Multicast IF: ~p" - "~n (ip) Multicast Loop: ~p" - "~n (ip) Multicast TTL: ~p" - "~n (ip) RecvTOS: ~p" - "~n => wait some", - [Name, Peer, - Domain, Type, Proto, - OOBI, SndBuf, RcvBuf, Linger, - MTU, MTUDisc, MALL, MIF, MLoop, MTTL, - RecvTOS]), + maybe_print_start_info(Verbose, Sock, Type), %% Give the server some time... ?LIB:sleep(5000), %% ok = socket:close(Sock), - send_loop(#client{socket = Sock, - type = Type}) + send_loop(#client{socket = Sock, + type = Type, + verbose = Verbose}) catch throw:E -> e("Failed initiate: " "~n Error: ~p", [E]) end; -do_start(Domain, dgram = Type, Proto, SA) -> +do_start(Domain, dgram = Type, Proto, SA, Verbose) -> try do_init(Domain, Type, Proto) of Sock -> + maybe_print_start_info(Verbose, Sock, Type), %% Give the server some time... - {ok, Domain} = socket:getopt(Sock, socket, domain), - {ok, Type} = socket:getopt(Sock, socket, type), - {ok, Proto} = socket:getopt(Sock, socket, protocol), - {ok, OOBI} = socket:getopt(Sock, socket, oobinline), - {ok, SndBuf} = socket:getopt(Sock, socket, sndbuf), - {ok, RcvBuf} = socket:getopt(Sock, socket, rcvbuf), - {ok, Linger} = socket:getopt(Sock, socket, linger), - {ok, MALL} = socket:getopt(Sock, ip, multicast_all), - {ok, MIF} = socket:getopt(Sock, ip, multicast_if), - {ok, MLoop} = socket:getopt(Sock, ip, multicast_loop), - {ok, MTTL} = socket:getopt(Sock, ip, multicast_ttl), - {ok, RecvTOS} = socket:getopt(Sock, ip, recvtos), - {ok, RecvTTL} = socket:getopt(Sock, ip, recvttl), - i("initiated when: " - "~n (socket) Domain: ~p" - "~n (socket) Type: ~p" - "~n (socket) Protocol: ~p" - "~n (socket) OOBInline: ~p" - "~n (socket) SndBuf: ~p" - "~n (socket) RcvBuf: ~p" - "~n (socket) Linger: ~p" - "~n (ip) Multicast ALL: ~p" - "~n (ip) Multicast IF: ~p" - "~n (ip) Multicast Loop: ~p" - "~n (ip) Multicast TTL: ~p" - "~n (ip) RecvTOS: ~p" - "~n (ip) RecvTTL: ~p" - "~n => wait some", - [Domain, Type, Proto, - OOBI, SndBuf, RcvBuf, Linger, - MALL, MIF, MLoop, MTTL, - RecvTOS, RecvTTL]), ?LIB:sleep(5000), %% ok = socket:close(Sock), - send_loop(#client{socket = Sock, - type = Type, - dest = SA}) + send_loop(#client{socket = Sock, + type = Type, + dest = SA, + verbose = Verbose}) catch throw:E -> e("Failed initiate: " "~n Error: ~p", [E]) end. +maybe_print_start_info(true = _Verbose, Sock, stream = _Type) -> + {ok, Name} = socket:sockname(Sock), + {ok, Peer} = socket:peername(Sock), + {ok, Domain} = socket:getopt(Sock, socket, domain), + {ok, Type} = socket:getopt(Sock, socket, type), + {ok, Proto} = socket:getopt(Sock, socket, protocol), + {ok, OOBI} = socket:getopt(Sock, socket, oobinline), + {ok, SndBuf} = socket:getopt(Sock, socket, sndbuf), + {ok, RcvBuf} = socket:getopt(Sock, socket, rcvbuf), + {ok, Linger} = socket:getopt(Sock, socket, linger), + {ok, MTU} = socket:getopt(Sock, ip, mtu), + {ok, MTUDisc} = socket:getopt(Sock, ip, mtu_discover), + {ok, MALL} = socket:getopt(Sock, ip, multicast_all), + {ok, MIF} = socket:getopt(Sock, ip, multicast_if), + {ok, MLoop} = socket:getopt(Sock, ip, multicast_loop), + {ok, MTTL} = socket:getopt(Sock, ip, multicast_ttl), + {ok, RecvTOS} = socket:getopt(Sock, ip, recvtos), + i("connected: " + "~n From: ~p" + "~n To: ~p" + "~nwhen" + "~n (socket) Domain: ~p" + "~n (socket) Type: ~p" + "~n (socket) Protocol: ~p" + "~n (socket) OOBInline: ~p" + "~n (socket) SndBuf: ~p" + "~n (socket) RcvBuf: ~p" + "~n (socket) Linger: ~p" + "~n (ip) MTU: ~p" + "~n (ip) MTU Discovery: ~p" + "~n (ip) Multicast ALL: ~p" + "~n (ip) Multicast IF: ~p" + "~n (ip) Multicast Loop: ~p" + "~n (ip) Multicast TTL: ~p" + "~n (ip) RecvTOS: ~p" + "~n => wait some", + [Name, Peer, + Domain, Type, Proto, + OOBI, SndBuf, RcvBuf, Linger, + MTU, MTUDisc, MALL, MIF, MLoop, MTTL, + RecvTOS]); +maybe_print_start_info(true = _Verbose, Sock, dgram = _Type) -> + {ok, Domain} = socket:getopt(Sock, socket, domain), + {ok, Type} = socket:getopt(Sock, socket, type), + {ok, Proto} = socket:getopt(Sock, socket, protocol), + {ok, OOBI} = socket:getopt(Sock, socket, oobinline), + {ok, SndBuf} = socket:getopt(Sock, socket, sndbuf), + {ok, RcvBuf} = socket:getopt(Sock, socket, rcvbuf), + {ok, Linger} = socket:getopt(Sock, socket, linger), + {ok, MALL} = socket:getopt(Sock, ip, multicast_all), + {ok, MIF} = socket:getopt(Sock, ip, multicast_if), + {ok, MLoop} = socket:getopt(Sock, ip, multicast_loop), + {ok, MTTL} = socket:getopt(Sock, ip, multicast_ttl), + {ok, RecvTOS} = socket:getopt(Sock, ip, recvtos), + {ok, RecvTTL} = socket:getopt(Sock, ip, recvttl), + i("initiated when: " + "~n (socket) Domain: ~p" + "~n (socket) Type: ~p" + "~n (socket) Protocol: ~p" + "~n (socket) OOBInline: ~p" + "~n (socket) SndBuf: ~p" + "~n (socket) RcvBuf: ~p" + "~n (socket) Linger: ~p" + "~n (ip) Multicast ALL: ~p" + "~n (ip) Multicast IF: ~p" + "~n (ip) Multicast Loop: ~p" + "~n (ip) Multicast TTL: ~p" + "~n (ip) RecvTOS: ~p" + "~n (ip) RecvTTL: ~p" + "~n => wait some", + [Domain, Type, Proto, + OOBI, SndBuf, RcvBuf, Linger, + MALL, MIF, MLoop, MTTL, + RecvTOS, RecvTTL]); +maybe_print_start_info(_Verbose, _Sock, _Type) -> + ok. + do_init(Domain, stream = Type, Proto) -> i("try (socket) open"), Sock = case socket:open(Domain, Type, Proto) of @@ -248,14 +330,25 @@ send_loop(#client{msg_id = N} = C) when (N =< 10) -> i("request ~w sent - now try read answer", [N]), case recv(C) of {ok, {Source, Msg}} -> - i("received ~w bytes of data~s", - [size(Msg), case Source of - undefined -> ""; - _ -> ?LIB:f(" from:~n ~p", [Source]) - end]), + if + (C#client.verbose =:= true) -> + i("received ~w bytes of data~s", + [size(Msg), case Source of + undefined -> ""; + _ -> ?LIB:f(" from:~n ~p", [Source]) + end]); + true -> + i("received ~w bytes", [size(Msg)]) + end, case ?LIB:dec_msg(Msg) of {reply, N, Reply} -> - i("received reply ~w: ~p", [N, Reply]), + if + (C#client.verbose =:= true) -> + i("received reply ~w: ~p", [N, Reply]); + true -> + i("received reply ~w", [N]) + end, + ?LIB:sleep(500), % Just to spread it out a bit send_loop(C#client{msg_id = N+1}) end; {error, RReason} -> @@ -268,13 +361,20 @@ send_loop(#client{msg_id = N} = C) when (N =< 10) -> "~n ~p", [N, SReason]), exit({failed_send, SReason}) end; -send_loop(#client{socket = Sock}) -> +send_loop(Client) -> + sock_close(Client). + +sock_close(#client{socket = Sock, verbose = true}) -> i("we are done - close the socket when: " "~n ~p", [socket:info()]), ok = socket:close(Sock), i("we are done - socket closed when: " - "~n ~p", [socket:info()]). + "~n ~p", [socket:info()]); +sock_close(#client{socket = Sock}) -> + i("we are done"), + ok = socket:close(Sock). + send(#client{socket = Sock, type = stream}, Msg) -> socket:send(Sock, Msg); @@ -282,11 +382,10 @@ send(#client{socket = Sock, type = dgram, dest = Dest}, Msg) -> %% i("try send to: " %% "~n ~p", [Dest]), %% ok = socket:setopt(Sock, otp, debug, true), - TOS = get(tos), + TOS = tos_next(), ok = socket:setopt(Sock, ip, tos, TOS), case socket:sendto(Sock, Msg, Dest) of ok = OK -> - put(tos, TOS+1), OK; {error, _} = ERROR -> ERROR @@ -299,31 +398,41 @@ recv(#client{socket = Sock, type = stream, msg = false}) -> {error, _} = ERROR -> ERROR end; -recv(#client{socket = Sock, type = stream, msg = true}) -> +recv(#client{socket = Sock, verbose = Verbose, type = stream, msg = true}) -> case socket:recvmsg(Sock) of %% An iov of length 1 is an simplification... {ok, #{addr := undefined = Source, iov := [Msg], ctrl := CMsgHdrs, flags := Flags}} -> - i("received message: " - "~n CMsgHdr: ~p" - "~n Flags: ~p", [CMsgHdrs, Flags]), + if + (Verbose =:= true) -> + i("received message: " + "~n CMsgHdr: ~p" + "~n Flags: ~p", [CMsgHdrs, Flags]); + true -> + ok + end, {ok, {Source, Msg}}; {error, _} = ERROR -> ERROR end; recv(#client{socket = Sock, type = dgram, msg = false}) -> socket:recvfrom(Sock); -recv(#client{socket = Sock, type = dgram, msg = true}) -> +recv(#client{socket = Sock, verbose = Verbose, type = dgram, msg = true}) -> case socket:recvmsg(Sock) of {ok, #{addr := Source, iov := [Msg], ctrl := CMsgHdrs, flags := Flags}} -> - i("received message: " - "~n CMsgHdr: ~p" - "~n Flags: ~p", [CMsgHdrs, Flags]), + if + (Verbose =:= true) -> + i("received message: " + "~n CMsgHdr: ~p" + "~n Flags: ~p", [CMsgHdrs, Flags]); + true -> + ok + end, {ok, {Source, Msg}}; {error, _} = ERROR -> ERROR @@ -402,6 +511,22 @@ which_addr2(Domain, [_|IFO]) -> %% --- +tos_init() -> + put(tos, 1). + +tos_next() -> + case get(tos) of + TOS when (TOS < 100) -> + put(tos, TOS + 1), + TOS; + _ -> + put(tos, 1), + 1 + end. + + +%% --- + e(F, A) -> ?LIB:e(F, A). diff --git a/lib/kernel/test/socket_server.erl b/lib/kernel/test/socket_server.erl index ea2bdc8e0d..9142942428 100644 --- a/lib/kernel/test/socket_server.erl +++ b/lib/kernel/test/socket_server.erl @@ -34,8 +34,10 @@ -define(LIB, socket_lib). -record(manager, {socket, msg, peek, acceptors, handler_id, handlers}). --record(acceptor, {id, socket, manager}). --record(handler, {socket, peek, msg, type, manager}). +-record(acceptor, {id, socket, manager, + atimeout = 5000}). +-record(handler, {socket, peek, msg, type, manager, + stimeout = 5000, rtimeout = 5000}). -define(NUM_ACCEPTORS, 5). @@ -521,13 +523,14 @@ acceptor_stop(Pid, _Reason) -> acceptor_init(Manager, Sock, ID) -> put(sname, f("acceptor[~w]", [ID])), Manager ! {acceptor, self(), ok}, + %% ok = socket:setopt(Sock, otp, debug, true), acceptor_loop(#acceptor{id = ID, manager = Manager, socket = Sock}). -acceptor_loop(#acceptor{socket = LSock} = A) -> +acceptor_loop(#acceptor{socket = LSock, atimeout = Timeout} = A) -> i("try accept"), - case socket:accept(LSock, infinity) of + case socket:accept(LSock, Timeout) of {ok, Sock} -> i("accepted: " "~n ~p" @@ -542,6 +545,9 @@ acceptor_loop(#acceptor{socket = LSock} = A) -> socket:close(Sock), exit({failed_starting_handler, Reason}) end; + {error, timeout} -> + i("timeout"), + acceptor_loop(A); {error, Reason} -> e("accept failure: " "~n ~p", [Reason]), @@ -900,28 +906,30 @@ peek_recvfrom(Sock, BufSz) -> end. -send(#handler{socket = Sock, msg = true, type = stream}, Msg, _) -> +send(#handler{socket = Sock, msg = true, type = stream, stimeout = Timeout}, + Msg, _) -> CMsgHdr = #{level => ip, type => tos, data => reliability}, CMsgHdrs = [CMsgHdr], MsgHdr = #{iov => [Msg], ctrl => CMsgHdrs}, %% socket:setopt(Sock, otp, debug, true), - Res = socket:sendmsg(Sock, MsgHdr), + Res = socket:sendmsg(Sock, MsgHdr, Timeout), %% socket:setopt(Sock, otp, debug, false), Res; -send(#handler{socket = Sock, type = stream}, Msg, _) -> - socket:send(Sock, Msg); -send(#handler{socket = Sock, msg = true, type = dgram}, Msg, Dest) -> +send(#handler{socket = Sock, type = stream, stimeout = Timeout}, Msg, _) -> + socket:send(Sock, Msg, Timeout); +send(#handler{socket = Sock, msg = true, type = dgram, stimeout = Timeout}, + Msg, Dest) -> CMsgHdr = #{level => ip, type => tos, data => reliability}, CMsgHdrs = [CMsgHdr], MsgHdr = #{addr => Dest, iov => [Msg], ctrl => CMsgHdrs}, %% ok = socket:setopt(Sock, otp, debug, true), - Res = socket:sendmsg(Sock, MsgHdr), + Res = socket:sendmsg(Sock, MsgHdr, Timeout), %% ok = socket:setopt(Sock, otp, debug, false), Res; -send(#handler{socket = Sock, type = dgram}, Msg, Dest) -> - socket:sendto(Sock, Msg, Dest). +send(#handler{socket = Sock, type = dgram, stimeout = Timeout}, Msg, Dest) -> + socket:sendto(Sock, Msg, Dest, Timeout). %% filler() -> %% list_to_binary(lists:duplicate(2048, " FILLER ")). |