aboutsummaryrefslogtreecommitdiffstats
path: root/erts/emulator/nifs/common/socket_nif.c
diff options
context:
space:
mode:
authorMicael Karlberg <[email protected]>2018-04-11 12:38:58 +0200
committerMicael Karlberg <[email protected]>2018-09-18 13:01:37 +0200
commitc5c8da4ecb985837817e60738811793754c679a0 (patch)
tree4833202f8b878a428db794a8004c8dde058844de /erts/emulator/nifs/common/socket_nif.c
parent9e6fda01b1af0c42cee9f983d5bddecc7eb7e240 (diff)
downloadotp-c5c8da4ecb985837817e60738811793754c679a0.tar.gz
otp-c5c8da4ecb985837817e60738811793754c679a0.tar.bz2
otp-c5c8da4ecb985837817e60738811793754c679a0.zip
[socket-nif] Completed accept
Diffstat (limited to 'erts/emulator/nifs/common/socket_nif.c')
-rw-r--r--erts/emulator/nifs/common/socket_nif.c432
1 files changed, 399 insertions, 33 deletions
diff --git a/erts/emulator/nifs/common/socket_nif.c b/erts/emulator/nifs/common/socket_nif.c
index 88fb2206e4..3e8fe7061a 100644
--- a/erts/emulator/nifs/common/socket_nif.c
+++ b/erts/emulator/nifs/common/socket_nif.c
@@ -229,6 +229,8 @@ typedef unsigned long long llu_t;
#define MKREF(E) enif_make_ref(E)
#define MKT2(E,E1,E2) enif_make_tuple2(E, E1, E2)
#define MCREATE(N) enif_mutex_create(N)
+#define MLOCK(M) enif_mutex_lock(M)
+#define MUNLOCK(M) enif_mutex_unlock(M)
/* *** Socket state defs *** */
@@ -239,15 +241,13 @@ typedef unsigned long long llu_t;
#define SOCKET_FLAG_CON 0x0010
#define SOCKET_FLAG_ACC 0x0020
#define SOCKET_FLAG_BUSY 0x0040
-#define SOCKET_FLAG_MULTI_CLIENT 0x0100 /* Multiple clients for one descriptor, *
- * i.e. multi-accept */
+
#define SOCKET_STATE_CLOSED (0)
#define SOCKET_STATE_OPEN (SOCKET_FLAG_OPEN)
#define SOCKET_STATE_CONNECTED (SOCKET_STATE_OPEN | SOCKET_FLAG_ACTIVE)
#define SOCKET_STATE_LISTENING (SOCKET_STATE_OPEN | SOCKET_FLAG_LISTEN)
#define SOCKET_STATE_CONNECTING (SOCKET_STATE_OPEN | SOCKET_FLAG_CON)
#define SOCKET_STATE_ACCEPTING (SOCKET_STATE_LISTENING | SOCKET_FLAG_ACC)
-#define SOCKET_STATE_MULTI_ACCEPTING (SOCKET_STATE_ACCEPTING | SOCKET_FLAG_MULTI_CLIENT)
#define IS_OPEN(d) \
(((d)->state & SOCKET_FLAG_OPEN) == SOCKET_FLAG_OPEN)
@@ -317,7 +317,10 @@ typedef unsigned long long llu_t;
/* *** Windown macros *** */
+#define sock_accept(s, addr, len) \
+ make_noninheritable_handle(accept((s), (addr), (len)))
#define sock_bind(s, addr, len) bind((s), (addr), (len))
+#define sock_close(s) closesocket((s))
#define sock_connect(s, addr, len) connect((s), (addr), (len))
#define sock_getopt(s,t,n,v,l) getsockopt((s),(t),(n),(v),(l))
#define sock_htons(x) htons((x))
@@ -337,7 +340,14 @@ static unsigned long one_value = 1;
#else /* !__WIN32__ */
+#ifdef HAS_ACCEPT4
+// We have to figure out what the flags are...
+#define sock_accept(s, addr, len) accept4((s), (addr), (len), (SOCK_CLOEXEC))
+#else
+#define sock_accept(s, addr, len) accept((s), (addr), (len))
+#endif
#define sock_bind(s, addr, len) bind((s), (addr), (len))
+#define sock_close(s) close((s))
#define sock_connect(s, addr, len) connect((s), (addr), (len))
#define sock_getopt(s,t,n,v,l) getsockopt((s),(t),(n),(v),(l))
#define sock_htons(x) htons((x))
@@ -384,6 +394,12 @@ typedef struct {
typedef struct {
+ ErlNifPid pid; // PID of the acceptor
+ ErlNifMonitor mon; // Monitor for the acceptor
+ ERL_NIF_TERM ref; // The (unique) reference of the (accept) request
+} SocketAcceptor;
+
+typedef struct {
// The actual socket
SOCKET sock;
HANDLE event;
@@ -419,6 +435,13 @@ typedef struct {
unsigned int readTries;
unsigned int readWaits;
+ /* Accept
+ * We also need a queue for waiting acceptors...
+ * Lets see if this can be a common "request" queue...
+ */
+ ErlNifMutex* accMtx;
+ SocketAcceptor acceptor;
+
/* We need to keep track of the "request(s)" we have pending.
* If for instance an accept takes to long, the issuer may
@@ -485,9 +508,6 @@ static ERL_NIF_TERM nif_listen(ErlNifEnv* env,
static ERL_NIF_TERM nif_accept(ErlNifEnv* env,
int argc,
const ERL_NIF_TERM argv[]);
-static ERL_NIF_TERM nif_accept4(ErlNifEnv* env,
- int argc,
- const ERL_NIF_TERM argv[]);
static ERL_NIF_TERM nif_send(ErlNifEnv* env,
int argc,
const ERL_NIF_TERM argv[]);
@@ -556,11 +576,26 @@ static ERL_NIF_TERM nconnect(ErlNifEnv* env,
static ERL_NIF_TERM nlisten(ErlNifEnv* env,
SocketDescriptor* descP,
int backlog);
+static ERL_NIF_TERM naccept_listening(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ ERL_NIF_TERM ref);
+static ERL_NIF_TERM naccept_accepting(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ ERL_NIF_TERM ref);
+static ERL_NIF_TERM naccept(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ ERL_NIF_TERM ref);
static ERL_NIF_TERM nfinalize_connection(ErlNifEnv* env,
SocketDescriptor* descP);
static BOOLEAN_T verify_is_connected(SocketDescriptor* descP, int* err);
+static SocketDescriptor* alloc_descriptor(SOCKET sock, HANDLE event);
+
+static int compare_pids(ErlNifEnv* env,
+ const ErlNifPid* pid1,
+ const ErlNifPid* pid2);
+
static BOOLEAN_T edomain2domain(int edomain, int* domain);
static BOOLEAN_T etype2type(int etype, int* type);
static BOOLEAN_T eproto2proto(int eproto, int* proto);
@@ -636,6 +671,7 @@ static char str_einval[] = "einval";
static char str_eisconn[] = "eisconn";
static char str_eisnconn[] = "eisnconn";
static char str_exbadstate[] = "exbadstate";
+static char str_exbusy[] = "exbusy";
static char str_exmon[] = "exmonitor"; // failed monitor
static char str_exself[] = "exself"; // failed self
@@ -653,6 +689,7 @@ static ERL_NIF_TERM atom_einval;
static ERL_NIF_TERM atom_eisconn;
static ERL_NIF_TERM atom_eisnconn;
static ERL_NIF_TERM atom_exbadstate;
+static ERL_NIF_TERM atom_exbusy;
static ERL_NIF_TERM atom_exmon;
static ERL_NIF_TERM atom_exself;
@@ -686,7 +723,6 @@ static SocketData socketData;
* nif_connect(Sock, Addr, Port)
* nif_listen(Sock, Backlog)
* nif_accept(LSock, Ref)
- * nif_accept4(LSock, Ref)
* nif_send(Sock, Data, Flags)
* nif_sendto(Sock, Data, Flags, DstAddr, DstPort)
* nif_recv(Sock, Flags)
@@ -835,7 +871,7 @@ ERL_NIF_TERM nopen(ErlNifEnv* env,
if ((event = sock_create_event(sock)) == INVALID_EVENT) {
save_errno = sock_errno();
- while ((close(sock) == INVALID_SOCKET) && (sock_errno() == EINTR));
+ while ((sock_close(sock) == INVALID_SOCKET) && (sock_errno() == EINTR));
return make_error2(env, save_errno);
}
@@ -844,31 +880,16 @@ ERL_NIF_TERM nopen(ErlNifEnv* env,
/* Create and initiate the socket "descriptor" */
- descP = enif_alloc_resource(sockets, sizeof(SocketDescriptor));
- {
- char buf[64]; /* Buffer used for building the mutex name */
- sprintf(buf, "socket[w,%d]", sock);
- descP->writeMtx = MCREATE(buf);
- sprintf(buf, "socket[r,%d]", sock);
- descP->readMtx = MCREATE(buf);
+ if ((descP = alloc_descriptor(sock, event)) == NULL) {
+ sock_close(sock);
+ // Not sure if this is really the proper error, but...
+ return enif_make_badarg(env);
}
- descP->isWritable = TRUE;
- descP->isReadable = TRUE;
- descP->writePkgCnt = 0;
- descP->writeByteCnt = 0;
- descP->writeTries = 0;
- descP->writeWaits = 0;
- descP->readPkgCnt = 0;
- descP->readByteCnt = 0;
- descP->readTries = 0;
- descP->readWaits = 0;
- descP->dbg = SOCKET_DEBUG_DEFAULT;
- descP->state = SOCKET_STATE_OPEN;
- descP->domain = domain;
- descP->type = type;
- descP->protocol = protocol;
- descP->sock = sock;
- descP->event = event;
+
+ descP->state = SOCKET_STATE_OPEN;
+ descP->domain = domain;
+ descP->type = type;
+ descP->protocol = protocol;
res = enif_make_resource(env, descP);
enif_release_resource(descP); // We should really store a reference ...
@@ -1669,6 +1690,351 @@ ERL_NIF_TERM nlisten(ErlNifEnv* env,
/* ----------------------------------------------------------------------
+ * nif_accept
+ *
+ * Description:
+ * Accept a connection on a socket.
+ *
+ * Arguments:
+ * Socket (ref) - Points to the socket descriptor.
+ * Request ref - Unique "id" of this request
+ * (used for the select, if none is in queue).
+ */
+static
+ERL_NIF_TERM nif_accept(ErlNifEnv* env,
+ int argc,
+ const ERL_NIF_TERM argv[])
+{
+ SocketDescriptor* descP;
+ ERL_NIF_TERM ref;
+
+ /* Extract arguments and perform preliminary validation */
+
+ if ((argc != 2) ||
+ !enif_get_resource(env, argv[0], sockets, (void**) &descP)) {
+ return enif_make_badarg(env);
+ }
+ ref = argv[1];
+
+ return naccept(env, descP, ref);
+}
+
+
+static
+ERL_NIF_TERM naccept(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ ERL_NIF_TERM ref)
+{
+ ERL_NIF_TERM res;
+
+ switch (descP->state) {
+ case SOCKET_STATE_LISTENING:
+ MLOCK(descP->accMtx);
+ res = naccept_listening(env, descP, ref);
+ MUNLOCK(descP->accMtx);
+ break;
+
+ case SOCKET_STATE_ACCEPTING:
+ MLOCK(descP->accMtx);
+ res = naccept_accepting(env, descP, ref);
+ MUNLOCK(descP->accMtx);
+ break;
+
+ default:
+ res = make_error(env, atom_einval);
+ break;
+ }
+
+ return res;
+}
+
+
+/* *** naccept_listening ***
+ * We have no active acceptor and no acceptors in queue.
+ */
+static
+ERL_NIF_TERM naccept_listening(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ ERL_NIF_TERM ref)
+{
+ SocketAddress remote;
+ unsigned int n;
+ SOCKET accSock;
+ HANDLE accEvent;
+ int save_errno;
+ ErlNifPid caller;
+
+ if (enif_self(env, &caller) == NULL)
+ return make_error(env, atom_exself);
+
+ n = sizeof(descP->remote.u);
+ sys_memzero((char *) &remote, n);
+ accSock = sock_accept(descP->sock, (struct sockaddr*) &remote, &n);
+ if (accSock == INVALID_SOCKET) {
+ save_errno = sock_errno();
+ if (save_errno == ERRNO_BLOCK) {
+
+ /* *** Try again later *** */
+
+ descP->acceptor.pid = caller;
+ if (enif_monitor_process(env, descP,
+ &descP->acceptor.pid,
+ &descP->acceptor.mon) > 0)
+ return make_error(env, atom_exmon);
+
+ descP->acceptor.ref = ref;
+
+ enif_select(env,
+ descP->sock,
+ (ERL_NIF_SELECT_READ),
+ descP, NULL, ref);
+
+ /* Shall we really change state?
+ * The ready event is sent directly to the calling
+ * process, which simply calls this function again.
+ * Basically, state accepting means that we have
+ * an "outstanding" accept.
+ * Shall we store the pid of the calling process?
+ * And if someone else calls accept, return with ebusy?
+ * Can any process call accept or just the controlling
+ * process?
+ * We also need a monitor it case the calling process is
+ * called before we are done!
+ *
+ * Change state (to accepting) and store pid of the acceptor
+ * (current process). Only accept calls from the acceptor
+ * process (ebusy) and once we have a successful accept,
+ * change state back to listening. If cancel is called instead
+ * (only accepted from the acceptor process), we reset
+ * state to listening and also resets the pid to "null"
+ * (is there such a value?).
+ * Need a mutex to secure that we don't test and change the
+ * pid at the same time.
+ */
+
+ descP->state = SOCKET_STATE_ACCEPTING;
+
+ return make_error(env, atom_eagain);
+
+ } else {
+ return make_error2(env, save_errno);
+ }
+
+ } else {
+ SocketDescriptor* accDescP;
+ ERL_NIF_TERM accRef;
+
+ /*
+ * We got one
+ */
+
+ if ((accEvent = sock_create_event(accSock)) == INVALID_EVENT) {
+ save_errno = sock_errno();
+ while ((sock_close(accSock) == INVALID_SOCKET) &&
+ (sock_errno() == EINTR));
+ return make_error2(env, save_errno);
+ }
+
+ if ((accDescP = alloc_descriptor(accSock, accEvent)) == NULL) {
+ sock_close(accSock);
+ return enif_make_badarg(env);
+ }
+
+ accDescP->domain = descP->domain;
+ accDescP->type = descP->type;
+ accDescP->protocol = descP->protocol;
+
+ accRef = enif_make_resource(env, accDescP);
+ enif_release_resource(accDescP); // We should really store a reference ...
+
+ accDescP->ctrlPid = caller;
+ if (enif_monitor_process(env, accDescP,
+ &accDescP->ctrlPid,
+ &accDescP->ctrlMon) > 0) {
+ sock_close(accSock);
+ return make_error(env, atom_exmon);
+ }
+
+ accDescP->remote = remote;
+ SET_NONBLOCKING(accDescP->sock);
+
+#ifdef __WIN32__
+ /* See 'What is the point of this?' above */
+ enif_select(env,
+ (ERL_NIF_SELECT_READ),
+ descP, NULL, atom_undefined);
+#endif
+
+ accDescP->state = SOCKET_STATE_CONNECTED;
+
+ return make_ok(env, accRef);
+ }
+}
+
+
+/* *** naccept_accepting ***
+ * We have an active acceptor and possibly acceptors waiting in queue.
+ * At the moment the queue is *not* implemented.
+ */
+static
+ERL_NIF_TERM naccept_accepting(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ ERL_NIF_TERM ref)
+{
+ SocketAddress remote;
+ unsigned int n;
+ SOCKET accSock;
+ HANDLE accEvent;
+ ErlNifPid caller;
+ int save_errno;
+
+ if (enif_self(env, &caller) == NULL)
+ return make_error(env, atom_exself);
+
+ if (compare_pids(env, &descP->acceptor.pid, &caller) != 0) {
+ /* This will have to do until we implement the queue.
+ * When we have the queue, we should simply push this request,
+ * and instead return with eagain (the caller will then wait
+ * for the select message).
+ */
+ return make_error(env, atom_exbusy);
+ }
+
+ n = sizeof(descP->remote.u);
+ sys_memzero((char *) &remote, n);
+ accSock = sock_accept(descP->sock, (struct sockaddr*) &remote, &n);
+ if (accSock == INVALID_SOCKET) {
+ save_errno = sock_errno();
+ if (save_errno == ERRNO_BLOCK) {
+
+ /*
+ * Just try again, no real error, just a ghost trigger from poll,
+ */
+
+ enif_select(env,
+ descP->sock,
+ (ERL_NIF_SELECT_READ),
+ descP, NULL, ref);
+
+ return make_error(env, atom_eagain);
+ } else {
+ return make_error2(env, save_errno);
+ }
+ } else {
+ SocketDescriptor* accDescP;
+ ERL_NIF_TERM accRef;
+
+ /*
+ * We got one
+ */
+
+ if ((accEvent = sock_create_event(accSock)) == INVALID_EVENT) {
+ save_errno = sock_errno();
+ while ((sock_close(accSock) == INVALID_SOCKET) &&
+ (sock_errno() == EINTR));
+ return make_error2(env, save_errno);
+ }
+
+ if ((accDescP = alloc_descriptor(accSock, accEvent)) == NULL) {
+ sock_close(accSock);
+ return enif_make_badarg(env);
+ }
+
+ accDescP->domain = descP->domain;
+ accDescP->type = descP->type;
+ accDescP->protocol = descP->protocol;
+
+ accRef = enif_make_resource(env, accDescP);
+ enif_release_resource(accDescP); // We should really store a reference ...
+
+ accDescP->ctrlPid = caller;
+ if (enif_monitor_process(env, accDescP,
+ &accDescP->ctrlPid,
+ &accDescP->ctrlMon) > 0) {
+ sock_close(accSock);
+ return make_error(env, atom_exmon);
+ }
+
+ accDescP->remote = remote;
+ SET_NONBLOCKING(accDescP->sock);
+
+#ifdef __WIN32__
+ /* See 'What is the point of this?' above */
+ enif_select(env,
+ (ERL_NIF_SELECT_READ),
+ descP, NULL, atom_undefined);
+#endif
+
+ accDescP->state = SOCKET_STATE_CONNECTED;
+
+
+ /* Here we should have the test if we have something in the queue.
+ * And if so, pop it and copy the (waiting) acceptor, and then
+ * make a new select with that info).
+ */
+ descP->state = SOCKET_STATE_LISTENING;
+
+ return make_ok(env, accRef);
+ }
+}
+
+
+
+/* *** alloc_descriptor ***
+ * Allocate and perform basic initialization of a socket descriptor.
+ *
+ */
+static
+SocketDescriptor* alloc_descriptor(SOCKET sock, HANDLE event)
+{
+ SocketDescriptor* descP;
+
+ if ((descP = enif_alloc_resource(sockets, sizeof(SocketDescriptor))) != NULL) {
+ char buf[64]; /* Buffer used for building the mutex name */
+
+ sprintf(buf, "socket[w,%d]", sock);
+ descP->writeMtx = MCREATE(buf);
+
+ sprintf(buf, "socket[r,%d]", sock);
+ descP->readMtx = MCREATE(buf);
+
+ sprintf(buf, "socket[acc,%d]", sock);
+ descP->accMtx = MCREATE(buf);
+
+ descP->dbg = SOCKET_DEBUG_DEFAULT;
+ descP->isWritable = TRUE;
+ descP->isReadable = TRUE;
+ descP->writePkgCnt = 0;
+ descP->writeByteCnt = 0;
+ descP->writeTries = 0;
+ descP->writeWaits = 0;
+ descP->readPkgCnt = 0;
+ descP->readByteCnt = 0;
+ descP->readTries = 0;
+ descP->readWaits = 0;
+
+ descP->sock = sock;
+ descP->event = event;
+
+ }
+
+ return descP;
+}
+
+
+static
+int compare_pids(ErlNifEnv* env,
+ const ErlNifPid* pid1,
+ const ErlNifPid* pid2)
+{
+ ERL_NIF_TERM p1 = enif_make_pid(env, pid1);
+ ERL_NIF_TERM p2 = enif_make_pid(env, pid2);
+
+ return enif_is_identical(p1, p2);
+}
+
+
+/* ----------------------------------------------------------------------
* U t i l i t y F u n c t i o n s
* ----------------------------------------------------------------------
*/
@@ -1931,7 +2297,6 @@ ErlNifFunc socket_funcs[] =
{"nif_connect", 3, nif_connect},
{"nif_listen", 2, nif_listen},
{"nif_accept", 2, nif_accept},
- {"nif_accept4", 3, nif_accept4},
{"nif_send", 3, nif_send},
{"nif_sendto", 5, nif_sendto},
{"nif_recv", 2, nif_recv},
@@ -2035,6 +2400,7 @@ int on_load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info)
atom_eisnconn = MKA(env, str_eisnconn);
// atom_exalloc = MKA(env, str_exalloc);
atom_exbadstate = MKA(env, str_exbadstate);
+ atom_exbusy = MKA(env, str_exbusy);
// atom_exnotopen = MKA(env, str_exnotopen);
atom_exmon = MKA(env, str_exmon);
atom_exself = MKA(env, str_exself);