diff options
-rw-r--r-- | erts/emulator/nifs/common/socket_nif.c | 384 | ||||
-rw-r--r-- | erts/preloaded/ebin/socket.beam | bin | 65868 -> 66040 bytes | |||
-rw-r--r-- | erts/preloaded/src/socket.erl | 15 | ||||
-rw-r--r-- | lib/kernel/test/socket_client.erl | 344 | ||||
-rw-r--r-- | lib/kernel/test/socket_server.erl | 24 |
5 files changed, 616 insertions, 151 deletions
diff --git a/erts/emulator/nifs/common/socket_nif.c b/erts/emulator/nifs/common/socket_nif.c index fde2349234..04c3609b32 100644 --- a/erts/emulator/nifs/common/socket_nif.c +++ b/erts/emulator/nifs/common/socket_nif.c @@ -1839,6 +1839,11 @@ static ERL_NIF_TERM ncancel_accept_waiting(ErlNifEnv* env, static ERL_NIF_TERM ncancel_send(ErlNifEnv* env, SocketDescriptor* descP, ERL_NIF_TERM opRef); +static ERL_NIF_TERM ncancel_send_current(ErlNifEnv* env, + SocketDescriptor* descP); +static ERL_NIF_TERM ncancel_send_waiting(ErlNifEnv* env, + SocketDescriptor* descP, + ERL_NIF_TERM opRef); static ERL_NIF_TERM ncancel_recv(ErlNifEnv* env, SocketDescriptor* descP, ERL_NIF_TERM opRef); @@ -1894,6 +1899,10 @@ static ERL_NIF_TERM ngetopt_timeval_opt(ErlNifEnv* env, int level, int opt); +static BOOLEAN_T send_check_writer(ErlNifEnv* env, + SocketDescriptor* descP, + ERL_NIF_TERM ref, + ERL_NIF_TERM* checkResult); static ERL_NIF_TERM send_check_result(ErlNifEnv* env, SocketDescriptor* descP, ssize_t written, @@ -2123,6 +2132,22 @@ static BOOLEAN_T acceptor_unqueue(ErlNifEnv* env, SocketDescriptor* descP, const ErlNifPid* pid); +static BOOLEAN_T writer_search4pid(ErlNifEnv* env, + SocketDescriptor* descP, + ErlNifPid* pid); +static ERL_NIF_TERM writer_push(ErlNifEnv* env, + SocketDescriptor* descP, + ErlNifPid pid, + ERL_NIF_TERM ref); +static BOOLEAN_T writer_pop(ErlNifEnv* env, + SocketDescriptor* descP, + ErlNifPid* pid, + ErlNifMonitor* mon, + ERL_NIF_TERM* ref); +static BOOLEAN_T writer_unqueue(ErlNifEnv* env, + SocketDescriptor* descP, + const ErlNifPid* pid); + static BOOLEAN_T qsearch4pid(ErlNifEnv* env, SocketRequestQueue* q, ErlNifPid* pid); @@ -3399,7 +3424,7 @@ ERL_NIF_TERM naccept_accepting(ErlNifEnv* env, if (!compare_pids(env, &descP->currentAcceptor.pid, &caller)) { - /* Not the "current caller", so (maybe) push onto queue */ + /* Not the "current acceptor", so (maybe) push onto queue */ SSDBG( descP, ("SOCKET", "naccept_accepting -> not (active) acceptor\r\n") ); @@ -3459,6 +3484,8 @@ ERL_NIF_TERM naccept_accepting(ErlNifEnv* env, SSDBG( descP, ("SOCKET", "naccept_accepting -> accept success\r\n") ); + DEMONP(env, descP, &descP->currentAcceptor.mon); + if ((accEvent = sock_create_event(accSock)) == INVALID_EVENT) { save_errno = sock_errno(); while ((sock_close(accSock) == INVALID_SOCKET) && @@ -3499,10 +3526,7 @@ ERL_NIF_TERM naccept_accepting(ErlNifEnv* env, accDescP->state = SOCKET_STATE_CONNECTED; - /* Here we should have the test if we have something in the queue. - * And if so, pop it and copy the (waiting) acceptor, and then - * make a new select with that info). - */ + /* Check if there are waiting acceptors (popping the acceptor queue) */ if (acceptor_pop(env, descP, &descP->currentAcceptor.pid, @@ -3607,12 +3631,12 @@ ERL_NIF_TERM nif_send(ErlNifEnv* env, } -/* What do we do when another process tries to write - * when the current writer has a select already waiting? - * Queue it? And what about simultaneous read and write? - * Queue up all operations towards the socket? +/* *** nsend *** * - * We (may) need a currentOp field and an ops queue field. + * Do the actual send. + * Do some initial writer checks, do the actual send and then + * analyze the result. If we are done, another writer may be + * scheduled (if there is one in the writer queue). */ static ERL_NIF_TERM nsend(ErlNifEnv* env, @@ -3621,12 +3645,17 @@ ERL_NIF_TERM nsend(ErlNifEnv* env, ErlNifBinary* sndDataP, int flags) { - int save_errno; - ssize_t written; + int save_errno; + ssize_t written; + ERL_NIF_TERM writerCheck; if (!descP->isWritable) return enif_make_badarg(env); + /* Check if there is already a current writer and if its us */ + if (!send_check_writer(env, descP, sendRef, &writerCheck)) + return writerCheck; + /* We ignore the wrap for the moment. * Maybe we should issue a wrap-message to controlling process... */ @@ -3645,6 +3674,7 @@ ERL_NIF_TERM nsend(ErlNifEnv* env, } + /* ---------------------------------------------------------------------- * nif_sendto * @@ -3710,9 +3740,13 @@ ERL_NIF_TERM nif_sendto(ErlNifEnv* env, &remoteAddrLen)) != NULL) return esock_make_error_str(env, xres); + MLOCK(descP->writeMtx); + res = nsendto(env, descP, sendRef, &sndData, flags, &remoteAddr, remoteAddrLen); + MUNLOCK(descP->writeMtx); + SGDBG( ("SOCKET", "nif_sendto -> done with result: " "\r\n %T" "\r\n", res) ); @@ -3730,12 +3764,17 @@ ERL_NIF_TERM nsendto(ErlNifEnv* env, SocketAddress* toAddrP, unsigned int toAddrLen) { - int save_errno; - ssize_t written; + int save_errno; + ssize_t written; + ERL_NIF_TERM writerCheck; if (!descP->isWritable) return enif_make_badarg(env); + /* Check if there is already a current writer and if its us */ + if (!send_check_writer(env, descP, sendRef, &writerCheck)) + return writerCheck; + /* We ignore the wrap for the moment. * Maybe we should issue a wrap-message to controlling process... */ @@ -3811,8 +3850,12 @@ ERL_NIF_TERM nif_sendmsg(ErlNifEnv* env, if (!esendflags2sendflags(eflags, &flags)) return esock_make_error(env, esock_atom_einval); + MLOCK(descP->writeMtx); + res = nsendmsg(env, descP, sendRef, eMsgHdr, flags); + MUNLOCK(descP->writeMtx); + SSDBG( descP, ("SOCKET", "nif_sendmsg -> done with result: " "\r\n %T" @@ -3839,12 +3882,16 @@ ERL_NIF_TERM nsendmsg(ErlNifEnv* env, size_t ctrlBufLen, ctrlBufUsed; int save_errno; ssize_t written, dataSize; + ERL_NIF_TERM writerCheck; char* xres; if (!descP->isWritable) return enif_make_badarg(env); - + /* Check if there is already a current writer and if its us */ + if (!send_check_writer(env, descP, sendRef, &writerCheck)) + return writerCheck; + /* Depending on if we are *connected* or not, we require * different things in the msghdr map. */ @@ -10607,7 +10654,7 @@ ERL_NIF_TERM ncancel_accept(ErlNifEnv* env, } -/* The current process has an ongoing select we first must +/* The current acceptor process has an ongoing select we first must * cancel. Then we must re-activate the "first" (the first * in the acceptor queue). */ @@ -10664,7 +10711,7 @@ ERL_NIF_TERM ncancel_accept_waiting(ErlNifEnv* env, SocketDescriptor* descP, ERL_NIF_TERM opRef) { - ErlNifPid caller; + ErlNifPid caller; if (enif_self(env, &caller) == NULL) return esock_make_error(env, atom_exself); @@ -10683,14 +10730,117 @@ ERL_NIF_TERM ncancel_accept_waiting(ErlNifEnv* env, /* *** ncancel_send *** * - * + * Cancel a send operation. + * Its either the current writer or one of the waiting writers. */ static ERL_NIF_TERM ncancel_send(ErlNifEnv* env, SocketDescriptor* descP, ERL_NIF_TERM opRef) { - return esock_make_error(env, esock_atom_einval); + ERL_NIF_TERM res; + + SSDBG( descP, + ("SOCKET", "ncancel_send -> entry with" + "\r\n opRef: %T" + "\r\n %s" + "\r\n", opRef, + ((descP->currentWriterP == NULL) ? "without writer" : "with writer")) ); + + MLOCK(descP->writeMtx); + + if (descP->currentWriterP != NULL) { + if (COMPARE(opRef, descP->currentWriter.ref) == 0) { + res = ncancel_send_current(env, descP); + } else { + res = ncancel_send_waiting(env, descP, opRef); + } + } else { + /* Or badarg? */ + res = esock_make_error(env, esock_atom_einval); + } + + MUNLOCK(descP->writeMtx); + + SSDBG( descP, + ("SOCKET", "ncancel_send -> done with result:" + "\r\n %T" + "\r\n", res) ); + + return res; +} + + + +/* The current writer process has an ongoing select we first must + * cancel. Then we must re-activate the "first" (the first + * in the writer queue). + */ +static +ERL_NIF_TERM ncancel_send_current(ErlNifEnv* env, + SocketDescriptor* descP) +{ + ERL_NIF_TERM res; + + SSDBG( descP, ("SOCKET", "ncancel_send_current -> entry\r\n") ); + + res = ncancel_write_select(env, descP, descP->currentWriter.ref); + + SSDBG( descP, ("SOCKET", "ncancel_send_current -> cancel res: %T\r\n", res) ); + + if (writer_pop(env, descP, + &descP->currentWriter.pid, + &descP->currentWriter.mon, + &descP->currentWriter.ref)) { + + /* There was another one */ + + SSDBG( descP, ("SOCKET", "ncancel_send_current -> new (active) writer: " + "\r\n pid: %T" + "\r\n ref: %T" + "\r\n", + descP->currentWriter.pid, + descP->currentWriter.ref) ); + + SELECT(env, + descP->sock, + (ERL_NIF_SELECT_WRITE), + descP, &descP->currentWriter.pid, descP->currentWriter.ref); + + } else { + SSDBG( descP, ("SOCKET", "ncancel_send_current -> no more writers\r\n") ); + descP->currentWriterP = NULL; + } + + SSDBG( descP, ("SOCKET", "ncancel_send_current -> done with result:" + "\r\n %T" + "\r\n", res) ); + + return res; +} + + +/* These processes have not performed a select, so we can simply + * remove them from the writer queue. + */ +static +ERL_NIF_TERM ncancel_send_waiting(ErlNifEnv* env, + SocketDescriptor* descP, + ERL_NIF_TERM opRef) +{ + ErlNifPid caller; + + if (enif_self(env, &caller) == NULL) + return esock_make_error(env, atom_exself); + + /* unqueue request from (writer) queue */ + + if (writer_unqueue(env, descP, &caller)) { + return esock_atom_ok; + } else { + /* Race? */ + return esock_make_error(env, esock_atom_not_found); + } } @@ -10764,6 +10914,66 @@ ERL_NIF_TERM ncancel_mode_select(ErlNifEnv* env, * ---------------------------------------------------------------------- */ +/* *** send_check_writer *** + * + * Checks if we have a current writer and if that is us. If not, then we must + * be made to wait for our turn. This is done by pushing us unto the writer queue. + */ +static +BOOLEAN_T send_check_writer(ErlNifEnv* env, + SocketDescriptor* descP, + ERL_NIF_TERM ref, + ERL_NIF_TERM* checkResult) +{ + if (descP->currentWriterP != NULL) { + ErlNifPid caller; + + if (enif_self(env, &caller) == NULL) { + *checkResult = esock_make_error(env, atom_exself); + return FALSE; + } + + if (!compare_pids(env, &descP->currentWriter.pid, &caller)) { + /* Not the "current writer", so (maybe) push onto queue */ + + SSDBG( descP, + ("SOCKET", "send_check_writer -> not (current) writer\r\n") ); + + if (!writer_search4pid(env, descP, &caller)) + *checkResult = writer_push(env, descP, caller, ref); + else + *checkResult = esock_make_error(env, esock_atom_eagain); + + SSDBG( descP, + ("SOCKET", + "nsend -> queue (push) result: %T\r\n", checkResult) ); + + return FALSE; + + } + + } + + *checkResult = esock_atom_ok; // Does not actually matter in this case, but ... + + return TRUE; +} + + + +/* *** send_check_result *** + * + * Check the result of a socket send (send, sendto and sendmsg) call. + * If a "complete" send has been made, the next (waiting) writer will be + * scheduled (if there is one). + * If we did not manage to send the entire package, make another select, + * so that we can be informed when we can make another try (to send the rest), + * and return with the amount we actually managed to send (its up to the caller + * (that is the erlang code) to figure out hust much is left to send). + * If the write fail, we give up and return with the appropriate error code. + * + * What about the remaining writers!! + */ static ERL_NIF_TERM send_check_result(ErlNifEnv* env, SocketDescriptor* descP, @@ -10783,24 +10993,67 @@ ERL_NIF_TERM send_check_result(ErlNifEnv* env, cnt_inc(&descP->writePkgCnt, 1); cnt_inc(&descP->writeByteCnt, written); + DEMONP(env, descP, &descP->currentWriter.mon); SSDBG( descP, ("SOCKET", "send_check_result -> " "everything written (%d,%d) - done\r\n", dataSize, written) ); + /* Ok, this write is done maybe activate the next (if any) */ + + if (writer_pop(env, descP, + &descP->currentWriter.pid, + &descP->currentWriter.mon, + &descP->currentWriter.ref)) { + + /* There was another one */ + + SSDBG( descP, ("SOCKET", "send_check_result -> new (active) writer: " + "\r\n pid: %T" + "\r\n ref: %T" + "\r\n", + descP->currentWriter.pid, + descP->currentWriter.ref) ); + + SELECT(env, + descP->sock, + (ERL_NIF_SELECT_WRITE), + descP, &descP->currentWriter.pid, descP->currentWriter.ref); + + } else { + descP->currentWriterP = NULL; + } + return esock_atom_ok; } else if (written < 0) { - /* Ouch, check what kind of failure */ + /* Some kind of send failure - check what kind */ + if ((saveErrno != EAGAIN) && (saveErrno != EINTR)) { + ErlNifPid pid; + ErlNifMonitor mon; + ERL_NIF_TERM ref, res; + + /* + * An actual failure - we (and everyone waiting) give up + */ cnt_inc(&descP->writeFails, 1); SSDBG( descP, ("SOCKET", "send_check_result -> error: %d\r\n", saveErrno) ); - return esock_make_error_errno(env, saveErrno); + res = esock_make_error_errno(env, saveErrno); + + while (writer_pop(env, descP, &pid, &mon, &ref)) { + SSDBG( descP, + ("SOCKET", "send_check_result -> abort %T\r\n", pid) ); + send_msg_nif_abort(env, ref, res, &pid); + DEMONP(env, descP, &mon); + } + + return res; } else { @@ -13561,6 +13814,95 @@ BOOLEAN_T acceptor_unqueue(ErlNifEnv* env, } + +/* *** writer search for pid *** + * + * Search for a pid in the writer queue. + */ +static +BOOLEAN_T writer_search4pid(ErlNifEnv* env, + SocketDescriptor* descP, + ErlNifPid* pid) +{ + return qsearch4pid(env, &descP->writersQ, pid); +} + + +/* *** writer push *** + * + * Push an writer onto the writer queue. + * This happens when we already have atleast one current writer. + */ +static +ERL_NIF_TERM writer_push(ErlNifEnv* env, + SocketDescriptor* descP, + ErlNifPid pid, + ERL_NIF_TERM ref) +{ + SocketRequestQueueElement* e = MALLOC(sizeof(SocketRequestQueueElement)); + SocketRequestor* reqP = &e->data; + + reqP->pid = pid; + reqP->ref = enif_make_copy(descP->env, ref); + + if (MONP(env, descP, &pid, &reqP->mon) > 0) { + FREE(reqP); + return esock_make_error(env, atom_exmon); + } + + qpush(&descP->writersQ, e); + + // THIS IS OK => MAKES THE CALLER WAIT FOR ITS TURN + return esock_make_error(env, esock_atom_eagain); +} + + +/* *** writer pop *** + * + * Pop an writer from the writer queue. + */ +static +BOOLEAN_T writer_pop(ErlNifEnv* env, + SocketDescriptor* descP, + ErlNifPid* pid, + ErlNifMonitor* mon, + ERL_NIF_TERM* ref) +{ + SocketRequestQueueElement* e = qpop(&descP->writersQ); + + if (e != NULL) { + *pid = e->data.pid; + *mon = e->data.mon; + *ref = e->data.ref; // At this point the ref has already been copied (env) + FREE(e); + return TRUE; + } else { + /* (acceptors) Queue was empty */ + // *pid = NULL; we have no null value for pids + // *mon = NULL; we have no null value for monitors + *ref = esock_atom_undefined; // Just in case + return FALSE; + } + +} + + +/* *** writer unqueue *** + * + * Remove an writer from the writer queue. + */ +static +BOOLEAN_T writer_unqueue(ErlNifEnv* env, + SocketDescriptor* descP, + const ErlNifPid* pid) +{ + return qunqueue(env, &descP->writersQ, pid); +} + + + + + static BOOLEAN_T qsearch4pid(ErlNifEnv* env, SocketRequestQueue* q, diff --git a/erts/preloaded/ebin/socket.beam b/erts/preloaded/ebin/socket.beam Binary files differindex 2475dce37b..9e6d9f4709 100644 --- a/erts/preloaded/ebin/socket.beam +++ b/erts/preloaded/ebin/socket.beam diff --git a/erts/preloaded/src/socket.erl b/erts/preloaded/src/socket.erl index 1c16c94711..652054457f 100644 --- a/erts/preloaded/src/socket.erl +++ b/erts/preloaded/src/socket.erl @@ -1345,10 +1345,19 @@ sendto(Socket, Data, Dest) -> Data :: binary(), Dest :: null | sockaddr(), Flags :: send_flags(), - Reason :: term(). + Reason :: term() + ; (Socket, Data, Dest, Timeout) -> ok | {error, Reason} when + Socket :: socket(), + Data :: iodata(), + Dest :: null | sockaddr(), + Timeout :: timeout(), + Reason :: term(). + +sendto(Socket, Data, Dest, Flags) when is_list(Flags) -> + sendto(Socket, Data, Dest, Flags, ?SOCKET_SENDTO_TIMEOUT_DEFAULT); +sendto(Socket, Data, Dest, Timeout) -> + sendto(Socket, Data, Dest, ?SOCKET_SENDTO_FLAGS_DEFAULT, Timeout). -sendto(Socket, Data, Dest, Flags) -> - sendto(Socket, Data, Dest, Flags, ?SOCKET_SENDTO_TIMEOUT_DEFAULT). -spec sendto(Socket, Data, Dest, Flags, Timeout) -> ok | {error, Reason} when Socket :: socket(), diff --git a/lib/kernel/test/socket_client.erl b/lib/kernel/test/socket_client.erl index 6cd353fd07..1c07e799b8 100644 --- a/lib/kernel/test/socket_client.erl +++ b/lib/kernel/test/socket_client.erl @@ -21,53 +21,126 @@ -module(socket_client). -export([ - start/1, start/5, - start_tcp/1, start_tcp/2, start_tcp4/1, start_tcp6/1, - start_udp/1, start_udp/2, start_udp4/1, start_udp6/1 + start/1, start/2, start/5, start/6, + start_tcp/1, start_tcp/2, start_tcp/3, + start_tcp4/1, start_tcp4/2, start_tcp6/1, start_tcp6/2, + start_udp/1, start_udp/2, start_udp/3, + start_udp4/1, start_udp4/2, start_udp6/1, start_udp6/2 ]). -define(LIB, socket_lib). --record(client, {socket, msg = true, type, dest, msg_id = 1}). +-record(client, {socket, verbose = true, msg = true, type, dest, msg_id = 1}). start(Port) -> - start_tcp(Port). + start(Port, 1). + +start(Port, Num) -> + start_tcp(Port, Num). start_tcp(Port) -> - start_tcp4(Port). + start_tcp(Port, 1). + +start_tcp(Port, Num) -> + start_tcp4(Port, Num). start_tcp4(Port) -> - start(inet, stream, tcp, Port). + start_tcp4(Port, 1). + +start_tcp4(Port, Num) -> + start(inet, stream, tcp, Port, Num). start_tcp6(Port) -> - start(inet6, stream, tcp, Port). + start_tcp6(Port, 1). + +start_tcp6(Port, Num) -> + start(inet6, stream, tcp, Port, Num). -start_tcp(Addr, Port) when (size(Addr) =:= 4) -> - start(inet, stream, tcp, Addr, Port); -start_tcp(Addr, Port) when (size(Addr) =:= 8) -> - start(inet6, stream, tcp, Addr, Port). +start_tcp(Addr, Port, Num) when (size(Addr) =:= 4) andalso + is_integer(Num) andalso + (Num > 0) -> + start(inet, stream, tcp, Addr, Port, Num); +start_tcp(Addr, Port, Num) when (size(Addr) =:= 8) andalso + is_integer(Num) andalso + (Num > 0) -> + start(inet6, stream, tcp, Addr, Port, Num). start_udp(Port) -> - start_udp4(Port). + start_udp(Port, 1). + +start_udp(Port, Num) -> + start_udp4(Port, Num). start_udp4(Port) -> - start(inet, dgram, udp, Port). + start_udp4(Port, 1). + +start_udp4(Port, Num) -> + start(inet, dgram, udp, Port, Num). start_udp6(Port) -> - start(inet6, dgram, udp, Port). + start_udp6(Port, 1). -start_udp(Addr, Port) when (size(Addr) =:= 4) -> - start(inet, dgram, udp, Addr, Port); -start_udp(Addr, Port) when (size(Addr) =:= 8) -> - start(inet6, dgram, udp, Addr, Port). +start_udp6(Port, Num) -> + start(inet6, dgram, udp, Port, Num). +start_udp(Addr, Port, Num) when (size(Addr) =:= 4) -> + start(inet, dgram, udp, Addr, Port, Num); +start_udp(Addr, Port, Num) when (size(Addr) =:= 8) -> + start(inet6, dgram, udp, Addr, Port, Num). -start(Domain, Type, Proto, Port) -> - start(Domain, Type, Proto, which_addr(Domain), Port). + +start(Domain, Type, Proto, Port, Num) + when is_integer(Port) andalso is_integer(Num) -> + start(Domain, Type, Proto, which_addr(Domain), Port, Num); start(Domain, Type, Proto, Addr, Port) -> + start(Domain, Type, Proto, Addr, Port, 1). + +start(Domain, Type, Proto, Addr, Port, 1 = Num) -> + start(Domain, Type, Proto, Addr, Port, Num, true); +start(Domain, Type, Proto, Addr, Port, Num) + when is_integer(Num) andalso (Num > 1) -> + start(Domain, Type, Proto, Addr, Port, Num, false). + +start(Domain, Type, Proto, Addr, Port, Num, Verbose) -> put(sname, "starter"), + Clients = start_clients(Num, Domain, Type, Proto, Addr, Port, Verbose), + await_clients(Clients). + +start_clients(Num, Domain, Type, Proto, Addr, Port, Verbose) -> + start_clients(Num, 1, Domain, Type, Proto, Addr, Port, Verbose, []). + +start_clients(Num, ID, Domain, Type, Proto, Addr, Port, Verbose, Acc) + when (Num > 0) -> + StartClient = fun() -> + start_client(ID, Domain, Type, Proto, Addr, Port, Verbose) + end, + {Pid, _} = spawn_monitor(StartClient), + ?LIB:sleep(500), + i("start client ~w", [ID]), + start_clients(Num-1, ID+1, Domain, Type, Proto, Addr, Port, Verbose, [Pid|Acc]); +start_clients(_, _, _, _, _, _, _, _, Acc) -> + i("all client(s) started"), + lists:reverse(Acc). + +await_clients([]) -> + i("all clients done"); +await_clients(Clients) -> + receive + {'DOWN', _MRef, process, Pid, _Reason} -> + case lists:delete(Pid, Clients) of + Clients2 when (Clients2 =/= Clients) -> + i("client ~p done", [Pid]), + await_clients(Clients2); + _ -> + await_clients(Clients) + end + end. + + +start_client(ID, Domain, Type, Proto, Addr, Port, Verbose) -> + put(sname, ?LIB:f("client[~w]", [ID])), SA = #{family => Domain, addr => Addr, port => Port}, @@ -75,110 +148,119 @@ start(Domain, Type, Proto, Addr, Port) -> %% send so few messages (a new value for every %% message). tos_init(), - do_start(Domain, Type, Proto, SA). + do_start(Domain, Type, Proto, SA, Verbose). -do_start(Domain, stream = Type, Proto, SA) -> +do_start(Domain, stream = Type, Proto, SA, Verbose) -> try do_init(Domain, Type, Proto) of Sock -> connect(Sock, SA), - {ok, Name} = socket:sockname(Sock), - {ok, Peer} = socket:peername(Sock), - {ok, Domain} = socket:getopt(Sock, socket, domain), - {ok, Type} = socket:getopt(Sock, socket, type), - {ok, Proto} = socket:getopt(Sock, socket, protocol), - {ok, OOBI} = socket:getopt(Sock, socket, oobinline), - {ok, SndBuf} = socket:getopt(Sock, socket, sndbuf), - {ok, RcvBuf} = socket:getopt(Sock, socket, rcvbuf), - {ok, Linger} = socket:getopt(Sock, socket, linger), - {ok, MTU} = socket:getopt(Sock, ip, mtu), - {ok, MTUDisc} = socket:getopt(Sock, ip, mtu_discover), - {ok, MALL} = socket:getopt(Sock, ip, multicast_all), - {ok, MIF} = socket:getopt(Sock, ip, multicast_if), - {ok, MLoop} = socket:getopt(Sock, ip, multicast_loop), - {ok, MTTL} = socket:getopt(Sock, ip, multicast_ttl), - {ok, RecvTOS} = socket:getopt(Sock, ip, recvtos), - i("connected: " - "~n From: ~p" - "~n To: ~p" - "~nwhen" - "~n (socket) Domain: ~p" - "~n (socket) Type: ~p" - "~n (socket) Protocol: ~p" - "~n (socket) OOBInline: ~p" - "~n (socket) SndBuf: ~p" - "~n (socket) RcvBuf: ~p" - "~n (socket) Linger: ~p" - "~n (ip) MTU: ~p" - "~n (ip) MTU Discovery: ~p" - "~n (ip) Multicast ALL: ~p" - "~n (ip) Multicast IF: ~p" - "~n (ip) Multicast Loop: ~p" - "~n (ip) Multicast TTL: ~p" - "~n (ip) RecvTOS: ~p" - "~n => wait some", - [Name, Peer, - Domain, Type, Proto, - OOBI, SndBuf, RcvBuf, Linger, - MTU, MTUDisc, MALL, MIF, MLoop, MTTL, - RecvTOS]), + maybe_print_start_info(Verbose, Sock, Type), %% Give the server some time... ?LIB:sleep(5000), %% ok = socket:close(Sock), - send_loop(#client{socket = Sock, - type = Type}) + send_loop(#client{socket = Sock, + type = Type, + verbose = Verbose}) catch throw:E -> e("Failed initiate: " "~n Error: ~p", [E]) end; -do_start(Domain, dgram = Type, Proto, SA) -> +do_start(Domain, dgram = Type, Proto, SA, Verbose) -> try do_init(Domain, Type, Proto) of Sock -> + maybe_print_start_info(Verbose, Sock, Type), %% Give the server some time... - {ok, Domain} = socket:getopt(Sock, socket, domain), - {ok, Type} = socket:getopt(Sock, socket, type), - {ok, Proto} = socket:getopt(Sock, socket, protocol), - {ok, OOBI} = socket:getopt(Sock, socket, oobinline), - {ok, SndBuf} = socket:getopt(Sock, socket, sndbuf), - {ok, RcvBuf} = socket:getopt(Sock, socket, rcvbuf), - {ok, Linger} = socket:getopt(Sock, socket, linger), - {ok, MALL} = socket:getopt(Sock, ip, multicast_all), - {ok, MIF} = socket:getopt(Sock, ip, multicast_if), - {ok, MLoop} = socket:getopt(Sock, ip, multicast_loop), - {ok, MTTL} = socket:getopt(Sock, ip, multicast_ttl), - {ok, RecvTOS} = socket:getopt(Sock, ip, recvtos), - {ok, RecvTTL} = socket:getopt(Sock, ip, recvttl), - i("initiated when: " - "~n (socket) Domain: ~p" - "~n (socket) Type: ~p" - "~n (socket) Protocol: ~p" - "~n (socket) OOBInline: ~p" - "~n (socket) SndBuf: ~p" - "~n (socket) RcvBuf: ~p" - "~n (socket) Linger: ~p" - "~n (ip) Multicast ALL: ~p" - "~n (ip) Multicast IF: ~p" - "~n (ip) Multicast Loop: ~p" - "~n (ip) Multicast TTL: ~p" - "~n (ip) RecvTOS: ~p" - "~n (ip) RecvTTL: ~p" - "~n => wait some", - [Domain, Type, Proto, - OOBI, SndBuf, RcvBuf, Linger, - MALL, MIF, MLoop, MTTL, - RecvTOS, RecvTTL]), ?LIB:sleep(5000), %% ok = socket:close(Sock), - send_loop(#client{socket = Sock, - type = Type, - dest = SA}) + send_loop(#client{socket = Sock, + type = Type, + dest = SA, + verbose = Verbose}) catch throw:E -> e("Failed initiate: " "~n Error: ~p", [E]) end. +maybe_print_start_info(true = _Verbose, Sock, stream = _Type) -> + {ok, Name} = socket:sockname(Sock), + {ok, Peer} = socket:peername(Sock), + {ok, Domain} = socket:getopt(Sock, socket, domain), + {ok, Type} = socket:getopt(Sock, socket, type), + {ok, Proto} = socket:getopt(Sock, socket, protocol), + {ok, OOBI} = socket:getopt(Sock, socket, oobinline), + {ok, SndBuf} = socket:getopt(Sock, socket, sndbuf), + {ok, RcvBuf} = socket:getopt(Sock, socket, rcvbuf), + {ok, Linger} = socket:getopt(Sock, socket, linger), + {ok, MTU} = socket:getopt(Sock, ip, mtu), + {ok, MTUDisc} = socket:getopt(Sock, ip, mtu_discover), + {ok, MALL} = socket:getopt(Sock, ip, multicast_all), + {ok, MIF} = socket:getopt(Sock, ip, multicast_if), + {ok, MLoop} = socket:getopt(Sock, ip, multicast_loop), + {ok, MTTL} = socket:getopt(Sock, ip, multicast_ttl), + {ok, RecvTOS} = socket:getopt(Sock, ip, recvtos), + i("connected: " + "~n From: ~p" + "~n To: ~p" + "~nwhen" + "~n (socket) Domain: ~p" + "~n (socket) Type: ~p" + "~n (socket) Protocol: ~p" + "~n (socket) OOBInline: ~p" + "~n (socket) SndBuf: ~p" + "~n (socket) RcvBuf: ~p" + "~n (socket) Linger: ~p" + "~n (ip) MTU: ~p" + "~n (ip) MTU Discovery: ~p" + "~n (ip) Multicast ALL: ~p" + "~n (ip) Multicast IF: ~p" + "~n (ip) Multicast Loop: ~p" + "~n (ip) Multicast TTL: ~p" + "~n (ip) RecvTOS: ~p" + "~n => wait some", + [Name, Peer, + Domain, Type, Proto, + OOBI, SndBuf, RcvBuf, Linger, + MTU, MTUDisc, MALL, MIF, MLoop, MTTL, + RecvTOS]); +maybe_print_start_info(true = _Verbose, Sock, dgram = _Type) -> + {ok, Domain} = socket:getopt(Sock, socket, domain), + {ok, Type} = socket:getopt(Sock, socket, type), + {ok, Proto} = socket:getopt(Sock, socket, protocol), + {ok, OOBI} = socket:getopt(Sock, socket, oobinline), + {ok, SndBuf} = socket:getopt(Sock, socket, sndbuf), + {ok, RcvBuf} = socket:getopt(Sock, socket, rcvbuf), + {ok, Linger} = socket:getopt(Sock, socket, linger), + {ok, MALL} = socket:getopt(Sock, ip, multicast_all), + {ok, MIF} = socket:getopt(Sock, ip, multicast_if), + {ok, MLoop} = socket:getopt(Sock, ip, multicast_loop), + {ok, MTTL} = socket:getopt(Sock, ip, multicast_ttl), + {ok, RecvTOS} = socket:getopt(Sock, ip, recvtos), + {ok, RecvTTL} = socket:getopt(Sock, ip, recvttl), + i("initiated when: " + "~n (socket) Domain: ~p" + "~n (socket) Type: ~p" + "~n (socket) Protocol: ~p" + "~n (socket) OOBInline: ~p" + "~n (socket) SndBuf: ~p" + "~n (socket) RcvBuf: ~p" + "~n (socket) Linger: ~p" + "~n (ip) Multicast ALL: ~p" + "~n (ip) Multicast IF: ~p" + "~n (ip) Multicast Loop: ~p" + "~n (ip) Multicast TTL: ~p" + "~n (ip) RecvTOS: ~p" + "~n (ip) RecvTTL: ~p" + "~n => wait some", + [Domain, Type, Proto, + OOBI, SndBuf, RcvBuf, Linger, + MALL, MIF, MLoop, MTTL, + RecvTOS, RecvTTL]); +maybe_print_start_info(_Verbose, _Sock, _Type) -> + ok. + do_init(Domain, stream = Type, Proto) -> i("try (socket) open"), Sock = case socket:open(Domain, Type, Proto) of @@ -248,14 +330,25 @@ send_loop(#client{msg_id = N} = C) when (N =< 10) -> i("request ~w sent - now try read answer", [N]), case recv(C) of {ok, {Source, Msg}} -> - i("received ~w bytes of data~s", - [size(Msg), case Source of - undefined -> ""; - _ -> ?LIB:f(" from:~n ~p", [Source]) - end]), + if + (C#client.verbose =:= true) -> + i("received ~w bytes of data~s", + [size(Msg), case Source of + undefined -> ""; + _ -> ?LIB:f(" from:~n ~p", [Source]) + end]); + true -> + i("received ~w bytes", [size(Msg)]) + end, case ?LIB:dec_msg(Msg) of {reply, N, Reply} -> - i("received reply ~w: ~p", [N, Reply]), + if + (C#client.verbose =:= true) -> + i("received reply ~w: ~p", [N, Reply]); + true -> + i("received reply ~w", [N]) + end, + ?LIB:sleep(500), % Just to spread it out a bit send_loop(C#client{msg_id = N+1}) end; {error, RReason} -> @@ -268,13 +361,20 @@ send_loop(#client{msg_id = N} = C) when (N =< 10) -> "~n ~p", [N, SReason]), exit({failed_send, SReason}) end; -send_loop(#client{socket = Sock}) -> +send_loop(Client) -> + sock_close(Client). + +sock_close(#client{socket = Sock, verbose = true}) -> i("we are done - close the socket when: " "~n ~p", [socket:info()]), ok = socket:close(Sock), i("we are done - socket closed when: " - "~n ~p", [socket:info()]). + "~n ~p", [socket:info()]); +sock_close(#client{socket = Sock}) -> + i("we are done"), + ok = socket:close(Sock). + send(#client{socket = Sock, type = stream}, Msg) -> socket:send(Sock, Msg); @@ -298,31 +398,41 @@ recv(#client{socket = Sock, type = stream, msg = false}) -> {error, _} = ERROR -> ERROR end; -recv(#client{socket = Sock, type = stream, msg = true}) -> +recv(#client{socket = Sock, verbose = Verbose, type = stream, msg = true}) -> case socket:recvmsg(Sock) of %% An iov of length 1 is an simplification... {ok, #{addr := undefined = Source, iov := [Msg], ctrl := CMsgHdrs, flags := Flags}} -> - i("received message: " - "~n CMsgHdr: ~p" - "~n Flags: ~p", [CMsgHdrs, Flags]), + if + (Verbose =:= true) -> + i("received message: " + "~n CMsgHdr: ~p" + "~n Flags: ~p", [CMsgHdrs, Flags]); + true -> + ok + end, {ok, {Source, Msg}}; {error, _} = ERROR -> ERROR end; recv(#client{socket = Sock, type = dgram, msg = false}) -> socket:recvfrom(Sock); -recv(#client{socket = Sock, type = dgram, msg = true}) -> +recv(#client{socket = Sock, verbose = Verbose, type = dgram, msg = true}) -> case socket:recvmsg(Sock) of {ok, #{addr := Source, iov := [Msg], ctrl := CMsgHdrs, flags := Flags}} -> - i("received message: " - "~n CMsgHdr: ~p" - "~n Flags: ~p", [CMsgHdrs, Flags]), + if + (Verbose =:= true) -> + i("received message: " + "~n CMsgHdr: ~p" + "~n Flags: ~p", [CMsgHdrs, Flags]); + true -> + ok + end, {ok, {Source, Msg}}; {error, _} = ERROR -> ERROR diff --git a/lib/kernel/test/socket_server.erl b/lib/kernel/test/socket_server.erl index 3e5c4e5d95..9142942428 100644 --- a/lib/kernel/test/socket_server.erl +++ b/lib/kernel/test/socket_server.erl @@ -34,8 +34,10 @@ -define(LIB, socket_lib). -record(manager, {socket, msg, peek, acceptors, handler_id, handlers}). --record(acceptor, {id, socket, manager, atimeout = 5000}). --record(handler, {socket, peek, msg, type, manager}). +-record(acceptor, {id, socket, manager, + atimeout = 5000}). +-record(handler, {socket, peek, msg, type, manager, + stimeout = 5000, rtimeout = 5000}). -define(NUM_ACCEPTORS, 5). @@ -904,28 +906,30 @@ peek_recvfrom(Sock, BufSz) -> end. -send(#handler{socket = Sock, msg = true, type = stream}, Msg, _) -> +send(#handler{socket = Sock, msg = true, type = stream, stimeout = Timeout}, + Msg, _) -> CMsgHdr = #{level => ip, type => tos, data => reliability}, CMsgHdrs = [CMsgHdr], MsgHdr = #{iov => [Msg], ctrl => CMsgHdrs}, %% socket:setopt(Sock, otp, debug, true), - Res = socket:sendmsg(Sock, MsgHdr), + Res = socket:sendmsg(Sock, MsgHdr, Timeout), %% socket:setopt(Sock, otp, debug, false), Res; -send(#handler{socket = Sock, type = stream}, Msg, _) -> - socket:send(Sock, Msg); -send(#handler{socket = Sock, msg = true, type = dgram}, Msg, Dest) -> +send(#handler{socket = Sock, type = stream, stimeout = Timeout}, Msg, _) -> + socket:send(Sock, Msg, Timeout); +send(#handler{socket = Sock, msg = true, type = dgram, stimeout = Timeout}, + Msg, Dest) -> CMsgHdr = #{level => ip, type => tos, data => reliability}, CMsgHdrs = [CMsgHdr], MsgHdr = #{addr => Dest, iov => [Msg], ctrl => CMsgHdrs}, %% ok = socket:setopt(Sock, otp, debug, true), - Res = socket:sendmsg(Sock, MsgHdr), + Res = socket:sendmsg(Sock, MsgHdr, Timeout), %% ok = socket:setopt(Sock, otp, debug, false), Res; -send(#handler{socket = Sock, type = dgram}, Msg, Dest) -> - socket:sendto(Sock, Msg, Dest). +send(#handler{socket = Sock, type = dgram, stimeout = Timeout}, Msg, Dest) -> + socket:sendto(Sock, Msg, Dest, Timeout). %% filler() -> %% list_to_binary(lists:duplicate(2048, " FILLER ")). |