aboutsummaryrefslogtreecommitdiffstats
path: root/erts
diff options
context:
space:
mode:
authorMicael Karlberg <[email protected]>2019-02-22 18:53:19 +0100
committerMicael Karlberg <[email protected]>2019-02-22 19:45:54 +0100
commit1a1deb10194c2609fc0a1019518abfad40601cae (patch)
tree6bec70017cb82c17077c14df7465c26bb25c97a8 /erts
parent4ff181e69fc129576333c04a9d1d0c97b6770347 (diff)
downloadotp-1a1deb10194c2609fc0a1019518abfad40601cae.tar.gz
otp-1a1deb10194c2609fc0a1019518abfad40601cae.tar.bz2
otp-1a1deb10194c2609fc0a1019518abfad40601cae.zip
[socket] Cleanup and accept restructure
Some cleanup (of open, bind, connect) and rewrote the accept code (moved the code into smaller functions). OTP-14831
Diffstat (limited to 'erts')
-rw-r--r--erts/emulator/nifs/common/socket_nif.c781
1 files changed, 469 insertions, 312 deletions
diff --git a/erts/emulator/nifs/common/socket_nif.c b/erts/emulator/nifs/common/socket_nif.c
index d860cb4965..d56b70e3fd 100644
--- a/erts/emulator/nifs/common/socket_nif.c
+++ b/erts/emulator/nifs/common/socket_nif.c
@@ -18,9 +18,22 @@
* %CopyrightEnd%
*
* ----------------------------------------------------------------------
- * Purpose : The NIF (C) part of the socket interface
+ * Purpose : The NIF (C) part of the socket interface
+ *
+ * All of the nif-functions which are part of the API has two parts.
+ * The first function is called 'nif_<something>', e.g. nif_open.
+ * This does the initial validation and argument processing and then
+ * calls the function that does the actual work. This is called
+ * 'n<something>'.
* ----------------------------------------------------------------------
*
+ *
+ * This is just a code snippet in case there is need of extra debugging
+ *
+ * esock_dbg_printf("DEMONP", "[%d] %s: %T\r\n",
+ * descP->sock, slogan,
+ * my_make_monitor_term(env, &monP->mon));
+ *
*/
#define STATIC_ERLANG_NIF 1
@@ -745,7 +758,6 @@ typedef struct {
typedef struct {
ErlNifPid pid; // PID of the requesting process
- // ErlNifMonitor mon; Monitor to the requesting process
ESockMonitor mon; // Monitor to the requesting process
ERL_NIF_TERM ref; // The (unique) reference (ID) of the request
} SocketRequestor;
@@ -832,7 +844,6 @@ typedef struct {
/* +++ Close stuff +++ */
ErlNifMutex* closeMtx;
ErlNifPid closerPid;
- // ErlNifMonitor closerMon;
ESockMonitor closerMon;
ErlNifEnv* closeEnv;
ERL_NIF_TERM closeRef;
@@ -841,42 +852,15 @@ typedef struct {
} SocketDescriptor;
-#define SOCKET_OPT_VALUE_UNDEF 0
-#define SOCKET_OPT_VALUE_BOOL 1
-#define SOCKET_OPT_VALUE_INT 2
-#define SOCKET_OPT_VALUE_LINGER 3
-#define SOCKET_OPT_VALUE_BIN 4
-#define SOCKET_OPT_VALUE_STR 5
-
-typedef struct {
- unsigned int tag;
- union {
- BOOLEAN_T boolVal;
- int intVal;
- struct linger lingerVal;
- ErlNifBinary binVal;
- struct {
- unsigned int len;
- char* str;
- } strVal;
- } u;
- /*
- void* optValP; // Points to the actual data (above)
- socklen_t optValLen; // The size of the option value
- */
-} SocketOptValue;
-
-
-/* Global stuff (do we really need to "collect"
- * these things?)
+/* Global stuff.
*/
typedef struct {
/* These are for debugging, testing and the like */
- ERL_NIF_TERM version;
- ERL_NIF_TERM buildDate;
+ // ERL_NIF_TERM version;
+ // ERL_NIF_TERM buildDate;
BOOLEAN_T dbg;
- BOOLEAN_T iow;
+ BOOLEAN_T iow; // Where do we send this? Subscription?
ErlNifMutex* cntMtx;
Uint32 numSockets;
Uint32 numTypeStreams;
@@ -998,15 +982,51 @@ static ERL_NIF_TERM nconnect(ErlNifEnv* env,
static ERL_NIF_TERM nlisten(ErlNifEnv* env,
SocketDescriptor* descP,
int backlog);
+static ERL_NIF_TERM naccept(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ ERL_NIF_TERM ref);
static ERL_NIF_TERM naccept_listening(ErlNifEnv* env,
SocketDescriptor* descP,
ERL_NIF_TERM ref);
+static ERL_NIF_TERM naccept_listening_error(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ ERL_NIF_TERM ref,
+ ErlNifPid caller,
+ int save_errno);
+static ERL_NIF_TERM naccept_listening_accept(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ SOCKET accSock,
+ ErlNifPid caller,
+ SocketAddress* remote);
static ERL_NIF_TERM naccept_accepting(ErlNifEnv* env,
SocketDescriptor* descP,
ERL_NIF_TERM ref);
-static ERL_NIF_TERM naccept(ErlNifEnv* env,
- SocketDescriptor* descP,
- ERL_NIF_TERM ref);
+static ERL_NIF_TERM naccept_accepting_current(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ ERL_NIF_TERM ref);
+static ERL_NIF_TERM naccept_accepting_current_accept(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ SOCKET accSock,
+ SocketAddress* remote);
+static ERL_NIF_TERM naccept_accepting_current_error(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ ERL_NIF_TERM ref,
+ int save_errno);
+static ERL_NIF_TERM naccept_accepting_other(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ ERL_NIF_TERM ref,
+ ErlNifPid caller);
+static ERL_NIF_TERM naccept_busy_retry(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ ERL_NIF_TERM ref,
+ ErlNifPid* pid,
+ unsigned int nextState);
+static BOOLEAN_T naccept_accepted(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ SOCKET accSock,
+ ErlNifPid pid,
+ SocketAddress* remote,
+ ERL_NIF_TERM* result);
static ERL_NIF_TERM nsend(ErlNifEnv* env,
SocketDescriptor* descP,
ERL_NIF_TERM sockRef,
@@ -4170,6 +4190,8 @@ ERL_NIF_TERM nsupports_ipv6(ErlNifEnv* env)
* Extra - A map with "obscure" options.
* Currently the only allowed option is netns (network namespace).
* This is *only* allowed on linux!
+ * We sould also use this for the fd value, in case we should use
+ * an already existing (file) descriptor.
*/
static
ERL_NIF_TERM nif_open(ErlNifEnv* env,
@@ -4206,17 +4228,17 @@ ERL_NIF_TERM nif_open(ErlNifEnv* env,
"\r\n", argv[0], argv[1], eproto, emap) );
if (!edomain2domain(edomain, &domain)) {
- SGDBG( ("SOCKET", "nif_open -> domain: %d\r\n", domain) );
+ SGDBG( ("SOCKET", "nif_open -> invalid domain: %d\r\n", edomain) );
return esock_make_error(env, esock_atom_einval);
}
if (!etype2type(etype, &type)) {
- SGDBG( ("SOCKET", "nif_open -> type: %d\r\n", type) );
+ SGDBG( ("SOCKET", "nif_open -> invalid type: %d\r\n", etype) );
return esock_make_error(env, esock_atom_einval);
}
if (!eproto2proto(env, eproto, &proto)) {
- SGDBG( ("SOCKET", "nif_open -> protocol: %d\r\n", proto) );
+ SGDBG( ("SOCKET", "nif_open -> invalid protocol: %d\r\n", eproto) );
return esock_make_error(env, esock_atom_einval);
}
@@ -4230,6 +4252,7 @@ ERL_NIF_TERM nif_open(ErlNifEnv* env,
netns = NULL;
#endif
+
result = nopen(env, domain, type, proto, netns);
SGDBG( ("SOCKET", "nif_open -> done with result: "
@@ -4237,6 +4260,7 @@ ERL_NIF_TERM nif_open(ErlNifEnv* env,
"\r\n", result) );
return result;
+
#endif // if defined(__WIN32__)
}
@@ -4315,7 +4339,9 @@ ERL_NIF_TERM nopen(ErlNifEnv* env,
descP->type = type;
descP->protocol = protocol;
- /* Does this apply to other types? Such as RAW? */
+ /* Does this apply to other types? Such as RAW?
+ * Also, is this really correct? Should we not wait for bind?
+ */
if (type == SOCK_DGRAM) {
descP->isReadable = TRUE;
descP->isWritable = TRUE;
@@ -4330,7 +4356,7 @@ ERL_NIF_TERM nopen(ErlNifEnv* env,
enif_release_resource(descP);
/* Keep track of the creator
- * This should not be a problem but just in case
+ * This should not be a problem, but just in case
* the *open* function is used with the wrong kind
* of environment...
*/
@@ -4508,6 +4534,7 @@ ERL_NIF_TERM nif_bind(ErlNifEnv* env,
return esock_make_error_str(env, xres);
return nbind(env, descP, &sockAddr, addrLen);
+
#endif // if defined(__WIN32__)
}
@@ -4574,7 +4601,7 @@ ERL_NIF_TERM nif_connect(ErlNifEnv* env,
return enif_raise_exception(env, MKA(env, "notsup"));
#else
SocketDescriptor* descP;
- ERL_NIF_TERM eSockAddr;
+ ERL_NIF_TERM res, eSockAddr;
char* xres;
SGDBG( ("SOCKET", "nif_connect -> entry with argc: %d\r\n", argc) );
@@ -4587,9 +4614,6 @@ ERL_NIF_TERM nif_connect(ErlNifEnv* env,
}
eSockAddr = argv[1];
- if (IS_CLOSED(descP) || IS_CLOSING(descP))
- return esock_make_error(env, atom_closed);
-
SSDBG( descP,
("SOCKET", "nif_connect -> args when sock = %d:"
"\r\n Socket: %T"
@@ -4601,16 +4625,16 @@ ERL_NIF_TERM nif_connect(ErlNifEnv* env,
return esock_make_error_str(env, xres);
}
- /*
- * <KOLLA>
- *
- * We should lock both the read and write mutex:es...
- *
- * </KOLLA>
- *
- */
- return nconnect(env, descP);
+ MLOCK(descP->readMtx);
+ MLOCK(descP->writeMtx);
+
+ res = nconnect(env, descP);
+
+ MUNLOCK(descP->writeMtx);
+ MUNLOCK(descP->readMtx);
+
+ return res;
#endif // if !defined(__WIN32__)
}
@@ -4621,13 +4645,16 @@ static
ERL_NIF_TERM nconnect(ErlNifEnv* env,
SocketDescriptor* descP)
{
- ERL_NIF_TERM res;
+ ERL_NIF_TERM res, ref;
int code, sres, save_errno = 0;
/*
* Verify that we are where in the proper state
*/
+ if (IS_CLOSED(descP) || IS_CLOSING(descP))
+ return esock_make_error(env, atom_closed);
+
if (!IS_OPEN(descP)) {
SSDBG( descP, ("SOCKET", "nif_connect -> not open\r\n") );
return esock_make_error(env, atom_exbadstate);
@@ -4643,6 +4670,11 @@ ERL_NIF_TERM nconnect(ErlNifEnv* env,
return esock_make_error(env, esock_atom_einval);
}
+
+ /*
+ * And attempt to connect
+ */
+
code = sock_connect(descP->sock,
(struct sockaddr*) &descP->remote,
descP->addrLen);
@@ -4654,8 +4686,7 @@ ERL_NIF_TERM nconnect(ErlNifEnv* env,
if (IS_SOCKET_ERROR(code) &&
((save_errno == ERRNO_BLOCK) || /* Winsock2 */
(save_errno == EINPROGRESS))) { /* Unix & OSE!! */
- /* THIS DOES NOT WORK!! WE NEED A "PERISTENT" ENV!! */
- ERL_NIF_TERM ref = MKREF(env);
+ ref = MKREF(env);
descP->state = SOCKET_STATE_CONNECTING;
if ((sres = esock_select_write(env, descP->sock, descP, NULL, ref)) < 0) {
res = esock_make_error(env,
@@ -4666,12 +4697,11 @@ ERL_NIF_TERM nconnect(ErlNifEnv* env,
res = esock_make_ok2(env, ref);
}
} else if (code == 0) { /* ok we are connected */
+
descP->state = SOCKET_STATE_CONNECTED;
descP->isReadable = TRUE;
descP->isWritable = TRUE;
- /* Do we need to do somthing for "active" mode?
- * Is there even such a thing *here*?
- */
+
res = esock_atom_ok;
} else {
res = esock_make_error_errno(env, save_errno);
@@ -4828,9 +4858,6 @@ ERL_NIF_TERM nif_listen(ErlNifEnv* env,
return enif_make_badarg(env);
}
- if (IS_CLOSED(descP) || IS_CLOSING(descP))
- return esock_make_error(env, atom_closed);
-
SSDBG( descP,
("SOCKET", "nif_listen -> args when sock = %d:"
"\r\n Socket: %T"
@@ -4838,6 +4865,7 @@ ERL_NIF_TERM nif_listen(ErlNifEnv* env,
"\r\n", descP->sock, argv[0], backlog) );
return nlisten(env, descP, backlog);
+
#endif // if defined(__WIN32__)
}
@@ -4849,18 +4877,32 @@ ERL_NIF_TERM nlisten(ErlNifEnv* env,
SocketDescriptor* descP,
int backlog)
{
+
+ /*
+ * Verify that we are where in the proper state
+ */
+
+ if (IS_CLOSED(descP) || IS_CLOSING(descP))
+ return esock_make_error(env, atom_closed);
+
if (descP->state == SOCKET_STATE_CLOSED)
return esock_make_error(env, atom_exbadstate);
if (!IS_OPEN(descP))
return esock_make_error(env, atom_exbadstate);
+
+ /*
+ * And attempt to make socket listening
+ */
+
if (IS_SOCKET_ERROR(sock_listen(descP->sock, backlog)))
return esock_make_error_errno(env, sock_errno());
descP->state = SOCKET_STATE_LISTENING;
return esock_atom_ok;
+
}
#endif // if !defined(__WIN32__)
@@ -4886,7 +4928,7 @@ ERL_NIF_TERM nif_accept(ErlNifEnv* env,
return enif_raise_exception(env, MKA(env, "notsup"));
#else
SocketDescriptor* descP;
- ERL_NIF_TERM ref;
+ ERL_NIF_TERM ref, res;
SGDBG( ("SOCKET", "nif_accept -> entry with argc: %d\r\n", argc) );
@@ -4895,19 +4937,23 @@ ERL_NIF_TERM nif_accept(ErlNifEnv* env,
if ((argc != 2) ||
!enif_get_resource(env, argv[0], sockets, (void**) &descP)) {
return enif_make_badarg(env);
- }
+ }
ref = argv[1];
- if (IS_CLOSED(descP) || IS_CLOSING(descP))
- return esock_make_error(env, atom_closed);
-
SSDBG( descP,
("SOCKET", "nif_accept -> args when sock = %d:"
"\r\n Socket: %T"
"\r\n ReqRef: %T"
"\r\n", descP->sock, argv[0], ref) );
- return naccept(env, descP, ref);
+ MLOCK(descP->accMtx);
+
+ res = naccept(env, descP, ref);
+
+ MUNLOCK(descP->accMtx);
+
+ return res;
+
#endif // if defined(__WIN32__)
}
@@ -4920,17 +4966,16 @@ ERL_NIF_TERM naccept(ErlNifEnv* env,
{
ERL_NIF_TERM res;
+ if (IS_CLOSED(descP) || IS_CLOSING(descP))
+ return esock_make_error(env, atom_closed);
+
switch (descP->state) {
case SOCKET_STATE_LISTENING:
- MLOCK(descP->accMtx);
res = naccept_listening(env, descP, ref);
- MUNLOCK(descP->accMtx);
break;
case SOCKET_STATE_ACCEPTING:
- MLOCK(descP->accMtx);
res = naccept_accepting(env, descP, ref);
- MUNLOCK(descP->accMtx);
break;
default:
@@ -4944,7 +4989,7 @@ ERL_NIF_TERM naccept(ErlNifEnv* env,
/* *** naccept_listening ***
- * We have no active acceptor and no acceptors in queue.
+ * We have no active acceptor (and no acceptors in queue).
*/
#if !defined(__WIN32__)
static
@@ -4955,9 +5000,9 @@ ERL_NIF_TERM naccept_listening(ErlNifEnv* env,
SocketAddress remote;
unsigned int n;
SOCKET accSock;
- HANDLE accEvent;
- int sres, save_errno;
+ int save_errno;
ErlNifPid caller;
+ ERL_NIF_TERM res;
SSDBG( descP, ("SOCKET", "naccept_listening -> get caller\r\n") );
@@ -4976,124 +5021,92 @@ ERL_NIF_TERM naccept_listening(ErlNifEnv* env,
("SOCKET",
"naccept_listening -> accept failed (%d)\r\n", save_errno) );
- if (save_errno == ERRNO_BLOCK) {
+ res = naccept_listening_error(env, descP, ref, caller, save_errno);
- /* *** Try again later *** */
- SSDBG( descP, ("SOCKET", "naccept_listening -> would block\r\n") );
+ } else {
- descP->currentAcceptor.pid = caller;
- if (MONP("naccept_listening -> current acceptor",
- env, descP,
- &descP->currentAcceptor.pid,
- &descP->currentAcceptor.mon) != 0)
- return esock_make_error(env, atom_exmon);
+ /*
+ * We got one
+ */
- descP->currentAcceptor.ref = enif_make_copy(descP->env, ref);
- descP->currentAcceptorP = &descP->currentAcceptor;
+ SSDBG( descP, ("SOCKET", "naccept_listening -> success\r\n") );
- if ((sres = esock_select_read(env, descP->sock, descP,
- NULL, ref)) < 0) {
- return esock_make_error(env,
- MKT2(env,
- esock_atom_select_failed,
- MKI(env, sres)));
- } else {
+ res = naccept_listening_accept(env, descP, accSock, caller, &remote);
- /* Shall we really change state?
- * The ready event is sent directly to the calling
- * process, which simply calls this function again.
- * Basically, state accepting means that we have
- * an "outstanding" accept.
- * Shall we store the pid of the calling process?
- * And if someone else calls accept, return with ebusy?
- * Can any process call accept or just the controlling
- * process?
- * We also need a monitor it case the calling process is
- * called before we are done!
- *
- * Change state (to accepting) and store pid of the acceptor
- * (current process). Only accept calls from the acceptor
- * process (ebusy) and once we have a successful accept,
- * change state back to listening. If cancel is called instead
- * (only accepted from the acceptor process), we reset
- * state to listening and also resets the pid to "null"
- * (is there such a value?).
- * Need a mutex to secure that we don't test and change the
- * pid at the same time.
- */
+ }
- descP->state = SOCKET_STATE_ACCEPTING;
+ return res;
+}
- return esock_make_error(env, esock_atom_eagain);
- }
- } else {
- SSDBG( descP,
- ("SOCKET",
- "naccept_listening -> errno: %d\r\n", save_errno) );
- return esock_make_error_errno(env, save_errno);
- }
+/* *** naccept_listening_error ***
+ * The accept call resultet in an error - handle it.
+ * There are only two cases:
+ * 1) BLOCK => Attempt a "retry"
+ * 2) Other => Return the value (converted to an atom)
+ */
+static
+ERL_NIF_TERM naccept_listening_error(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ ERL_NIF_TERM ref,
+ ErlNifPid caller,
+ int save_errno)
+{
+ ERL_NIF_TERM res;
- } else {
- SocketDescriptor* accDescP;
- ERL_NIF_TERM accRef;
+ if (save_errno == ERRNO_BLOCK) {
- /*
- * We got one
- */
+ /* *** Try again later *** */
+ SSDBG( descP, ("SOCKET", "naccept_listening_error -> would block\r\n") );
- SSDBG( descP, ("SOCKET", "naccept_listening -> accept success\r\n") );
+ descP->currentAcceptor.pid = caller;
+ if (MONP("naccept_listening -> current acceptor",
+ env, descP,
+ &descP->currentAcceptor.pid,
+ &descP->currentAcceptor.mon) != 0)
+ return esock_make_error(env, atom_exmon);
- if ((accEvent = sock_create_event(accSock)) == INVALID_EVENT) {
- save_errno = sock_errno();
- while ((sock_close(accSock) == INVALID_SOCKET) &&
- (sock_errno() == EINTR));
- return esock_make_error_errno(env, save_errno);
- }
+ descP->currentAcceptor.ref = enif_make_copy(descP->env, ref);
+ descP->currentAcceptorP = &descP->currentAcceptor;
- if ((accDescP = alloc_descriptor(accSock, accEvent)) == NULL) {
- sock_close(accSock);
- return enif_make_badarg(env);
- }
+ res = naccept_busy_retry(env, descP, ref, NULL, SOCKET_STATE_ACCEPTING);
+
- accDescP->domain = descP->domain;
- accDescP->type = descP->type;
- accDescP->protocol = descP->protocol;
- accDescP->rBufSz = descP->rBufSz; // Inherit buffer size
- accDescP->rNum = descP->rNum; // Inherit buffer uses
- accDescP->rNumCnt = 0;
- accDescP->rCtrlSz = descP->rCtrlSz; // Inherit buffer siez
- accDescP->wCtrlSz = descP->wCtrlSz; // Inherit buffer size
-
- accRef = enif_make_resource(env, accDescP);
- enif_release_resource(accDescP);
-
- accDescP->ctrlPid = caller;
- if (MONP("naccept_listening -> ctrl",
- env, accDescP,
- &accDescP->ctrlPid,
- &accDescP->ctrlMon) != 0) {
- sock_close(accSock);
- return esock_make_error(env, atom_exmon);
- }
+ } else {
+ SSDBG( descP,
+ ("SOCKET",
+ "naccept_listening -> errno: %d\r\n", save_errno) );
+ res = esock_make_error_errno(env, save_errno);
+ }
+
+ return res;
+}
- accDescP->remote = remote;
- SET_NONBLOCKING(accDescP->sock);
- accDescP->state = SOCKET_STATE_CONNECTED;
- accDescP->isReadable = TRUE;
- accDescP->isWritable = TRUE;
+/* *** naccept_listening_accept ***
+ * The accept call was successful (accepted) - handle the new connection.
+ */
+static
+ERL_NIF_TERM naccept_listening_accept(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ SOCKET accSock,
+ ErlNifPid caller,
+ SocketAddress* remote)
+{
+ ERL_NIF_TERM res;
- return esock_make_ok2(env, accRef);
- }
+ naccept_accepted(env, descP, accSock, caller, remote, &res);
+
+ return res;
}
#endif // if !defined(__WIN32__)
+
/* *** naccept_accepting ***
* We have an active acceptor and possibly acceptors waiting in queue.
* If the pid of the calling process is not the pid of the "current process",
- * push the requester onto the queue.
+ * push the requester onto the (acceptor) queue.
*/
#if !defined(__WIN32__)
static
@@ -5101,13 +5114,8 @@ ERL_NIF_TERM naccept_accepting(ErlNifEnv* env,
SocketDescriptor* descP,
ERL_NIF_TERM ref)
{
- SocketAddress remote;
- unsigned int n;
- SOCKET accSock;
- HANDLE accEvent;
ErlNifPid caller;
- int save_errno, sres;
- ERL_NIF_TERM result;
+ ERL_NIF_TERM res;
SSDBG( descP, ("SOCKET", "naccept_accepting -> get caller\r\n") );
@@ -5120,113 +5128,97 @@ ERL_NIF_TERM naccept_accepting(ErlNifEnv* env,
"\r\n Current: %T"
"\r\n", caller, descP->currentAcceptor.pid) );
- if (!compare_pids(env, &descP->currentAcceptor.pid, &caller)) {
- /* Not the "current acceptor", so (maybe) push onto queue */
- SSDBG( descP, ("SOCKET", "naccept_accepting -> not (active) acceptor\r\n") );
- if (!acceptor_search4pid(env, descP, &caller))
- result = acceptor_push(env, descP, caller, ref);
- else
- result = esock_make_error(env, esock_atom_eagain);
-
+ if (compare_pids(env, &descP->currentAcceptor.pid, &caller)) {
+
SSDBG( descP,
- ("SOCKET",
- "naccept_accepting -> queue (push) result: %T\r\n", result) );
+ ("SOCKET", "naccept_accepting -> current acceptor\r\n") );
- return result;
- }
+ res = naccept_accepting_current(env, descP, ref);
- n = sizeof(descP->remote);
- sys_memzero((char *) &remote, n);
- SSDBG( descP, ("SOCKET", "naccept_accepting -> try accept\r\n") );
- accSock = sock_accept(descP->sock, (struct sockaddr*) &remote, &n);
- if (accSock == INVALID_SOCKET) {
+ } else {
- save_errno = sock_errno();
+ /* Not the "current acceptor", so (maybe) push onto queue */
SSDBG( descP,
- ("SOCKET",
- "naccept_accepting -> accept failed (%d)\r\n", save_errno) );
+ ("SOCKET", "naccept_accepting -> *not* current acceptor\r\n") );
- if (save_errno == ERRNO_BLOCK) {
+ res = naccept_accepting_other(env, descP, ref, caller);
- /*
- * Just try again, no real error, just a ghost trigger from poll,
- */
+ }
- SSDBG( descP,
- ("SOCKET",
- "naccept_accepting -> would block: try again\r\n") );
+ return res;
- if ((sres = esock_select_read(env, descP->sock, descP,
- NULL, ref)) < 0) {
- result = esock_make_error(env,
- MKT2(env,
- esock_atom_select_failed,
- MKI(env, sres)));
- } else {
- result = esock_make_error(env, esock_atom_eagain);
- }
+}
- return result;
- } else {
- SSDBG( descP,
- ("SOCKET",
- "naccept_accepting -> errno: %d\r\n", save_errno) );
- return esock_make_error_errno(env, save_errno);
- }
- } else {
- SocketDescriptor* accDescP;
- ERL_NIF_TERM accRef;
- /*
- * We got one
- */
+/* *** naccept_accepting_current ***
+ * Handles when the current acceptor makes another attempt.
+ */
+static
+ERL_NIF_TERM naccept_accepting_current(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ ERL_NIF_TERM ref)
+{
+ SocketAddress remote;
+ unsigned int n;
+ SOCKET accSock;
+ int save_errno;
+ ERL_NIF_TERM res;
- SSDBG( descP, ("SOCKET", "naccept_accepting -> accept success\r\n") );
+ SSDBG( descP, ("SOCKET", "naccept_accepting_current -> try accept\r\n") );
+ n = sizeof(descP->remote);
+ sys_memzero((char *) &remote, n);
+ accSock = sock_accept(descP->sock, (struct sockaddr*) &remote, &n);
+ if (accSock == INVALID_SOCKET) {
- DEMONP("naccept_accepting -> current acceptor",
- env, descP, &descP->currentAcceptor.mon);
+ save_errno = sock_errno();
- if ((accEvent = sock_create_event(accSock)) == INVALID_EVENT) {
- save_errno = sock_errno();
- while ((sock_close(accSock) == INVALID_SOCKET) &&
- (sock_errno() == EINTR));
- return esock_make_error_errno(env, save_errno);
- }
+ SSDBG( descP,
+ ("SOCKET",
+ "naccept_accepting_current -> accept failed: %d\r\n",
+ save_errno) );
- if ((accDescP = alloc_descriptor(accSock, accEvent)) == NULL) {
- sock_close(accSock);
- return enif_make_badarg(env);
- }
+ res = naccept_accepting_current_error(env, descP, ref, save_errno);
+
+ } else {
- accDescP->domain = descP->domain;
- accDescP->type = descP->type;
- accDescP->protocol = descP->protocol;
+ SSDBG( descP, ("SOCKET", "naccept_accepting_current -> accepted\r\n") );
+
+ res = naccept_accepting_current_accept(env, descP, accSock, &remote);
- accRef = enif_make_resource(env, accDescP);
- enif_release_resource(accDescP); // We should really store a reference ...
+ }
- accDescP->ctrlPid = caller;
- if (MONP("naccept_accepting -> ctrl",
- env, accDescP,
- &accDescP->ctrlPid,
- &accDescP->ctrlMon) != 0) {
- sock_close(accSock);
- return esock_make_error(env, atom_exmon);
- }
+ return res;
+}
- accDescP->remote = remote;
- SET_NONBLOCKING(accDescP->sock);
- accDescP->state = SOCKET_STATE_CONNECTED;
- accDescP->isReadable = TRUE;
- accDescP->isWritable = TRUE;
+/* *** naccept_accepting_current_accept ***
+ * Handles when the current acceptor succeeded in its accept call -
+ * handle the new connection.
+ */
+static
+ERL_NIF_TERM naccept_accepting_current_accept(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ SOCKET accSock,
+ SocketAddress* remote)
+{
+ int sres;
+ ERL_NIF_TERM res;
+
+ if (naccept_accepted(env, descP, accSock,
+ descP->currentAcceptor.pid, remote, &res)) {
- /* Check if there are waiting acceptors (popping the acceptor queue) */
+ /* We should really go through the queue until we succeed to activate
+ * a waiting acceptor. For now we just pop once and hope for the best...
+ * This will leave any remaining acceptors *hanging*...
+ *
+ * We need a "activate-next" function.
+ *
+ */
if (acceptor_pop(env, descP,
&descP->currentAcceptor.pid,
@@ -5235,28 +5227,186 @@ ERL_NIF_TERM naccept_accepting(ErlNifEnv* env,
/* There was another one */
- SSDBG( descP, ("SOCKET", "naccept_accepting -> new (active) acceptor: "
- "\r\n pid: %T"
- "\r\n ref: %T"
- "\r\n",
- descP->currentAcceptor.pid,
- descP->currentAcceptor.ref) );
+ SSDBG( descP,
+ ("SOCKET",
+ "naccept_accepting_current_accept -> new (active) acceptor: "
+ "\r\n pid: %T"
+ "\r\n ref: %T"
+ "\r\n",
+ descP->currentAcceptor.pid,
+ descP->currentAcceptor.ref) );
if ((sres = esock_select_read(env, descP->sock, descP,
&descP->currentAcceptor.pid,
descP->currentAcceptor.ref)) < 0) {
- return esock_make_error(env,
- MKT2(env,
- esock_atom_select_failed,
- MKI(env, sres)));
+ esock_warning_msg("Failed select (%d) for new acceptor "
+ "after current (%T) died\r\n",
+ sres, descP->currentAcceptor.pid);
}
} else {
descP->currentAcceptorP = NULL;
descP->state = SOCKET_STATE_LISTENING;
}
-
- return esock_make_ok2(env, accRef);
}
+
+ return res;
+}
+
+
+/* *** naccept_accepting_current_error ***
+ * The accept call of current acceptor resultet in an error - handle it.
+ * There are only two cases:
+ * 1) BLOCK => Attempt a "retry"
+ * 2) Other => Return the value (converted to an atom)
+ */
+static
+ERL_NIF_TERM naccept_accepting_current_error(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ ERL_NIF_TERM ref,
+ int save_errno)
+{
+ ERL_NIF_TERM res;
+
+ if (save_errno == ERRNO_BLOCK) {
+
+ /*
+ * Just try again, no real error, just a ghost trigger from poll,
+ */
+
+ SSDBG( descP,
+ ("SOCKET",
+ "naccept_accepting_current_error -> would block: try again\r\n") );
+
+ res = naccept_busy_retry(env, descP, ref, &descP->currentAcceptor.pid,
+ /* No state change */
+ descP->state);
+
+ } else {
+ res = esock_make_error_errno(env, save_errno);
+ }
+
+ return res;
+}
+
+
+/* *** naccept_accepting_other ***
+ * Handles when the another acceptor makes an attempt, which
+ * results (maybe) in the request beeing pushed onto the
+ * acceptor queue.
+ */
+static
+ERL_NIF_TERM naccept_accepting_other(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ ERL_NIF_TERM ref,
+ ErlNifPid caller)
+{
+ ERL_NIF_TERM result;
+
+ if (!acceptor_search4pid(env, descP, &caller)) // Ugh! (&caller)
+ result = acceptor_push(env, descP, caller, ref);
+ else
+ result = esock_make_error(env, esock_atom_eagain);
+
+ return result;
+}
+#endif // if !defined(__WIN32__)
+
+
+
+/* *** naccept_busy_retry ***
+ * Perform a retry select. If successful, set nextState.
+ */
+#if !defined(__WIN32__)
+static
+ERL_NIF_TERM naccept_busy_retry(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ ERL_NIF_TERM ref,
+ ErlNifPid* pid,
+ unsigned int nextState)
+{
+ int sres;
+ ERL_NIF_TERM res, reason;
+
+ if ((sres = esock_select_read(env, descP->sock, descP, pid, ref)) < 0) {
+ reason = MKT2(env, esock_atom_select_failed, MKI(env, sres));
+ res = esock_make_error(env, reason);
+ } else {
+ descP->state = nextState;
+ res = esock_make_error(env, esock_atom_eagain);
+ }
+
+ return res;
+}
+
+
+
+/* *** naccept_accepted ***
+ * Generic function handling a successful accept.
+ */
+static
+BOOLEAN_T naccept_accepted(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ SOCKET accSock,
+ ErlNifPid pid,
+ SocketAddress* remote,
+ ERL_NIF_TERM* result)
+{
+ SocketDescriptor* accDescP;
+ HANDLE accEvent;
+ ERL_NIF_TERM accRef;
+ int save_errno;
+
+ /*
+ * We got one
+ */
+
+ if ((accEvent = sock_create_event(accSock)) == INVALID_EVENT) {
+ save_errno = sock_errno();
+ while ((sock_close(accSock) == INVALID_SOCKET) &&
+ (sock_errno() == EINTR));
+ *result = esock_make_error_errno(env, save_errno);
+ return FALSE;
+ }
+
+ if ((accDescP = alloc_descriptor(accSock, accEvent)) == NULL) {
+ sock_close(accSock);
+ *result = enif_make_badarg(env);
+ return FALSE;
+ }
+
+ accDescP->domain = descP->domain;
+ accDescP->type = descP->type;
+ accDescP->protocol = descP->protocol;
+ accDescP->rBufSz = descP->rBufSz; // Inherit buffer size
+ accDescP->rNum = descP->rNum; // Inherit buffer uses
+ accDescP->rNumCnt = 0;
+ accDescP->rCtrlSz = descP->rCtrlSz; // Inherit buffer siez
+ accDescP->wCtrlSz = descP->wCtrlSz; // Inherit buffer size
+
+ accRef = enif_make_resource(env, accDescP);
+ enif_release_resource(accDescP);
+
+ accDescP->ctrlPid = pid;
+ if (MONP("naccept_accepted -> ctrl",
+ env, accDescP,
+ &accDescP->ctrlPid,
+ &accDescP->ctrlMon) != 0) {
+ sock_close(accSock);
+ *result = esock_make_error(env, atom_exmon);
+ return FALSE;
+ }
+
+ accDescP->remote = *remote;
+ SET_NONBLOCKING(accDescP->sock);
+
+ accDescP->state = SOCKET_STATE_CONNECTED;
+ accDescP->isReadable = TRUE;
+ accDescP->isWritable = TRUE;
+
+ *result = esock_make_ok2(env, accRef);
+
+ return TRUE;
+
}
#endif // if !defined(__WIN32__)
@@ -17393,11 +17543,6 @@ int esock_demonitor(const char* slogan,
SSDBG( descP, ("SOCKET", "[%d] %s: try demonitor\r\n", descP->sock, slogan) );
- /*
- esock_dbg_printf("DEMONP", "[%d] %s: %T\r\n",
- descP->sock, slogan,
- my_make_monitor_term(env, &monP->mon));*/
-
res = enif_demonitor_process(env, descP, &monP->mon);
if (res == 0) {
@@ -17405,9 +17550,6 @@ int esock_demonitor(const char* slogan,
} else {
SSDBG( descP,
("SOCKET", "[%d] demonitor failed: %d\r\n", descP->sock, res) );
- /*
- esock_dbg_printf("DEMONP", "[%d] failed: %d\r\n", descP->sock, res);
- */
}
return res;
@@ -17487,6 +17629,8 @@ void socket_stop(ErlNifEnv* env, void* obj, int fd, int is_direct_call)
"\r\n sock: %d (%d)"
"\r\n",
((is_direct_call) ? "called" : "scheduled"), descP->sock, fd) );
+
+ /* +++ Lock it down +++ */
MLOCK(descP->writeMtx);
MLOCK(descP->readMtx);
@@ -17525,11 +17669,20 @@ void socket_stop(ErlNifEnv* env, void* obj, int fd, int is_direct_call)
* This *should* be done with a "NULL" monitor value,
* which there currently is none...
* If we got here because the controlling process died,
- * its no point in demonitor. Also, we not actually have
- * a monitor in that case...
+ * there is no point to demonitor. Also, we do not actually
+ * have a monitor in that case...
*/
DEMONP("socket_stop -> ctrl", env, descP, &descP->ctrlMon);
+
+
+ /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
+ *
+ * Check current and waiting Writers
+ *
+ * +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
+ */
+
if (descP->currentWriterP != NULL) {
/* We have a (current) writer and *may* therefor also have
* writers waiting.
@@ -17565,6 +17718,14 @@ void socket_stop(ErlNifEnv* env, void* obj, int fd, int is_direct_call)
inform_waiting_procs(env, descP, &descP->writersQ, TRUE, atom_closed);
}
+
+ /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
+ *
+ * Check current and waiting Readers
+ *
+ * +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
+ */
+
if (descP->currentReaderP != NULL) {
/* We have a (current) reader and *may* therefor also have
@@ -17601,6 +17762,15 @@ void socket_stop(ErlNifEnv* env, void* obj, int fd, int is_direct_call)
inform_waiting_procs(env, descP, &descP->readersQ, TRUE, atom_closed);
}
+
+
+ /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
+ *
+ * Check current and waiting Acceptors
+ *
+ * +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
+ */
+
if (descP->currentAcceptorP != NULL) {
/* We have a (current) acceptor and *may* therefor also have
* acceptors waiting.
@@ -17637,18 +17807,16 @@ void socket_stop(ErlNifEnv* env, void* obj, int fd, int is_direct_call)
}
+
+ /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
+ *
+ * Maybe inform waiting closer
+ *
+ * +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
+ */
+
if (descP->sock != INVALID_SOCKET) {
- /*
- * <KOLLA>
- *
- * WE NEED TO CHECK IF THIS OPERATION IS TRIGGERED
- * LOCALLY (VIA A CALL TO CLOSE) OR REMOTELLY
- * (VIA I.E. ECONRESET).
- *
- * </KOLLA>
- */
-
if (descP->closeLocal) {
if (!is_direct_call) {
@@ -17685,6 +17853,7 @@ void socket_stop(ErlNifEnv* env, void* obj, int fd, int is_direct_call)
}
+
/* This function traverse the queue and sends the specified
* nif_abort message with the specified reason to each member,
* and if the 'free' argument is TRUE, the queue will be emptied.
@@ -17748,6 +17917,7 @@ void socket_down(ErlNifEnv* env,
{
#if !defined(__WIN32__)
SocketDescriptor* descP = (SocketDescriptor*) obj;
+ int sres;
SSDBG( descP, ("SOCKET", "socket_down -> entry with"
"\r\n sock: %d"
@@ -17759,8 +17929,8 @@ void socket_down(ErlNifEnv* env,
B2S(IS_CLOSING(descP))) );
if (!IS_CLOSED(descP)) {
+
if (compare_pids(env, &descP->ctrlPid, pid)) {
- int selectRes;
/* We don't bother with the queue cleanup here -
* we leave it to the stop callback function.
@@ -17774,13 +17944,15 @@ void socket_down(ErlNifEnv* env,
descP->closerPid = *pid;
MON_INIT(&descP->closerMon);
- selectRes = esock_select_stop(env, descP->sock, descP);
+ sres = esock_select_stop(env, descP->sock, descP);
+
+ if (sres & ERL_NIF_SELECT_STOP_CALLED) {
- if (selectRes & ERL_NIF_SELECT_STOP_CALLED) {
/* We are done - we can finalize (socket close) directly */
SSDBG( descP,
("SOCKET",
"socket_down -> [%d] stop called\r\n", descP->sock) );
+
dec_socket(descP->domain, descP->type, descP->protocol);
descP->state = SOCKET_STATE_CLOSED;
@@ -17807,7 +17979,8 @@ void socket_down(ErlNifEnv* env,
descP->state = SOCKET_STATE_CLOSED;
- } else if (selectRes & ERL_NIF_SELECT_STOP_SCHEDULED) {
+ } else if (sres & ERL_NIF_SELECT_STOP_SCHEDULED) {
+
/* The stop callback function has been *scheduled* which means
* that "should" wait for it to complete. But since we are in
* a callback (down) function, we cannot...
@@ -17817,6 +17990,7 @@ void socket_down(ErlNifEnv* env,
("SOCKET",
"socket_down -> [%d] stop scheduled\r\n",
descP->sock) );
+
dec_socket(descP->domain, descP->type, descP->protocol);
/* And now what? We can't wait for the stop function here...
@@ -17838,23 +18012,13 @@ void socket_down(ErlNifEnv* env,
} else {
- /*
- * <KOLLA>
- *
- * WE SHOULD REALLY HAVE A WAY TO CLOBBER THE SOCKET,
- * SO WE DON'T LET STUFF LEAK.
- * NOW, BECAUSE WE FAILED TO SELECT, WE CANNOT FINISH
- * THE CLOSE, WHAT TO DO? ABORT?
- *
- * </KOLLA>
- */
esock_warning_msg("Failed selecting stop when handling down "
"of controlling process: "
"\r\n Select Res: %d"
"\r\n Controlling Process: %T"
"\r\n Descriptor: %d"
"\r\n Monitor: %T"
- "\r\n", selectRes, pid, descP->sock,
+ "\r\n", sres, pid, descP->sock,
my_make_monitor_term(env, mon));
}
@@ -17886,13 +18050,6 @@ void socket_down(ErlNifEnv* env,
}
}
- /*
- esock_dbg_printf("DOWN",
- "[%d] end %T\r\n",
- descP->sock,
- my_make_monitor_term(env, mon));
- */
-
SSDBG( descP, ("SOCKET", "socket_down -> done\r\n") );
#endif // if !defined(__WIN32__)