aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--erts/doc/src/socket.xml37
-rw-r--r--erts/doc/src/socket_usage.xml29
-rw-r--r--erts/emulator/nifs/common/socket_nif.c73
-rw-r--r--erts/emulator/test/socket_SUITE.erl383
-rw-r--r--erts/preloaded/ebin/socket.beambin74796 -> 75044 bytes
-rw-r--r--erts/preloaded/src/socket.erl30
6 files changed, 505 insertions, 47 deletions
diff --git a/erts/doc/src/socket.xml b/erts/doc/src/socket.xml
index 47e84090ee..e5cf77663a 100644
--- a/erts/doc/src/socket.xml
+++ b/erts/doc/src/socket.xml
@@ -324,7 +324,7 @@
<p>In the case when there is no connections waiting, the function
will return with the <c>SelectInfo</c>. The caller can then await a
- select message, <c>{'$socket', Sock, select, Info}</c> (where
+ select message, <c>{'$socket', Socket, select, Info}</c> (where
<c>Info</c> is the <c>select_ref()</c> from the <c>SelectInfo</c>),
when a client connects (a subsequent call to accept will then return
the socket). </p>
@@ -377,7 +377,7 @@
<func>
<name name="connect" arity="2" since="OTP 22.0"/>
- <name name="connect" arity="3" since="OTP 22.0"/>
+ <name name="connect" arity="3" clause_i="2" since="OTP 22.0"/>
<fsummary>Initiate a connection on a socket.</fsummary>
<desc>
<p>This function connects the socket to the address
@@ -386,6 +386,24 @@
</func>
<func>
+ <name name="connect" arity="3" clause_i="1" anchor="connect_async" since="OTP @OTP-15731@"/>
+ <fsummary>Initiate a connection on a socket.</fsummary>
+ <desc>
+ <p>This function connects the socket to the address
+ specied by the <c>SockAddr</c> argument.</p>
+
+ <p>In the case when its not possible to immediately establish a
+ connection, the function will return with the
+ <seealso marker="#type-select_info"><c>SelectInfo</c></seealso>.
+ The caller can then await a
+ select message, <c>{'$socket', Socket, select, Info}</c> (where
+ <c>Info</c> is the <c>select_ref()</c> from the <c>SelectInfo</c>,
+ a subsequent call to connect will then
+ establish the connection). </p>
+ </desc>
+ </func>
+
+ <func>
<name name="getopt" arity="3" clause_i="1" since="OTP 22.0"/>
<name name="getopt" arity="3" clause_i="2" since="OTP 22.0"/>
<name name="getopt" arity="3" clause_i="3" since="OTP 22.0"/>
@@ -505,7 +523,7 @@
<p>In the case when there is no data waiting, the function
will return with the <c>SelectInfo</c>. The caller can then await a
- select message, <c>{'$socket', Sock, select, Info}</c> (where
+ select message, <c>{'$socket', Socket, select, Info}</c> (where
<c>Info</c> is the <c>select_ref()</c> from the <c>SelectInfo</c>),
when data has arrived (a subsequent call to recv will then return
the data). </p>
@@ -564,7 +582,7 @@
<p>In the case when there is no data waiting, the function
will return with the <c>SelectInfo</c>. The caller can then await a
- select message, <c>{'$socket', Sock, select, Info}</c> (where
+ select message, <c>{'$socket', Socket, select, Info}</c> (where
<c>Info</c> is the <c>select_ref()</c> from the <c>SelectInfo</c>),
when data has arrived (a subsequent call to recvfrom will then return
the data). </p>
@@ -640,7 +658,7 @@
<p>In the case when there is no data waiting, the function
will return with the <c>SelectInfo</c>. The caller can then await a
- select message, <c>{'$socket', Sock, select, Info}</c> (where
+ select message, <c>{'$socket', Socket, select, Info}</c> (where
<c>Info</c> is the <c>select_ref()</c> from the <c>SelectInfo</c>),
when data has arrived (a subsequent call to recvmsg will then return
the data). </p>
@@ -667,7 +685,8 @@
<p>In the case when there is no room in the (system-) buffers,
the function will return with the <c>SelectInfo</c>. The caller
- can then await a select message, <c>{'$socket', Sock, select, Info}</c>
+ can then await a select message,
+ <c>{'$socket', Socket, select, Info}</c>
(where <c>Info</c> is the <c>select_ref()</c> from the
<c>SelectInfo</c>), when there is room for more data (a subsequent
call to send will then send the data). </p>
@@ -724,7 +743,8 @@
<p>In the case when there is no room in the (system-) buffers,
the function will return with the <c>SelectInfo</c>. The caller
- can then await a select message, <c>{'$socket', Sock, select, Info}</c>
+ can then await a select message,
+ <c>{'$socket', Socket, select, Info}</c>
(where <c>Info</c> is the <c>select_ref()</c> from the
<c>SelectInfo</c>), when there is room for more data (a subsequent
call to sendmsg will then send the data). </p>
@@ -751,7 +771,8 @@
<p>In the case when there is no room in the (system-) buffers,
the function will return with the <c>SelectInfo</c>. The caller
- can then await a select message, <c>{'$socket', Sock, select, Info}</c>
+ can then await a select message,
+ <c>{'$socket', Socket, select, Info}</c>
(where <c>Info</c> is the <c>select_ref()</c> from the
<c>SelectInfo</c>), when there is room for more data (a subsequent
call to sendto will then send the data). </p>
diff --git a/erts/doc/src/socket_usage.xml b/erts/doc/src/socket_usage.xml
index 9d8141b822..7e65bcbf70 100644
--- a/erts/doc/src/socket_usage.xml
+++ b/erts/doc/src/socket_usage.xml
@@ -43,6 +43,7 @@
<title>Asynchronous calls</title>
<p>Some functions allow for an <i>asynchronous</i> call
(<seealso marker="socket#accept_async"><c>accept/2</c></seealso>,
+ <seealso marker="socket#connect_async"><c>connect/3</c></seealso>,
<seealso marker="socket#recv_async"><c>recv/3,4</c></seealso>,
<seealso marker="socket#recvfrom_async"><c>recvfrom/3,4</c></seealso>,
<seealso marker="socket#recvmsg_async"><c>recvmsg/2,3,5</c></seealso>,
@@ -52,21 +53,17 @@
This is achieved by setting the <c>Timeout</c> argument to
<c>nowait</c>. For instance, if calling the
<seealso marker="socket#recv_async"><c>recv/3</c></seealso>
- function with Timeout set to <c>nowait</c> (i.e. <c>recv(Sock, 0, nowait)</c>)
+ function with Timeout set to <c>nowait</c> (i.e.
+ <c>recv(Sock, 0, nowait)</c>)
when there is actually nothing to read, it will return with
<c>{ok, </c>
<seealso marker="socket#type-select_info"><c>SelectInfo</c></seealso><c>}</c>.
When data eventually arrives a 'select message'
will be sent to the caller:</p>
- <!--
- <list type="ordered">
- <item><c>{'$socket', Sock, select, Info}</c></item>
- </list>
- -->
<taglist>
<!-- NOTE THAT THE EMPTY TAG IS INTENTIONAL -->
<tag></tag>
- <item><c>{'$socket', Sock, select, Info}</c></item>
+ <item><c>{'$socket', socket(), select, select_ref()}</c></item>
</taglist>
<p>The caller can then make another
call to the recv function and now expect data.</p>
@@ -79,24 +76,18 @@
<taglist>
<!-- NOTE THAT THE EMPTY TAG IS INTENTIONAL -->
<tag></tag>
- <item><c>{'$socket', Sock, abort, Info}</c></item>
+ <item><c>{'$socket', socket(), abort, Info}</c></item>
</taglist>
<p>If the operation is aborted
for whatever reason (e.g. if the socket is closed "by someone else").
The <c>Info</c> part contains the abort reason (in this case that
- the socket has been closed <c>Info = {Ref, closed}</c>). </p>
+ the socket has been closed <c>Info = {select_ref(), closed}</c>). </p>
<p>Note that all other users are <em>locked out</em> until the
'current user' has called the function (recv in this case). So either
immediately call the function or
<seealso marker="socket#cancel/2"><c>cancel</c></seealso>. </p>
+
<p>The general form of the 'socket' message is: </p>
- <!--
- <list type="ordered">
- <item>
- <c>{'$socket', Socket :: socket(), Tag :: atom(), Info :: term()}</c>
- </item>
- </list>
- -->
<taglist>
<!-- NOTE THAT THE EMPTY TAG IS INTENTIONAL -->
<tag></tag>
@@ -110,15 +101,15 @@
</row>
<row>
<cell>select</cell>
- <cell>reference()</cell>
+ <cell>select_ref()</cell>
</row>
<row>
<cell>abort</cell>
- <cell>{reference(), Reason :: term()}</cell>
+ <cell>{select_ref(), Reason :: term()}</cell>
</row>
<tcaption>socket message info value type</tcaption>
</table>
- <p>The <c>reference()</c> is the same as was received in the
+ <p>The <c>select_ref()</c> is the same as was received in the
<seealso marker="socket#type-select_info"><c>SelectInfo</c></seealso>. </p>
</section>
</section>
diff --git a/erts/emulator/nifs/common/socket_nif.c b/erts/emulator/nifs/common/socket_nif.c
index 25bc712949..1a8ce89b7b 100644
--- a/erts/emulator/nifs/common/socket_nif.c
+++ b/erts/emulator/nifs/common/socket_nif.c
@@ -825,6 +825,10 @@ typedef struct {
ErlNifPid ctrlPid;
ESockMonitor ctrlMon;
+ /* +++ Connector process +++ */
+ ErlNifPid connPid;
+ ESockMonitor connMon;
+
/* +++ Write stuff +++ */
ErlNifMutex* writeMtx;
ESockRequestor currentWriter;
@@ -2377,6 +2381,8 @@ static int socket_setopt(int sock,
const void* optVal,
const socklen_t optLen);
+static BOOLEAN_T is_connector(ErlNifEnv* env,
+ ESockDescriptor* descP);
static BOOLEAN_T verify_is_connected(ESockDescriptor* descP, int* err);
static ESockDescriptor* alloc_descriptor(SOCKET sock, HANDLE event);
@@ -4904,7 +4910,7 @@ ERL_NIF_TERM nconnect(ErlNifEnv* env,
return esock_make_error(env, atom_eisconn);
}
- if (IS_CONNECTING(descP)) {
+ if (IS_CONNECTING(descP) && !is_connector(env, descP)) {
SSDBG( descP, ("SOCKET", "nif_connect -> already connecting\r\n") );
return esock_make_error(env, esock_atom_einval);
}
@@ -4926,19 +4932,41 @@ ERL_NIF_TERM nconnect(ErlNifEnv* env,
((save_errno == ERRNO_BLOCK) || /* Winsock2 */
(save_errno == EINPROGRESS))) { /* Unix & OSE!! */
ref = MKREF(env);
- descP->state = SOCKET_STATE_CONNECTING;
- if ((sres = esock_select_write(env, descP->sock, descP, NULL,
- sockRef, ref)) < 0) {
- res = esock_make_error(env,
- MKT2(env,
- esock_atom_select_failed,
- MKI(env, sres)));
- } else {
+
+ if (IS_CONNECTING(descP)) {
+ /* Glitch */
res = esock_make_ok2(env, ref);
+ } else {
+
+ /* First time here */
+
+ if (enif_self(env, &descP->connPid) == NULL)
+ return esock_make_error(env, atom_exself);
+
+ if (MONP("nconnect -> conn",
+ env, descP,
+ &descP->connPid,
+ &descP->connMon) != 0)
+ return esock_make_error(env, atom_exmon);
+
+ descP->state = SOCKET_STATE_CONNECTING;
+
+ if ((sres = esock_select_write(env, descP->sock, descP, NULL,
+ sockRef, ref)) < 0) {
+ res = esock_make_error(env,
+ MKT2(env,
+ esock_atom_select_failed,
+ MKI(env, sres)));
+ } else {
+ res = esock_make_ok2(env, ref);
+ }
}
+
} else if (code == 0) { /* ok we are connected */
descP->state = SOCKET_STATE_CONNECTED;
+ enif_set_pid_undefined(&descP->connPid);
+ MON_INIT(&descP->connMon);
descP->isReadable = TRUE;
descP->isWritable = TRUE;
@@ -5069,6 +5097,29 @@ BOOLEAN_T verify_is_connected(ESockDescriptor* descP, int* err)
+/* *** is_connector ***
+ * Check if the current process is the connector process.
+ */
+#if !defined(__WIN32__)
+static
+BOOLEAN_T is_connector(ErlNifEnv* env,
+ ESockDescriptor* descP)
+{
+ ErlNifPid caller;
+
+ if (enif_self(env, &caller) == NULL)
+ return FALSE;
+
+ if (COMPARE_PIDS(&descP->connPid, &caller) == 0)
+ return TRUE;
+ else
+ return FALSE;
+
+}
+#endif
+
+
+
/* ----------------------------------------------------------------------
* nif_listen
*
@@ -16873,7 +16924,9 @@ ESockDescriptor* alloc_descriptor(SOCKET sock, HANDLE event)
char buf[64]; /* Buffer used for building the mutex name */
// This needs to be released when the socket is closed!
- // descP->env = enif_alloc_env();
+
+ enif_set_pid_undefined(&descP->connPid);
+ MON_INIT(&descP->connMon);
sprintf(buf, "esock[w,%d]", sock);
descP->writeMtx = MCREATE(buf);
diff --git a/erts/emulator/test/socket_SUITE.erl b/erts/emulator/test/socket_SUITE.erl
index 2d7d9c4e08..0c29e3288c 100644
--- a/erts/emulator/test/socket_SUITE.erl
+++ b/erts/emulator/test/socket_SUITE.erl
@@ -82,6 +82,7 @@
api_b_sendmsg_and_recvmsg_tcpL/1,
%% *** API async ***
+ api_a_connect_tcp4/1,
api_a_sendto_and_recvfrom_udp4/1,
api_a_sendmsg_and_recvmsg_udp4/1,
api_a_send_and_recv_tcp4/1,
@@ -680,6 +681,7 @@ api_basic_cases() ->
api_async_cases() ->
[
+ api_a_connect_tcp4,
api_a_sendto_and_recvfrom_udp4,
api_a_sendmsg_and_recvmsg_udp4,
api_a_send_and_recv_tcp4,
@@ -2653,6 +2655,385 @@ api_b_send_and_recv_tcp(InitState) ->
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+%% Basically establish a TCP connection via an async connect.
+
+api_a_connect_tcp4(suite) ->
+ [];
+api_a_connect_tcp4(doc) ->
+ [];
+api_a_connect_tcp4(_Config) when is_list(_Config) ->
+ ?TT(?SECS(10)),
+ tc_try(api_a_connect_tcp4,
+ fun() ->
+ Connect = fun(Sock, SockAddr) ->
+ socket:connect(Sock, SockAddr, nowait)
+ end,
+ InitState = #{domain => inet,
+ connect => Connect},
+ ok = api_a_connect_tcp(InitState)
+ end).
+
+
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+
+api_a_connect_tcp(InitState) ->
+ process_flag(trap_exit, true),
+ ServerSeq =
+ [
+ %% *** Wait for start order ***
+ #{desc => "await start (from tester)",
+ cmd => fun(State) ->
+ Tester = ?SEV_AWAIT_START(),
+ {ok, State#{tester => Tester}}
+ end},
+ #{desc => "monitor tester",
+ cmd => fun(#{tester := Tester}) ->
+ _MRef = erlang:monitor(process, Tester),
+ ok
+ end},
+
+ %% *** Init part ***
+ #{desc => "which local address",
+ cmd => fun(#{domain := Domain} = State) ->
+ LAddr = which_local_addr(Domain),
+ LSA = #{family => Domain, addr => LAddr},
+ {ok, State#{lsa => LSA}}
+ end},
+ #{desc => "create listen socket",
+ cmd => fun(#{domain := Domain} = State) ->
+ case socket:open(Domain, stream, tcp) of
+ {ok, Sock} ->
+ {ok, State#{lsock => Sock}};
+ {error, _} = ERROR ->
+ ERROR
+ end
+ end},
+ #{desc => "bind to local address",
+ cmd => fun(#{lsock := LSock, lsa := LSA} = State) ->
+ case socket:bind(LSock, LSA) of
+ {ok, Port} ->
+ {ok, State#{lport => Port}};
+ {error, _} = ERROR ->
+ ERROR
+ end
+ end},
+ #{desc => "make listen socket",
+ cmd => fun(#{lsock := LSock}) ->
+ socket:listen(LSock)
+ end},
+ #{desc => "announce ready (init)",
+ cmd => fun(#{tester := Tester, lport := Port}) ->
+ ?SEV_ANNOUNCE_READY(Tester, init, Port),
+ ok
+ end},
+
+ %% The actual test
+ #{desc => "await continue (accept)",
+ cmd => fun(#{tester := Tester}) ->
+ ?SEV_AWAIT_CONTINUE(Tester, tester, accept)
+ end},
+ #{desc => "await connection",
+ cmd => fun(#{lsock := LSock} = State) ->
+ case socket:accept(LSock) of
+ {ok, Sock} ->
+ ?SEV_IPRINT("accepted: ~n ~p", [Sock]),
+ {ok, State#{csock => Sock}};
+ {error, _} = ERROR ->
+ ERROR
+ end
+ end},
+ #{desc => "announce ready (accept)",
+ cmd => fun(#{tester := Tester}) ->
+ ?SEV_ANNOUNCE_READY(Tester, accept),
+ ok
+ end},
+
+ %% *** Termination ***
+ #{desc => "await terminate",
+ cmd => fun(#{tester := Tester} = State) ->
+ case ?SEV_AWAIT_TERMINATE(Tester, tester) of
+ ok ->
+ {ok, maps:remove(tester, State)};
+ {error, _} = ERROR ->
+ ERROR
+ end
+ end},
+ #{desc => "close connection socket",
+ cmd => fun(#{csock := Sock} = State) ->
+ ok = socket:close(Sock),
+ {ok, maps:remove(csock, State)}
+ end},
+ #{desc => "close listen socket",
+ cmd => fun(#{lsock := Sock} = State) ->
+ ok = socket:close(Sock),
+ {ok, maps:remove(lsock, State)}
+ end},
+
+ %% *** We are done ***
+ ?SEV_FINISH_NORMAL
+ ],
+
+ ClientSeq =
+ [
+ %% *** Wait for start order ***
+ #{desc => "await start (from tester)",
+ cmd => fun(State) ->
+ {Tester, Port} = ?SEV_AWAIT_START(),
+ {ok, State#{tester => Tester, server_port => Port}}
+ end},
+ #{desc => "monitor tester",
+ cmd => fun(#{tester := Tester}) ->
+ _MRef = erlang:monitor(process, Tester),
+ ok
+ end},
+
+ %% *** The init part ***
+ #{desc => "which server (local) address",
+ cmd => fun(#{domain := Domain, server_port := Port} = State) ->
+ LAddr = which_local_addr(Domain),
+ LSA = #{family => Domain,
+ addr => LAddr},
+ SSA = LSA#{port => Port},
+ {ok, State#{local_sa => LSA, server_sa => SSA}}
+ end},
+ #{desc => "create socket",
+ cmd => fun(#{domain := Domain} = State) ->
+ case socket:open(Domain, stream, tcp) of
+ {ok, Sock} ->
+ {ok, State#{sock => Sock}};
+ {error, _} = ERROR ->
+ ERROR
+ end
+ end},
+ #{desc => "bind to local address",
+ cmd => fun(#{sock := Sock, local_sa := LSA} = _State) ->
+ case socket:bind(Sock, LSA) of
+ {ok, _Port} ->
+ ok;
+ {error, _} = ERROR ->
+ ERROR
+ end
+ end},
+ #{desc => "announce ready (init)",
+ cmd => fun(#{tester := Tester}) ->
+ ?SEV_ANNOUNCE_READY(Tester, init),
+ ok
+ end},
+
+ %% *** The actual test ***
+ #{desc => "await continue (async connect)",
+ cmd => fun(#{tester := Tester} = _State) ->
+ ?SEV_AWAIT_CONTINUE(Tester, tester, async_connect)
+ end},
+ #{desc => "connect (async) to server",
+ cmd => fun(#{sock := Sock,
+ server_sa := SSA,
+ connect := Connect} = State) ->
+ case Connect(Sock, SSA) of
+ ok ->
+ {error, unexpected_success};
+ {select, {select_info, ST, SR}} ->
+ ?SEV_IPRINT("select ->"
+ "~n tag: ~p"
+ "~n ref: ~p", [ST, SR]),
+ {ok, State#{connect_stag => ST,
+ connect_sref => SR}};
+ {error, _} = ERROR ->
+ ERROR
+ end
+ end},
+ #{desc => "announce ready (connect select)",
+ cmd => fun(#{tester := Tester}) ->
+ ?SEV_ANNOUNCE_READY(Tester, connect_select),
+ ok
+ end},
+ #{desc => "await select message",
+ cmd => fun(#{sock := Sock, connect_sref := Ref}) ->
+ receive
+ {'$socket', Sock, select, Ref} ->
+ ?SEV_IPRINT("select message ->"
+ "~n ref: ~p", [Ref]),
+ ok
+ after 5000 ->
+ ?SEV_EPRINT("timeout: "
+ "~n message queue: ~p",
+ [mq()]),
+ {error, timeout}
+ end
+ end},
+ #{desc => "announce ready (select)",
+ cmd => fun(#{tester := Tester}) ->
+ ?SEV_ANNOUNCE_READY(Tester, select),
+ ok
+ end},
+ #{desc => "connect (async) to server",
+ cmd => fun(#{sock := Sock, server_sa := SSA, connect := Connect}) ->
+ case Connect(Sock, SSA) of
+ ok ->
+ ok = socket:setopt(Sock, otp, debug, false),
+ ok;
+ {select, SelectInfo} ->
+ {error, {unexpected_select, SelectInfo}};
+ {error, _} = ERROR ->
+ ERROR
+ end
+ end},
+ #{desc => "announce ready (connect)",
+ cmd => fun(#{tester := Tester}) ->
+ ?SEV_ANNOUNCE_READY(Tester, connect),
+ ok
+ end},
+ #{desc => "get peername",
+ cmd => fun(#{sock := Sock} = _State) ->
+ case socket:peername(Sock) of
+ {ok, SockAddr} ->
+ ?SEV_IPRINT("Peer Name: ~p", [SockAddr]),
+ ok;
+ {error, _} = ERROR ->
+ ERROR
+ end
+ end},
+
+ %% *** Termination ***
+ #{desc => "await terminate",
+ cmd => fun(#{tester := Tester} = State) ->
+ case ?SEV_AWAIT_TERMINATE(Tester, tester) of
+ ok ->
+ {ok, maps:remove(tester, State)};
+ {error, _} = ERROR ->
+ ERROR
+ end
+ end},
+ #{desc => "close socket",
+ cmd => fun(#{sock := Sock} = State) ->
+ ok = socket:close(Sock),
+ State2 = maps:remove(sock, State),
+ State3 = maps:remove(connect_stag, State2),
+ State4 = maps:remove(connect_sref, State3),
+ {ok, State4}
+ end},
+
+ %% *** We are done ***
+ ?SEV_FINISH_NORMAL
+ ],
+
+ TesterSeq =
+ [
+ %% *** Init part ***
+ #{desc => "monitor server",
+ cmd => fun(#{server := Pid} = _State) ->
+ _MRef = erlang:monitor(process, Pid),
+ ok
+ end},
+ #{desc => "monitor client",
+ cmd => fun(#{client := Pid} = _State) ->
+ _MRef = erlang:monitor(process, Pid),
+ ok
+ end},
+
+ %% Start the server
+ #{desc => "order server start",
+ cmd => fun(#{server := Pid} = _State) ->
+ ?SEV_ANNOUNCE_START(Pid),
+ ok
+ end},
+ #{desc => "await server ready (init)",
+ cmd => fun(#{server := Pid} = State) ->
+ {ok, Port} = ?SEV_AWAIT_READY(Pid, server, init),
+ {ok, State#{server_port => Port}}
+ end},
+
+ %% Start the client
+ #{desc => "order client start",
+ cmd => fun(#{client := Pid, server_port := Port} = _State) ->
+ ?SEV_ANNOUNCE_START(Pid, Port),
+ ok
+ end},
+ #{desc => "await client ready (init)",
+ cmd => fun(#{client := Pid} = _State) ->
+ ok = ?SEV_AWAIT_READY(Pid, client, init)
+ end},
+
+
+ %% *** The actual test ***
+ #{desc => "order client to continue (async connect)",
+ cmd => fun(#{client := Client} = _State) ->
+ ?SEV_ANNOUNCE_CONTINUE(Client, async_connect),
+ ok
+ end},
+ #{desc => "await client ready (connect select)",
+ cmd => fun(#{client := Client} = _State) ->
+ ?SEV_AWAIT_READY(Client, client, connect_select)
+ end},
+
+ ?SEV_SLEEP(?SECS(1)),
+
+ #{desc => "order server to continue (with accept)",
+ cmd => fun(#{server := Server} = _State) ->
+ ?SEV_ANNOUNCE_CONTINUE(Server, accept),
+ ok
+ end},
+ #{desc => "await client ready (select)",
+ cmd => fun(#{client := Client} = _State) ->
+ ?SEV_AWAIT_READY(Client, client, select)
+ end},
+ #{desc => "await client ready (connect)",
+ cmd => fun(#{client := Client} = _State) ->
+ ?SEV_AWAIT_READY(Client, client, connect)
+ end},
+ #{desc => "await server ready (accept)",
+ cmd => fun(#{server := Server} = _State) ->
+ ?SEV_AWAIT_READY(Server, server, accept)
+ end},
+
+
+ %% *** Termination ***
+ #{desc => "order client to terminate",
+ cmd => fun(#{client := Client} = _State) ->
+ ?SEV_ANNOUNCE_TERMINATE(Client),
+ ok
+ end},
+ #{desc => "await client termination",
+ cmd => fun(#{client := Client} = State) ->
+ ?SEV_AWAIT_TERMINATION(Client),
+ State1 = maps:remove(client, State),
+ {ok, State1}
+ end},
+ #{desc => "order server to terminate",
+ cmd => fun(#{server := Server} = _State) ->
+ ?SEV_ANNOUNCE_TERMINATE(Server),
+ ok
+ end},
+ #{desc => "await server termination",
+ cmd => fun(#{server := Server} = State) ->
+ ?SEV_AWAIT_TERMINATION(Server),
+ State1 = maps:remove(server, State),
+ {ok, State1}
+ end},
+
+ %% *** We are done ***
+ ?SEV_FINISH_NORMAL
+ ],
+
+ i("start server evaluator"),
+ Server = ?SEV_START("server", ServerSeq, InitState),
+
+ i("start client evaluator"),
+ Client = ?SEV_START("client", ClientSeq, InitState),
+ i("await evaluator(s)"),
+
+ i("start tester evaluator"),
+ TesterInitState = #{server => Server#ev.pid,
+ client => Client#ev.pid},
+ Tester = ?SEV_START("tester", TesterSeq, TesterInitState),
+
+ ok = ?SEV_AWAIT_FINISH([Server, Client, Tester]).
+
+
+
+
+%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
+
%% Basically send and receive on an IPv4 UDP (dgram) socket using
%% sendto and recvfrom. But we try to be async. That is, we use
%% the 'nowait' value for the Timeout argument (and await the eventual
@@ -2787,7 +3168,7 @@ api_a_send_and_recv_udp(InitState) ->
{ok, State#{recv_stag => Tag,
recv_sref => RecvRef}};
{ok, X} ->
- {error, {unexpected_select_info, X}};
+ {error, {unexpected_succes, X}};
{error, _} = ERROR ->
ERROR
end
diff --git a/erts/preloaded/ebin/socket.beam b/erts/preloaded/ebin/socket.beam
index 0f438e4ca9..fd8d489f36 100644
--- a/erts/preloaded/ebin/socket.beam
+++ b/erts/preloaded/ebin/socket.beam
Binary files differ
diff --git a/erts/preloaded/src/socket.erl b/erts/preloaded/src/socket.erl
index 0b1b49653c..c8d044209d 100644
--- a/erts/preloaded/src/socket.erl
+++ b/erts/preloaded/src/socket.erl
@@ -1206,18 +1206,24 @@ validate_inet6_addrs(Addrs) ->
%%
-spec connect(Socket, SockAddr) -> ok | {error, Reason} when
- Socket :: socket(),
- SockAddr :: sockaddr(),
- Reason :: term().
+ Socket :: socket(),
+ SockAddr :: sockaddr(),
+ Reason :: term().
connect(Socket, SockAddr) ->
connect(Socket, SockAddr, infinity).
--spec connect(Socket, SockAddr, Timeout) -> ok | {error, Reason} when
- Socket :: socket(),
- SockAddr :: sockaddr(),
- Timeout :: timeout(),
- Reason :: term().
+-spec connect(Socket, SockAddr, nowait) ->
+ ok | {select, SelectInfo} | {error, Reason} when
+ Socket :: socket(),
+ SockAddr :: sockaddr(),
+ SelectInfo :: select_info(),
+ Reason :: term()
+ ; (Socket, SockAddr, Timeout) -> ok | {error, Reason} when
+ Socket :: socket(),
+ SockAddr :: sockaddr(),
+ Timeout :: timeout(),
+ Reason :: term().
%% <KOLLA>
%% Is it possible to connect with family = local for the (dest) sockaddr?
@@ -1227,12 +1233,18 @@ connect(_Socket, _SockAddr, Timeout)
{error, timeout};
connect(#socket{ref = SockRef}, #{family := Fam} = SockAddr, Timeout)
when ((Fam =:= inet) orelse (Fam =:= inet6) orelse (Fam =:= local)) andalso
- ((Timeout =:= infinity) orelse is_integer(Timeout)) ->
+ ((Timeout =:= nowait) orelse
+ (Timeout =:= infinity) orelse is_integer(Timeout)) ->
TS = timestamp(Timeout),
case nif_connect(SockRef, SockAddr) of
ok ->
%% Connected!
ok;
+
+ {ok, Ref} when (Timeout =:= nowait) ->
+ %% Connecting, but the caller does not want to wait...
+ ?SELECT(connect, Ref);
+
{ok, Ref} ->
%% Connecting...
NewTimeout = next_timeout(TS, Timeout),