aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMicael Karlberg <[email protected]>2018-04-24 17:38:52 +0200
committerMicael Karlberg <[email protected]>2018-09-18 13:01:37 +0200
commit599a320f630991823fc28b6a8a9f09851e261fed (patch)
tree4052c4bacabd1b2258f0a2ddf8427155589d4208
parent04335ca6aedfc5ad9f0d6a8d193dfd76a222291c (diff)
downloadotp-599a320f630991823fc28b6a8a9f09851e261fed.tar.gz
otp-599a320f630991823fc28b6a8a9f09851e261fed.tar.bz2
otp-599a320f630991823fc28b6a8a9f09851e261fed.zip
[socket-nif] "Completed" the close function
There is probably a lot of things left to be here. For instance the handling of ECONNRESET when reading (recv and recvfrom). Also some stuff about setopt and getopt.
-rw-r--r--erts/emulator/nifs/common/socket_nif.c542
-rw-r--r--erts/preloaded/src/socket.erl123
2 files changed, 597 insertions, 68 deletions
diff --git a/erts/emulator/nifs/common/socket_nif.c b/erts/emulator/nifs/common/socket_nif.c
index 6e6851a608..d55de9ff4e 100644
--- a/erts/emulator/nifs/common/socket_nif.c
+++ b/erts/emulator/nifs/common/socket_nif.c
@@ -185,6 +185,10 @@ typedef unsigned int BOOLEAN_T;
#define get_int16(s) ((((unsigned char*) (s))[0] << 8) | \
(((unsigned char*) (s))[1]))
+#define SASSERT(e) \
+ ((void) ((e) ? 1 : (xabort(#e, __func__, __FILE__, __LINE__), 0)))
+
+
/* Debug stuff... */
#define SOCKET_NIF_DEBUG_DEFAULT TRUE
#define SOCKET_DEBUG_DEFAULT TRUE
@@ -235,6 +239,7 @@ typedef unsigned long long llu_t;
#define SOCKET_FLAG_CON 0x0010
#define SOCKET_FLAG_ACC 0x0020
#define SOCKET_FLAG_BUSY 0x0040
+#define SOCKET_FLAG_CLOSE 0x0080
#define SOCKET_STATE_CLOSED (0)
#define SOCKET_STATE_OPEN (SOCKET_FLAG_OPEN)
@@ -242,6 +247,7 @@ typedef unsigned long long llu_t;
#define SOCKET_STATE_LISTENING (SOCKET_STATE_OPEN | SOCKET_FLAG_LISTEN)
#define SOCKET_STATE_CONNECTING (SOCKET_STATE_OPEN | SOCKET_FLAG_CON)
#define SOCKET_STATE_ACCEPTING (SOCKET_STATE_LISTENING | SOCKET_FLAG_ACC)
+#define SOCKET_STATE_CLOSING (SOCKET_FLAG_CLOSE)
#define IS_OPEN(d) \
(((d)->state & SOCKET_FLAG_OPEN) == SOCKET_FLAG_OPEN)
@@ -331,6 +337,7 @@ typedef union {
#define MALLOC(SZ) enif_alloc((SZ))
#define FREE(P) enif_free((P))
+
#define MKA(E,S) enif_make_atom((E), (S))
#define MKBIN(E,B) enif_make_binary((E), (B))
#define MKI(E,I) enif_make_int((E), (I))
@@ -343,9 +350,15 @@ typedef union {
#define MKT4(E,E1,E2,E3,E4) enif_make_tuple4((E), (E1), (E2), (E3), (E4))
#define MKT8(E,E1,E2,E3,E4,E5,E6,E7,E8) \
enif_make_tuple8((E), (E1), (E2), (E3), (E4), (E5), (E6), (E7), (E8))
+
#define MCREATE(N) enif_mutex_create((N))
+#define MDESTROY(M) enif_mutex_destroy((M))
#define MLOCK(M) enif_mutex_lock((M))
#define MUNLOCK(M) enif_mutex_unlock((M))
+
+#define MONP(E,D,P,M) enif_monitor_process((E), (D), (P), (M))
+#define DEMONP(E,D,M) enif_demonitor_process((E), (D), (M))
+
#define SELECT(E,FD,M,O,P,R) \
if (enif_select((E), (FD), (M), (O), (P), (R)) < 0) \
return enif_make_badarg((E));
@@ -375,13 +388,16 @@ typedef union {
#ifdef __WIN32__
-/* *** Windown macros *** */
+/* *** Windows macros *** */
#define sock_accept(s, addr, len) \
make_noninheritable_handle(accept((s), (addr), (len)))
#define sock_bind(s, addr, len) bind((s), (addr), (len))
#define sock_close(s) closesocket((s))
+#define sock_close_event(e) WSACloseEvent(e)
#define sock_connect(s, addr, len) connect((s), (addr), (len))
+#define sock_create_event(s) WSACreateEvent()
+#define sock_errno() WSAGetLastError()
#define sock_getopt(s,t,n,v,l) getsockopt((s),(t),(n),(v),(l))
#define sock_htons(x) htons((x))
#define sock_htonl(x) htonl((x))
@@ -397,16 +413,16 @@ typedef union {
#define sock_sendto(s,buf,blen,flag,addr,alen) \
sendto((s),(buf),(blen),(flag),(addr),(alen))
-#define sock_errno() WSAGetLastError()
-#define sock_create_event(s) WSACreateEvent()
#define SET_BLOCKING(s) ioctlsocket(s, FIONBIO, &zero_value)
#define SET_NONBLOCKING(s) ioctlsocket(s, FIONBIO, &one_value)
static unsigned long zero_value = 0;
static unsigned long one_value = 1;
+
#else /* !__WIN32__ */
+
#ifdef HAS_ACCEPT4
// We have to figure out what the flags are...
#define sock_accept(s, addr, len) accept4((s), (addr), (len), (SOCK_CLOEXEC))
@@ -415,7 +431,10 @@ static unsigned long one_value = 1;
#endif
#define sock_bind(s, addr, len) bind((s), (addr), (len))
#define sock_close(s) close((s))
+#define sock_close_event(e) /* do nothing */
#define sock_connect(s, addr, len) connect((s), (addr), (len))
+#define sock_create_event(s) (s) /* return file descriptor */
+#define sock_errno() errno
#define sock_getopt(s,t,n,v,l) getsockopt((s),(t),(n),(v),(l))
#define sock_htons(x) htons((x))
#define sock_htonl(x) htonl((x))
@@ -430,10 +449,6 @@ static unsigned long one_value = 1;
#define sock_sendto(s,buf,blen,flag,addr,alen) \
sendto((s),(buf),(blen),(flag),(addr),(alen))
-#define sock_errno() errno
-#define sock_create_event(s) (s) /* return file descriptor */
-
-
#endif /* !__WIN32__ */
#ifdef HAVE_SOCKLEN_T
@@ -470,7 +485,7 @@ typedef struct {
} SocketRequestor;
typedef struct socket_request_queue_element {
- struct socket_request_queue_element* next;
+ struct socket_request_queue_element* nextP;
SocketRequestor data;
} SocketRequestQueueElement;
@@ -534,6 +549,12 @@ typedef struct {
BOOLEAN_T iow; // Inform On Wrap
BOOLEAN_T dbg;
+ /* +++ Close stuff +++ */
+ ErlNifMutex* closeMtx;
+ ErlNifPid closerPid;
+ ErlNifMonitor closerMon;
+ ERL_NIF_TERM closeRef;
+
} SocketDescriptor;
@@ -609,6 +630,9 @@ static ERL_NIF_TERM nif_getsockopt(ErlNifEnv* env,
static ERL_NIF_TERM nif_finalize_connection(ErlNifEnv* env,
int argc,
const ERL_NIF_TERM argv[]);
+static ERL_NIF_TERM nif_finalize_close(ErlNifEnv* env,
+ int argc,
+ const ERL_NIF_TERM argv[]);
static ERL_NIF_TERM nif_cancel(ErlNifEnv* env,
int argc,
const ERL_NIF_TERM argv[]);
@@ -660,6 +684,8 @@ static ERL_NIF_TERM nrecvfrom(ErlNifEnv* env,
ERL_NIF_TERM recvRef,
uint16_t bufSz,
int flags);
+static ERL_NIF_TERM nclose(ErlNifEnv* env,
+ SocketDescriptor* descP);
static ERL_NIF_TERM send_check_result(ErlNifEnv* env,
SocketDescriptor* descP,
@@ -682,6 +708,8 @@ static ERL_NIF_TERM recvfrom_check_result(ErlNifEnv* env,
static ERL_NIF_TERM nfinalize_connection(ErlNifEnv* env,
SocketDescriptor* descP);
+static ERL_NIF_TERM nfinalize_close(ErlNifEnv* env,
+ SocketDescriptor* descP);
static char* decode_laddress(ErlNifEnv* env,
@@ -730,6 +758,12 @@ static void encode_address(ErlNifEnv* env,
ERL_NIF_TERM* fromDomainT,
ERL_NIF_TERM* fromSourceT);
+static void inform_waiting_procs(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ SocketRequestQueue* q,
+ BOOLEAN_T free,
+ ERL_NIF_TERM msg);
+
static BOOLEAN_T verify_is_connected(SocketDescriptor* descP, int* err);
static SocketDescriptor* alloc_descriptor(SOCKET sock, HANDLE event);
@@ -773,6 +807,19 @@ static ERL_NIF_TERM make_error(ErlNifEnv* env, ERL_NIF_TERM reason);
static ERL_NIF_TERM make_error1(ErlNifEnv* env, char* reason);
static ERL_NIF_TERM make_error2(ErlNifEnv* env, int err);
+static char* send_msg_error_closed(ErlNifEnv* env,
+ ErlNifPid* pid);
+static char* send_msg_error(ErlNifEnv* env,
+ ERL_NIF_TERM reason,
+ ErlNifPid* pid);
+static char* send_msg(ErlNifEnv* env,
+ ERL_NIF_TERM msg,
+ ErlNifPid* pid);
+
+static void xabort(const char* expr,
+ const char* func,
+ const char* file,
+ int line);
static BOOLEAN_T extract_item_on_load(ErlNifEnv* env,
ERL_NIF_TERM map,
@@ -810,10 +857,14 @@ static const struct in6_addr in6addr_loopback =
/* *** String constants *** */
+static char str_close[] = "close";
static char str_closed[] = "closed";
+static char str_closing[] = "closing";
static char str_error[] = "error";
static char str_false[] = "false";
static char str_ok[] = "ok";
+static char str_select[] = "select";
+static char str_timeout[] = "timeout";
static char str_true[] = "true";
static char str_undefined[] = "undefined";
@@ -822,19 +873,25 @@ static char str_eagain[] = "eagain";
static char str_eafnosupport[] = "eafnosupport";
static char str_einval[] = "einval";
static char str_eisconn[] = "eisconn";
+static char str_enotclosing[] = "enotclosing";
static char str_enotconn[] = "enotconn";
static char str_exalloc[] = "exalloc";
static char str_exbadstate[] = "exbadstate";
static char str_exbusy[] = "exbusy";
static char str_exmon[] = "exmonitor"; // failed monitor
static char str_exself[] = "exself"; // failed self
+static char str_exsend[] = "exsend"; // failed send
/* *** Atoms *** */
+static ERL_NIF_TERM atom_close;
static ERL_NIF_TERM atom_closed;
+static ERL_NIF_TERM atom_closing;
static ERL_NIF_TERM atom_error;
static ERL_NIF_TERM atom_false;
static ERL_NIF_TERM atom_ok;
+static ERL_NIF_TERM atom_select;
+static ERL_NIF_TERM atom_timeout;
static ERL_NIF_TERM atom_true;
static ERL_NIF_TERM atom_undefined;
@@ -842,12 +899,14 @@ static ERL_NIF_TERM atom_eagain;
static ERL_NIF_TERM atom_eafnosupport;
static ERL_NIF_TERM atom_einval;
static ERL_NIF_TERM atom_eisconn;
+static ERL_NIF_TERM atom_enotclosing;
static ERL_NIF_TERM atom_enotconn;
static ERL_NIF_TERM atom_exalloc;
static ERL_NIF_TERM atom_exbadstate;
static ERL_NIF_TERM atom_exbusy;
static ERL_NIF_TERM atom_exmon;
static ERL_NIF_TERM atom_exself;
+static ERL_NIF_TERM atom_exsend;
/* *** Sockets *** */
@@ -1059,9 +1118,9 @@ ERL_NIF_TERM nopen(ErlNifEnv* env,
if (enif_self(env, &descP->ctrlPid) == NULL)
return make_error(env, atom_exself);
- if (enif_monitor_process(env, descP,
- &descP->ctrlPid,
- &descP->ctrlMon) > 0)
+ if (MONP(env, descP,
+ &descP->ctrlPid,
+ &descP->ctrlMon) > 0)
return make_error(env, atom_exmon);
@@ -1794,9 +1853,9 @@ ERL_NIF_TERM naccept_listening(ErlNifEnv* env,
/* *** Try again later *** */
descP->currentAcceptor.pid = caller;
- if (enif_monitor_process(env, descP,
- &descP->currentAcceptor.pid,
- &descP->currentAcceptor.mon) > 0)
+ if (MONP(env, descP,
+ &descP->currentAcceptor.pid,
+ &descP->currentAcceptor.mon) > 0)
return make_error(env, atom_exmon);
descP->currentAcceptor.ref = ref;
@@ -1865,9 +1924,9 @@ ERL_NIF_TERM naccept_listening(ErlNifEnv* env,
enif_release_resource(accDescP); // We should really store a reference ...
accDescP->ctrlPid = caller;
- if (enif_monitor_process(env, accDescP,
- &accDescP->ctrlPid,
- &accDescP->ctrlMon) > 0) {
+ if (MONP(env, accDescP,
+ &accDescP->ctrlPid,
+ &accDescP->ctrlMon) > 0) {
sock_close(accSock);
return make_error(env, atom_exmon);
}
@@ -1966,9 +2025,9 @@ ERL_NIF_TERM naccept_accepting(ErlNifEnv* env,
enif_release_resource(accDescP); // We should really store a reference ...
accDescP->ctrlPid = caller;
- if (enif_monitor_process(env, accDescP,
- &accDescP->ctrlPid,
- &accDescP->ctrlMon) > 0) {
+ if (MONP(env, accDescP,
+ &accDescP->ctrlPid,
+ &accDescP->ctrlMon) > 0) {
sock_close(accSock);
return make_error(env, atom_exmon);
}
@@ -2481,6 +2540,189 @@ ERL_NIF_TERM nrecvfrom(ErlNifEnv* env,
/* ----------------------------------------------------------------------
+ * nif_close
+ *
+ * Description:
+ * Close a (socket) file descriptor.
+ *
+ * Arguments:
+ * Socket (ref) - Points to the socket descriptor.
+ */
+
+static
+ERL_NIF_TERM nif_close(ErlNifEnv* env,
+ int argc,
+ const ERL_NIF_TERM argv[])
+{
+ SocketDescriptor* descP;
+
+ if ((argc != 1) ||
+ !enif_get_resource(env, argv[0], sockets, (void**) &descP)) {
+ return enif_make_badarg(env);
+ }
+
+ return nclose(env, descP);
+}
+
+
+static
+ERL_NIF_TERM nclose(ErlNifEnv* env,
+ SocketDescriptor* descP)
+{
+ ERL_NIF_TERM reply, reason;
+ BOOLEAN_T doClose;
+ int selectRes;
+
+ MLOCK(descP->closeMtx);
+
+ if (descP->state == SOCKET_STATE_CLOSED) {
+ reason = atom_closed;
+ doClose = FALSE;
+ } else if (descP->state == SOCKET_STATE_CLOSING) {
+ reason = atom_closing;
+ doClose = FALSE;
+ } else {
+
+ /* Store the PID of the caller,
+ * since we need to inform it when we
+ * (that is, the stop callback function)
+ * completes.
+ */
+
+ if (enif_self(env, &descP->closerPid) == NULL) {
+ MUNLOCK(descP->closeMtx);
+ return make_error(env, atom_exself);
+ }
+
+ /* Monitor the caller, since we should complete this operation even if
+ * the caller dies (for whatever reason).
+ */
+
+ if (MONP(env, descP,
+ &descP->closerPid,
+ &descP->closerMon) > 0) {
+ MUNLOCK(descP->closeMtx);
+ return make_error(env, atom_exmon);
+ }
+
+ descP->state = SOCKET_STATE_CLOSING;
+ doClose = TRUE;
+ }
+
+ MUNLOCK(descP->closeMtx);
+
+ if (doClose) {
+ descP->closeRef = MKREF(env);
+ selectRes = enif_select(env, descP->sock, (ERL_NIF_SELECT_STOP),
+ descP, NULL, descP->closeRef);
+ if (selectRes & ERL_NIF_SELECT_STOP_CALLED) {
+ /* Prep done - inform the caller it can finalize (close) directly */
+ reply = atom_ok;
+ } else if (selectRes & ERL_NIF_SELECT_STOP_SCHEDULED) {
+ /* The stop callback function has been *scheduled* which means that we
+ * have to wait for it to complete. */
+ reply = make_ok2(env, descP->closeRef);
+ } else {
+ /* <KOLLA>
+ *
+ * WE SHOULD REALLY HAVE A WAY TO CLOBBER THE SOCKET,
+ * SO WE DON'T LET STUFF LEAK.
+ * NOW, BECAUSE WE FAILED TO SELECT, WE CANNOT FINISH
+ * THE CLOSE, WHAT TO DO? ABORT?
+ *
+ * </KOLLA>
+ */
+ reason = MKT2(env, atom_select, MKI(env, selectRes));
+ reply = make_error(env, reason);
+ }
+ } else {
+ reply = make_error(env, reason);
+ }
+
+ return reply;
+}
+
+
+
+/* ----------------------------------------------------------------------
+ * nif_finalize_close
+ *
+ * Description:
+ * Perform the actual socket close!
+ * Note that this function is executed in a dirfty scheduler.
+ *
+ * Arguments:
+ * Socket (ref) - Points to the socket descriptor.
+ */
+static
+ERL_NIF_TERM nif_finalize_close(ErlNifEnv* env,
+ int argc,
+ const ERL_NIF_TERM argv[])
+{
+ SocketDescriptor* descP;
+
+ /* Extract arguments and perform preliminary validation */
+
+ if ((argc != 1) ||
+ !enif_get_resource(env, argv[0], sockets, (void**) &descP)) {
+ return enif_make_badarg(env);
+ }
+
+ return nfinalize_close(env, descP);
+
+}
+
+
+/* *** nfinalize_close ***
+ * Perform the final step in the socket close.
+ */
+static
+ERL_NIF_TERM nfinalize_close(ErlNifEnv* env,
+ SocketDescriptor* descP)
+{
+ ERL_NIF_TERM reply;
+
+ if (descP->state == SOCKET_STATE_CLOSED)
+ return atom_ok;
+
+ if (descP->state != SOCKET_STATE_CLOSING)
+ return make_error(env, atom_enotclosing);
+
+ /* This nif is executed in a dirty scheduler just so that
+ * it can "hang" (whith minumum effect on the VM) while the
+ * kernel writes our buffers. IF we have set the linger option
+ * for this ({true, integer() > 0}). For this to work we must
+ * be blocking...
+ */
+ SET_BLOCKING(descP->sock);
+
+ if (sock_close(descP->sock) != 0) {
+ int save_errno = sock_errno();
+
+ if (save_errno != ERRNO_BLOCK) {
+ /* Not all data in the buffers where sent,
+ * make sure the caller gets this.
+ */
+ reply = make_error(env, atom_timeout);
+ } else {
+ reply = make_error2(env, save_errno);
+ }
+ } else {
+ reply = atom_ok;
+ }
+ sock_close_event(descP->event);
+
+ descP->sock = INVALID_SOCKET;
+ descP->event = INVALID_EVENT;
+
+ descP->state = SOCKET_STATE_CLOSED;
+
+ return reply;
+}
+
+
+
+/* ----------------------------------------------------------------------
* U t i l i t y F u n c t i o n s
* ----------------------------------------------------------------------
*/
@@ -2610,10 +2852,16 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env,
* PROCESS, WE NEED TO INFORM IT!!!
*
* ALL WAITING PROCESSES MUST ALSO GET THE ERROR!!
+ * HANDLED BY THE STOP (CALLBACK) FUNCTION?
+ *
+ * WE DON'T NEED TO WAIT FOR OUTPUT TO BE WRITTEN,
+ * JUST ABORT THE SOCKET!!!
*
* </KOLLA>
*/
+ descP->state = SOCKET_STATE_CLOSING;
+
SELECT(env,
descP->sock,
(ERL_NIF_SELECT_STOP),
@@ -3150,6 +3398,9 @@ SocketDescriptor* alloc_descriptor(SOCKET sock, HANDLE event)
descP->acceptorsQ.first = NULL;
descP->acceptorsQ.last = NULL;
+ sprintf(buf, "socket[close,%d]", sock);
+ descP->closeMtx = MCREATE(buf);
+
descP->rBufSz = SOCKET_RECV_BUFFER_SIZE_DEFAULT;
descP->iow = FALSE;
descP->dbg = SOCKET_DEBUG_DEFAULT;
@@ -3491,6 +3742,65 @@ ERL_NIF_TERM make_error2(ErlNifEnv* env, int err)
}
+/* Send an error closed message to the specified process:
+ *
+ * This message is for processes that are waiting in the
+ * erlang API functions for a select message.
+ */
+static
+char* send_msg_error_closed(ErlNifEnv* env,
+ ErlNifPid* pid)
+{
+ return send_msg_error(env, atom_closed, pid);
+}
+
+
+/* Send an error message to the specified process:
+ * A message in the form:
+ *
+ * {error, Reason}
+ *
+ * This message is for processes that are waiting in the
+ * erlang API functions for a select message.
+ */
+static
+char* send_msg_error(ErlNifEnv* env,
+ ERL_NIF_TERM reason,
+ ErlNifPid* pid)
+{
+ ERL_NIF_TERM msg = enif_make_tuple2(env, atom_error, reason);
+
+ return send_msg(env, msg, pid);
+}
+
+
+/* Send a message to the specified process.
+ */
+static
+char* send_msg(ErlNifEnv* env,
+ ERL_NIF_TERM msg,
+ ErlNifPid* pid)
+{
+ if (!enif_send(env, pid, NULL, msg))
+ return str_exsend;
+ else
+ return NULL;
+}
+
+
+static
+void xabort(const char* expr,
+ const char* func,
+ const char* file,
+ int line)
+{
+ fflush(stdout);
+ fprintf(stderr, "%s:%d:%s() Assertion failed: %s\n",
+ file, line, func, expr);
+ fflush(stderr);
+ abort();
+}
+
/* ----------------------------------------------------------------------
* C o u n t e r F u n c t i o n s
@@ -3529,19 +3839,153 @@ BOOLEAN_T cnt_inc(uint32_t* cnt, uint32_t inc)
static
void socket_dtor(ErlNifEnv* env, void* obj)
{
+ SocketDescriptor* descP = (SocketDescriptor*) obj;
+
+ MDESTROY(descP->writeMtx);
+ MDESTROY(descP->readMtx);
+ MDESTROY(descP->accMtx);
+ MDESTROY(descP->closeMtx);
}
/* =========================================================================
* socket_stop - Callback function for resource stop
*
+ * When the socket is stopped, we need to inform:
+ *
+ * * the controlling process
+ * * the current writer and any waiting writers
+ * * the current reader and any waiting readers
+ * * the current acceptor and any waiting acceptor
+ *
+ * Also, make sure no process gets the message twice
+ * (in case it is, for instance, both controlling process
+ * and a writer).
+ *
+ * <KOLLA>
+ * We do not handle linger-issues yet! So anything in the out
+ * buffers will be left for the OS to solve...
+ * Do we need a special "close"-thread? Dirty scheduler?
+ * </KOLLA>
*/
static
void socket_stop(ErlNifEnv* env, void* obj, int fd, int is_direct_call)
{
+ SocketDescriptor* descP = (SocketDescriptor*) obj;
+ ERL_NIF_TERM errClosed = MKT2(env, atom_error, atom_closed);
+
+ MLOCK(descP->writeMtx);
+ MLOCK(descP->readMtx);
+ MLOCK(descP->accMtx);
+ MLOCK(descP->closeMtx);
+
+
+ descP->state = SOCKET_STATE_CLOSING;
+ descP->isReadable = FALSE;
+ descP->isWritable = FALSE;
+
+
+ /* We should check that we actually have a monitor.
+ * This *should* be done with a "NULL" monitor value,
+ * which there currently is none...
+ */
+ DEMONP(env, descP, &descP->ctrlMon);
+
+ if (descP->currentWriterP != NULL) {
+ /* We have a (current) writer and *may* therefor also have
+ * writers waiting.
+ */
+
+ SASSERT( (NULL == send_msg_error_closed(env, &descP->currentWriter.pid)) );
+
+ /* And also deal with the waiting writers (in the same way) */
+ inform_waiting_procs(env, descP, &descP->writersQ, TRUE, errClosed);
+ }
+
+ if (descP->currentReaderP != NULL) {
+
+ /* We have a (current) reader and *may* therefor also have
+ * readers waiting.
+ */
+
+ SASSERT( (NULL == send_msg_error_closed(env, &descP->currentReader.pid)) );
+
+ /* And also deal with the waiting readers (in the same way) */
+ inform_waiting_procs(env, descP, &descP->readersQ, TRUE, errClosed);
+ }
+
+ if (descP->currentAcceptorP != NULL) {
+ /* We have a (current) acceptor and *may* therefor also have
+ * acceptors waiting.
+ */
+
+ SASSERT( (NULL == send_msg_error_closed(env, &descP->currentAcceptor.pid)) );
+
+ /* And also deal with the waiting acceptors (in the same way) */
+ inform_waiting_procs(env, descP, &descP->acceptorsQ, TRUE, errClosed);
+ }
+
+
+ if (descP->sock != INVALID_SOCKET) {
+
+ /* +++ send close message to the waiting process +++
+ *
+ * {close, CloseRef}
+ *
+ */
+
+ send_msg(env, MKT2(env, atom_close, descP->closeRef), &descP->closerPid);
+
+ DEMONP(env, descP, &descP->closerMon);
+ }
+
+
+ MUNLOCK(descP->closeMtx);
+ MUNLOCK(descP->accMtx);
+ MUNLOCK(descP->readMtx);
+ MUNLOCK(descP->writeMtx);
+
}
+/* This function traverse the queue and sends the specified
+ * message to each member, and if the 'free' argument is TRUE,
+ * the queue will be emptied.
+ */
+static
+void inform_waiting_procs(ErlNifEnv* env,
+ SocketDescriptor* descP,
+ SocketRequestQueue* q,
+ BOOLEAN_T free,
+ ERL_NIF_TERM msg)
+{
+ SocketRequestQueueElement* currentP = q->first;
+ SocketRequestQueueElement* nextP;
+
+ while (currentP != NULL) {
+
+ /* <KOLL>
+ * Should we inform anyone if we fail to demonitor?
+ * NOT SURE WHAT THAT WOULD REPRESENT AND IT IS NOT
+ * IMPORTANT IN *THIS* CASE, BUT ITS A FUNDAMENTAL OP...
+ * </KOLLA>
+ */
+
+ SASSERT( (NULL == send_msg(env, msg, &currentP->data.pid)) );
+ DEMONP(env, descP, &currentP->data.mon);
+ nextP = currentP->nextP;
+ if (free) FREE(currentP);
+ currentP = nextP;
+ }
+
+ if (free) {
+ q->first = NULL;
+ q->last = NULL;
+ }
+}
+
+
+
/* =========================================================================
* socket_down - Callback function for resource down (monitored processes)
*
@@ -3564,31 +4008,32 @@ void socket_down(ErlNifEnv* env,
static
ErlNifFunc socket_funcs[] =
{
- // Some utility functions
- {"nif_is_loaded", 0, nif_is_loaded},
- {"nif_info", 0, nif_info},
- // {"nif_debug", 1, nif_debug_},
-
- // The proper "socket" interface
- {"nif_open", 4, nif_open},
- {"nif_bind", 3, nif_bind},
- {"nif_connect", 3, nif_connect},
- {"nif_listen", 2, nif_listen},
- {"nif_accept", 2, nif_accept},
- {"nif_send", 4, nif_send},
- {"nif_sendto", 6, nif_sendto},
- {"nif_recv", 4, nif_recv},
- {"nif_recvfrom", 2, nif_recvfrom},
- {"nif_close", 1, nif_close},
- {"nif_setsockopt", 3, nif_setsockopt},
- {"nif_getsockopt", 2, nif_getsockopt},
-
- /* "Extra" functions to "complete" the socket interface.
- * For instance, the function nif_finalize_connection
- * is called after the connect *select* has "completed".
- */
- {"nif_finalize_connection", 1, nif_finalize_connection},
- {"nif_cancel", 2, nif_cancel},
+ // Some utility functions
+ {"nif_is_loaded", 0, nif_is_loaded, 0},
+ {"nif_info", 0, nif_info, 0},
+ // {"nif_debug", 1, nif_debug_, 0},
+
+ // The proper "socket" interface
+ {"nif_open", 4, nif_open, 0},
+ {"nif_bind", 3, nif_bind, 0},
+ {"nif_connect", 3, nif_connect, 0},
+ {"nif_listen", 2, nif_listen, 0},
+ {"nif_accept", 2, nif_accept, 0},
+ {"nif_send", 4, nif_send, 0},
+ {"nif_sendto", 6, nif_sendto, 0},
+ {"nif_recv", 4, nif_recv, 0},
+ {"nif_recvfrom", 2, nif_recvfrom, 0},
+ {"nif_close", 1, nif_close, 0},
+ {"nif_setsockopt", 3, nif_setsockopt, 0},
+ {"nif_getsockopt", 2, nif_getsockopt, 0},
+
+ /* "Extra" functions to "complete" the socket interface.
+ * For instance, the function nif_finalize_connection
+ * is called after the connect *select* has "completed".
+ */
+ {"nif_finalize_connection", 1, nif_finalize_connection, 0},
+ {"nif_cancel", 2, nif_cancel, 0},
+ {"nif_finalize_close", 1, nif_finalize_close, ERL_NIF_DIRTY_JOB_IO_BOUND}
};
@@ -3656,7 +4101,9 @@ int on_load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info)
// atom_active_once = MKA(env, str_active_once);
// atom_binary = MKA(env, str_binary);
// atom_buildDate = MKA(env, str_buildDate);
+ atom_close = MKA(env, str_close);
atom_closed = MKA(env, str_closed);
+ atom_closing = MKA(env, str_closing);
atom_error = MKA(env, str_error);
atom_false = MKA(env, str_false);
// atom_list = MKA(env, str_list);
@@ -3665,7 +4112,9 @@ int on_load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info)
// atom_once = MKA(env, str_once);
// atom_passive = MKA(env, str_passive);
// atom_receiver = MKA(env, str_receiver);
+ atom_select = MKA(env, str_select);
// atom_tcp_closed = MKA(env, str_tcp_closed);
+ atom_timeout = MKA(env, str_timeout);
atom_true = MKA(env, str_true);
atom_undefined = MKA(env, str_undefined);
// atom_version = MKA(env, str_version);
@@ -3675,6 +4124,7 @@ int on_load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info)
atom_eafnosupport = MKA(env, str_eafnosupport);
atom_einval = MKA(env, str_einval);
atom_eisconn = MKA(env, str_eisconn);
+ atom_enotclosing = MKA(env, str_enotclosing);
atom_enotconn = MKA(env, str_enotconn);
atom_exalloc = MKA(env, str_exalloc);
atom_exbadstate = MKA(env, str_exbadstate);
@@ -3682,7 +4132,7 @@ int on_load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info)
// atom_exnotopen = MKA(env, str_exnotopen);
atom_exmon = MKA(env, str_exmon);
atom_exself = MKA(env, str_exself);
- // atom_exsend = MKA(env, str_exsend);
+ atom_exsend = MKA(env, str_exsend);
// For storing "global" things...
// socketData.env = enif_alloc_env(); // We should really check
diff --git a/erts/preloaded/src/socket.erl b/erts/preloaded/src/socket.erl
index bae561cd51..044fe73906 100644
--- a/erts/preloaded/src/socket.erl
+++ b/erts/preloaded/src/socket.erl
@@ -45,10 +45,8 @@
close/1,
- setopt/3,
- getopt/2,
- %% ?????
- formated_timestamp/0
+ setopt/4,
+ getopt/3
]).
-export_type([
@@ -105,6 +103,14 @@
-type port_number() :: 0..65535.
+%% otp - The option is internal to our (OTP) imeplementation.
+%% socket - The socket layer (SOL_SOCKET).
+%% ip - The ip layer (SOL_IP).
+%% tcp - The TCP (Transport Control Protocol) layer (IPPROTO_TCP).
+%% udp - The UDP (User Datagram Protocol) layer (IPPROTO_UDP).
+%% Int - Raw level, sent down and used "as is".
+-type option_level() :: otp | socket | ip | tcp | udp | non_neg_integer().
+
-type socket_info() :: map().
-record(socket, {info :: socket_info(),
ref :: reference()}).
@@ -201,6 +207,14 @@
-define(SOCKET_RECV_FLAGS_DEFAULT, []).
-define(SOCKET_RECV_TIMEOUT_DEFAULT, infinity).
+-define(SOCKET_SETOPT_LEVEL_ENCODED, 0).
+-define(SOCKET_SETOPT_LEVEL_RAW, 1).
+-define(SOCKET_SETOPT_LEVEL_OTP, 0).
+-define(SOCKET_SETOPT_LEVEL_SOCKET, 1).
+-define(SOCKET_SETOPT_LEVEL_IP, 2).
+-define(SOCKET_SETOPT_LEVEL_TCP, 3).
+-define(SOCKET_SETOPT_LEVEL_UDP, 4).
+
-define(SOCKET_SETOPT_KEY_DEBUG, 0).
-define(SOCKET_GETOPT_KEY_DEBUG, ?SOCKET_SETOPT_KEY_DEBUG).
@@ -935,16 +949,37 @@ do_recvfrom(SockRef, BufSz, EFlags, Timeout) ->
%% ===========================================================================
%%
%% close - close a file descriptor
+%%
-spec close(Socket) -> ok | {error, Reason} when
Socket :: socket(),
Reason :: term().
-close({socket, _, SockRef}) ->
- nif_close(SockRef).
+close(#socket{ref = SockRef}) ->
+ case nif_close(SockRef) of
+ ok ->
+ nif_finalize_close(SockRef);
+ {ok, CloseRef} ->
+ %% We must wait
+ receive
+ {close, CloseRef} ->
+ %% <KOLLA>
+ %%
+ %% WHAT HAPPENS IF THIS PROCESS IS KILLED
+ %% BEFORE WE CAN EXECUTE THE FINAL CLOSE???
+ %%
+ %% </KOLLA>
+ nif_finalize_close(SockRef)
+ end;
+ {error, _} = ERROR ->
+ ERROR
+ end.
+
+%% ===========================================================================
+%%
%% setopt - manipulate individual properties of a socket
%%
%% What properties are valid depend on what kind of socket it is
@@ -952,22 +987,30 @@ close({socket, _, SockRef}) ->
%% If its an "invalid" option (or value), we should not crash but return some
%% useful error...
%%
+%% <KOLLA>
+%%
+%% WE NEED TOP MAKE SURE THAT THE USER DOES NOT MAKE US BLOCKING
+%% AS MUCH OF THE CODE EXPECTS TO BE NON-BLOCKING!!
+%%
+%% </KOLLA>
--spec setopt(Socket, Key, Value) -> ok | {error, Reason} when
+-spec setopt(Socket, Level, Key, Value) -> ok | {error, Reason} when
Socket :: socket(),
+ Level :: option_level(),
Key :: setopt_key(),
Value :: term(),
Reason :: term().
-setopt({socket, Info, SockRef}, Key, Value) ->
+setopt(#socket{info = Info, ref = SockRef}, Level, Key, Value) ->
try
begin
Domain = maps:get(domain, Info),
Type = maps:get(type, Info),
Protocol = maps:get(protocol, Info),
- EKey = enc_setopt_key(Key, Domain, Type, Protocol),
- EVal = enc_setopt_value(Key, Value, Domain, Type, Protocol),
- nif_setopt(SockRef, EKey, EVal)
+ ELevel = enc_setopt_level(Level),
+ EKey = enc_setopt_key(Level, Key, Domain, Type, Protocol),
+ EVal = enc_setopt_value(Level, Key, Value, Domain, Type, Protocol),
+ nif_setopt(SockRef, ELevel, EKey, EVal)
end
catch
throw:T ->
@@ -985,24 +1028,26 @@ setopt({socket, Info, SockRef}, Key, Value) ->
%% useful error...
%%
--spec getopt(Socket, Key) -> {ok, Value} | {error, Reason} when
+-spec getopt(Socket, Level, Key) -> {ok, Value} | {error, Reason} when
Socket :: socket(),
+ Level :: option_level(),
Key :: getopt_key(),
Value :: term(),
Reason :: term().
-getopt({socket, Info, SockRef}, Key) ->
+getopt(#socket{info = Info, ref = SockRef}, Level, Key) ->
try
begin
Domain = maps:get(domain, Info),
Type = maps:get(type, Info),
Protocol = maps:get(protocol, Info),
- EKey = enc_getopt_key(Key, Domain, Type, Protocol),
+ ELevel = enc_getopt_level(Level),
+ EKey = enc_getopt_key(Level, Key, Domain, Type, Protocol),
%% We may need to decode the value (for the same reason
%% we needed to encode the value for setopt).
- case nif_getopt(SockRef, EKey) of
+ case nif_getopt(SockRef, ELevel, EKey) of
{ok, EVal} ->
- Val = dec_getopt_value(Key, EVal, Domain, Type, Protocol),
+ Val = dec_getopt_value(Level, Key, EVal, Domain, Type, Protocol),
{ok, Val};
{error, _} = ERROR ->
ERROR
@@ -1090,21 +1135,52 @@ enc_flags(Flags, EFlags) ->
end,
lists:foldl(F, 0, Flags).
+enc_setopt_level(otp) ->
+ {?SOCKET_SETOPT_LEVEL_ENCODED, ?SOCKET_SETOPT_LEVEL_OTP};
+enc_setopt_level(socket) ->
+ {?SOCKET_SETOPT_LEVEL_ENCODED, ?SOCKET_SETOPT_LEVEL_SOCKET};
+enc_setopt_level(ip) ->
+ {?SOCKET_SETOPT_LEVEL_ENCODED, ?SOCKET_SETOPT_LEVEL_IP};
+enc_setopt_level(tcp) ->
+ {?SOCKET_SETOPT_LEVEL_ENCODED, ?SOCKET_SETOPT_LEVEL_TCP};
+enc_setopt_level(udp) ->
+ {?SOCKET_SETOPT_LEVEL_ENCODED, ?SOCKET_SETOPT_LEVEL_UDP};
+%% Any option that is of an raw level must be provided as a binary
+%% already fully encoded!
+enc_setopt_level(L) when is_integer(L) ->
+ {?SOCKET_SETOPT_LEVEL_RAW, L}.
+
+
%% We should ...really... do something with the domain, type and protocol args...
-enc_setopt_key(debug, _, _, _) ->
+%% Also, any option which has an integer level (raw) must also be provided
+%% in a raw mode, that is, as an integer.
+enc_setopt_key(L, K, _, _, _) when is_integer(L) andalso is_integer(K) ->
+ K;
+enc_setopt_key(otp, debug, _, _, _) ->
?SOCKET_SETOPT_KEY_DEBUG.
%% We should ...really... do something with the domain, type and protocol args...
-enc_setopt_value(debug, V, _, _, _) when is_boolean(V) ->
+enc_setopt_value(otp, debug, V, _, _, _) when is_boolean(V) ->
+ V;
+enc_setopt_value(socket, linger, abort, D, T, P) ->
+ enc_setopt_value(socket, linger, {true, 0}, D, T, P);
+enc_setopt_value(socket, linger, {OnOff, Secs} = V, _D, _T, _P)
+ when is_boolean(OnOff) andalso is_integer(Secs) andalso (Secs >= 0) ->
+ V;
+enc_setopt_value(L, _, V, _, _, _) when is_integer(L) andalso is_binary(V) ->
V.
+
+enc_getopt_level(Level) ->
+ enc_setopt_level(Level).
+
%% We should ...really... do something with the domain, type and protocol args...
-enc_getopt_key(debug, _, _, _) ->
+enc_getopt_key(otp, debug, _, _, _) ->
?SOCKET_GETOPT_KEY_DEBUG.
%% We should ...really... do something with the domain, type and protocol args...
-dec_getopt_value(debug, B, _, _, _) when is_boolean(B) ->
+dec_getopt_value(otp, debug, B, _, _, _) when is_boolean(B) ->
B.
@@ -1226,8 +1302,11 @@ nif_cancel(_SRef, _Op, _Ref) ->
nif_close(_SRef) ->
erlang:error(badarg).
-nif_setopt(_Ref, _Key, _Val) ->
+nif_finalize_close(_SRef) ->
+ erlang:error(badarg).
+
+nif_setopt(_Ref, _Lev, _Key, _Val) ->
erlang:error(badarg).
-nif_getopt(_Ref, _Key) ->
+nif_getopt(_Ref, _Lev, _Key) ->
erlang:error(badarg).