diff options
-rw-r--r-- | erts/emulator/nifs/common/socket_nif.c | 400 | ||||
-rw-r--r-- | erts/preloaded/ebin/socket.beam | bin | 66152 -> 65868 bytes | |||
-rw-r--r-- | erts/preloaded/src/socket.erl | 111 | ||||
-rw-r--r-- | lib/kernel/test/socket_client.erl | 21 | ||||
-rw-r--r-- | lib/kernel/test/socket_server.erl | 10 |
5 files changed, 434 insertions, 108 deletions
diff --git a/erts/emulator/nifs/common/socket_nif.c b/erts/emulator/nifs/common/socket_nif.c index 5fca0eb58b..fde2349234 100644 --- a/erts/emulator/nifs/common/socket_nif.c +++ b/erts/emulator/nifs/common/socket_nif.c @@ -737,6 +737,7 @@ typedef struct { SocketAddress remote; unsigned int addrLen; + ErlNifEnv* env; /* +++ Controller (owner) process +++ */ ErlNifPid ctrlPid; @@ -777,7 +778,7 @@ typedef struct { size_t rBufSz; // Read buffer size (when data length = 0 is specified) size_t rCtrlSz; // Read control buffer size size_t wCtrlSz; // Write control buffer size - BOOLEAN_T iow; // Inform On Wrap + BOOLEAN_T iow; // Inform On (counter) Wrap BOOLEAN_T dbg; /* +++ Close stuff +++ */ @@ -915,11 +916,9 @@ static ERL_NIF_TERM nif_finalize_connection(ErlNifEnv* env, static ERL_NIF_TERM nif_finalize_close(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]); -/* static ERL_NIF_TERM nif_cancel(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]); -*/ static ERL_NIF_TERM nopen(ErlNifEnv* env, @@ -983,7 +982,6 @@ static ERL_NIF_TERM nclose(ErlNifEnv* env, static ERL_NIF_TERM nshutdown(ErlNifEnv* env, SocketDescriptor* descP, int how); - static ERL_NIF_TERM nsetopt(ErlNifEnv* env, SocketDescriptor* descP, BOOLEAN_T isEncoded, @@ -1823,6 +1821,38 @@ static ERL_NIF_TERM nsockname(ErlNifEnv* env, SocketDescriptor* descP); static ERL_NIF_TERM npeername(ErlNifEnv* env, SocketDescriptor* descP); +static ERL_NIF_TERM ncancel(ErlNifEnv* env, + SocketDescriptor* descP, + ERL_NIF_TERM op, + ERL_NIF_TERM opRef); +static ERL_NIF_TERM ncancel_connect(ErlNifEnv* env, + SocketDescriptor* descP, + ERL_NIF_TERM opRef); +static ERL_NIF_TERM ncancel_accept(ErlNifEnv* env, + SocketDescriptor* descP, + ERL_NIF_TERM opRef); +static ERL_NIF_TERM ncancel_accept_current(ErlNifEnv* env, + SocketDescriptor* descP); +static ERL_NIF_TERM ncancel_accept_waiting(ErlNifEnv* env, + SocketDescriptor* descP, + ERL_NIF_TERM opRef); +static ERL_NIF_TERM ncancel_send(ErlNifEnv* env, + SocketDescriptor* descP, + ERL_NIF_TERM opRef); +static ERL_NIF_TERM ncancel_recv(ErlNifEnv* env, + SocketDescriptor* descP, + ERL_NIF_TERM opRef); +static ERL_NIF_TERM ncancel_read_select(ErlNifEnv* env, + SocketDescriptor* descP, + ERL_NIF_TERM opRef); +static ERL_NIF_TERM ncancel_write_select(ErlNifEnv* env, + SocketDescriptor* descP, + ERL_NIF_TERM opRef); +static ERL_NIF_TERM ncancel_mode_select(ErlNifEnv* env, + SocketDescriptor* descP, + ERL_NIF_TERM opRef, + int smode, + int rmode); static ERL_NIF_TERM nsetopt_str_opt(ErlNifEnv* env, SocketDescriptor* descP, @@ -2089,6 +2119,9 @@ static BOOLEAN_T acceptor_pop(ErlNifEnv* env, ErlNifPid* pid, ErlNifMonitor* mon, ERL_NIF_TERM* ref); +static BOOLEAN_T acceptor_unqueue(ErlNifEnv* env, + SocketDescriptor* descP, + const ErlNifPid* pid); static BOOLEAN_T qsearch4pid(ErlNifEnv* env, SocketRequestQueue* q, @@ -2241,8 +2274,10 @@ static char str_exsend[] = "exsend"; // failed send /* *** "Global" Atoms *** */ +ERL_NIF_TERM esock_atom_accept; ERL_NIF_TERM esock_atom_addr; ERL_NIF_TERM esock_atom_any; +ERL_NIF_TERM esock_atom_connect; ERL_NIF_TERM esock_atom_credentials; ERL_NIF_TERM esock_atom_ctrl; ERL_NIF_TERM esock_atom_ctrunc; @@ -2267,6 +2302,7 @@ ERL_NIF_TERM esock_atom_local; ERL_NIF_TERM esock_atom_loopback; ERL_NIF_TERM esock_atom_lowdelay; ERL_NIF_TERM esock_atom_mincost; +ERL_NIF_TERM esock_atom_not_found; ERL_NIF_TERM esock_atom_ok; ERL_NIF_TERM esock_atom_oob; ERL_NIF_TERM esock_atom_origdstaddr; @@ -2276,11 +2312,18 @@ ERL_NIF_TERM esock_atom_port; ERL_NIF_TERM esock_atom_protocol; ERL_NIF_TERM esock_atom_raw; ERL_NIF_TERM esock_atom_rdm; -ERL_NIF_TERM esock_atom_rights; +ERL_NIF_TERM esock_atom_recv; +ERL_NIF_TERM esock_atom_recvfrom; +ERL_NIF_TERM esock_atom_recvmsg; ERL_NIF_TERM esock_atom_reliability; +ERL_NIF_TERM esock_atom_rights; ERL_NIF_TERM esock_atom_scope_id; ERL_NIF_TERM esock_atom_sctp; ERL_NIF_TERM esock_atom_sec; +ERL_NIF_TERM esock_atom_select_sent; +ERL_NIF_TERM esock_atom_send; +ERL_NIF_TERM esock_atom_sendmsg; +ERL_NIF_TERM esock_atom_sendto; ERL_NIF_TERM esock_atom_seqpacket; ERL_NIF_TERM esock_atom_socket; ERL_NIF_TERM esock_atom_spec_dst; @@ -3228,7 +3271,8 @@ ERL_NIF_TERM naccept_listening(ErlNifEnv* env, &descP->currentAcceptor.mon) > 0) return esock_make_error(env, atom_exmon); - descP->currentAcceptor.ref = ref; + descP->currentAcceptor.ref = enif_make_copy(descP->env, ref); + descP->currentAcceptorP = &descP->currentAcceptor; SELECT(env, descP->sock, @@ -3326,7 +3370,8 @@ ERL_NIF_TERM naccept_listening(ErlNifEnv* env, /* *** naccept_accepting *** * We have an active acceptor and possibly acceptors waiting in queue. - * At the moment the queue is *not* implemented. + * If the pid of the calling process is not the pid of the "current process", + * push the requester onto the queue. */ static ERL_NIF_TERM naccept_accepting(ErlNifEnv* env, @@ -3353,11 +3398,8 @@ ERL_NIF_TERM naccept_accepting(ErlNifEnv* env, "\r\n", caller, descP->currentAcceptor.pid) ); if (!compare_pids(env, &descP->currentAcceptor.pid, &caller)) { - /* This will have to do until we implement the queue. - * When we have the queue, we should simply push this request, - * and instead return with eagain (the caller will then wait - * for the select message). - */ + + /* Not the "current caller", so (maybe) push onto queue */ SSDBG( descP, ("SOCKET", "naccept_accepting -> not (active) acceptor\r\n") ); @@ -10420,6 +10462,304 @@ ERL_NIF_TERM npeername(ErlNifEnv* env, /* ---------------------------------------------------------------------- + * nif_cancel + * + * Description: + * Cancel a previous select! + * + * Arguments: + * Socket (ref) - Points to the socket descriptor. + * Operation (atom) - What kind of operation (accept, send, ...) is to be cancelled + * Ref (ref) - Unique id for the operation + */ +static +ERL_NIF_TERM nif_cancel(ErlNifEnv* env, + int argc, + const ERL_NIF_TERM argv[]) +{ + SocketDescriptor* descP; + ERL_NIF_TERM op, opRef, result; + + SGDBG( ("SOCKET", "nif_cancel -> entry with argc: %d\r\n", argc) ); + + /* Extract arguments and perform preliminary validation */ + + if ((argc != 3) || + !enif_get_resource(env, argv[0], sockets, (void**) &descP)) { + return enif_make_badarg(env); + } + op = argv[1]; + opRef = argv[2]; + + SSDBG( descP, + ("SOCKET", "nif_cancel -> args when sock = %d:" + "\r\n op: %T" + "\r\n opRef: %T" + "\r\n", descP->sock, op, opRef) ); + + result = ncancel(env, descP, op, opRef); + + SSDBG( descP, + ("SOCKET", "nif_cancel -> done with result: " + "\r\n %T" + "\r\n", result) ); + + return result; + +} + + +static +ERL_NIF_TERM ncancel(ErlNifEnv* env, + SocketDescriptor* descP, + ERL_NIF_TERM op, + ERL_NIF_TERM opRef) +{ + /* <KOLLA> + * + * Do we really need all these variants? Should it not be enough with: + * + * connect | accept | send | recv + * + * </KOLLA> + */ + if (COMPARE(op, esock_atom_connect) == 0) { + return ncancel_connect(env, descP, opRef); + } else if (COMPARE(op, esock_atom_accept) == 0) { + return ncancel_accept(env, descP, opRef); + } else if (COMPARE(op, esock_atom_send) == 0) { + return ncancel_send(env, descP, opRef); + } else if (COMPARE(op, esock_atom_sendto) == 0) { + return ncancel_send(env, descP, opRef); + } else if (COMPARE(op, esock_atom_sendmsg) == 0) { + return ncancel_send(env, descP, opRef); + } else if (COMPARE(op, esock_atom_recv) == 0) { + return ncancel_recv(env, descP, opRef); + } else if (COMPARE(op, esock_atom_recvfrom) == 0) { + return ncancel_recv(env, descP, opRef); + } else if (COMPARE(op, esock_atom_recvmsg) == 0) { + return ncancel_recv(env, descP, opRef); + } else { + return esock_make_error(env, esock_atom_einval); + } +} + + + +/* *** ncancel_connect *** + * + * + */ +static +ERL_NIF_TERM ncancel_connect(ErlNifEnv* env, + SocketDescriptor* descP, + ERL_NIF_TERM opRef) +{ + return ncancel_write_select(env, descP, opRef); +} + + +/* *** ncancel_accept *** + * + * We have two different cases: + * *) Its the current acceptor + * Cancel the select! + * We need to activate one of the waiting acceptors. + * *) Its one of the acceptors ("waiting") in the queue + * Simply remove the acceptor from the queue. + * + */ +static +ERL_NIF_TERM ncancel_accept(ErlNifEnv* env, + SocketDescriptor* descP, + ERL_NIF_TERM opRef) +{ + ERL_NIF_TERM res; + + SSDBG( descP, + ("SOCKET", "ncancel_accept -> entry with" + "\r\n opRef: %T" + "\r\n %s" + "\r\n", opRef, + ((descP->currentAcceptorP == NULL) ? "without acceptor" : "with acceptor")) ); + + MLOCK(descP->accMtx); + + if (descP->currentAcceptorP != NULL) { + if (COMPARE(opRef, descP->currentAcceptor.ref) == 0) { + res = ncancel_accept_current(env, descP); + } else { + res = ncancel_accept_waiting(env, descP, opRef); + } + } else { + /* Or badarg? */ + res = esock_make_error(env, esock_atom_einval); + } + + MUNLOCK(descP->accMtx); + + SSDBG( descP, + ("SOCKET", "ncancel_accept -> done with result:" + "\r\n %T" + "\r\n", res) ); + + return res; +} + + +/* The current process has an ongoing select we first must + * cancel. Then we must re-activate the "first" (the first + * in the acceptor queue). + */ +static +ERL_NIF_TERM ncancel_accept_current(ErlNifEnv* env, + SocketDescriptor* descP) +{ + ERL_NIF_TERM res; + + SSDBG( descP, ("SOCKET", "ncancel_accept_current -> entry\r\n") ); + + res = ncancel_read_select(env, descP, descP->currentAcceptor.ref); + + SSDBG( descP, ("SOCKET", "ncancel_accept_current -> cancel res: %T\r\n", res) ); + + if (acceptor_pop(env, descP, + &descP->currentAcceptor.pid, + &descP->currentAcceptor.mon, + &descP->currentAcceptor.ref)) { + + /* There was another one */ + + SSDBG( descP, ("SOCKET", "ncancel_accept_current -> new (active) acceptor: " + "\r\n pid: %T" + "\r\n ref: %T" + "\r\n", + descP->currentAcceptor.pid, + descP->currentAcceptor.ref) ); + + SELECT(env, + descP->sock, + (ERL_NIF_SELECT_READ), + descP, &descP->currentAcceptor.pid, descP->currentAcceptor.ref); + + } else { + SSDBG( descP, ("SOCKET", "ncancel_accept_current -> no more acceptors\r\n") ); + descP->currentAcceptorP = NULL; + descP->state = SOCKET_STATE_LISTENING; + } + + SSDBG( descP, ("SOCKET", "ncancel_accept_current -> done with result:" + "\r\n %T" + "\r\n", res) ); + + return res; +} + + +/* These processes have not performed a select, so we can simply + * remove them from the acceptor queue. + */ +static +ERL_NIF_TERM ncancel_accept_waiting(ErlNifEnv* env, + SocketDescriptor* descP, + ERL_NIF_TERM opRef) +{ + ErlNifPid caller; + + if (enif_self(env, &caller) == NULL) + return esock_make_error(env, atom_exself); + + /* unqueue request from (acceptor) queue */ + + if (acceptor_unqueue(env, descP, &caller)) { + return esock_atom_ok; + } else { + /* Race? */ + return esock_make_error(env, esock_atom_not_found); + } +} + + + +/* *** ncancel_send *** + * + * + */ +static +ERL_NIF_TERM ncancel_send(ErlNifEnv* env, + SocketDescriptor* descP, + ERL_NIF_TERM opRef) +{ + return esock_make_error(env, esock_atom_einval); +} + + + +/* *** ncancel_recv *** + * + * + */ +static +ERL_NIF_TERM ncancel_recv(ErlNifEnv* env, + SocketDescriptor* descP, + ERL_NIF_TERM opRef) +{ + return esock_make_error(env, esock_atom_einval); +} + + + +static +ERL_NIF_TERM ncancel_read_select(ErlNifEnv* env, + SocketDescriptor* descP, + ERL_NIF_TERM opRef) +{ + return ncancel_mode_select(env, descP, opRef, + ERL_NIF_SELECT_READ, + ERL_NIF_SELECT_READ_CANCELLED); +} + + +static +ERL_NIF_TERM ncancel_write_select(ErlNifEnv* env, + SocketDescriptor* descP, + ERL_NIF_TERM opRef) +{ + return ncancel_mode_select(env, descP, opRef, + ERL_NIF_SELECT_WRITE, + ERL_NIF_SELECT_WRITE_CANCELLED); +} + + +static +ERL_NIF_TERM ncancel_mode_select(ErlNifEnv* env, + SocketDescriptor* descP, + ERL_NIF_TERM opRef, + int smode, + int rmode) +{ + int selectRes = enif_select(env, descP->sock, + (ERL_NIF_SELECT_CANCEL | smode), + descP, NULL, opRef); + + if (selectRes & rmode) { + /* Was cancelled */ + return esock_atom_ok; + } else if (selectRes > 0) { + /* Has already sent the message */ + return esock_make_error(env, esock_atom_select_sent); + } else { + /* Stopped? */ + SSDBG( descP, ("SOCKET", "ncancel_mode_select -> failed: %d (0x%lX)" + "\r\n", selectRes, selectRes) ); + return esock_make_error(env, esock_atom_einval); + } + +} + + + +/* ---------------------------------------------------------------------- * U t i l i t y F u n c t i o n s * ---------------------------------------------------------------------- */ @@ -10537,8 +10877,9 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env, } /* There is a special case: If the provided 'to read' value is - * zero (0). That means that we reads as much as we can, using - * the default read buffer size. + * zero (0) (only for type =/= stream). + * That means that we reads as much as we can, using the default + * read buffer size. */ if (bufP->size == read) { @@ -12537,6 +12878,8 @@ SocketDescriptor* alloc_descriptor(SOCKET sock, HANDLE event) if ((descP = enif_alloc_resource(sockets, sizeof(SocketDescriptor))) != NULL) { char buf[64]; /* Buffer used for building the mutex name */ + descP->env = enif_alloc_env(); + sprintf(buf, "socket[w,%d]", sock); descP->writeMtx = MCREATE(buf); descP->currentWriterP = NULL; // currentWriter not used @@ -13161,7 +13504,7 @@ ERL_NIF_TERM acceptor_push(ErlNifEnv* env, SocketRequestor* reqP = &e->data; reqP->pid = pid; - reqP->ref = ref; + reqP->ref = enif_make_copy(descP->env, ref); if (MONP(env, descP, &pid, &reqP->mon) > 0) { FREE(reqP); @@ -13205,6 +13548,19 @@ BOOLEAN_T acceptor_pop(ErlNifEnv* env, } +/* *** acceptor unqueue *** + * + * Remove an acceptor from the acceptor queue. + */ +static +BOOLEAN_T acceptor_unqueue(ErlNifEnv* env, + SocketDescriptor* descP, + const ErlNifPid* pid) +{ + return qunqueue(env, &descP->acceptorsQ, pid); +} + + static BOOLEAN_T qsearch4pid(ErlNifEnv* env, SocketRequestQueue* q, @@ -13647,7 +14003,7 @@ void socket_down(ErlNifEnv* env, "socket_down -> " "not current acceptor - maybe a waiting acceptor\r\n") ); - qunqueue(env, &descP->acceptorsQ, pid); + acceptor_unqueue(env, descP, pid); } } @@ -13697,7 +14053,7 @@ ErlNifFunc socket_funcs[] = * is called after the connect *select* has "completed". */ {"nif_finalize_connection", 1, nif_finalize_connection, 0}, - // {"nif_cancel", 2, nif_cancel, 0}, + {"nif_cancel", 3, nif_cancel, 0}, {"nif_finalize_close", 1, nif_finalize_close, ERL_NIF_DIRTY_JOB_IO_BOUND} }; @@ -13817,8 +14173,10 @@ int on_load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info) atom_want = MKA(env, str_want); /* Global atom(s) */ + esock_atom_accept = MKA(env, "accept"); esock_atom_addr = MKA(env, "addr"); esock_atom_any = MKA(env, "any"); + esock_atom_connect = MKA(env, "connect"); esock_atom_credentials = MKA(env, "credentials"); esock_atom_ctrl = MKA(env, "ctrl"); esock_atom_ctrunc = MKA(env, "ctrunc"); @@ -13843,6 +14201,7 @@ int on_load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info) esock_atom_loopback = MKA(env, "loopback"); esock_atom_lowdelay = MKA(env, "lowdelay"); esock_atom_mincost = MKA(env, "mincost"); + esock_atom_not_found = MKA(env, "not_found"); esock_atom_ok = MKA(env, "ok"); esock_atom_oob = MKA(env, "oob"); esock_atom_origdstaddr = MKA(env, "origdstaddr"); @@ -13852,11 +14211,18 @@ int on_load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info) esock_atom_protocol = MKA(env, "protocol"); esock_atom_raw = MKA(env, "raw"); esock_atom_rdm = MKA(env, "rdm"); + esock_atom_recv = MKA(env, "recv"); + esock_atom_recvfrom = MKA(env, "recvfrom"); + esock_atom_recvmsg = MKA(env, "recvmsg"); esock_atom_reliability = MKA(env, "reliability"); esock_atom_rights = MKA(env, "rights"); esock_atom_scope_id = MKA(env, "scope_id"); esock_atom_sctp = MKA(env, "sctp"); esock_atom_sec = MKA(env, "sec"); + esock_atom_select_sent = MKA(env, "select_sent"); + esock_atom_send = MKA(env, "send"); + esock_atom_sendmsg = MKA(env, "sendmsg"); + esock_atom_sendto = MKA(env, "sendto"); esock_atom_seqpacket = MKA(env, "seqpacket"); esock_atom_socket = MKA(env, "socket"); esock_atom_spec_dst = MKA(env, "spec_dst"); diff --git a/erts/preloaded/ebin/socket.beam b/erts/preloaded/ebin/socket.beam Binary files differindex e6a33337ba..2475dce37b 100644 --- a/erts/preloaded/ebin/socket.beam +++ b/erts/preloaded/ebin/socket.beam diff --git a/erts/preloaded/src/socket.erl b/erts/preloaded/src/socket.erl index ad7a35694b..1c16c94711 100644 --- a/erts/preloaded/src/socket.erl +++ b/erts/preloaded/src/socket.erl @@ -1137,7 +1137,7 @@ connect(#socket{ref = SockRef}, #{family := Fam} = SockAddr, Timeout) %% </KOLLA> nif_finalize_connection(SockRef) after NewTimeout -> - nif_cancel(SockRef, connect, Ref), + cancel(SockRef, connect, Ref), {error, timeout} end; {error, _} = ERROR -> @@ -1145,7 +1145,6 @@ connect(#socket{ref = SockRef}, #{family := Fam} = SockAddr, Timeout) end. - %% =========================================================================== %% %% listen - listen for connections on a socket @@ -1227,13 +1226,12 @@ do_accept(LSockRef, Timeout) -> {error, Reason} after NewTimeout -> - nif_cancel(LSockRef, accept, AccRef), - flush_select_msgs(LSockRef, AccRef), + cancel(LSockRef, accept, AccRef), {error, timeout} end; {error, _} = ERROR -> - nif_cancel(LSockRef, accept, AccRef), % Just to be on the safe side... + cancel(LSockRef, accept, AccRef), % Just to be on the safe side... ERROR end. @@ -1305,8 +1303,7 @@ do_send(SockRef, Data, EFlags, Timeout) -> {error, Reason} after NewTimeout -> - nif_cancel(SockRef, send, SendRef), - flush_select_msgs(SockRef, SendRef), + cancel(SockRef, send, SendRef), {error, {timeout, size(Data)}} end; {error, eagain} -> @@ -1319,8 +1316,7 @@ do_send(SockRef, Data, EFlags, Timeout) -> {error, Reason} after Timeout -> - nif_cancel(SockRef, send, SendRef), - flush_select_msgs(SockRef, SendRef), + cancel(SockRef, send, SendRef), {error, {timeout, size(Data)}} end; @@ -1403,8 +1399,7 @@ do_sendto(SockRef, Data, Dest, EFlags, Timeout) -> {error, Reason} after Timeout -> - nif_cancel(SockRef, sendto, SendRef), - flush_select_msgs(SockRef, SendRef), + cancel(SockRef, sendto, SendRef), {error, timeout} end; @@ -1414,8 +1409,7 @@ do_sendto(SockRef, Data, Dest, EFlags, Timeout) -> do_sendto(SockRef, Data, Dest, EFlags, next_timeout(TS, Timeout)) after Timeout -> - nif_cancel(SockRef, sendto, SendRef), - flush_select_msgs(SockRef, SendRef), + cancel(SockRef, sendto, SendRef), {error, timeout} end; @@ -1497,8 +1491,7 @@ do_sendmsg(SockRef, MsgHdr, EFlags, Timeout) -> do_sendmsg(SockRef, MsgHdr, EFlags, next_timeout(TS, Timeout)) after Timeout -> - nif_cancel(SockRef, sendmsg, SendRef), - flush_select_msgs(SockRef, SendRef), + cancel(SockRef, sendmsg, SendRef), {error, timeout} end; @@ -1519,62 +1512,6 @@ ensure_msghdr(_) -> %% writev - write data into multiple buffers %% -%% send(Socket, Data, Flags, Timeout) -%% when (is_list(Data) orelse is_binary(Data)) andalso is_list(Flags) -> -%% IOVec = erlang:iolist_to_iovec(Data), -%% EFlags = enc_send_flags(Flags), -%% send_iovec(Socket, IOVec, EFlags, Timeout). - - -%% %% Iterate over the IO-vector (list of binaries). - -%% send_iovec(_Socket, [] = _IOVec, _EFlags, _Timeout) -> -%% ok; -%% send_iovec({socket, _, SockRef} = Socket, [Bin|IOVec], EFlags, Timeout) -> -%% case do_send(SockRef, make_ref(), Bin, EFlags, Timeout) of -%% {ok, NewTimeout} -> -%% send_iovec(Socket, IOVec, EFlags, NewTimeout); -%% {error, _} = ERROR -> -%% ERROR -%% end. - - -%% do_send(SockRef, SendRef, Data, _EFlags, Timeout) -%% when (Timeout < 0) -> -%% nif_cancel(SockRef, SendRef), -%% flush_select_msgs(SockRef, SendRef), -%% {error, {timeout, size(Data)}}; -%% do_send(SockRef, SendRef, Data, EFlags, Timeout) -> -%% TS = timestamp(Timeout), -%% case nif_send(SockRef, SendRef, Data, EFlags) of -%% ok -> -%% {ok, next_timeout(TS, Timeout)}; -%% {ok, Written} -> -%% %% We are partially done, wait for continuation -%% receive -%% {select, SockRef, SendRef, ready_output} -> -%% <<_:Written/binary, Rest/binary>> = Data, -%% do_send(SockRef, make_ref(), Rest, EFlags, -%% next_timeout(TS, Timeout)) -%% after Timeout -> -%% nif_cancel(SockRef, SendRef), -%% flush_select_msgs(SockRef, SendRef), -%% {error, timeout} -%% end; -%% {error, eagain} -> -%% receive -%% {select, SockRef, SendRef, ready_output} -> -%% do_send(SockRef, SendRef, Data, EFlags, -%% next_timeout(TS, Timeout)) -%% after Timeout -> -%% nif_cancel(SockRef, SendRef), -%% flush_select_msgs(SockRef, SendRef), -%% {error, timeout} -%% end; - -%% {error, _} = ERROR -> -%% ERROR -%% end. %% =========================================================================== @@ -1695,8 +1632,7 @@ do_recv(SockRef, _OldRef, Length, EFlags, Acc, Timeout) after NewTimeout -> - nif_cancel(SockRef, recv, RecvRef), - flush_select_msgs(SockRef, RecvRef), + cancel(SockRef, recv, RecvRef), {error, {timeout, Acc}} end; @@ -1715,8 +1651,7 @@ do_recv(SockRef, _OldRef, Length, EFlags, Acc, Timeout) after NewTimeout -> - nif_cancel(SockRef, recv, RecvRef), - flush_select_msgs(SockRef, RecvRef), + cancel(SockRef, recv, RecvRef), {error, {timeout, Acc}} end; @@ -1739,8 +1674,7 @@ do_recv(SockRef, _OldRef, Length, EFlags, Acc, Timeout) {error, Reason} after NewTimeout -> - nif_cancel(SockRef, recv, RecvRef), - flush_select_msgs(SockRef, RecvRef), + cancel(SockRef, recv, RecvRef), {error, timeout} end; @@ -1765,7 +1699,7 @@ do_recv(SockRef, RecvRef, 0 = _Length, _Eflags, Acc, _Timeout) -> %% The current recv operation is to be cancelled, so no need for a ref... %% The cancel will end our 'read everything you have' and "activate" %% any waiting reader. - nif_cancel(SockRef, recv, RecvRef), + cancel(SockRef, recv, RecvRef), {ok, Acc}; do_recv(_SockRef, _RecvRef, _Length, _EFlags, Acc, _Timeout) when (size(Acc) > 0) -> {error, {timeout, Acc}}; @@ -1878,8 +1812,7 @@ do_recvfrom(SockRef, BufSz, EFlags, Timeout) -> {error, Reason} after NewTimeout -> - nif_cancel(SockRef, recvfrom, RecvRef), - flush_select_msgs(SockRef, RecvRef), + cancel(SockRef, recvfrom, RecvRef), {error, timeout} end; @@ -1966,8 +1899,7 @@ do_recvmsg(SockRef, BufSz, CtrlSz, EFlags, Timeout) -> {error, Reason} after NewTimeout -> - nif_cancel(SockRef, recvmsg, RecvRef), - flush_select_msgs(SockRef, RecvRef), + cancel(SockRef, recvmsg, RecvRef), {error, timeout} end; @@ -3325,10 +3257,19 @@ ensure_sockaddr(_SockAddr) -> -flush_select_msgs(LSRef, Ref) -> +cancel(SockRef, Op, OpRef) -> + case nif_cancel(SockRef, Op, OpRef) of + %% The select has already completed + {error, select_sent} -> + flush_select_msgs(SockRef, OpRef); + Other -> + Other + end. + +flush_select_msgs(SockRef, Ref) -> receive - {select, LSRef, Ref, _} -> - flush_select_msgs(LSRef, Ref) + {select, SockRef, Ref, _} -> + flush_select_msgs(SockRef, Ref) after 0 -> ok end. diff --git a/lib/kernel/test/socket_client.erl b/lib/kernel/test/socket_client.erl index 58d70b6181..6cd353fd07 100644 --- a/lib/kernel/test/socket_client.erl +++ b/lib/kernel/test/socket_client.erl @@ -74,7 +74,7 @@ start(Domain, Type, Proto, Addr, Port) -> %% The way we use tos only works because we %% send so few messages (a new value for every %% message). - put(tos, 1), + tos_init(), do_start(Domain, Type, Proto, SA). do_start(Domain, stream = Type, Proto, SA) -> @@ -282,11 +282,10 @@ send(#client{socket = Sock, type = dgram, dest = Dest}, Msg) -> %% i("try send to: " %% "~n ~p", [Dest]), %% ok = socket:setopt(Sock, otp, debug, true), - TOS = get(tos), + TOS = tos_next(), ok = socket:setopt(Sock, ip, tos, TOS), case socket:sendto(Sock, Msg, Dest) of ok = OK -> - put(tos, TOS+1), OK; {error, _} = ERROR -> ERROR @@ -402,6 +401,22 @@ which_addr2(Domain, [_|IFO]) -> %% --- +tos_init() -> + put(tos, 1). + +tos_next() -> + case get(tos) of + TOS when (TOS < 100) -> + put(tos, TOS + 1), + TOS; + _ -> + put(tos, 1), + 1 + end. + + +%% --- + e(F, A) -> ?LIB:e(F, A). diff --git a/lib/kernel/test/socket_server.erl b/lib/kernel/test/socket_server.erl index ea2bdc8e0d..3e5c4e5d95 100644 --- a/lib/kernel/test/socket_server.erl +++ b/lib/kernel/test/socket_server.erl @@ -34,7 +34,7 @@ -define(LIB, socket_lib). -record(manager, {socket, msg, peek, acceptors, handler_id, handlers}). --record(acceptor, {id, socket, manager}). +-record(acceptor, {id, socket, manager, atimeout = 5000}). -record(handler, {socket, peek, msg, type, manager}). -define(NUM_ACCEPTORS, 5). @@ -521,13 +521,14 @@ acceptor_stop(Pid, _Reason) -> acceptor_init(Manager, Sock, ID) -> put(sname, f("acceptor[~w]", [ID])), Manager ! {acceptor, self(), ok}, + %% ok = socket:setopt(Sock, otp, debug, true), acceptor_loop(#acceptor{id = ID, manager = Manager, socket = Sock}). -acceptor_loop(#acceptor{socket = LSock} = A) -> +acceptor_loop(#acceptor{socket = LSock, atimeout = Timeout} = A) -> i("try accept"), - case socket:accept(LSock, infinity) of + case socket:accept(LSock, Timeout) of {ok, Sock} -> i("accepted: " "~n ~p" @@ -542,6 +543,9 @@ acceptor_loop(#acceptor{socket = LSock} = A) -> socket:close(Sock), exit({failed_starting_handler, Reason}) end; + {error, timeout} -> + i("timeout"), + acceptor_loop(A); {error, Reason} -> e("accept failure: " "~n ~p", [Reason]), |