aboutsummaryrefslogtreecommitdiffstats
path: root/erts
diff options
context:
space:
mode:
authorMicael Karlberg <[email protected]>2018-09-19 18:21:47 +0200
committerMicael Karlberg <[email protected]>2018-09-19 18:22:41 +0200
commite01a856c993b55c3fbc76fd429783d4aad5bfc80 (patch)
tree5f379d3d987703295133a6489bbbe97ac5a1f9b7 /erts
parenta866bd04c5ce5f418f0e11685713af2992ef0ce8 (diff)
downloadotp-e01a856c993b55c3fbc76fd429783d4aad5bfc80.tar.gz
otp-e01a856c993b55c3fbc76fd429783d4aad5bfc80.tar.bz2
otp-e01a856c993b55c3fbc76fd429783d4aad5bfc80.zip
[socket-nif] Add proper connect and accept timeout handling
Added proper connect and accept timeout handling. Made use of the enif_select(mode = cancel) feature. Each time a timeout expires, the previous operation (connect or accept) has to be cancelled (actually its the select operation that has to be cancelled). Only partial implementation of cancel for now (connect and accept). More to follow... OTP-14831
Diffstat (limited to 'erts')
-rw-r--r--erts/emulator/nifs/common/socket_nif.c400
-rw-r--r--erts/preloaded/ebin/socket.beambin66152 -> 65868 bytes
-rw-r--r--erts/preloaded/src/socket.erl111
3 files changed, 409 insertions, 102 deletions
diff --git a/erts/emulator/nifs/common/socket_nif.c b/erts/emulator/nifs/common/socket_nif.c
index 5fca0eb58b..fde2349234 100644
--- a/erts/emulator/nifs/common/socket_nif.c
+++ b/erts/emulator/nifs/common/socket_nif.c
@@ -737,6 +737,7 @@ typedef struct {
SocketAddress remote;
unsigned int addrLen;
+ ErlNifEnv* env;
/* +++ Controller (owner) process +++ */
ErlNifPid ctrlPid;
@@ -777,7 +778,7 @@ typedef struct {
size_t rBufSz; // Read buffer size (when data length = 0 is specified)
size_t rCtrlSz; // Read control buffer size
size_t wCtrlSz; // Write control buffer size
- BOOLEAN_T iow; // Inform On Wrap
+ BOOLEAN_T iow; // Inform On (counter) Wrap
BOOLEAN_T dbg;
/* +++ Close stuff +++ */
@@ -915,11 +916,9 @@ static ERL_NIF_TERM nif_finalize_connection(ErlNifEnv* env,
static ERL_NIF_TERM nif_finalize_close(ErlNifEnv* env,
int argc,
const ERL_NIF_TERM argv[]);
-/*
static ERL_NIF_TERM nif_cancel(ErlNifEnv* env,
int argc,
const ERL_NIF_TERM argv[]);
-*/
static ERL_NIF_TERM nopen(ErlNifEnv* env,
@@ -983,7 +982,6 @@ static ERL_NIF_TERM nclose(ErlNifEnv* env,
static ERL_NIF_TERM nshutdown(ErlNifEnv* env,
SocketDescriptor* descP,
int how);
-
static ERL_NIF_TERM nsetopt(ErlNifEnv* env,
SocketDescriptor* descP,
BOOLEAN_T isEncoded,
@@ -1823,6 +1821,38 @@ static ERL_NIF_TERM nsockname(ErlNifEnv* env,
SocketDescriptor* descP);
static ERL_NIF_TERM npeername(ErlNifEnv* env,
SocketDescriptor* descP);
+static ERL_NIF_TERM ncancel(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ ERL_NIF_TERM op,
+ ERL_NIF_TERM opRef);
+static ERL_NIF_TERM ncancel_connect(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ ERL_NIF_TERM opRef);
+static ERL_NIF_TERM ncancel_accept(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ ERL_NIF_TERM opRef);
+static ERL_NIF_TERM ncancel_accept_current(ErlNifEnv* env,
+ SocketDescriptor* descP);
+static ERL_NIF_TERM ncancel_accept_waiting(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ ERL_NIF_TERM opRef);
+static ERL_NIF_TERM ncancel_send(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ ERL_NIF_TERM opRef);
+static ERL_NIF_TERM ncancel_recv(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ ERL_NIF_TERM opRef);
+static ERL_NIF_TERM ncancel_read_select(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ ERL_NIF_TERM opRef);
+static ERL_NIF_TERM ncancel_write_select(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ ERL_NIF_TERM opRef);
+static ERL_NIF_TERM ncancel_mode_select(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ ERL_NIF_TERM opRef,
+ int smode,
+ int rmode);
static ERL_NIF_TERM nsetopt_str_opt(ErlNifEnv* env,
SocketDescriptor* descP,
@@ -2089,6 +2119,9 @@ static BOOLEAN_T acceptor_pop(ErlNifEnv* env,
ErlNifPid* pid,
ErlNifMonitor* mon,
ERL_NIF_TERM* ref);
+static BOOLEAN_T acceptor_unqueue(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ const ErlNifPid* pid);
static BOOLEAN_T qsearch4pid(ErlNifEnv* env,
SocketRequestQueue* q,
@@ -2241,8 +2274,10 @@ static char str_exsend[] = "exsend"; // failed send
/* *** "Global" Atoms *** */
+ERL_NIF_TERM esock_atom_accept;
ERL_NIF_TERM esock_atom_addr;
ERL_NIF_TERM esock_atom_any;
+ERL_NIF_TERM esock_atom_connect;
ERL_NIF_TERM esock_atom_credentials;
ERL_NIF_TERM esock_atom_ctrl;
ERL_NIF_TERM esock_atom_ctrunc;
@@ -2267,6 +2302,7 @@ ERL_NIF_TERM esock_atom_local;
ERL_NIF_TERM esock_atom_loopback;
ERL_NIF_TERM esock_atom_lowdelay;
ERL_NIF_TERM esock_atom_mincost;
+ERL_NIF_TERM esock_atom_not_found;
ERL_NIF_TERM esock_atom_ok;
ERL_NIF_TERM esock_atom_oob;
ERL_NIF_TERM esock_atom_origdstaddr;
@@ -2276,11 +2312,18 @@ ERL_NIF_TERM esock_atom_port;
ERL_NIF_TERM esock_atom_protocol;
ERL_NIF_TERM esock_atom_raw;
ERL_NIF_TERM esock_atom_rdm;
-ERL_NIF_TERM esock_atom_rights;
+ERL_NIF_TERM esock_atom_recv;
+ERL_NIF_TERM esock_atom_recvfrom;
+ERL_NIF_TERM esock_atom_recvmsg;
ERL_NIF_TERM esock_atom_reliability;
+ERL_NIF_TERM esock_atom_rights;
ERL_NIF_TERM esock_atom_scope_id;
ERL_NIF_TERM esock_atom_sctp;
ERL_NIF_TERM esock_atom_sec;
+ERL_NIF_TERM esock_atom_select_sent;
+ERL_NIF_TERM esock_atom_send;
+ERL_NIF_TERM esock_atom_sendmsg;
+ERL_NIF_TERM esock_atom_sendto;
ERL_NIF_TERM esock_atom_seqpacket;
ERL_NIF_TERM esock_atom_socket;
ERL_NIF_TERM esock_atom_spec_dst;
@@ -3228,7 +3271,8 @@ ERL_NIF_TERM naccept_listening(ErlNifEnv* env,
&descP->currentAcceptor.mon) > 0)
return esock_make_error(env, atom_exmon);
- descP->currentAcceptor.ref = ref;
+ descP->currentAcceptor.ref = enif_make_copy(descP->env, ref);
+ descP->currentAcceptorP = &descP->currentAcceptor;
SELECT(env,
descP->sock,
@@ -3326,7 +3370,8 @@ ERL_NIF_TERM naccept_listening(ErlNifEnv* env,
/* *** naccept_accepting ***
* We have an active acceptor and possibly acceptors waiting in queue.
- * At the moment the queue is *not* implemented.
+ * If the pid of the calling process is not the pid of the "current process",
+ * push the requester onto the queue.
*/
static
ERL_NIF_TERM naccept_accepting(ErlNifEnv* env,
@@ -3353,11 +3398,8 @@ ERL_NIF_TERM naccept_accepting(ErlNifEnv* env,
"\r\n", caller, descP->currentAcceptor.pid) );
if (!compare_pids(env, &descP->currentAcceptor.pid, &caller)) {
- /* This will have to do until we implement the queue.
- * When we have the queue, we should simply push this request,
- * and instead return with eagain (the caller will then wait
- * for the select message).
- */
+
+ /* Not the "current caller", so (maybe) push onto queue */
SSDBG( descP, ("SOCKET", "naccept_accepting -> not (active) acceptor\r\n") );
@@ -10420,6 +10462,304 @@ ERL_NIF_TERM npeername(ErlNifEnv* env,
/* ----------------------------------------------------------------------
+ * nif_cancel
+ *
+ * Description:
+ * Cancel a previous select!
+ *
+ * Arguments:
+ * Socket (ref) - Points to the socket descriptor.
+ * Operation (atom) - What kind of operation (accept, send, ...) is to be cancelled
+ * Ref (ref) - Unique id for the operation
+ */
+static
+ERL_NIF_TERM nif_cancel(ErlNifEnv* env,
+ int argc,
+ const ERL_NIF_TERM argv[])
+{
+ SocketDescriptor* descP;
+ ERL_NIF_TERM op, opRef, result;
+
+ SGDBG( ("SOCKET", "nif_cancel -> entry with argc: %d\r\n", argc) );
+
+ /* Extract arguments and perform preliminary validation */
+
+ if ((argc != 3) ||
+ !enif_get_resource(env, argv[0], sockets, (void**) &descP)) {
+ return enif_make_badarg(env);
+ }
+ op = argv[1];
+ opRef = argv[2];
+
+ SSDBG( descP,
+ ("SOCKET", "nif_cancel -> args when sock = %d:"
+ "\r\n op: %T"
+ "\r\n opRef: %T"
+ "\r\n", descP->sock, op, opRef) );
+
+ result = ncancel(env, descP, op, opRef);
+
+ SSDBG( descP,
+ ("SOCKET", "nif_cancel -> done with result: "
+ "\r\n %T"
+ "\r\n", result) );
+
+ return result;
+
+}
+
+
+static
+ERL_NIF_TERM ncancel(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ ERL_NIF_TERM op,
+ ERL_NIF_TERM opRef)
+{
+ /* <KOLLA>
+ *
+ * Do we really need all these variants? Should it not be enough with:
+ *
+ * connect | accept | send | recv
+ *
+ * </KOLLA>
+ */
+ if (COMPARE(op, esock_atom_connect) == 0) {
+ return ncancel_connect(env, descP, opRef);
+ } else if (COMPARE(op, esock_atom_accept) == 0) {
+ return ncancel_accept(env, descP, opRef);
+ } else if (COMPARE(op, esock_atom_send) == 0) {
+ return ncancel_send(env, descP, opRef);
+ } else if (COMPARE(op, esock_atom_sendto) == 0) {
+ return ncancel_send(env, descP, opRef);
+ } else if (COMPARE(op, esock_atom_sendmsg) == 0) {
+ return ncancel_send(env, descP, opRef);
+ } else if (COMPARE(op, esock_atom_recv) == 0) {
+ return ncancel_recv(env, descP, opRef);
+ } else if (COMPARE(op, esock_atom_recvfrom) == 0) {
+ return ncancel_recv(env, descP, opRef);
+ } else if (COMPARE(op, esock_atom_recvmsg) == 0) {
+ return ncancel_recv(env, descP, opRef);
+ } else {
+ return esock_make_error(env, esock_atom_einval);
+ }
+}
+
+
+
+/* *** ncancel_connect ***
+ *
+ *
+ */
+static
+ERL_NIF_TERM ncancel_connect(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ ERL_NIF_TERM opRef)
+{
+ return ncancel_write_select(env, descP, opRef);
+}
+
+
+/* *** ncancel_accept ***
+ *
+ * We have two different cases:
+ * *) Its the current acceptor
+ * Cancel the select!
+ * We need to activate one of the waiting acceptors.
+ * *) Its one of the acceptors ("waiting") in the queue
+ * Simply remove the acceptor from the queue.
+ *
+ */
+static
+ERL_NIF_TERM ncancel_accept(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ ERL_NIF_TERM opRef)
+{
+ ERL_NIF_TERM res;
+
+ SSDBG( descP,
+ ("SOCKET", "ncancel_accept -> entry with"
+ "\r\n opRef: %T"
+ "\r\n %s"
+ "\r\n", opRef,
+ ((descP->currentAcceptorP == NULL) ? "without acceptor" : "with acceptor")) );
+
+ MLOCK(descP->accMtx);
+
+ if (descP->currentAcceptorP != NULL) {
+ if (COMPARE(opRef, descP->currentAcceptor.ref) == 0) {
+ res = ncancel_accept_current(env, descP);
+ } else {
+ res = ncancel_accept_waiting(env, descP, opRef);
+ }
+ } else {
+ /* Or badarg? */
+ res = esock_make_error(env, esock_atom_einval);
+ }
+
+ MUNLOCK(descP->accMtx);
+
+ SSDBG( descP,
+ ("SOCKET", "ncancel_accept -> done with result:"
+ "\r\n %T"
+ "\r\n", res) );
+
+ return res;
+}
+
+
+/* The current process has an ongoing select we first must
+ * cancel. Then we must re-activate the "first" (the first
+ * in the acceptor queue).
+ */
+static
+ERL_NIF_TERM ncancel_accept_current(ErlNifEnv* env,
+ SocketDescriptor* descP)
+{
+ ERL_NIF_TERM res;
+
+ SSDBG( descP, ("SOCKET", "ncancel_accept_current -> entry\r\n") );
+
+ res = ncancel_read_select(env, descP, descP->currentAcceptor.ref);
+
+ SSDBG( descP, ("SOCKET", "ncancel_accept_current -> cancel res: %T\r\n", res) );
+
+ if (acceptor_pop(env, descP,
+ &descP->currentAcceptor.pid,
+ &descP->currentAcceptor.mon,
+ &descP->currentAcceptor.ref)) {
+
+ /* There was another one */
+
+ SSDBG( descP, ("SOCKET", "ncancel_accept_current -> new (active) acceptor: "
+ "\r\n pid: %T"
+ "\r\n ref: %T"
+ "\r\n",
+ descP->currentAcceptor.pid,
+ descP->currentAcceptor.ref) );
+
+ SELECT(env,
+ descP->sock,
+ (ERL_NIF_SELECT_READ),
+ descP, &descP->currentAcceptor.pid, descP->currentAcceptor.ref);
+
+ } else {
+ SSDBG( descP, ("SOCKET", "ncancel_accept_current -> no more acceptors\r\n") );
+ descP->currentAcceptorP = NULL;
+ descP->state = SOCKET_STATE_LISTENING;
+ }
+
+ SSDBG( descP, ("SOCKET", "ncancel_accept_current -> done with result:"
+ "\r\n %T"
+ "\r\n", res) );
+
+ return res;
+}
+
+
+/* These processes have not performed a select, so we can simply
+ * remove them from the acceptor queue.
+ */
+static
+ERL_NIF_TERM ncancel_accept_waiting(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ ERL_NIF_TERM opRef)
+{
+ ErlNifPid caller;
+
+ if (enif_self(env, &caller) == NULL)
+ return esock_make_error(env, atom_exself);
+
+ /* unqueue request from (acceptor) queue */
+
+ if (acceptor_unqueue(env, descP, &caller)) {
+ return esock_atom_ok;
+ } else {
+ /* Race? */
+ return esock_make_error(env, esock_atom_not_found);
+ }
+}
+
+
+
+/* *** ncancel_send ***
+ *
+ *
+ */
+static
+ERL_NIF_TERM ncancel_send(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ ERL_NIF_TERM opRef)
+{
+ return esock_make_error(env, esock_atom_einval);
+}
+
+
+
+/* *** ncancel_recv ***
+ *
+ *
+ */
+static
+ERL_NIF_TERM ncancel_recv(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ ERL_NIF_TERM opRef)
+{
+ return esock_make_error(env, esock_atom_einval);
+}
+
+
+
+static
+ERL_NIF_TERM ncancel_read_select(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ ERL_NIF_TERM opRef)
+{
+ return ncancel_mode_select(env, descP, opRef,
+ ERL_NIF_SELECT_READ,
+ ERL_NIF_SELECT_READ_CANCELLED);
+}
+
+
+static
+ERL_NIF_TERM ncancel_write_select(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ ERL_NIF_TERM opRef)
+{
+ return ncancel_mode_select(env, descP, opRef,
+ ERL_NIF_SELECT_WRITE,
+ ERL_NIF_SELECT_WRITE_CANCELLED);
+}
+
+
+static
+ERL_NIF_TERM ncancel_mode_select(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ ERL_NIF_TERM opRef,
+ int smode,
+ int rmode)
+{
+ int selectRes = enif_select(env, descP->sock,
+ (ERL_NIF_SELECT_CANCEL | smode),
+ descP, NULL, opRef);
+
+ if (selectRes & rmode) {
+ /* Was cancelled */
+ return esock_atom_ok;
+ } else if (selectRes > 0) {
+ /* Has already sent the message */
+ return esock_make_error(env, esock_atom_select_sent);
+ } else {
+ /* Stopped? */
+ SSDBG( descP, ("SOCKET", "ncancel_mode_select -> failed: %d (0x%lX)"
+ "\r\n", selectRes, selectRes) );
+ return esock_make_error(env, esock_atom_einval);
+ }
+
+}
+
+
+
+/* ----------------------------------------------------------------------
* U t i l i t y F u n c t i o n s
* ----------------------------------------------------------------------
*/
@@ -10537,8 +10877,9 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env,
}
/* There is a special case: If the provided 'to read' value is
- * zero (0). That means that we reads as much as we can, using
- * the default read buffer size.
+ * zero (0) (only for type =/= stream).
+ * That means that we reads as much as we can, using the default
+ * read buffer size.
*/
if (bufP->size == read) {
@@ -12537,6 +12878,8 @@ SocketDescriptor* alloc_descriptor(SOCKET sock, HANDLE event)
if ((descP = enif_alloc_resource(sockets, sizeof(SocketDescriptor))) != NULL) {
char buf[64]; /* Buffer used for building the mutex name */
+ descP->env = enif_alloc_env();
+
sprintf(buf, "socket[w,%d]", sock);
descP->writeMtx = MCREATE(buf);
descP->currentWriterP = NULL; // currentWriter not used
@@ -13161,7 +13504,7 @@ ERL_NIF_TERM acceptor_push(ErlNifEnv* env,
SocketRequestor* reqP = &e->data;
reqP->pid = pid;
- reqP->ref = ref;
+ reqP->ref = enif_make_copy(descP->env, ref);
if (MONP(env, descP, &pid, &reqP->mon) > 0) {
FREE(reqP);
@@ -13205,6 +13548,19 @@ BOOLEAN_T acceptor_pop(ErlNifEnv* env,
}
+/* *** acceptor unqueue ***
+ *
+ * Remove an acceptor from the acceptor queue.
+ */
+static
+BOOLEAN_T acceptor_unqueue(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ const ErlNifPid* pid)
+{
+ return qunqueue(env, &descP->acceptorsQ, pid);
+}
+
+
static
BOOLEAN_T qsearch4pid(ErlNifEnv* env,
SocketRequestQueue* q,
@@ -13647,7 +14003,7 @@ void socket_down(ErlNifEnv* env,
"socket_down -> "
"not current acceptor - maybe a waiting acceptor\r\n") );
- qunqueue(env, &descP->acceptorsQ, pid);
+ acceptor_unqueue(env, descP, pid);
}
}
@@ -13697,7 +14053,7 @@ ErlNifFunc socket_funcs[] =
* is called after the connect *select* has "completed".
*/
{"nif_finalize_connection", 1, nif_finalize_connection, 0},
- // {"nif_cancel", 2, nif_cancel, 0},
+ {"nif_cancel", 3, nif_cancel, 0},
{"nif_finalize_close", 1, nif_finalize_close, ERL_NIF_DIRTY_JOB_IO_BOUND}
};
@@ -13817,8 +14173,10 @@ int on_load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info)
atom_want = MKA(env, str_want);
/* Global atom(s) */
+ esock_atom_accept = MKA(env, "accept");
esock_atom_addr = MKA(env, "addr");
esock_atom_any = MKA(env, "any");
+ esock_atom_connect = MKA(env, "connect");
esock_atom_credentials = MKA(env, "credentials");
esock_atom_ctrl = MKA(env, "ctrl");
esock_atom_ctrunc = MKA(env, "ctrunc");
@@ -13843,6 +14201,7 @@ int on_load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info)
esock_atom_loopback = MKA(env, "loopback");
esock_atom_lowdelay = MKA(env, "lowdelay");
esock_atom_mincost = MKA(env, "mincost");
+ esock_atom_not_found = MKA(env, "not_found");
esock_atom_ok = MKA(env, "ok");
esock_atom_oob = MKA(env, "oob");
esock_atom_origdstaddr = MKA(env, "origdstaddr");
@@ -13852,11 +14211,18 @@ int on_load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info)
esock_atom_protocol = MKA(env, "protocol");
esock_atom_raw = MKA(env, "raw");
esock_atom_rdm = MKA(env, "rdm");
+ esock_atom_recv = MKA(env, "recv");
+ esock_atom_recvfrom = MKA(env, "recvfrom");
+ esock_atom_recvmsg = MKA(env, "recvmsg");
esock_atom_reliability = MKA(env, "reliability");
esock_atom_rights = MKA(env, "rights");
esock_atom_scope_id = MKA(env, "scope_id");
esock_atom_sctp = MKA(env, "sctp");
esock_atom_sec = MKA(env, "sec");
+ esock_atom_select_sent = MKA(env, "select_sent");
+ esock_atom_send = MKA(env, "send");
+ esock_atom_sendmsg = MKA(env, "sendmsg");
+ esock_atom_sendto = MKA(env, "sendto");
esock_atom_seqpacket = MKA(env, "seqpacket");
esock_atom_socket = MKA(env, "socket");
esock_atom_spec_dst = MKA(env, "spec_dst");
diff --git a/erts/preloaded/ebin/socket.beam b/erts/preloaded/ebin/socket.beam
index e6a33337ba..2475dce37b 100644
--- a/erts/preloaded/ebin/socket.beam
+++ b/erts/preloaded/ebin/socket.beam
Binary files differ
diff --git a/erts/preloaded/src/socket.erl b/erts/preloaded/src/socket.erl
index ad7a35694b..1c16c94711 100644
--- a/erts/preloaded/src/socket.erl
+++ b/erts/preloaded/src/socket.erl
@@ -1137,7 +1137,7 @@ connect(#socket{ref = SockRef}, #{family := Fam} = SockAddr, Timeout)
%% </KOLLA>
nif_finalize_connection(SockRef)
after NewTimeout ->
- nif_cancel(SockRef, connect, Ref),
+ cancel(SockRef, connect, Ref),
{error, timeout}
end;
{error, _} = ERROR ->
@@ -1145,7 +1145,6 @@ connect(#socket{ref = SockRef}, #{family := Fam} = SockAddr, Timeout)
end.
-
%% ===========================================================================
%%
%% listen - listen for connections on a socket
@@ -1227,13 +1226,12 @@ do_accept(LSockRef, Timeout) ->
{error, Reason}
after NewTimeout ->
- nif_cancel(LSockRef, accept, AccRef),
- flush_select_msgs(LSockRef, AccRef),
+ cancel(LSockRef, accept, AccRef),
{error, timeout}
end;
{error, _} = ERROR ->
- nif_cancel(LSockRef, accept, AccRef), % Just to be on the safe side...
+ cancel(LSockRef, accept, AccRef), % Just to be on the safe side...
ERROR
end.
@@ -1305,8 +1303,7 @@ do_send(SockRef, Data, EFlags, Timeout) ->
{error, Reason}
after NewTimeout ->
- nif_cancel(SockRef, send, SendRef),
- flush_select_msgs(SockRef, SendRef),
+ cancel(SockRef, send, SendRef),
{error, {timeout, size(Data)}}
end;
{error, eagain} ->
@@ -1319,8 +1316,7 @@ do_send(SockRef, Data, EFlags, Timeout) ->
{error, Reason}
after Timeout ->
- nif_cancel(SockRef, send, SendRef),
- flush_select_msgs(SockRef, SendRef),
+ cancel(SockRef, send, SendRef),
{error, {timeout, size(Data)}}
end;
@@ -1403,8 +1399,7 @@ do_sendto(SockRef, Data, Dest, EFlags, Timeout) ->
{error, Reason}
after Timeout ->
- nif_cancel(SockRef, sendto, SendRef),
- flush_select_msgs(SockRef, SendRef),
+ cancel(SockRef, sendto, SendRef),
{error, timeout}
end;
@@ -1414,8 +1409,7 @@ do_sendto(SockRef, Data, Dest, EFlags, Timeout) ->
do_sendto(SockRef, Data, Dest, EFlags,
next_timeout(TS, Timeout))
after Timeout ->
- nif_cancel(SockRef, sendto, SendRef),
- flush_select_msgs(SockRef, SendRef),
+ cancel(SockRef, sendto, SendRef),
{error, timeout}
end;
@@ -1497,8 +1491,7 @@ do_sendmsg(SockRef, MsgHdr, EFlags, Timeout) ->
do_sendmsg(SockRef, MsgHdr, EFlags,
next_timeout(TS, Timeout))
after Timeout ->
- nif_cancel(SockRef, sendmsg, SendRef),
- flush_select_msgs(SockRef, SendRef),
+ cancel(SockRef, sendmsg, SendRef),
{error, timeout}
end;
@@ -1519,62 +1512,6 @@ ensure_msghdr(_) ->
%% writev - write data into multiple buffers
%%
-%% send(Socket, Data, Flags, Timeout)
-%% when (is_list(Data) orelse is_binary(Data)) andalso is_list(Flags) ->
-%% IOVec = erlang:iolist_to_iovec(Data),
-%% EFlags = enc_send_flags(Flags),
-%% send_iovec(Socket, IOVec, EFlags, Timeout).
-
-
-%% %% Iterate over the IO-vector (list of binaries).
-
-%% send_iovec(_Socket, [] = _IOVec, _EFlags, _Timeout) ->
-%% ok;
-%% send_iovec({socket, _, SockRef} = Socket, [Bin|IOVec], EFlags, Timeout) ->
-%% case do_send(SockRef, make_ref(), Bin, EFlags, Timeout) of
-%% {ok, NewTimeout} ->
-%% send_iovec(Socket, IOVec, EFlags, NewTimeout);
-%% {error, _} = ERROR ->
-%% ERROR
-%% end.
-
-
-%% do_send(SockRef, SendRef, Data, _EFlags, Timeout)
-%% when (Timeout < 0) ->
-%% nif_cancel(SockRef, SendRef),
-%% flush_select_msgs(SockRef, SendRef),
-%% {error, {timeout, size(Data)}};
-%% do_send(SockRef, SendRef, Data, EFlags, Timeout) ->
-%% TS = timestamp(Timeout),
-%% case nif_send(SockRef, SendRef, Data, EFlags) of
-%% ok ->
-%% {ok, next_timeout(TS, Timeout)};
-%% {ok, Written} ->
-%% %% We are partially done, wait for continuation
-%% receive
-%% {select, SockRef, SendRef, ready_output} ->
-%% <<_:Written/binary, Rest/binary>> = Data,
-%% do_send(SockRef, make_ref(), Rest, EFlags,
-%% next_timeout(TS, Timeout))
-%% after Timeout ->
-%% nif_cancel(SockRef, SendRef),
-%% flush_select_msgs(SockRef, SendRef),
-%% {error, timeout}
-%% end;
-%% {error, eagain} ->
-%% receive
-%% {select, SockRef, SendRef, ready_output} ->
-%% do_send(SockRef, SendRef, Data, EFlags,
-%% next_timeout(TS, Timeout))
-%% after Timeout ->
-%% nif_cancel(SockRef, SendRef),
-%% flush_select_msgs(SockRef, SendRef),
-%% {error, timeout}
-%% end;
-
-%% {error, _} = ERROR ->
-%% ERROR
-%% end.
%% ===========================================================================
@@ -1695,8 +1632,7 @@ do_recv(SockRef, _OldRef, Length, EFlags, Acc, Timeout)
after NewTimeout ->
- nif_cancel(SockRef, recv, RecvRef),
- flush_select_msgs(SockRef, RecvRef),
+ cancel(SockRef, recv, RecvRef),
{error, {timeout, Acc}}
end;
@@ -1715,8 +1651,7 @@ do_recv(SockRef, _OldRef, Length, EFlags, Acc, Timeout)
after NewTimeout ->
- nif_cancel(SockRef, recv, RecvRef),
- flush_select_msgs(SockRef, RecvRef),
+ cancel(SockRef, recv, RecvRef),
{error, {timeout, Acc}}
end;
@@ -1739,8 +1674,7 @@ do_recv(SockRef, _OldRef, Length, EFlags, Acc, Timeout)
{error, Reason}
after NewTimeout ->
- nif_cancel(SockRef, recv, RecvRef),
- flush_select_msgs(SockRef, RecvRef),
+ cancel(SockRef, recv, RecvRef),
{error, timeout}
end;
@@ -1765,7 +1699,7 @@ do_recv(SockRef, RecvRef, 0 = _Length, _Eflags, Acc, _Timeout) ->
%% The current recv operation is to be cancelled, so no need for a ref...
%% The cancel will end our 'read everything you have' and "activate"
%% any waiting reader.
- nif_cancel(SockRef, recv, RecvRef),
+ cancel(SockRef, recv, RecvRef),
{ok, Acc};
do_recv(_SockRef, _RecvRef, _Length, _EFlags, Acc, _Timeout) when (size(Acc) > 0) ->
{error, {timeout, Acc}};
@@ -1878,8 +1812,7 @@ do_recvfrom(SockRef, BufSz, EFlags, Timeout) ->
{error, Reason}
after NewTimeout ->
- nif_cancel(SockRef, recvfrom, RecvRef),
- flush_select_msgs(SockRef, RecvRef),
+ cancel(SockRef, recvfrom, RecvRef),
{error, timeout}
end;
@@ -1966,8 +1899,7 @@ do_recvmsg(SockRef, BufSz, CtrlSz, EFlags, Timeout) ->
{error, Reason}
after NewTimeout ->
- nif_cancel(SockRef, recvmsg, RecvRef),
- flush_select_msgs(SockRef, RecvRef),
+ cancel(SockRef, recvmsg, RecvRef),
{error, timeout}
end;
@@ -3325,10 +3257,19 @@ ensure_sockaddr(_SockAddr) ->
-flush_select_msgs(LSRef, Ref) ->
+cancel(SockRef, Op, OpRef) ->
+ case nif_cancel(SockRef, Op, OpRef) of
+ %% The select has already completed
+ {error, select_sent} ->
+ flush_select_msgs(SockRef, OpRef);
+ Other ->
+ Other
+ end.
+
+flush_select_msgs(SockRef, Ref) ->
receive
- {select, LSRef, Ref, _} ->
- flush_select_msgs(LSRef, Ref)
+ {select, SockRef, Ref, _} ->
+ flush_select_msgs(SockRef, Ref)
after 0 ->
ok
end.