diff options
-rw-r--r-- | erts/doc/src/socket.xml | 37 | ||||
-rw-r--r-- | erts/doc/src/socket_usage.xml | 29 | ||||
-rw-r--r-- | erts/emulator/nifs/common/socket_nif.c | 73 | ||||
-rw-r--r-- | erts/emulator/test/socket_SUITE.erl | 383 | ||||
-rw-r--r-- | erts/preloaded/ebin/socket.beam | bin | 74796 -> 75044 bytes | |||
-rw-r--r-- | erts/preloaded/src/socket.erl | 30 |
6 files changed, 505 insertions, 47 deletions
diff --git a/erts/doc/src/socket.xml b/erts/doc/src/socket.xml index 47e84090ee..e5cf77663a 100644 --- a/erts/doc/src/socket.xml +++ b/erts/doc/src/socket.xml @@ -324,7 +324,7 @@ <p>In the case when there is no connections waiting, the function will return with the <c>SelectInfo</c>. The caller can then await a - select message, <c>{'$socket', Sock, select, Info}</c> (where + select message, <c>{'$socket', Socket, select, Info}</c> (where <c>Info</c> is the <c>select_ref()</c> from the <c>SelectInfo</c>), when a client connects (a subsequent call to accept will then return the socket). </p> @@ -377,7 +377,7 @@ <func> <name name="connect" arity="2" since="OTP 22.0"/> - <name name="connect" arity="3" since="OTP 22.0"/> + <name name="connect" arity="3" clause_i="2" since="OTP 22.0"/> <fsummary>Initiate a connection on a socket.</fsummary> <desc> <p>This function connects the socket to the address @@ -386,6 +386,24 @@ </func> <func> + <name name="connect" arity="3" clause_i="1" anchor="connect_async" since="OTP @OTP-15731@"/> + <fsummary>Initiate a connection on a socket.</fsummary> + <desc> + <p>This function connects the socket to the address + specied by the <c>SockAddr</c> argument.</p> + + <p>In the case when its not possible to immediately establish a + connection, the function will return with the + <seealso marker="#type-select_info"><c>SelectInfo</c></seealso>. + The caller can then await a + select message, <c>{'$socket', Socket, select, Info}</c> (where + <c>Info</c> is the <c>select_ref()</c> from the <c>SelectInfo</c>, + a subsequent call to connect will then + establish the connection). </p> + </desc> + </func> + + <func> <name name="getopt" arity="3" clause_i="1" since="OTP 22.0"/> <name name="getopt" arity="3" clause_i="2" since="OTP 22.0"/> <name name="getopt" arity="3" clause_i="3" since="OTP 22.0"/> @@ -505,7 +523,7 @@ <p>In the case when there is no data waiting, the function will return with the <c>SelectInfo</c>. The caller can then await a - select message, <c>{'$socket', Sock, select, Info}</c> (where + select message, <c>{'$socket', Socket, select, Info}</c> (where <c>Info</c> is the <c>select_ref()</c> from the <c>SelectInfo</c>), when data has arrived (a subsequent call to recv will then return the data). </p> @@ -564,7 +582,7 @@ <p>In the case when there is no data waiting, the function will return with the <c>SelectInfo</c>. The caller can then await a - select message, <c>{'$socket', Sock, select, Info}</c> (where + select message, <c>{'$socket', Socket, select, Info}</c> (where <c>Info</c> is the <c>select_ref()</c> from the <c>SelectInfo</c>), when data has arrived (a subsequent call to recvfrom will then return the data). </p> @@ -640,7 +658,7 @@ <p>In the case when there is no data waiting, the function will return with the <c>SelectInfo</c>. The caller can then await a - select message, <c>{'$socket', Sock, select, Info}</c> (where + select message, <c>{'$socket', Socket, select, Info}</c> (where <c>Info</c> is the <c>select_ref()</c> from the <c>SelectInfo</c>), when data has arrived (a subsequent call to recvmsg will then return the data). </p> @@ -667,7 +685,8 @@ <p>In the case when there is no room in the (system-) buffers, the function will return with the <c>SelectInfo</c>. The caller - can then await a select message, <c>{'$socket', Sock, select, Info}</c> + can then await a select message, + <c>{'$socket', Socket, select, Info}</c> (where <c>Info</c> is the <c>select_ref()</c> from the <c>SelectInfo</c>), when there is room for more data (a subsequent call to send will then send the data). </p> @@ -724,7 +743,8 @@ <p>In the case when there is no room in the (system-) buffers, the function will return with the <c>SelectInfo</c>. The caller - can then await a select message, <c>{'$socket', Sock, select, Info}</c> + can then await a select message, + <c>{'$socket', Socket, select, Info}</c> (where <c>Info</c> is the <c>select_ref()</c> from the <c>SelectInfo</c>), when there is room for more data (a subsequent call to sendmsg will then send the data). </p> @@ -751,7 +771,8 @@ <p>In the case when there is no room in the (system-) buffers, the function will return with the <c>SelectInfo</c>. The caller - can then await a select message, <c>{'$socket', Sock, select, Info}</c> + can then await a select message, + <c>{'$socket', Socket, select, Info}</c> (where <c>Info</c> is the <c>select_ref()</c> from the <c>SelectInfo</c>), when there is room for more data (a subsequent call to sendto will then send the data). </p> diff --git a/erts/doc/src/socket_usage.xml b/erts/doc/src/socket_usage.xml index 9d8141b822..7e65bcbf70 100644 --- a/erts/doc/src/socket_usage.xml +++ b/erts/doc/src/socket_usage.xml @@ -43,6 +43,7 @@ <title>Asynchronous calls</title> <p>Some functions allow for an <i>asynchronous</i> call (<seealso marker="socket#accept_async"><c>accept/2</c></seealso>, + <seealso marker="socket#connect_async"><c>connect/3</c></seealso>, <seealso marker="socket#recv_async"><c>recv/3,4</c></seealso>, <seealso marker="socket#recvfrom_async"><c>recvfrom/3,4</c></seealso>, <seealso marker="socket#recvmsg_async"><c>recvmsg/2,3,5</c></seealso>, @@ -52,21 +53,17 @@ This is achieved by setting the <c>Timeout</c> argument to <c>nowait</c>. For instance, if calling the <seealso marker="socket#recv_async"><c>recv/3</c></seealso> - function with Timeout set to <c>nowait</c> (i.e. <c>recv(Sock, 0, nowait)</c>) + function with Timeout set to <c>nowait</c> (i.e. + <c>recv(Sock, 0, nowait)</c>) when there is actually nothing to read, it will return with <c>{ok, </c> <seealso marker="socket#type-select_info"><c>SelectInfo</c></seealso><c>}</c>. When data eventually arrives a 'select message' will be sent to the caller:</p> - <!-- - <list type="ordered"> - <item><c>{'$socket', Sock, select, Info}</c></item> - </list> - --> <taglist> <!-- NOTE THAT THE EMPTY TAG IS INTENTIONAL --> <tag></tag> - <item><c>{'$socket', Sock, select, Info}</c></item> + <item><c>{'$socket', socket(), select, select_ref()}</c></item> </taglist> <p>The caller can then make another call to the recv function and now expect data.</p> @@ -79,24 +76,18 @@ <taglist> <!-- NOTE THAT THE EMPTY TAG IS INTENTIONAL --> <tag></tag> - <item><c>{'$socket', Sock, abort, Info}</c></item> + <item><c>{'$socket', socket(), abort, Info}</c></item> </taglist> <p>If the operation is aborted for whatever reason (e.g. if the socket is closed "by someone else"). The <c>Info</c> part contains the abort reason (in this case that - the socket has been closed <c>Info = {Ref, closed}</c>). </p> + the socket has been closed <c>Info = {select_ref(), closed}</c>). </p> <p>Note that all other users are <em>locked out</em> until the 'current user' has called the function (recv in this case). So either immediately call the function or <seealso marker="socket#cancel/2"><c>cancel</c></seealso>. </p> + <p>The general form of the 'socket' message is: </p> - <!-- - <list type="ordered"> - <item> - <c>{'$socket', Socket :: socket(), Tag :: atom(), Info :: term()}</c> - </item> - </list> - --> <taglist> <!-- NOTE THAT THE EMPTY TAG IS INTENTIONAL --> <tag></tag> @@ -110,15 +101,15 @@ </row> <row> <cell>select</cell> - <cell>reference()</cell> + <cell>select_ref()</cell> </row> <row> <cell>abort</cell> - <cell>{reference(), Reason :: term()}</cell> + <cell>{select_ref(), Reason :: term()}</cell> </row> <tcaption>socket message info value type</tcaption> </table> - <p>The <c>reference()</c> is the same as was received in the + <p>The <c>select_ref()</c> is the same as was received in the <seealso marker="socket#type-select_info"><c>SelectInfo</c></seealso>. </p> </section> </section> diff --git a/erts/emulator/nifs/common/socket_nif.c b/erts/emulator/nifs/common/socket_nif.c index 25bc712949..1a8ce89b7b 100644 --- a/erts/emulator/nifs/common/socket_nif.c +++ b/erts/emulator/nifs/common/socket_nif.c @@ -825,6 +825,10 @@ typedef struct { ErlNifPid ctrlPid; ESockMonitor ctrlMon; + /* +++ Connector process +++ */ + ErlNifPid connPid; + ESockMonitor connMon; + /* +++ Write stuff +++ */ ErlNifMutex* writeMtx; ESockRequestor currentWriter; @@ -2377,6 +2381,8 @@ static int socket_setopt(int sock, const void* optVal, const socklen_t optLen); +static BOOLEAN_T is_connector(ErlNifEnv* env, + ESockDescriptor* descP); static BOOLEAN_T verify_is_connected(ESockDescriptor* descP, int* err); static ESockDescriptor* alloc_descriptor(SOCKET sock, HANDLE event); @@ -4904,7 +4910,7 @@ ERL_NIF_TERM nconnect(ErlNifEnv* env, return esock_make_error(env, atom_eisconn); } - if (IS_CONNECTING(descP)) { + if (IS_CONNECTING(descP) && !is_connector(env, descP)) { SSDBG( descP, ("SOCKET", "nif_connect -> already connecting\r\n") ); return esock_make_error(env, esock_atom_einval); } @@ -4926,19 +4932,41 @@ ERL_NIF_TERM nconnect(ErlNifEnv* env, ((save_errno == ERRNO_BLOCK) || /* Winsock2 */ (save_errno == EINPROGRESS))) { /* Unix & OSE!! */ ref = MKREF(env); - descP->state = SOCKET_STATE_CONNECTING; - if ((sres = esock_select_write(env, descP->sock, descP, NULL, - sockRef, ref)) < 0) { - res = esock_make_error(env, - MKT2(env, - esock_atom_select_failed, - MKI(env, sres))); - } else { + + if (IS_CONNECTING(descP)) { + /* Glitch */ res = esock_make_ok2(env, ref); + } else { + + /* First time here */ + + if (enif_self(env, &descP->connPid) == NULL) + return esock_make_error(env, atom_exself); + + if (MONP("nconnect -> conn", + env, descP, + &descP->connPid, + &descP->connMon) != 0) + return esock_make_error(env, atom_exmon); + + descP->state = SOCKET_STATE_CONNECTING; + + if ((sres = esock_select_write(env, descP->sock, descP, NULL, + sockRef, ref)) < 0) { + res = esock_make_error(env, + MKT2(env, + esock_atom_select_failed, + MKI(env, sres))); + } else { + res = esock_make_ok2(env, ref); + } } + } else if (code == 0) { /* ok we are connected */ descP->state = SOCKET_STATE_CONNECTED; + enif_set_pid_undefined(&descP->connPid); + MON_INIT(&descP->connMon); descP->isReadable = TRUE; descP->isWritable = TRUE; @@ -5069,6 +5097,29 @@ BOOLEAN_T verify_is_connected(ESockDescriptor* descP, int* err) +/* *** is_connector *** + * Check if the current process is the connector process. + */ +#if !defined(__WIN32__) +static +BOOLEAN_T is_connector(ErlNifEnv* env, + ESockDescriptor* descP) +{ + ErlNifPid caller; + + if (enif_self(env, &caller) == NULL) + return FALSE; + + if (COMPARE_PIDS(&descP->connPid, &caller) == 0) + return TRUE; + else + return FALSE; + +} +#endif + + + /* ---------------------------------------------------------------------- * nif_listen * @@ -16873,7 +16924,9 @@ ESockDescriptor* alloc_descriptor(SOCKET sock, HANDLE event) char buf[64]; /* Buffer used for building the mutex name */ // This needs to be released when the socket is closed! - // descP->env = enif_alloc_env(); + + enif_set_pid_undefined(&descP->connPid); + MON_INIT(&descP->connMon); sprintf(buf, "esock[w,%d]", sock); descP->writeMtx = MCREATE(buf); diff --git a/erts/emulator/test/socket_SUITE.erl b/erts/emulator/test/socket_SUITE.erl index 2d7d9c4e08..0c29e3288c 100644 --- a/erts/emulator/test/socket_SUITE.erl +++ b/erts/emulator/test/socket_SUITE.erl @@ -82,6 +82,7 @@ api_b_sendmsg_and_recvmsg_tcpL/1, %% *** API async *** + api_a_connect_tcp4/1, api_a_sendto_and_recvfrom_udp4/1, api_a_sendmsg_and_recvmsg_udp4/1, api_a_send_and_recv_tcp4/1, @@ -680,6 +681,7 @@ api_basic_cases() -> api_async_cases() -> [ + api_a_connect_tcp4, api_a_sendto_and_recvfrom_udp4, api_a_sendmsg_and_recvmsg_udp4, api_a_send_and_recv_tcp4, @@ -2653,6 +2655,385 @@ api_b_send_and_recv_tcp(InitState) -> %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%% Basically establish a TCP connection via an async connect. + +api_a_connect_tcp4(suite) -> + []; +api_a_connect_tcp4(doc) -> + []; +api_a_connect_tcp4(_Config) when is_list(_Config) -> + ?TT(?SECS(10)), + tc_try(api_a_connect_tcp4, + fun() -> + Connect = fun(Sock, SockAddr) -> + socket:connect(Sock, SockAddr, nowait) + end, + InitState = #{domain => inet, + connect => Connect}, + ok = api_a_connect_tcp(InitState) + end). + + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +api_a_connect_tcp(InitState) -> + process_flag(trap_exit, true), + ServerSeq = + [ + %% *** Wait for start order *** + #{desc => "await start (from tester)", + cmd => fun(State) -> + Tester = ?SEV_AWAIT_START(), + {ok, State#{tester => Tester}} + end}, + #{desc => "monitor tester", + cmd => fun(#{tester := Tester}) -> + _MRef = erlang:monitor(process, Tester), + ok + end}, + + %% *** Init part *** + #{desc => "which local address", + cmd => fun(#{domain := Domain} = State) -> + LAddr = which_local_addr(Domain), + LSA = #{family => Domain, addr => LAddr}, + {ok, State#{lsa => LSA}} + end}, + #{desc => "create listen socket", + cmd => fun(#{domain := Domain} = State) -> + case socket:open(Domain, stream, tcp) of + {ok, Sock} -> + {ok, State#{lsock => Sock}}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "bind to local address", + cmd => fun(#{lsock := LSock, lsa := LSA} = State) -> + case socket:bind(LSock, LSA) of + {ok, Port} -> + {ok, State#{lport => Port}}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "make listen socket", + cmd => fun(#{lsock := LSock}) -> + socket:listen(LSock) + end}, + #{desc => "announce ready (init)", + cmd => fun(#{tester := Tester, lport := Port}) -> + ?SEV_ANNOUNCE_READY(Tester, init, Port), + ok + end}, + + %% The actual test + #{desc => "await continue (accept)", + cmd => fun(#{tester := Tester}) -> + ?SEV_AWAIT_CONTINUE(Tester, tester, accept) + end}, + #{desc => "await connection", + cmd => fun(#{lsock := LSock} = State) -> + case socket:accept(LSock) of + {ok, Sock} -> + ?SEV_IPRINT("accepted: ~n ~p", [Sock]), + {ok, State#{csock => Sock}}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "announce ready (accept)", + cmd => fun(#{tester := Tester}) -> + ?SEV_ANNOUNCE_READY(Tester, accept), + ok + end}, + + %% *** Termination *** + #{desc => "await terminate", + cmd => fun(#{tester := Tester} = State) -> + case ?SEV_AWAIT_TERMINATE(Tester, tester) of + ok -> + {ok, maps:remove(tester, State)}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "close connection socket", + cmd => fun(#{csock := Sock} = State) -> + ok = socket:close(Sock), + {ok, maps:remove(csock, State)} + end}, + #{desc => "close listen socket", + cmd => fun(#{lsock := Sock} = State) -> + ok = socket:close(Sock), + {ok, maps:remove(lsock, State)} + end}, + + %% *** We are done *** + ?SEV_FINISH_NORMAL + ], + + ClientSeq = + [ + %% *** Wait for start order *** + #{desc => "await start (from tester)", + cmd => fun(State) -> + {Tester, Port} = ?SEV_AWAIT_START(), + {ok, State#{tester => Tester, server_port => Port}} + end}, + #{desc => "monitor tester", + cmd => fun(#{tester := Tester}) -> + _MRef = erlang:monitor(process, Tester), + ok + end}, + + %% *** The init part *** + #{desc => "which server (local) address", + cmd => fun(#{domain := Domain, server_port := Port} = State) -> + LAddr = which_local_addr(Domain), + LSA = #{family => Domain, + addr => LAddr}, + SSA = LSA#{port => Port}, + {ok, State#{local_sa => LSA, server_sa => SSA}} + end}, + #{desc => "create socket", + cmd => fun(#{domain := Domain} = State) -> + case socket:open(Domain, stream, tcp) of + {ok, Sock} -> + {ok, State#{sock => Sock}}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "bind to local address", + cmd => fun(#{sock := Sock, local_sa := LSA} = _State) -> + case socket:bind(Sock, LSA) of + {ok, _Port} -> + ok; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "announce ready (init)", + cmd => fun(#{tester := Tester}) -> + ?SEV_ANNOUNCE_READY(Tester, init), + ok + end}, + + %% *** The actual test *** + #{desc => "await continue (async connect)", + cmd => fun(#{tester := Tester} = _State) -> + ?SEV_AWAIT_CONTINUE(Tester, tester, async_connect) + end}, + #{desc => "connect (async) to server", + cmd => fun(#{sock := Sock, + server_sa := SSA, + connect := Connect} = State) -> + case Connect(Sock, SSA) of + ok -> + {error, unexpected_success}; + {select, {select_info, ST, SR}} -> + ?SEV_IPRINT("select ->" + "~n tag: ~p" + "~n ref: ~p", [ST, SR]), + {ok, State#{connect_stag => ST, + connect_sref => SR}}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "announce ready (connect select)", + cmd => fun(#{tester := Tester}) -> + ?SEV_ANNOUNCE_READY(Tester, connect_select), + ok + end}, + #{desc => "await select message", + cmd => fun(#{sock := Sock, connect_sref := Ref}) -> + receive + {'$socket', Sock, select, Ref} -> + ?SEV_IPRINT("select message ->" + "~n ref: ~p", [Ref]), + ok + after 5000 -> + ?SEV_EPRINT("timeout: " + "~n message queue: ~p", + [mq()]), + {error, timeout} + end + end}, + #{desc => "announce ready (select)", + cmd => fun(#{tester := Tester}) -> + ?SEV_ANNOUNCE_READY(Tester, select), + ok + end}, + #{desc => "connect (async) to server", + cmd => fun(#{sock := Sock, server_sa := SSA, connect := Connect}) -> + case Connect(Sock, SSA) of + ok -> + ok = socket:setopt(Sock, otp, debug, false), + ok; + {select, SelectInfo} -> + {error, {unexpected_select, SelectInfo}}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "announce ready (connect)", + cmd => fun(#{tester := Tester}) -> + ?SEV_ANNOUNCE_READY(Tester, connect), + ok + end}, + #{desc => "get peername", + cmd => fun(#{sock := Sock} = _State) -> + case socket:peername(Sock) of + {ok, SockAddr} -> + ?SEV_IPRINT("Peer Name: ~p", [SockAddr]), + ok; + {error, _} = ERROR -> + ERROR + end + end}, + + %% *** Termination *** + #{desc => "await terminate", + cmd => fun(#{tester := Tester} = State) -> + case ?SEV_AWAIT_TERMINATE(Tester, tester) of + ok -> + {ok, maps:remove(tester, State)}; + {error, _} = ERROR -> + ERROR + end + end}, + #{desc => "close socket", + cmd => fun(#{sock := Sock} = State) -> + ok = socket:close(Sock), + State2 = maps:remove(sock, State), + State3 = maps:remove(connect_stag, State2), + State4 = maps:remove(connect_sref, State3), + {ok, State4} + end}, + + %% *** We are done *** + ?SEV_FINISH_NORMAL + ], + + TesterSeq = + [ + %% *** Init part *** + #{desc => "monitor server", + cmd => fun(#{server := Pid} = _State) -> + _MRef = erlang:monitor(process, Pid), + ok + end}, + #{desc => "monitor client", + cmd => fun(#{client := Pid} = _State) -> + _MRef = erlang:monitor(process, Pid), + ok + end}, + + %% Start the server + #{desc => "order server start", + cmd => fun(#{server := Pid} = _State) -> + ?SEV_ANNOUNCE_START(Pid), + ok + end}, + #{desc => "await server ready (init)", + cmd => fun(#{server := Pid} = State) -> + {ok, Port} = ?SEV_AWAIT_READY(Pid, server, init), + {ok, State#{server_port => Port}} + end}, + + %% Start the client + #{desc => "order client start", + cmd => fun(#{client := Pid, server_port := Port} = _State) -> + ?SEV_ANNOUNCE_START(Pid, Port), + ok + end}, + #{desc => "await client ready (init)", + cmd => fun(#{client := Pid} = _State) -> + ok = ?SEV_AWAIT_READY(Pid, client, init) + end}, + + + %% *** The actual test *** + #{desc => "order client to continue (async connect)", + cmd => fun(#{client := Client} = _State) -> + ?SEV_ANNOUNCE_CONTINUE(Client, async_connect), + ok + end}, + #{desc => "await client ready (connect select)", + cmd => fun(#{client := Client} = _State) -> + ?SEV_AWAIT_READY(Client, client, connect_select) + end}, + + ?SEV_SLEEP(?SECS(1)), + + #{desc => "order server to continue (with accept)", + cmd => fun(#{server := Server} = _State) -> + ?SEV_ANNOUNCE_CONTINUE(Server, accept), + ok + end}, + #{desc => "await client ready (select)", + cmd => fun(#{client := Client} = _State) -> + ?SEV_AWAIT_READY(Client, client, select) + end}, + #{desc => "await client ready (connect)", + cmd => fun(#{client := Client} = _State) -> + ?SEV_AWAIT_READY(Client, client, connect) + end}, + #{desc => "await server ready (accept)", + cmd => fun(#{server := Server} = _State) -> + ?SEV_AWAIT_READY(Server, server, accept) + end}, + + + %% *** Termination *** + #{desc => "order client to terminate", + cmd => fun(#{client := Client} = _State) -> + ?SEV_ANNOUNCE_TERMINATE(Client), + ok + end}, + #{desc => "await client termination", + cmd => fun(#{client := Client} = State) -> + ?SEV_AWAIT_TERMINATION(Client), + State1 = maps:remove(client, State), + {ok, State1} + end}, + #{desc => "order server to terminate", + cmd => fun(#{server := Server} = _State) -> + ?SEV_ANNOUNCE_TERMINATE(Server), + ok + end}, + #{desc => "await server termination", + cmd => fun(#{server := Server} = State) -> + ?SEV_AWAIT_TERMINATION(Server), + State1 = maps:remove(server, State), + {ok, State1} + end}, + + %% *** We are done *** + ?SEV_FINISH_NORMAL + ], + + i("start server evaluator"), + Server = ?SEV_START("server", ServerSeq, InitState), + + i("start client evaluator"), + Client = ?SEV_START("client", ClientSeq, InitState), + i("await evaluator(s)"), + + i("start tester evaluator"), + TesterInitState = #{server => Server#ev.pid, + client => Client#ev.pid}, + Tester = ?SEV_START("tester", TesterSeq, TesterInitState), + + ok = ?SEV_AWAIT_FINISH([Server, Client, Tester]). + + + + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + %% Basically send and receive on an IPv4 UDP (dgram) socket using %% sendto and recvfrom. But we try to be async. That is, we use %% the 'nowait' value for the Timeout argument (and await the eventual @@ -2787,7 +3168,7 @@ api_a_send_and_recv_udp(InitState) -> {ok, State#{recv_stag => Tag, recv_sref => RecvRef}}; {ok, X} -> - {error, {unexpected_select_info, X}}; + {error, {unexpected_succes, X}}; {error, _} = ERROR -> ERROR end diff --git a/erts/preloaded/ebin/socket.beam b/erts/preloaded/ebin/socket.beam Binary files differindex 0f438e4ca9..fd8d489f36 100644 --- a/erts/preloaded/ebin/socket.beam +++ b/erts/preloaded/ebin/socket.beam diff --git a/erts/preloaded/src/socket.erl b/erts/preloaded/src/socket.erl index 0b1b49653c..c8d044209d 100644 --- a/erts/preloaded/src/socket.erl +++ b/erts/preloaded/src/socket.erl @@ -1206,18 +1206,24 @@ validate_inet6_addrs(Addrs) -> %% -spec connect(Socket, SockAddr) -> ok | {error, Reason} when - Socket :: socket(), - SockAddr :: sockaddr(), - Reason :: term(). + Socket :: socket(), + SockAddr :: sockaddr(), + Reason :: term(). connect(Socket, SockAddr) -> connect(Socket, SockAddr, infinity). --spec connect(Socket, SockAddr, Timeout) -> ok | {error, Reason} when - Socket :: socket(), - SockAddr :: sockaddr(), - Timeout :: timeout(), - Reason :: term(). +-spec connect(Socket, SockAddr, nowait) -> + ok | {select, SelectInfo} | {error, Reason} when + Socket :: socket(), + SockAddr :: sockaddr(), + SelectInfo :: select_info(), + Reason :: term() + ; (Socket, SockAddr, Timeout) -> ok | {error, Reason} when + Socket :: socket(), + SockAddr :: sockaddr(), + Timeout :: timeout(), + Reason :: term(). %% <KOLLA> %% Is it possible to connect with family = local for the (dest) sockaddr? @@ -1227,12 +1233,18 @@ connect(_Socket, _SockAddr, Timeout) {error, timeout}; connect(#socket{ref = SockRef}, #{family := Fam} = SockAddr, Timeout) when ((Fam =:= inet) orelse (Fam =:= inet6) orelse (Fam =:= local)) andalso - ((Timeout =:= infinity) orelse is_integer(Timeout)) -> + ((Timeout =:= nowait) orelse + (Timeout =:= infinity) orelse is_integer(Timeout)) -> TS = timestamp(Timeout), case nif_connect(SockRef, SockAddr) of ok -> %% Connected! ok; + + {ok, Ref} when (Timeout =:= nowait) -> + %% Connecting, but the caller does not want to wait... + ?SELECT(connect, Ref); + {ok, Ref} -> %% Connecting... NewTimeout = next_timeout(TS, Timeout), |