diff options
author | Micael Karlberg <[email protected]> | 2018-04-11 12:38:58 +0200 |
---|---|---|
committer | Micael Karlberg <[email protected]> | 2018-09-18 13:01:37 +0200 |
commit | c5c8da4ecb985837817e60738811793754c679a0 (patch) | |
tree | 4833202f8b878a428db794a8004c8dde058844de /erts/emulator/nifs | |
parent | 9e6fda01b1af0c42cee9f983d5bddecc7eb7e240 (diff) | |
download | otp-c5c8da4ecb985837817e60738811793754c679a0.tar.gz otp-c5c8da4ecb985837817e60738811793754c679a0.tar.bz2 otp-c5c8da4ecb985837817e60738811793754c679a0.zip |
[socket-nif] Completed accept
Diffstat (limited to 'erts/emulator/nifs')
-rw-r--r-- | erts/emulator/nifs/common/socket_nif.c | 432 |
1 files changed, 399 insertions, 33 deletions
diff --git a/erts/emulator/nifs/common/socket_nif.c b/erts/emulator/nifs/common/socket_nif.c index 88fb2206e4..3e8fe7061a 100644 --- a/erts/emulator/nifs/common/socket_nif.c +++ b/erts/emulator/nifs/common/socket_nif.c @@ -229,6 +229,8 @@ typedef unsigned long long llu_t; #define MKREF(E) enif_make_ref(E) #define MKT2(E,E1,E2) enif_make_tuple2(E, E1, E2) #define MCREATE(N) enif_mutex_create(N) +#define MLOCK(M) enif_mutex_lock(M) +#define MUNLOCK(M) enif_mutex_unlock(M) /* *** Socket state defs *** */ @@ -239,15 +241,13 @@ typedef unsigned long long llu_t; #define SOCKET_FLAG_CON 0x0010 #define SOCKET_FLAG_ACC 0x0020 #define SOCKET_FLAG_BUSY 0x0040 -#define SOCKET_FLAG_MULTI_CLIENT 0x0100 /* Multiple clients for one descriptor, * - * i.e. multi-accept */ + #define SOCKET_STATE_CLOSED (0) #define SOCKET_STATE_OPEN (SOCKET_FLAG_OPEN) #define SOCKET_STATE_CONNECTED (SOCKET_STATE_OPEN | SOCKET_FLAG_ACTIVE) #define SOCKET_STATE_LISTENING (SOCKET_STATE_OPEN | SOCKET_FLAG_LISTEN) #define SOCKET_STATE_CONNECTING (SOCKET_STATE_OPEN | SOCKET_FLAG_CON) #define SOCKET_STATE_ACCEPTING (SOCKET_STATE_LISTENING | SOCKET_FLAG_ACC) -#define SOCKET_STATE_MULTI_ACCEPTING (SOCKET_STATE_ACCEPTING | SOCKET_FLAG_MULTI_CLIENT) #define IS_OPEN(d) \ (((d)->state & SOCKET_FLAG_OPEN) == SOCKET_FLAG_OPEN) @@ -317,7 +317,10 @@ typedef unsigned long long llu_t; /* *** Windown macros *** */ +#define sock_accept(s, addr, len) \ + make_noninheritable_handle(accept((s), (addr), (len))) #define sock_bind(s, addr, len) bind((s), (addr), (len)) +#define sock_close(s) closesocket((s)) #define sock_connect(s, addr, len) connect((s), (addr), (len)) #define sock_getopt(s,t,n,v,l) getsockopt((s),(t),(n),(v),(l)) #define sock_htons(x) htons((x)) @@ -337,7 +340,14 @@ static unsigned long one_value = 1; #else /* !__WIN32__ */ +#ifdef HAS_ACCEPT4 +// We have to figure out what the flags are... +#define sock_accept(s, addr, len) accept4((s), (addr), (len), (SOCK_CLOEXEC)) +#else +#define sock_accept(s, addr, len) accept((s), (addr), (len)) +#endif #define sock_bind(s, addr, len) bind((s), (addr), (len)) +#define sock_close(s) close((s)) #define sock_connect(s, addr, len) connect((s), (addr), (len)) #define sock_getopt(s,t,n,v,l) getsockopt((s),(t),(n),(v),(l)) #define sock_htons(x) htons((x)) @@ -384,6 +394,12 @@ typedef struct { typedef struct { + ErlNifPid pid; // PID of the acceptor + ErlNifMonitor mon; // Monitor for the acceptor + ERL_NIF_TERM ref; // The (unique) reference of the (accept) request +} SocketAcceptor; + +typedef struct { // The actual socket SOCKET sock; HANDLE event; @@ -419,6 +435,13 @@ typedef struct { unsigned int readTries; unsigned int readWaits; + /* Accept + * We also need a queue for waiting acceptors... + * Lets see if this can be a common "request" queue... + */ + ErlNifMutex* accMtx; + SocketAcceptor acceptor; + /* We need to keep track of the "request(s)" we have pending. * If for instance an accept takes to long, the issuer may @@ -485,9 +508,6 @@ static ERL_NIF_TERM nif_listen(ErlNifEnv* env, static ERL_NIF_TERM nif_accept(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]); -static ERL_NIF_TERM nif_accept4(ErlNifEnv* env, - int argc, - const ERL_NIF_TERM argv[]); static ERL_NIF_TERM nif_send(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]); @@ -556,11 +576,26 @@ static ERL_NIF_TERM nconnect(ErlNifEnv* env, static ERL_NIF_TERM nlisten(ErlNifEnv* env, SocketDescriptor* descP, int backlog); +static ERL_NIF_TERM naccept_listening(ErlNifEnv* env, + SocketDescriptor* descP, + ERL_NIF_TERM ref); +static ERL_NIF_TERM naccept_accepting(ErlNifEnv* env, + SocketDescriptor* descP, + ERL_NIF_TERM ref); +static ERL_NIF_TERM naccept(ErlNifEnv* env, + SocketDescriptor* descP, + ERL_NIF_TERM ref); static ERL_NIF_TERM nfinalize_connection(ErlNifEnv* env, SocketDescriptor* descP); static BOOLEAN_T verify_is_connected(SocketDescriptor* descP, int* err); +static SocketDescriptor* alloc_descriptor(SOCKET sock, HANDLE event); + +static int compare_pids(ErlNifEnv* env, + const ErlNifPid* pid1, + const ErlNifPid* pid2); + static BOOLEAN_T edomain2domain(int edomain, int* domain); static BOOLEAN_T etype2type(int etype, int* type); static BOOLEAN_T eproto2proto(int eproto, int* proto); @@ -636,6 +671,7 @@ static char str_einval[] = "einval"; static char str_eisconn[] = "eisconn"; static char str_eisnconn[] = "eisnconn"; static char str_exbadstate[] = "exbadstate"; +static char str_exbusy[] = "exbusy"; static char str_exmon[] = "exmonitor"; // failed monitor static char str_exself[] = "exself"; // failed self @@ -653,6 +689,7 @@ static ERL_NIF_TERM atom_einval; static ERL_NIF_TERM atom_eisconn; static ERL_NIF_TERM atom_eisnconn; static ERL_NIF_TERM atom_exbadstate; +static ERL_NIF_TERM atom_exbusy; static ERL_NIF_TERM atom_exmon; static ERL_NIF_TERM atom_exself; @@ -686,7 +723,6 @@ static SocketData socketData; * nif_connect(Sock, Addr, Port) * nif_listen(Sock, Backlog) * nif_accept(LSock, Ref) - * nif_accept4(LSock, Ref) * nif_send(Sock, Data, Flags) * nif_sendto(Sock, Data, Flags, DstAddr, DstPort) * nif_recv(Sock, Flags) @@ -835,7 +871,7 @@ ERL_NIF_TERM nopen(ErlNifEnv* env, if ((event = sock_create_event(sock)) == INVALID_EVENT) { save_errno = sock_errno(); - while ((close(sock) == INVALID_SOCKET) && (sock_errno() == EINTR)); + while ((sock_close(sock) == INVALID_SOCKET) && (sock_errno() == EINTR)); return make_error2(env, save_errno); } @@ -844,31 +880,16 @@ ERL_NIF_TERM nopen(ErlNifEnv* env, /* Create and initiate the socket "descriptor" */ - descP = enif_alloc_resource(sockets, sizeof(SocketDescriptor)); - { - char buf[64]; /* Buffer used for building the mutex name */ - sprintf(buf, "socket[w,%d]", sock); - descP->writeMtx = MCREATE(buf); - sprintf(buf, "socket[r,%d]", sock); - descP->readMtx = MCREATE(buf); + if ((descP = alloc_descriptor(sock, event)) == NULL) { + sock_close(sock); + // Not sure if this is really the proper error, but... + return enif_make_badarg(env); } - descP->isWritable = TRUE; - descP->isReadable = TRUE; - descP->writePkgCnt = 0; - descP->writeByteCnt = 0; - descP->writeTries = 0; - descP->writeWaits = 0; - descP->readPkgCnt = 0; - descP->readByteCnt = 0; - descP->readTries = 0; - descP->readWaits = 0; - descP->dbg = SOCKET_DEBUG_DEFAULT; - descP->state = SOCKET_STATE_OPEN; - descP->domain = domain; - descP->type = type; - descP->protocol = protocol; - descP->sock = sock; - descP->event = event; + + descP->state = SOCKET_STATE_OPEN; + descP->domain = domain; + descP->type = type; + descP->protocol = protocol; res = enif_make_resource(env, descP); enif_release_resource(descP); // We should really store a reference ... @@ -1669,6 +1690,351 @@ ERL_NIF_TERM nlisten(ErlNifEnv* env, /* ---------------------------------------------------------------------- + * nif_accept + * + * Description: + * Accept a connection on a socket. + * + * Arguments: + * Socket (ref) - Points to the socket descriptor. + * Request ref - Unique "id" of this request + * (used for the select, if none is in queue). + */ +static +ERL_NIF_TERM nif_accept(ErlNifEnv* env, + int argc, + const ERL_NIF_TERM argv[]) +{ + SocketDescriptor* descP; + ERL_NIF_TERM ref; + + /* Extract arguments and perform preliminary validation */ + + if ((argc != 2) || + !enif_get_resource(env, argv[0], sockets, (void**) &descP)) { + return enif_make_badarg(env); + } + ref = argv[1]; + + return naccept(env, descP, ref); +} + + +static +ERL_NIF_TERM naccept(ErlNifEnv* env, + SocketDescriptor* descP, + ERL_NIF_TERM ref) +{ + ERL_NIF_TERM res; + + switch (descP->state) { + case SOCKET_STATE_LISTENING: + MLOCK(descP->accMtx); + res = naccept_listening(env, descP, ref); + MUNLOCK(descP->accMtx); + break; + + case SOCKET_STATE_ACCEPTING: + MLOCK(descP->accMtx); + res = naccept_accepting(env, descP, ref); + MUNLOCK(descP->accMtx); + break; + + default: + res = make_error(env, atom_einval); + break; + } + + return res; +} + + +/* *** naccept_listening *** + * We have no active acceptor and no acceptors in queue. + */ +static +ERL_NIF_TERM naccept_listening(ErlNifEnv* env, + SocketDescriptor* descP, + ERL_NIF_TERM ref) +{ + SocketAddress remote; + unsigned int n; + SOCKET accSock; + HANDLE accEvent; + int save_errno; + ErlNifPid caller; + + if (enif_self(env, &caller) == NULL) + return make_error(env, atom_exself); + + n = sizeof(descP->remote.u); + sys_memzero((char *) &remote, n); + accSock = sock_accept(descP->sock, (struct sockaddr*) &remote, &n); + if (accSock == INVALID_SOCKET) { + save_errno = sock_errno(); + if (save_errno == ERRNO_BLOCK) { + + /* *** Try again later *** */ + + descP->acceptor.pid = caller; + if (enif_monitor_process(env, descP, + &descP->acceptor.pid, + &descP->acceptor.mon) > 0) + return make_error(env, atom_exmon); + + descP->acceptor.ref = ref; + + enif_select(env, + descP->sock, + (ERL_NIF_SELECT_READ), + descP, NULL, ref); + + /* Shall we really change state? + * The ready event is sent directly to the calling + * process, which simply calls this function again. + * Basically, state accepting means that we have + * an "outstanding" accept. + * Shall we store the pid of the calling process? + * And if someone else calls accept, return with ebusy? + * Can any process call accept or just the controlling + * process? + * We also need a monitor it case the calling process is + * called before we are done! + * + * Change state (to accepting) and store pid of the acceptor + * (current process). Only accept calls from the acceptor + * process (ebusy) and once we have a successful accept, + * change state back to listening. If cancel is called instead + * (only accepted from the acceptor process), we reset + * state to listening and also resets the pid to "null" + * (is there such a value?). + * Need a mutex to secure that we don't test and change the + * pid at the same time. + */ + + descP->state = SOCKET_STATE_ACCEPTING; + + return make_error(env, atom_eagain); + + } else { + return make_error2(env, save_errno); + } + + } else { + SocketDescriptor* accDescP; + ERL_NIF_TERM accRef; + + /* + * We got one + */ + + if ((accEvent = sock_create_event(accSock)) == INVALID_EVENT) { + save_errno = sock_errno(); + while ((sock_close(accSock) == INVALID_SOCKET) && + (sock_errno() == EINTR)); + return make_error2(env, save_errno); + } + + if ((accDescP = alloc_descriptor(accSock, accEvent)) == NULL) { + sock_close(accSock); + return enif_make_badarg(env); + } + + accDescP->domain = descP->domain; + accDescP->type = descP->type; + accDescP->protocol = descP->protocol; + + accRef = enif_make_resource(env, accDescP); + enif_release_resource(accDescP); // We should really store a reference ... + + accDescP->ctrlPid = caller; + if (enif_monitor_process(env, accDescP, + &accDescP->ctrlPid, + &accDescP->ctrlMon) > 0) { + sock_close(accSock); + return make_error(env, atom_exmon); + } + + accDescP->remote = remote; + SET_NONBLOCKING(accDescP->sock); + +#ifdef __WIN32__ + /* See 'What is the point of this?' above */ + enif_select(env, + (ERL_NIF_SELECT_READ), + descP, NULL, atom_undefined); +#endif + + accDescP->state = SOCKET_STATE_CONNECTED; + + return make_ok(env, accRef); + } +} + + +/* *** naccept_accepting *** + * We have an active acceptor and possibly acceptors waiting in queue. + * At the moment the queue is *not* implemented. + */ +static +ERL_NIF_TERM naccept_accepting(ErlNifEnv* env, + SocketDescriptor* descP, + ERL_NIF_TERM ref) +{ + SocketAddress remote; + unsigned int n; + SOCKET accSock; + HANDLE accEvent; + ErlNifPid caller; + int save_errno; + + if (enif_self(env, &caller) == NULL) + return make_error(env, atom_exself); + + if (compare_pids(env, &descP->acceptor.pid, &caller) != 0) { + /* This will have to do until we implement the queue. + * When we have the queue, we should simply push this request, + * and instead return with eagain (the caller will then wait + * for the select message). + */ + return make_error(env, atom_exbusy); + } + + n = sizeof(descP->remote.u); + sys_memzero((char *) &remote, n); + accSock = sock_accept(descP->sock, (struct sockaddr*) &remote, &n); + if (accSock == INVALID_SOCKET) { + save_errno = sock_errno(); + if (save_errno == ERRNO_BLOCK) { + + /* + * Just try again, no real error, just a ghost trigger from poll, + */ + + enif_select(env, + descP->sock, + (ERL_NIF_SELECT_READ), + descP, NULL, ref); + + return make_error(env, atom_eagain); + } else { + return make_error2(env, save_errno); + } + } else { + SocketDescriptor* accDescP; + ERL_NIF_TERM accRef; + + /* + * We got one + */ + + if ((accEvent = sock_create_event(accSock)) == INVALID_EVENT) { + save_errno = sock_errno(); + while ((sock_close(accSock) == INVALID_SOCKET) && + (sock_errno() == EINTR)); + return make_error2(env, save_errno); + } + + if ((accDescP = alloc_descriptor(accSock, accEvent)) == NULL) { + sock_close(accSock); + return enif_make_badarg(env); + } + + accDescP->domain = descP->domain; + accDescP->type = descP->type; + accDescP->protocol = descP->protocol; + + accRef = enif_make_resource(env, accDescP); + enif_release_resource(accDescP); // We should really store a reference ... + + accDescP->ctrlPid = caller; + if (enif_monitor_process(env, accDescP, + &accDescP->ctrlPid, + &accDescP->ctrlMon) > 0) { + sock_close(accSock); + return make_error(env, atom_exmon); + } + + accDescP->remote = remote; + SET_NONBLOCKING(accDescP->sock); + +#ifdef __WIN32__ + /* See 'What is the point of this?' above */ + enif_select(env, + (ERL_NIF_SELECT_READ), + descP, NULL, atom_undefined); +#endif + + accDescP->state = SOCKET_STATE_CONNECTED; + + + /* Here we should have the test if we have something in the queue. + * And if so, pop it and copy the (waiting) acceptor, and then + * make a new select with that info). + */ + descP->state = SOCKET_STATE_LISTENING; + + return make_ok(env, accRef); + } +} + + + +/* *** alloc_descriptor *** + * Allocate and perform basic initialization of a socket descriptor. + * + */ +static +SocketDescriptor* alloc_descriptor(SOCKET sock, HANDLE event) +{ + SocketDescriptor* descP; + + if ((descP = enif_alloc_resource(sockets, sizeof(SocketDescriptor))) != NULL) { + char buf[64]; /* Buffer used for building the mutex name */ + + sprintf(buf, "socket[w,%d]", sock); + descP->writeMtx = MCREATE(buf); + + sprintf(buf, "socket[r,%d]", sock); + descP->readMtx = MCREATE(buf); + + sprintf(buf, "socket[acc,%d]", sock); + descP->accMtx = MCREATE(buf); + + descP->dbg = SOCKET_DEBUG_DEFAULT; + descP->isWritable = TRUE; + descP->isReadable = TRUE; + descP->writePkgCnt = 0; + descP->writeByteCnt = 0; + descP->writeTries = 0; + descP->writeWaits = 0; + descP->readPkgCnt = 0; + descP->readByteCnt = 0; + descP->readTries = 0; + descP->readWaits = 0; + + descP->sock = sock; + descP->event = event; + + } + + return descP; +} + + +static +int compare_pids(ErlNifEnv* env, + const ErlNifPid* pid1, + const ErlNifPid* pid2) +{ + ERL_NIF_TERM p1 = enif_make_pid(env, pid1); + ERL_NIF_TERM p2 = enif_make_pid(env, pid2); + + return enif_is_identical(p1, p2); +} + + +/* ---------------------------------------------------------------------- * U t i l i t y F u n c t i o n s * ---------------------------------------------------------------------- */ @@ -1931,7 +2297,6 @@ ErlNifFunc socket_funcs[] = {"nif_connect", 3, nif_connect}, {"nif_listen", 2, nif_listen}, {"nif_accept", 2, nif_accept}, - {"nif_accept4", 3, nif_accept4}, {"nif_send", 3, nif_send}, {"nif_sendto", 5, nif_sendto}, {"nif_recv", 2, nif_recv}, @@ -2035,6 +2400,7 @@ int on_load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info) atom_eisnconn = MKA(env, str_eisnconn); // atom_exalloc = MKA(env, str_exalloc); atom_exbadstate = MKA(env, str_exbadstate); + atom_exbusy = MKA(env, str_exbusy); // atom_exnotopen = MKA(env, str_exnotopen); atom_exmon = MKA(env, str_exmon); atom_exself = MKA(env, str_exself); |