diff options
author | Micael Karlberg <[email protected]> | 2019-02-22 18:53:19 +0100 |
---|---|---|
committer | Micael Karlberg <[email protected]> | 2019-02-22 19:45:54 +0100 |
commit | 1a1deb10194c2609fc0a1019518abfad40601cae (patch) | |
tree | 6bec70017cb82c17077c14df7465c26bb25c97a8 /erts | |
parent | 4ff181e69fc129576333c04a9d1d0c97b6770347 (diff) | |
download | otp-1a1deb10194c2609fc0a1019518abfad40601cae.tar.gz otp-1a1deb10194c2609fc0a1019518abfad40601cae.tar.bz2 otp-1a1deb10194c2609fc0a1019518abfad40601cae.zip |
[socket] Cleanup and accept restructure
Some cleanup (of open, bind, connect) and rewrote the
accept code (moved the code into smaller functions).
OTP-14831
Diffstat (limited to 'erts')
-rw-r--r-- | erts/emulator/nifs/common/socket_nif.c | 781 |
1 files changed, 469 insertions, 312 deletions
diff --git a/erts/emulator/nifs/common/socket_nif.c b/erts/emulator/nifs/common/socket_nif.c index d860cb4965..d56b70e3fd 100644 --- a/erts/emulator/nifs/common/socket_nif.c +++ b/erts/emulator/nifs/common/socket_nif.c @@ -18,9 +18,22 @@ * %CopyrightEnd% * * ---------------------------------------------------------------------- - * Purpose : The NIF (C) part of the socket interface + * Purpose : The NIF (C) part of the socket interface + * + * All of the nif-functions which are part of the API has two parts. + * The first function is called 'nif_<something>', e.g. nif_open. + * This does the initial validation and argument processing and then + * calls the function that does the actual work. This is called + * 'n<something>'. * ---------------------------------------------------------------------- * + * + * This is just a code snippet in case there is need of extra debugging + * + * esock_dbg_printf("DEMONP", "[%d] %s: %T\r\n", + * descP->sock, slogan, + * my_make_monitor_term(env, &monP->mon)); + * */ #define STATIC_ERLANG_NIF 1 @@ -745,7 +758,6 @@ typedef struct { typedef struct { ErlNifPid pid; // PID of the requesting process - // ErlNifMonitor mon; Monitor to the requesting process ESockMonitor mon; // Monitor to the requesting process ERL_NIF_TERM ref; // The (unique) reference (ID) of the request } SocketRequestor; @@ -832,7 +844,6 @@ typedef struct { /* +++ Close stuff +++ */ ErlNifMutex* closeMtx; ErlNifPid closerPid; - // ErlNifMonitor closerMon; ESockMonitor closerMon; ErlNifEnv* closeEnv; ERL_NIF_TERM closeRef; @@ -841,42 +852,15 @@ typedef struct { } SocketDescriptor; -#define SOCKET_OPT_VALUE_UNDEF 0 -#define SOCKET_OPT_VALUE_BOOL 1 -#define SOCKET_OPT_VALUE_INT 2 -#define SOCKET_OPT_VALUE_LINGER 3 -#define SOCKET_OPT_VALUE_BIN 4 -#define SOCKET_OPT_VALUE_STR 5 - -typedef struct { - unsigned int tag; - union { - BOOLEAN_T boolVal; - int intVal; - struct linger lingerVal; - ErlNifBinary binVal; - struct { - unsigned int len; - char* str; - } strVal; - } u; - /* - void* optValP; // Points to the actual data (above) - socklen_t optValLen; // The size of the option value - */ -} SocketOptValue; - - -/* Global stuff (do we really need to "collect" - * these things?) +/* Global stuff. */ typedef struct { /* These are for debugging, testing and the like */ - ERL_NIF_TERM version; - ERL_NIF_TERM buildDate; + // ERL_NIF_TERM version; + // ERL_NIF_TERM buildDate; BOOLEAN_T dbg; - BOOLEAN_T iow; + BOOLEAN_T iow; // Where do we send this? Subscription? ErlNifMutex* cntMtx; Uint32 numSockets; Uint32 numTypeStreams; @@ -998,15 +982,51 @@ static ERL_NIF_TERM nconnect(ErlNifEnv* env, static ERL_NIF_TERM nlisten(ErlNifEnv* env, SocketDescriptor* descP, int backlog); +static ERL_NIF_TERM naccept(ErlNifEnv* env, + SocketDescriptor* descP, + ERL_NIF_TERM ref); static ERL_NIF_TERM naccept_listening(ErlNifEnv* env, SocketDescriptor* descP, ERL_NIF_TERM ref); +static ERL_NIF_TERM naccept_listening_error(ErlNifEnv* env, + SocketDescriptor* descP, + ERL_NIF_TERM ref, + ErlNifPid caller, + int save_errno); +static ERL_NIF_TERM naccept_listening_accept(ErlNifEnv* env, + SocketDescriptor* descP, + SOCKET accSock, + ErlNifPid caller, + SocketAddress* remote); static ERL_NIF_TERM naccept_accepting(ErlNifEnv* env, SocketDescriptor* descP, ERL_NIF_TERM ref); -static ERL_NIF_TERM naccept(ErlNifEnv* env, - SocketDescriptor* descP, - ERL_NIF_TERM ref); +static ERL_NIF_TERM naccept_accepting_current(ErlNifEnv* env, + SocketDescriptor* descP, + ERL_NIF_TERM ref); +static ERL_NIF_TERM naccept_accepting_current_accept(ErlNifEnv* env, + SocketDescriptor* descP, + SOCKET accSock, + SocketAddress* remote); +static ERL_NIF_TERM naccept_accepting_current_error(ErlNifEnv* env, + SocketDescriptor* descP, + ERL_NIF_TERM ref, + int save_errno); +static ERL_NIF_TERM naccept_accepting_other(ErlNifEnv* env, + SocketDescriptor* descP, + ERL_NIF_TERM ref, + ErlNifPid caller); +static ERL_NIF_TERM naccept_busy_retry(ErlNifEnv* env, + SocketDescriptor* descP, + ERL_NIF_TERM ref, + ErlNifPid* pid, + unsigned int nextState); +static BOOLEAN_T naccept_accepted(ErlNifEnv* env, + SocketDescriptor* descP, + SOCKET accSock, + ErlNifPid pid, + SocketAddress* remote, + ERL_NIF_TERM* result); static ERL_NIF_TERM nsend(ErlNifEnv* env, SocketDescriptor* descP, ERL_NIF_TERM sockRef, @@ -4170,6 +4190,8 @@ ERL_NIF_TERM nsupports_ipv6(ErlNifEnv* env) * Extra - A map with "obscure" options. * Currently the only allowed option is netns (network namespace). * This is *only* allowed on linux! + * We sould also use this for the fd value, in case we should use + * an already existing (file) descriptor. */ static ERL_NIF_TERM nif_open(ErlNifEnv* env, @@ -4206,17 +4228,17 @@ ERL_NIF_TERM nif_open(ErlNifEnv* env, "\r\n", argv[0], argv[1], eproto, emap) ); if (!edomain2domain(edomain, &domain)) { - SGDBG( ("SOCKET", "nif_open -> domain: %d\r\n", domain) ); + SGDBG( ("SOCKET", "nif_open -> invalid domain: %d\r\n", edomain) ); return esock_make_error(env, esock_atom_einval); } if (!etype2type(etype, &type)) { - SGDBG( ("SOCKET", "nif_open -> type: %d\r\n", type) ); + SGDBG( ("SOCKET", "nif_open -> invalid type: %d\r\n", etype) ); return esock_make_error(env, esock_atom_einval); } if (!eproto2proto(env, eproto, &proto)) { - SGDBG( ("SOCKET", "nif_open -> protocol: %d\r\n", proto) ); + SGDBG( ("SOCKET", "nif_open -> invalid protocol: %d\r\n", eproto) ); return esock_make_error(env, esock_atom_einval); } @@ -4230,6 +4252,7 @@ ERL_NIF_TERM nif_open(ErlNifEnv* env, netns = NULL; #endif + result = nopen(env, domain, type, proto, netns); SGDBG( ("SOCKET", "nif_open -> done with result: " @@ -4237,6 +4260,7 @@ ERL_NIF_TERM nif_open(ErlNifEnv* env, "\r\n", result) ); return result; + #endif // if defined(__WIN32__) } @@ -4315,7 +4339,9 @@ ERL_NIF_TERM nopen(ErlNifEnv* env, descP->type = type; descP->protocol = protocol; - /* Does this apply to other types? Such as RAW? */ + /* Does this apply to other types? Such as RAW? + * Also, is this really correct? Should we not wait for bind? + */ if (type == SOCK_DGRAM) { descP->isReadable = TRUE; descP->isWritable = TRUE; @@ -4330,7 +4356,7 @@ ERL_NIF_TERM nopen(ErlNifEnv* env, enif_release_resource(descP); /* Keep track of the creator - * This should not be a problem but just in case + * This should not be a problem, but just in case * the *open* function is used with the wrong kind * of environment... */ @@ -4508,6 +4534,7 @@ ERL_NIF_TERM nif_bind(ErlNifEnv* env, return esock_make_error_str(env, xres); return nbind(env, descP, &sockAddr, addrLen); + #endif // if defined(__WIN32__) } @@ -4574,7 +4601,7 @@ ERL_NIF_TERM nif_connect(ErlNifEnv* env, return enif_raise_exception(env, MKA(env, "notsup")); #else SocketDescriptor* descP; - ERL_NIF_TERM eSockAddr; + ERL_NIF_TERM res, eSockAddr; char* xres; SGDBG( ("SOCKET", "nif_connect -> entry with argc: %d\r\n", argc) ); @@ -4587,9 +4614,6 @@ ERL_NIF_TERM nif_connect(ErlNifEnv* env, } eSockAddr = argv[1]; - if (IS_CLOSED(descP) || IS_CLOSING(descP)) - return esock_make_error(env, atom_closed); - SSDBG( descP, ("SOCKET", "nif_connect -> args when sock = %d:" "\r\n Socket: %T" @@ -4601,16 +4625,16 @@ ERL_NIF_TERM nif_connect(ErlNifEnv* env, return esock_make_error_str(env, xres); } - /* - * <KOLLA> - * - * We should lock both the read and write mutex:es... - * - * </KOLLA> - * - */ - return nconnect(env, descP); + MLOCK(descP->readMtx); + MLOCK(descP->writeMtx); + + res = nconnect(env, descP); + + MUNLOCK(descP->writeMtx); + MUNLOCK(descP->readMtx); + + return res; #endif // if !defined(__WIN32__) } @@ -4621,13 +4645,16 @@ static ERL_NIF_TERM nconnect(ErlNifEnv* env, SocketDescriptor* descP) { - ERL_NIF_TERM res; + ERL_NIF_TERM res, ref; int code, sres, save_errno = 0; /* * Verify that we are where in the proper state */ + if (IS_CLOSED(descP) || IS_CLOSING(descP)) + return esock_make_error(env, atom_closed); + if (!IS_OPEN(descP)) { SSDBG( descP, ("SOCKET", "nif_connect -> not open\r\n") ); return esock_make_error(env, atom_exbadstate); @@ -4643,6 +4670,11 @@ ERL_NIF_TERM nconnect(ErlNifEnv* env, return esock_make_error(env, esock_atom_einval); } + + /* + * And attempt to connect + */ + code = sock_connect(descP->sock, (struct sockaddr*) &descP->remote, descP->addrLen); @@ -4654,8 +4686,7 @@ ERL_NIF_TERM nconnect(ErlNifEnv* env, if (IS_SOCKET_ERROR(code) && ((save_errno == ERRNO_BLOCK) || /* Winsock2 */ (save_errno == EINPROGRESS))) { /* Unix & OSE!! */ - /* THIS DOES NOT WORK!! WE NEED A "PERISTENT" ENV!! */ - ERL_NIF_TERM ref = MKREF(env); + ref = MKREF(env); descP->state = SOCKET_STATE_CONNECTING; if ((sres = esock_select_write(env, descP->sock, descP, NULL, ref)) < 0) { res = esock_make_error(env, @@ -4666,12 +4697,11 @@ ERL_NIF_TERM nconnect(ErlNifEnv* env, res = esock_make_ok2(env, ref); } } else if (code == 0) { /* ok we are connected */ + descP->state = SOCKET_STATE_CONNECTED; descP->isReadable = TRUE; descP->isWritable = TRUE; - /* Do we need to do somthing for "active" mode? - * Is there even such a thing *here*? - */ + res = esock_atom_ok; } else { res = esock_make_error_errno(env, save_errno); @@ -4828,9 +4858,6 @@ ERL_NIF_TERM nif_listen(ErlNifEnv* env, return enif_make_badarg(env); } - if (IS_CLOSED(descP) || IS_CLOSING(descP)) - return esock_make_error(env, atom_closed); - SSDBG( descP, ("SOCKET", "nif_listen -> args when sock = %d:" "\r\n Socket: %T" @@ -4838,6 +4865,7 @@ ERL_NIF_TERM nif_listen(ErlNifEnv* env, "\r\n", descP->sock, argv[0], backlog) ); return nlisten(env, descP, backlog); + #endif // if defined(__WIN32__) } @@ -4849,18 +4877,32 @@ ERL_NIF_TERM nlisten(ErlNifEnv* env, SocketDescriptor* descP, int backlog) { + + /* + * Verify that we are where in the proper state + */ + + if (IS_CLOSED(descP) || IS_CLOSING(descP)) + return esock_make_error(env, atom_closed); + if (descP->state == SOCKET_STATE_CLOSED) return esock_make_error(env, atom_exbadstate); if (!IS_OPEN(descP)) return esock_make_error(env, atom_exbadstate); + + /* + * And attempt to make socket listening + */ + if (IS_SOCKET_ERROR(sock_listen(descP->sock, backlog))) return esock_make_error_errno(env, sock_errno()); descP->state = SOCKET_STATE_LISTENING; return esock_atom_ok; + } #endif // if !defined(__WIN32__) @@ -4886,7 +4928,7 @@ ERL_NIF_TERM nif_accept(ErlNifEnv* env, return enif_raise_exception(env, MKA(env, "notsup")); #else SocketDescriptor* descP; - ERL_NIF_TERM ref; + ERL_NIF_TERM ref, res; SGDBG( ("SOCKET", "nif_accept -> entry with argc: %d\r\n", argc) ); @@ -4895,19 +4937,23 @@ ERL_NIF_TERM nif_accept(ErlNifEnv* env, if ((argc != 2) || !enif_get_resource(env, argv[0], sockets, (void**) &descP)) { return enif_make_badarg(env); - } + } ref = argv[1]; - if (IS_CLOSED(descP) || IS_CLOSING(descP)) - return esock_make_error(env, atom_closed); - SSDBG( descP, ("SOCKET", "nif_accept -> args when sock = %d:" "\r\n Socket: %T" "\r\n ReqRef: %T" "\r\n", descP->sock, argv[0], ref) ); - return naccept(env, descP, ref); + MLOCK(descP->accMtx); + + res = naccept(env, descP, ref); + + MUNLOCK(descP->accMtx); + + return res; + #endif // if defined(__WIN32__) } @@ -4920,17 +4966,16 @@ ERL_NIF_TERM naccept(ErlNifEnv* env, { ERL_NIF_TERM res; + if (IS_CLOSED(descP) || IS_CLOSING(descP)) + return esock_make_error(env, atom_closed); + switch (descP->state) { case SOCKET_STATE_LISTENING: - MLOCK(descP->accMtx); res = naccept_listening(env, descP, ref); - MUNLOCK(descP->accMtx); break; case SOCKET_STATE_ACCEPTING: - MLOCK(descP->accMtx); res = naccept_accepting(env, descP, ref); - MUNLOCK(descP->accMtx); break; default: @@ -4944,7 +4989,7 @@ ERL_NIF_TERM naccept(ErlNifEnv* env, /* *** naccept_listening *** - * We have no active acceptor and no acceptors in queue. + * We have no active acceptor (and no acceptors in queue). */ #if !defined(__WIN32__) static @@ -4955,9 +5000,9 @@ ERL_NIF_TERM naccept_listening(ErlNifEnv* env, SocketAddress remote; unsigned int n; SOCKET accSock; - HANDLE accEvent; - int sres, save_errno; + int save_errno; ErlNifPid caller; + ERL_NIF_TERM res; SSDBG( descP, ("SOCKET", "naccept_listening -> get caller\r\n") ); @@ -4976,124 +5021,92 @@ ERL_NIF_TERM naccept_listening(ErlNifEnv* env, ("SOCKET", "naccept_listening -> accept failed (%d)\r\n", save_errno) ); - if (save_errno == ERRNO_BLOCK) { + res = naccept_listening_error(env, descP, ref, caller, save_errno); - /* *** Try again later *** */ - SSDBG( descP, ("SOCKET", "naccept_listening -> would block\r\n") ); + } else { - descP->currentAcceptor.pid = caller; - if (MONP("naccept_listening -> current acceptor", - env, descP, - &descP->currentAcceptor.pid, - &descP->currentAcceptor.mon) != 0) - return esock_make_error(env, atom_exmon); + /* + * We got one + */ - descP->currentAcceptor.ref = enif_make_copy(descP->env, ref); - descP->currentAcceptorP = &descP->currentAcceptor; + SSDBG( descP, ("SOCKET", "naccept_listening -> success\r\n") ); - if ((sres = esock_select_read(env, descP->sock, descP, - NULL, ref)) < 0) { - return esock_make_error(env, - MKT2(env, - esock_atom_select_failed, - MKI(env, sres))); - } else { + res = naccept_listening_accept(env, descP, accSock, caller, &remote); - /* Shall we really change state? - * The ready event is sent directly to the calling - * process, which simply calls this function again. - * Basically, state accepting means that we have - * an "outstanding" accept. - * Shall we store the pid of the calling process? - * And if someone else calls accept, return with ebusy? - * Can any process call accept or just the controlling - * process? - * We also need a monitor it case the calling process is - * called before we are done! - * - * Change state (to accepting) and store pid of the acceptor - * (current process). Only accept calls from the acceptor - * process (ebusy) and once we have a successful accept, - * change state back to listening. If cancel is called instead - * (only accepted from the acceptor process), we reset - * state to listening and also resets the pid to "null" - * (is there such a value?). - * Need a mutex to secure that we don't test and change the - * pid at the same time. - */ + } - descP->state = SOCKET_STATE_ACCEPTING; + return res; +} - return esock_make_error(env, esock_atom_eagain); - } - } else { - SSDBG( descP, - ("SOCKET", - "naccept_listening -> errno: %d\r\n", save_errno) ); - return esock_make_error_errno(env, save_errno); - } +/* *** naccept_listening_error *** + * The accept call resultet in an error - handle it. + * There are only two cases: + * 1) BLOCK => Attempt a "retry" + * 2) Other => Return the value (converted to an atom) + */ +static +ERL_NIF_TERM naccept_listening_error(ErlNifEnv* env, + SocketDescriptor* descP, + ERL_NIF_TERM ref, + ErlNifPid caller, + int save_errno) +{ + ERL_NIF_TERM res; - } else { - SocketDescriptor* accDescP; - ERL_NIF_TERM accRef; + if (save_errno == ERRNO_BLOCK) { - /* - * We got one - */ + /* *** Try again later *** */ + SSDBG( descP, ("SOCKET", "naccept_listening_error -> would block\r\n") ); - SSDBG( descP, ("SOCKET", "naccept_listening -> accept success\r\n") ); + descP->currentAcceptor.pid = caller; + if (MONP("naccept_listening -> current acceptor", + env, descP, + &descP->currentAcceptor.pid, + &descP->currentAcceptor.mon) != 0) + return esock_make_error(env, atom_exmon); - if ((accEvent = sock_create_event(accSock)) == INVALID_EVENT) { - save_errno = sock_errno(); - while ((sock_close(accSock) == INVALID_SOCKET) && - (sock_errno() == EINTR)); - return esock_make_error_errno(env, save_errno); - } + descP->currentAcceptor.ref = enif_make_copy(descP->env, ref); + descP->currentAcceptorP = &descP->currentAcceptor; - if ((accDescP = alloc_descriptor(accSock, accEvent)) == NULL) { - sock_close(accSock); - return enif_make_badarg(env); - } + res = naccept_busy_retry(env, descP, ref, NULL, SOCKET_STATE_ACCEPTING); + - accDescP->domain = descP->domain; - accDescP->type = descP->type; - accDescP->protocol = descP->protocol; - accDescP->rBufSz = descP->rBufSz; // Inherit buffer size - accDescP->rNum = descP->rNum; // Inherit buffer uses - accDescP->rNumCnt = 0; - accDescP->rCtrlSz = descP->rCtrlSz; // Inherit buffer siez - accDescP->wCtrlSz = descP->wCtrlSz; // Inherit buffer size - - accRef = enif_make_resource(env, accDescP); - enif_release_resource(accDescP); - - accDescP->ctrlPid = caller; - if (MONP("naccept_listening -> ctrl", - env, accDescP, - &accDescP->ctrlPid, - &accDescP->ctrlMon) != 0) { - sock_close(accSock); - return esock_make_error(env, atom_exmon); - } + } else { + SSDBG( descP, + ("SOCKET", + "naccept_listening -> errno: %d\r\n", save_errno) ); + res = esock_make_error_errno(env, save_errno); + } + + return res; +} - accDescP->remote = remote; - SET_NONBLOCKING(accDescP->sock); - accDescP->state = SOCKET_STATE_CONNECTED; - accDescP->isReadable = TRUE; - accDescP->isWritable = TRUE; +/* *** naccept_listening_accept *** + * The accept call was successful (accepted) - handle the new connection. + */ +static +ERL_NIF_TERM naccept_listening_accept(ErlNifEnv* env, + SocketDescriptor* descP, + SOCKET accSock, + ErlNifPid caller, + SocketAddress* remote) +{ + ERL_NIF_TERM res; - return esock_make_ok2(env, accRef); - } + naccept_accepted(env, descP, accSock, caller, remote, &res); + + return res; } #endif // if !defined(__WIN32__) + /* *** naccept_accepting *** * We have an active acceptor and possibly acceptors waiting in queue. * If the pid of the calling process is not the pid of the "current process", - * push the requester onto the queue. + * push the requester onto the (acceptor) queue. */ #if !defined(__WIN32__) static @@ -5101,13 +5114,8 @@ ERL_NIF_TERM naccept_accepting(ErlNifEnv* env, SocketDescriptor* descP, ERL_NIF_TERM ref) { - SocketAddress remote; - unsigned int n; - SOCKET accSock; - HANDLE accEvent; ErlNifPid caller; - int save_errno, sres; - ERL_NIF_TERM result; + ERL_NIF_TERM res; SSDBG( descP, ("SOCKET", "naccept_accepting -> get caller\r\n") ); @@ -5120,113 +5128,97 @@ ERL_NIF_TERM naccept_accepting(ErlNifEnv* env, "\r\n Current: %T" "\r\n", caller, descP->currentAcceptor.pid) ); - if (!compare_pids(env, &descP->currentAcceptor.pid, &caller)) { - /* Not the "current acceptor", so (maybe) push onto queue */ - SSDBG( descP, ("SOCKET", "naccept_accepting -> not (active) acceptor\r\n") ); - if (!acceptor_search4pid(env, descP, &caller)) - result = acceptor_push(env, descP, caller, ref); - else - result = esock_make_error(env, esock_atom_eagain); - + if (compare_pids(env, &descP->currentAcceptor.pid, &caller)) { + SSDBG( descP, - ("SOCKET", - "naccept_accepting -> queue (push) result: %T\r\n", result) ); + ("SOCKET", "naccept_accepting -> current acceptor\r\n") ); - return result; - } + res = naccept_accepting_current(env, descP, ref); - n = sizeof(descP->remote); - sys_memzero((char *) &remote, n); - SSDBG( descP, ("SOCKET", "naccept_accepting -> try accept\r\n") ); - accSock = sock_accept(descP->sock, (struct sockaddr*) &remote, &n); - if (accSock == INVALID_SOCKET) { + } else { - save_errno = sock_errno(); + /* Not the "current acceptor", so (maybe) push onto queue */ SSDBG( descP, - ("SOCKET", - "naccept_accepting -> accept failed (%d)\r\n", save_errno) ); + ("SOCKET", "naccept_accepting -> *not* current acceptor\r\n") ); - if (save_errno == ERRNO_BLOCK) { + res = naccept_accepting_other(env, descP, ref, caller); - /* - * Just try again, no real error, just a ghost trigger from poll, - */ + } - SSDBG( descP, - ("SOCKET", - "naccept_accepting -> would block: try again\r\n") ); + return res; - if ((sres = esock_select_read(env, descP->sock, descP, - NULL, ref)) < 0) { - result = esock_make_error(env, - MKT2(env, - esock_atom_select_failed, - MKI(env, sres))); - } else { - result = esock_make_error(env, esock_atom_eagain); - } +} - return result; - } else { - SSDBG( descP, - ("SOCKET", - "naccept_accepting -> errno: %d\r\n", save_errno) ); - return esock_make_error_errno(env, save_errno); - } - } else { - SocketDescriptor* accDescP; - ERL_NIF_TERM accRef; - /* - * We got one - */ +/* *** naccept_accepting_current *** + * Handles when the current acceptor makes another attempt. + */ +static +ERL_NIF_TERM naccept_accepting_current(ErlNifEnv* env, + SocketDescriptor* descP, + ERL_NIF_TERM ref) +{ + SocketAddress remote; + unsigned int n; + SOCKET accSock; + int save_errno; + ERL_NIF_TERM res; - SSDBG( descP, ("SOCKET", "naccept_accepting -> accept success\r\n") ); + SSDBG( descP, ("SOCKET", "naccept_accepting_current -> try accept\r\n") ); + n = sizeof(descP->remote); + sys_memzero((char *) &remote, n); + accSock = sock_accept(descP->sock, (struct sockaddr*) &remote, &n); + if (accSock == INVALID_SOCKET) { - DEMONP("naccept_accepting -> current acceptor", - env, descP, &descP->currentAcceptor.mon); + save_errno = sock_errno(); - if ((accEvent = sock_create_event(accSock)) == INVALID_EVENT) { - save_errno = sock_errno(); - while ((sock_close(accSock) == INVALID_SOCKET) && - (sock_errno() == EINTR)); - return esock_make_error_errno(env, save_errno); - } + SSDBG( descP, + ("SOCKET", + "naccept_accepting_current -> accept failed: %d\r\n", + save_errno) ); - if ((accDescP = alloc_descriptor(accSock, accEvent)) == NULL) { - sock_close(accSock); - return enif_make_badarg(env); - } + res = naccept_accepting_current_error(env, descP, ref, save_errno); + + } else { - accDescP->domain = descP->domain; - accDescP->type = descP->type; - accDescP->protocol = descP->protocol; + SSDBG( descP, ("SOCKET", "naccept_accepting_current -> accepted\r\n") ); + + res = naccept_accepting_current_accept(env, descP, accSock, &remote); - accRef = enif_make_resource(env, accDescP); - enif_release_resource(accDescP); // We should really store a reference ... + } - accDescP->ctrlPid = caller; - if (MONP("naccept_accepting -> ctrl", - env, accDescP, - &accDescP->ctrlPid, - &accDescP->ctrlMon) != 0) { - sock_close(accSock); - return esock_make_error(env, atom_exmon); - } + return res; +} - accDescP->remote = remote; - SET_NONBLOCKING(accDescP->sock); - accDescP->state = SOCKET_STATE_CONNECTED; - accDescP->isReadable = TRUE; - accDescP->isWritable = TRUE; +/* *** naccept_accepting_current_accept *** + * Handles when the current acceptor succeeded in its accept call - + * handle the new connection. + */ +static +ERL_NIF_TERM naccept_accepting_current_accept(ErlNifEnv* env, + SocketDescriptor* descP, + SOCKET accSock, + SocketAddress* remote) +{ + int sres; + ERL_NIF_TERM res; + + if (naccept_accepted(env, descP, accSock, + descP->currentAcceptor.pid, remote, &res)) { - /* Check if there are waiting acceptors (popping the acceptor queue) */ + /* We should really go through the queue until we succeed to activate + * a waiting acceptor. For now we just pop once and hope for the best... + * This will leave any remaining acceptors *hanging*... + * + * We need a "activate-next" function. + * + */ if (acceptor_pop(env, descP, &descP->currentAcceptor.pid, @@ -5235,28 +5227,186 @@ ERL_NIF_TERM naccept_accepting(ErlNifEnv* env, /* There was another one */ - SSDBG( descP, ("SOCKET", "naccept_accepting -> new (active) acceptor: " - "\r\n pid: %T" - "\r\n ref: %T" - "\r\n", - descP->currentAcceptor.pid, - descP->currentAcceptor.ref) ); + SSDBG( descP, + ("SOCKET", + "naccept_accepting_current_accept -> new (active) acceptor: " + "\r\n pid: %T" + "\r\n ref: %T" + "\r\n", + descP->currentAcceptor.pid, + descP->currentAcceptor.ref) ); if ((sres = esock_select_read(env, descP->sock, descP, &descP->currentAcceptor.pid, descP->currentAcceptor.ref)) < 0) { - return esock_make_error(env, - MKT2(env, - esock_atom_select_failed, - MKI(env, sres))); + esock_warning_msg("Failed select (%d) for new acceptor " + "after current (%T) died\r\n", + sres, descP->currentAcceptor.pid); } } else { descP->currentAcceptorP = NULL; descP->state = SOCKET_STATE_LISTENING; } - - return esock_make_ok2(env, accRef); } + + return res; +} + + +/* *** naccept_accepting_current_error *** + * The accept call of current acceptor resultet in an error - handle it. + * There are only two cases: + * 1) BLOCK => Attempt a "retry" + * 2) Other => Return the value (converted to an atom) + */ +static +ERL_NIF_TERM naccept_accepting_current_error(ErlNifEnv* env, + SocketDescriptor* descP, + ERL_NIF_TERM ref, + int save_errno) +{ + ERL_NIF_TERM res; + + if (save_errno == ERRNO_BLOCK) { + + /* + * Just try again, no real error, just a ghost trigger from poll, + */ + + SSDBG( descP, + ("SOCKET", + "naccept_accepting_current_error -> would block: try again\r\n") ); + + res = naccept_busy_retry(env, descP, ref, &descP->currentAcceptor.pid, + /* No state change */ + descP->state); + + } else { + res = esock_make_error_errno(env, save_errno); + } + + return res; +} + + +/* *** naccept_accepting_other *** + * Handles when the another acceptor makes an attempt, which + * results (maybe) in the request beeing pushed onto the + * acceptor queue. + */ +static +ERL_NIF_TERM naccept_accepting_other(ErlNifEnv* env, + SocketDescriptor* descP, + ERL_NIF_TERM ref, + ErlNifPid caller) +{ + ERL_NIF_TERM result; + + if (!acceptor_search4pid(env, descP, &caller)) // Ugh! (&caller) + result = acceptor_push(env, descP, caller, ref); + else + result = esock_make_error(env, esock_atom_eagain); + + return result; +} +#endif // if !defined(__WIN32__) + + + +/* *** naccept_busy_retry *** + * Perform a retry select. If successful, set nextState. + */ +#if !defined(__WIN32__) +static +ERL_NIF_TERM naccept_busy_retry(ErlNifEnv* env, + SocketDescriptor* descP, + ERL_NIF_TERM ref, + ErlNifPid* pid, + unsigned int nextState) +{ + int sres; + ERL_NIF_TERM res, reason; + + if ((sres = esock_select_read(env, descP->sock, descP, pid, ref)) < 0) { + reason = MKT2(env, esock_atom_select_failed, MKI(env, sres)); + res = esock_make_error(env, reason); + } else { + descP->state = nextState; + res = esock_make_error(env, esock_atom_eagain); + } + + return res; +} + + + +/* *** naccept_accepted *** + * Generic function handling a successful accept. + */ +static +BOOLEAN_T naccept_accepted(ErlNifEnv* env, + SocketDescriptor* descP, + SOCKET accSock, + ErlNifPid pid, + SocketAddress* remote, + ERL_NIF_TERM* result) +{ + SocketDescriptor* accDescP; + HANDLE accEvent; + ERL_NIF_TERM accRef; + int save_errno; + + /* + * We got one + */ + + if ((accEvent = sock_create_event(accSock)) == INVALID_EVENT) { + save_errno = sock_errno(); + while ((sock_close(accSock) == INVALID_SOCKET) && + (sock_errno() == EINTR)); + *result = esock_make_error_errno(env, save_errno); + return FALSE; + } + + if ((accDescP = alloc_descriptor(accSock, accEvent)) == NULL) { + sock_close(accSock); + *result = enif_make_badarg(env); + return FALSE; + } + + accDescP->domain = descP->domain; + accDescP->type = descP->type; + accDescP->protocol = descP->protocol; + accDescP->rBufSz = descP->rBufSz; // Inherit buffer size + accDescP->rNum = descP->rNum; // Inherit buffer uses + accDescP->rNumCnt = 0; + accDescP->rCtrlSz = descP->rCtrlSz; // Inherit buffer siez + accDescP->wCtrlSz = descP->wCtrlSz; // Inherit buffer size + + accRef = enif_make_resource(env, accDescP); + enif_release_resource(accDescP); + + accDescP->ctrlPid = pid; + if (MONP("naccept_accepted -> ctrl", + env, accDescP, + &accDescP->ctrlPid, + &accDescP->ctrlMon) != 0) { + sock_close(accSock); + *result = esock_make_error(env, atom_exmon); + return FALSE; + } + + accDescP->remote = *remote; + SET_NONBLOCKING(accDescP->sock); + + accDescP->state = SOCKET_STATE_CONNECTED; + accDescP->isReadable = TRUE; + accDescP->isWritable = TRUE; + + *result = esock_make_ok2(env, accRef); + + return TRUE; + } #endif // if !defined(__WIN32__) @@ -17393,11 +17543,6 @@ int esock_demonitor(const char* slogan, SSDBG( descP, ("SOCKET", "[%d] %s: try demonitor\r\n", descP->sock, slogan) ); - /* - esock_dbg_printf("DEMONP", "[%d] %s: %T\r\n", - descP->sock, slogan, - my_make_monitor_term(env, &monP->mon));*/ - res = enif_demonitor_process(env, descP, &monP->mon); if (res == 0) { @@ -17405,9 +17550,6 @@ int esock_demonitor(const char* slogan, } else { SSDBG( descP, ("SOCKET", "[%d] demonitor failed: %d\r\n", descP->sock, res) ); - /* - esock_dbg_printf("DEMONP", "[%d] failed: %d\r\n", descP->sock, res); - */ } return res; @@ -17487,6 +17629,8 @@ void socket_stop(ErlNifEnv* env, void* obj, int fd, int is_direct_call) "\r\n sock: %d (%d)" "\r\n", ((is_direct_call) ? "called" : "scheduled"), descP->sock, fd) ); + + /* +++ Lock it down +++ */ MLOCK(descP->writeMtx); MLOCK(descP->readMtx); @@ -17525,11 +17669,20 @@ void socket_stop(ErlNifEnv* env, void* obj, int fd, int is_direct_call) * This *should* be done with a "NULL" monitor value, * which there currently is none... * If we got here because the controlling process died, - * its no point in demonitor. Also, we not actually have - * a monitor in that case... + * there is no point to demonitor. Also, we do not actually + * have a monitor in that case... */ DEMONP("socket_stop -> ctrl", env, descP, &descP->ctrlMon); + + + /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ + * + * Check current and waiting Writers + * + * +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ + */ + if (descP->currentWriterP != NULL) { /* We have a (current) writer and *may* therefor also have * writers waiting. @@ -17565,6 +17718,14 @@ void socket_stop(ErlNifEnv* env, void* obj, int fd, int is_direct_call) inform_waiting_procs(env, descP, &descP->writersQ, TRUE, atom_closed); } + + /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ + * + * Check current and waiting Readers + * + * +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ + */ + if (descP->currentReaderP != NULL) { /* We have a (current) reader and *may* therefor also have @@ -17601,6 +17762,15 @@ void socket_stop(ErlNifEnv* env, void* obj, int fd, int is_direct_call) inform_waiting_procs(env, descP, &descP->readersQ, TRUE, atom_closed); } + + + /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ + * + * Check current and waiting Acceptors + * + * +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ + */ + if (descP->currentAcceptorP != NULL) { /* We have a (current) acceptor and *may* therefor also have * acceptors waiting. @@ -17637,18 +17807,16 @@ void socket_stop(ErlNifEnv* env, void* obj, int fd, int is_direct_call) } + + /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ + * + * Maybe inform waiting closer + * + * +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ + */ + if (descP->sock != INVALID_SOCKET) { - /* - * <KOLLA> - * - * WE NEED TO CHECK IF THIS OPERATION IS TRIGGERED - * LOCALLY (VIA A CALL TO CLOSE) OR REMOTELLY - * (VIA I.E. ECONRESET). - * - * </KOLLA> - */ - if (descP->closeLocal) { if (!is_direct_call) { @@ -17685,6 +17853,7 @@ void socket_stop(ErlNifEnv* env, void* obj, int fd, int is_direct_call) } + /* This function traverse the queue and sends the specified * nif_abort message with the specified reason to each member, * and if the 'free' argument is TRUE, the queue will be emptied. @@ -17748,6 +17917,7 @@ void socket_down(ErlNifEnv* env, { #if !defined(__WIN32__) SocketDescriptor* descP = (SocketDescriptor*) obj; + int sres; SSDBG( descP, ("SOCKET", "socket_down -> entry with" "\r\n sock: %d" @@ -17759,8 +17929,8 @@ void socket_down(ErlNifEnv* env, B2S(IS_CLOSING(descP))) ); if (!IS_CLOSED(descP)) { + if (compare_pids(env, &descP->ctrlPid, pid)) { - int selectRes; /* We don't bother with the queue cleanup here - * we leave it to the stop callback function. @@ -17774,13 +17944,15 @@ void socket_down(ErlNifEnv* env, descP->closerPid = *pid; MON_INIT(&descP->closerMon); - selectRes = esock_select_stop(env, descP->sock, descP); + sres = esock_select_stop(env, descP->sock, descP); + + if (sres & ERL_NIF_SELECT_STOP_CALLED) { - if (selectRes & ERL_NIF_SELECT_STOP_CALLED) { /* We are done - we can finalize (socket close) directly */ SSDBG( descP, ("SOCKET", "socket_down -> [%d] stop called\r\n", descP->sock) ); + dec_socket(descP->domain, descP->type, descP->protocol); descP->state = SOCKET_STATE_CLOSED; @@ -17807,7 +17979,8 @@ void socket_down(ErlNifEnv* env, descP->state = SOCKET_STATE_CLOSED; - } else if (selectRes & ERL_NIF_SELECT_STOP_SCHEDULED) { + } else if (sres & ERL_NIF_SELECT_STOP_SCHEDULED) { + /* The stop callback function has been *scheduled* which means * that "should" wait for it to complete. But since we are in * a callback (down) function, we cannot... @@ -17817,6 +17990,7 @@ void socket_down(ErlNifEnv* env, ("SOCKET", "socket_down -> [%d] stop scheduled\r\n", descP->sock) ); + dec_socket(descP->domain, descP->type, descP->protocol); /* And now what? We can't wait for the stop function here... @@ -17838,23 +18012,13 @@ void socket_down(ErlNifEnv* env, } else { - /* - * <KOLLA> - * - * WE SHOULD REALLY HAVE A WAY TO CLOBBER THE SOCKET, - * SO WE DON'T LET STUFF LEAK. - * NOW, BECAUSE WE FAILED TO SELECT, WE CANNOT FINISH - * THE CLOSE, WHAT TO DO? ABORT? - * - * </KOLLA> - */ esock_warning_msg("Failed selecting stop when handling down " "of controlling process: " "\r\n Select Res: %d" "\r\n Controlling Process: %T" "\r\n Descriptor: %d" "\r\n Monitor: %T" - "\r\n", selectRes, pid, descP->sock, + "\r\n", sres, pid, descP->sock, my_make_monitor_term(env, mon)); } @@ -17886,13 +18050,6 @@ void socket_down(ErlNifEnv* env, } } - /* - esock_dbg_printf("DOWN", - "[%d] end %T\r\n", - descP->sock, - my_make_monitor_term(env, mon)); - */ - SSDBG( descP, ("SOCKET", "socket_down -> done\r\n") ); #endif // if !defined(__WIN32__) |