diff options
author | Micael Karlberg <[email protected]> | 2018-04-24 17:38:52 +0200 |
---|---|---|
committer | Micael Karlberg <[email protected]> | 2018-09-18 13:01:37 +0200 |
commit | 599a320f630991823fc28b6a8a9f09851e261fed (patch) | |
tree | 4052c4bacabd1b2258f0a2ddf8427155589d4208 /erts/emulator/nifs | |
parent | 04335ca6aedfc5ad9f0d6a8d193dfd76a222291c (diff) | |
download | otp-599a320f630991823fc28b6a8a9f09851e261fed.tar.gz otp-599a320f630991823fc28b6a8a9f09851e261fed.tar.bz2 otp-599a320f630991823fc28b6a8a9f09851e261fed.zip |
[socket-nif] "Completed" the close function
There is probably a lot of things left to be
here. For instance the handling of ECONNRESET
when reading (recv and recvfrom).
Also some stuff about setopt and getopt.
Diffstat (limited to 'erts/emulator/nifs')
-rw-r--r-- | erts/emulator/nifs/common/socket_nif.c | 542 |
1 files changed, 496 insertions, 46 deletions
diff --git a/erts/emulator/nifs/common/socket_nif.c b/erts/emulator/nifs/common/socket_nif.c index 6e6851a608..d55de9ff4e 100644 --- a/erts/emulator/nifs/common/socket_nif.c +++ b/erts/emulator/nifs/common/socket_nif.c @@ -185,6 +185,10 @@ typedef unsigned int BOOLEAN_T; #define get_int16(s) ((((unsigned char*) (s))[0] << 8) | \ (((unsigned char*) (s))[1])) +#define SASSERT(e) \ + ((void) ((e) ? 1 : (xabort(#e, __func__, __FILE__, __LINE__), 0))) + + /* Debug stuff... */ #define SOCKET_NIF_DEBUG_DEFAULT TRUE #define SOCKET_DEBUG_DEFAULT TRUE @@ -235,6 +239,7 @@ typedef unsigned long long llu_t; #define SOCKET_FLAG_CON 0x0010 #define SOCKET_FLAG_ACC 0x0020 #define SOCKET_FLAG_BUSY 0x0040 +#define SOCKET_FLAG_CLOSE 0x0080 #define SOCKET_STATE_CLOSED (0) #define SOCKET_STATE_OPEN (SOCKET_FLAG_OPEN) @@ -242,6 +247,7 @@ typedef unsigned long long llu_t; #define SOCKET_STATE_LISTENING (SOCKET_STATE_OPEN | SOCKET_FLAG_LISTEN) #define SOCKET_STATE_CONNECTING (SOCKET_STATE_OPEN | SOCKET_FLAG_CON) #define SOCKET_STATE_ACCEPTING (SOCKET_STATE_LISTENING | SOCKET_FLAG_ACC) +#define SOCKET_STATE_CLOSING (SOCKET_FLAG_CLOSE) #define IS_OPEN(d) \ (((d)->state & SOCKET_FLAG_OPEN) == SOCKET_FLAG_OPEN) @@ -331,6 +337,7 @@ typedef union { #define MALLOC(SZ) enif_alloc((SZ)) #define FREE(P) enif_free((P)) + #define MKA(E,S) enif_make_atom((E), (S)) #define MKBIN(E,B) enif_make_binary((E), (B)) #define MKI(E,I) enif_make_int((E), (I)) @@ -343,9 +350,15 @@ typedef union { #define MKT4(E,E1,E2,E3,E4) enif_make_tuple4((E), (E1), (E2), (E3), (E4)) #define MKT8(E,E1,E2,E3,E4,E5,E6,E7,E8) \ enif_make_tuple8((E), (E1), (E2), (E3), (E4), (E5), (E6), (E7), (E8)) + #define MCREATE(N) enif_mutex_create((N)) +#define MDESTROY(M) enif_mutex_destroy((M)) #define MLOCK(M) enif_mutex_lock((M)) #define MUNLOCK(M) enif_mutex_unlock((M)) + +#define MONP(E,D,P,M) enif_monitor_process((E), (D), (P), (M)) +#define DEMONP(E,D,M) enif_demonitor_process((E), (D), (M)) + #define SELECT(E,FD,M,O,P,R) \ if (enif_select((E), (FD), (M), (O), (P), (R)) < 0) \ return enif_make_badarg((E)); @@ -375,13 +388,16 @@ typedef union { #ifdef __WIN32__ -/* *** Windown macros *** */ +/* *** Windows macros *** */ #define sock_accept(s, addr, len) \ make_noninheritable_handle(accept((s), (addr), (len))) #define sock_bind(s, addr, len) bind((s), (addr), (len)) #define sock_close(s) closesocket((s)) +#define sock_close_event(e) WSACloseEvent(e) #define sock_connect(s, addr, len) connect((s), (addr), (len)) +#define sock_create_event(s) WSACreateEvent() +#define sock_errno() WSAGetLastError() #define sock_getopt(s,t,n,v,l) getsockopt((s),(t),(n),(v),(l)) #define sock_htons(x) htons((x)) #define sock_htonl(x) htonl((x)) @@ -397,16 +413,16 @@ typedef union { #define sock_sendto(s,buf,blen,flag,addr,alen) \ sendto((s),(buf),(blen),(flag),(addr),(alen)) -#define sock_errno() WSAGetLastError() -#define sock_create_event(s) WSACreateEvent() #define SET_BLOCKING(s) ioctlsocket(s, FIONBIO, &zero_value) #define SET_NONBLOCKING(s) ioctlsocket(s, FIONBIO, &one_value) static unsigned long zero_value = 0; static unsigned long one_value = 1; + #else /* !__WIN32__ */ + #ifdef HAS_ACCEPT4 // We have to figure out what the flags are... #define sock_accept(s, addr, len) accept4((s), (addr), (len), (SOCK_CLOEXEC)) @@ -415,7 +431,10 @@ static unsigned long one_value = 1; #endif #define sock_bind(s, addr, len) bind((s), (addr), (len)) #define sock_close(s) close((s)) +#define sock_close_event(e) /* do nothing */ #define sock_connect(s, addr, len) connect((s), (addr), (len)) +#define sock_create_event(s) (s) /* return file descriptor */ +#define sock_errno() errno #define sock_getopt(s,t,n,v,l) getsockopt((s),(t),(n),(v),(l)) #define sock_htons(x) htons((x)) #define sock_htonl(x) htonl((x)) @@ -430,10 +449,6 @@ static unsigned long one_value = 1; #define sock_sendto(s,buf,blen,flag,addr,alen) \ sendto((s),(buf),(blen),(flag),(addr),(alen)) -#define sock_errno() errno -#define sock_create_event(s) (s) /* return file descriptor */ - - #endif /* !__WIN32__ */ #ifdef HAVE_SOCKLEN_T @@ -470,7 +485,7 @@ typedef struct { } SocketRequestor; typedef struct socket_request_queue_element { - struct socket_request_queue_element* next; + struct socket_request_queue_element* nextP; SocketRequestor data; } SocketRequestQueueElement; @@ -534,6 +549,12 @@ typedef struct { BOOLEAN_T iow; // Inform On Wrap BOOLEAN_T dbg; + /* +++ Close stuff +++ */ + ErlNifMutex* closeMtx; + ErlNifPid closerPid; + ErlNifMonitor closerMon; + ERL_NIF_TERM closeRef; + } SocketDescriptor; @@ -609,6 +630,9 @@ static ERL_NIF_TERM nif_getsockopt(ErlNifEnv* env, static ERL_NIF_TERM nif_finalize_connection(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]); +static ERL_NIF_TERM nif_finalize_close(ErlNifEnv* env, + int argc, + const ERL_NIF_TERM argv[]); static ERL_NIF_TERM nif_cancel(ErlNifEnv* env, int argc, const ERL_NIF_TERM argv[]); @@ -660,6 +684,8 @@ static ERL_NIF_TERM nrecvfrom(ErlNifEnv* env, ERL_NIF_TERM recvRef, uint16_t bufSz, int flags); +static ERL_NIF_TERM nclose(ErlNifEnv* env, + SocketDescriptor* descP); static ERL_NIF_TERM send_check_result(ErlNifEnv* env, SocketDescriptor* descP, @@ -682,6 +708,8 @@ static ERL_NIF_TERM recvfrom_check_result(ErlNifEnv* env, static ERL_NIF_TERM nfinalize_connection(ErlNifEnv* env, SocketDescriptor* descP); +static ERL_NIF_TERM nfinalize_close(ErlNifEnv* env, + SocketDescriptor* descP); static char* decode_laddress(ErlNifEnv* env, @@ -730,6 +758,12 @@ static void encode_address(ErlNifEnv* env, ERL_NIF_TERM* fromDomainT, ERL_NIF_TERM* fromSourceT); +static void inform_waiting_procs(ErlNifEnv* env, + SocketDescriptor* descP, + SocketRequestQueue* q, + BOOLEAN_T free, + ERL_NIF_TERM msg); + static BOOLEAN_T verify_is_connected(SocketDescriptor* descP, int* err); static SocketDescriptor* alloc_descriptor(SOCKET sock, HANDLE event); @@ -773,6 +807,19 @@ static ERL_NIF_TERM make_error(ErlNifEnv* env, ERL_NIF_TERM reason); static ERL_NIF_TERM make_error1(ErlNifEnv* env, char* reason); static ERL_NIF_TERM make_error2(ErlNifEnv* env, int err); +static char* send_msg_error_closed(ErlNifEnv* env, + ErlNifPid* pid); +static char* send_msg_error(ErlNifEnv* env, + ERL_NIF_TERM reason, + ErlNifPid* pid); +static char* send_msg(ErlNifEnv* env, + ERL_NIF_TERM msg, + ErlNifPid* pid); + +static void xabort(const char* expr, + const char* func, + const char* file, + int line); static BOOLEAN_T extract_item_on_load(ErlNifEnv* env, ERL_NIF_TERM map, @@ -810,10 +857,14 @@ static const struct in6_addr in6addr_loopback = /* *** String constants *** */ +static char str_close[] = "close"; static char str_closed[] = "closed"; +static char str_closing[] = "closing"; static char str_error[] = "error"; static char str_false[] = "false"; static char str_ok[] = "ok"; +static char str_select[] = "select"; +static char str_timeout[] = "timeout"; static char str_true[] = "true"; static char str_undefined[] = "undefined"; @@ -822,19 +873,25 @@ static char str_eagain[] = "eagain"; static char str_eafnosupport[] = "eafnosupport"; static char str_einval[] = "einval"; static char str_eisconn[] = "eisconn"; +static char str_enotclosing[] = "enotclosing"; static char str_enotconn[] = "enotconn"; static char str_exalloc[] = "exalloc"; static char str_exbadstate[] = "exbadstate"; static char str_exbusy[] = "exbusy"; static char str_exmon[] = "exmonitor"; // failed monitor static char str_exself[] = "exself"; // failed self +static char str_exsend[] = "exsend"; // failed send /* *** Atoms *** */ +static ERL_NIF_TERM atom_close; static ERL_NIF_TERM atom_closed; +static ERL_NIF_TERM atom_closing; static ERL_NIF_TERM atom_error; static ERL_NIF_TERM atom_false; static ERL_NIF_TERM atom_ok; +static ERL_NIF_TERM atom_select; +static ERL_NIF_TERM atom_timeout; static ERL_NIF_TERM atom_true; static ERL_NIF_TERM atom_undefined; @@ -842,12 +899,14 @@ static ERL_NIF_TERM atom_eagain; static ERL_NIF_TERM atom_eafnosupport; static ERL_NIF_TERM atom_einval; static ERL_NIF_TERM atom_eisconn; +static ERL_NIF_TERM atom_enotclosing; static ERL_NIF_TERM atom_enotconn; static ERL_NIF_TERM atom_exalloc; static ERL_NIF_TERM atom_exbadstate; static ERL_NIF_TERM atom_exbusy; static ERL_NIF_TERM atom_exmon; static ERL_NIF_TERM atom_exself; +static ERL_NIF_TERM atom_exsend; /* *** Sockets *** */ @@ -1059,9 +1118,9 @@ ERL_NIF_TERM nopen(ErlNifEnv* env, if (enif_self(env, &descP->ctrlPid) == NULL) return make_error(env, atom_exself); - if (enif_monitor_process(env, descP, - &descP->ctrlPid, - &descP->ctrlMon) > 0) + if (MONP(env, descP, + &descP->ctrlPid, + &descP->ctrlMon) > 0) return make_error(env, atom_exmon); @@ -1794,9 +1853,9 @@ ERL_NIF_TERM naccept_listening(ErlNifEnv* env, /* *** Try again later *** */ descP->currentAcceptor.pid = caller; - if (enif_monitor_process(env, descP, - &descP->currentAcceptor.pid, - &descP->currentAcceptor.mon) > 0) + if (MONP(env, descP, + &descP->currentAcceptor.pid, + &descP->currentAcceptor.mon) > 0) return make_error(env, atom_exmon); descP->currentAcceptor.ref = ref; @@ -1865,9 +1924,9 @@ ERL_NIF_TERM naccept_listening(ErlNifEnv* env, enif_release_resource(accDescP); // We should really store a reference ... accDescP->ctrlPid = caller; - if (enif_monitor_process(env, accDescP, - &accDescP->ctrlPid, - &accDescP->ctrlMon) > 0) { + if (MONP(env, accDescP, + &accDescP->ctrlPid, + &accDescP->ctrlMon) > 0) { sock_close(accSock); return make_error(env, atom_exmon); } @@ -1966,9 +2025,9 @@ ERL_NIF_TERM naccept_accepting(ErlNifEnv* env, enif_release_resource(accDescP); // We should really store a reference ... accDescP->ctrlPid = caller; - if (enif_monitor_process(env, accDescP, - &accDescP->ctrlPid, - &accDescP->ctrlMon) > 0) { + if (MONP(env, accDescP, + &accDescP->ctrlPid, + &accDescP->ctrlMon) > 0) { sock_close(accSock); return make_error(env, atom_exmon); } @@ -2481,6 +2540,189 @@ ERL_NIF_TERM nrecvfrom(ErlNifEnv* env, /* ---------------------------------------------------------------------- + * nif_close + * + * Description: + * Close a (socket) file descriptor. + * + * Arguments: + * Socket (ref) - Points to the socket descriptor. + */ + +static +ERL_NIF_TERM nif_close(ErlNifEnv* env, + int argc, + const ERL_NIF_TERM argv[]) +{ + SocketDescriptor* descP; + + if ((argc != 1) || + !enif_get_resource(env, argv[0], sockets, (void**) &descP)) { + return enif_make_badarg(env); + } + + return nclose(env, descP); +} + + +static +ERL_NIF_TERM nclose(ErlNifEnv* env, + SocketDescriptor* descP) +{ + ERL_NIF_TERM reply, reason; + BOOLEAN_T doClose; + int selectRes; + + MLOCK(descP->closeMtx); + + if (descP->state == SOCKET_STATE_CLOSED) { + reason = atom_closed; + doClose = FALSE; + } else if (descP->state == SOCKET_STATE_CLOSING) { + reason = atom_closing; + doClose = FALSE; + } else { + + /* Store the PID of the caller, + * since we need to inform it when we + * (that is, the stop callback function) + * completes. + */ + + if (enif_self(env, &descP->closerPid) == NULL) { + MUNLOCK(descP->closeMtx); + return make_error(env, atom_exself); + } + + /* Monitor the caller, since we should complete this operation even if + * the caller dies (for whatever reason). + */ + + if (MONP(env, descP, + &descP->closerPid, + &descP->closerMon) > 0) { + MUNLOCK(descP->closeMtx); + return make_error(env, atom_exmon); + } + + descP->state = SOCKET_STATE_CLOSING; + doClose = TRUE; + } + + MUNLOCK(descP->closeMtx); + + if (doClose) { + descP->closeRef = MKREF(env); + selectRes = enif_select(env, descP->sock, (ERL_NIF_SELECT_STOP), + descP, NULL, descP->closeRef); + if (selectRes & ERL_NIF_SELECT_STOP_CALLED) { + /* Prep done - inform the caller it can finalize (close) directly */ + reply = atom_ok; + } else if (selectRes & ERL_NIF_SELECT_STOP_SCHEDULED) { + /* The stop callback function has been *scheduled* which means that we + * have to wait for it to complete. */ + reply = make_ok2(env, descP->closeRef); + } else { + /* <KOLLA> + * + * WE SHOULD REALLY HAVE A WAY TO CLOBBER THE SOCKET, + * SO WE DON'T LET STUFF LEAK. + * NOW, BECAUSE WE FAILED TO SELECT, WE CANNOT FINISH + * THE CLOSE, WHAT TO DO? ABORT? + * + * </KOLLA> + */ + reason = MKT2(env, atom_select, MKI(env, selectRes)); + reply = make_error(env, reason); + } + } else { + reply = make_error(env, reason); + } + + return reply; +} + + + +/* ---------------------------------------------------------------------- + * nif_finalize_close + * + * Description: + * Perform the actual socket close! + * Note that this function is executed in a dirfty scheduler. + * + * Arguments: + * Socket (ref) - Points to the socket descriptor. + */ +static +ERL_NIF_TERM nif_finalize_close(ErlNifEnv* env, + int argc, + const ERL_NIF_TERM argv[]) +{ + SocketDescriptor* descP; + + /* Extract arguments and perform preliminary validation */ + + if ((argc != 1) || + !enif_get_resource(env, argv[0], sockets, (void**) &descP)) { + return enif_make_badarg(env); + } + + return nfinalize_close(env, descP); + +} + + +/* *** nfinalize_close *** + * Perform the final step in the socket close. + */ +static +ERL_NIF_TERM nfinalize_close(ErlNifEnv* env, + SocketDescriptor* descP) +{ + ERL_NIF_TERM reply; + + if (descP->state == SOCKET_STATE_CLOSED) + return atom_ok; + + if (descP->state != SOCKET_STATE_CLOSING) + return make_error(env, atom_enotclosing); + + /* This nif is executed in a dirty scheduler just so that + * it can "hang" (whith minumum effect on the VM) while the + * kernel writes our buffers. IF we have set the linger option + * for this ({true, integer() > 0}). For this to work we must + * be blocking... + */ + SET_BLOCKING(descP->sock); + + if (sock_close(descP->sock) != 0) { + int save_errno = sock_errno(); + + if (save_errno != ERRNO_BLOCK) { + /* Not all data in the buffers where sent, + * make sure the caller gets this. + */ + reply = make_error(env, atom_timeout); + } else { + reply = make_error2(env, save_errno); + } + } else { + reply = atom_ok; + } + sock_close_event(descP->event); + + descP->sock = INVALID_SOCKET; + descP->event = INVALID_EVENT; + + descP->state = SOCKET_STATE_CLOSED; + + return reply; +} + + + +/* ---------------------------------------------------------------------- * U t i l i t y F u n c t i o n s * ---------------------------------------------------------------------- */ @@ -2610,10 +2852,16 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env, * PROCESS, WE NEED TO INFORM IT!!! * * ALL WAITING PROCESSES MUST ALSO GET THE ERROR!! + * HANDLED BY THE STOP (CALLBACK) FUNCTION? + * + * WE DON'T NEED TO WAIT FOR OUTPUT TO BE WRITTEN, + * JUST ABORT THE SOCKET!!! * * </KOLLA> */ + descP->state = SOCKET_STATE_CLOSING; + SELECT(env, descP->sock, (ERL_NIF_SELECT_STOP), @@ -3150,6 +3398,9 @@ SocketDescriptor* alloc_descriptor(SOCKET sock, HANDLE event) descP->acceptorsQ.first = NULL; descP->acceptorsQ.last = NULL; + sprintf(buf, "socket[close,%d]", sock); + descP->closeMtx = MCREATE(buf); + descP->rBufSz = SOCKET_RECV_BUFFER_SIZE_DEFAULT; descP->iow = FALSE; descP->dbg = SOCKET_DEBUG_DEFAULT; @@ -3491,6 +3742,65 @@ ERL_NIF_TERM make_error2(ErlNifEnv* env, int err) } +/* Send an error closed message to the specified process: + * + * This message is for processes that are waiting in the + * erlang API functions for a select message. + */ +static +char* send_msg_error_closed(ErlNifEnv* env, + ErlNifPid* pid) +{ + return send_msg_error(env, atom_closed, pid); +} + + +/* Send an error message to the specified process: + * A message in the form: + * + * {error, Reason} + * + * This message is for processes that are waiting in the + * erlang API functions for a select message. + */ +static +char* send_msg_error(ErlNifEnv* env, + ERL_NIF_TERM reason, + ErlNifPid* pid) +{ + ERL_NIF_TERM msg = enif_make_tuple2(env, atom_error, reason); + + return send_msg(env, msg, pid); +} + + +/* Send a message to the specified process. + */ +static +char* send_msg(ErlNifEnv* env, + ERL_NIF_TERM msg, + ErlNifPid* pid) +{ + if (!enif_send(env, pid, NULL, msg)) + return str_exsend; + else + return NULL; +} + + +static +void xabort(const char* expr, + const char* func, + const char* file, + int line) +{ + fflush(stdout); + fprintf(stderr, "%s:%d:%s() Assertion failed: %s\n", + file, line, func, expr); + fflush(stderr); + abort(); +} + /* ---------------------------------------------------------------------- * C o u n t e r F u n c t i o n s @@ -3529,19 +3839,153 @@ BOOLEAN_T cnt_inc(uint32_t* cnt, uint32_t inc) static void socket_dtor(ErlNifEnv* env, void* obj) { + SocketDescriptor* descP = (SocketDescriptor*) obj; + + MDESTROY(descP->writeMtx); + MDESTROY(descP->readMtx); + MDESTROY(descP->accMtx); + MDESTROY(descP->closeMtx); } /* ========================================================================= * socket_stop - Callback function for resource stop * + * When the socket is stopped, we need to inform: + * + * * the controlling process + * * the current writer and any waiting writers + * * the current reader and any waiting readers + * * the current acceptor and any waiting acceptor + * + * Also, make sure no process gets the message twice + * (in case it is, for instance, both controlling process + * and a writer). + * + * <KOLLA> + * We do not handle linger-issues yet! So anything in the out + * buffers will be left for the OS to solve... + * Do we need a special "close"-thread? Dirty scheduler? + * </KOLLA> */ static void socket_stop(ErlNifEnv* env, void* obj, int fd, int is_direct_call) { + SocketDescriptor* descP = (SocketDescriptor*) obj; + ERL_NIF_TERM errClosed = MKT2(env, atom_error, atom_closed); + + MLOCK(descP->writeMtx); + MLOCK(descP->readMtx); + MLOCK(descP->accMtx); + MLOCK(descP->closeMtx); + + + descP->state = SOCKET_STATE_CLOSING; + descP->isReadable = FALSE; + descP->isWritable = FALSE; + + + /* We should check that we actually have a monitor. + * This *should* be done with a "NULL" monitor value, + * which there currently is none... + */ + DEMONP(env, descP, &descP->ctrlMon); + + if (descP->currentWriterP != NULL) { + /* We have a (current) writer and *may* therefor also have + * writers waiting. + */ + + SASSERT( (NULL == send_msg_error_closed(env, &descP->currentWriter.pid)) ); + + /* And also deal with the waiting writers (in the same way) */ + inform_waiting_procs(env, descP, &descP->writersQ, TRUE, errClosed); + } + + if (descP->currentReaderP != NULL) { + + /* We have a (current) reader and *may* therefor also have + * readers waiting. + */ + + SASSERT( (NULL == send_msg_error_closed(env, &descP->currentReader.pid)) ); + + /* And also deal with the waiting readers (in the same way) */ + inform_waiting_procs(env, descP, &descP->readersQ, TRUE, errClosed); + } + + if (descP->currentAcceptorP != NULL) { + /* We have a (current) acceptor and *may* therefor also have + * acceptors waiting. + */ + + SASSERT( (NULL == send_msg_error_closed(env, &descP->currentAcceptor.pid)) ); + + /* And also deal with the waiting acceptors (in the same way) */ + inform_waiting_procs(env, descP, &descP->acceptorsQ, TRUE, errClosed); + } + + + if (descP->sock != INVALID_SOCKET) { + + /* +++ send close message to the waiting process +++ + * + * {close, CloseRef} + * + */ + + send_msg(env, MKT2(env, atom_close, descP->closeRef), &descP->closerPid); + + DEMONP(env, descP, &descP->closerMon); + } + + + MUNLOCK(descP->closeMtx); + MUNLOCK(descP->accMtx); + MUNLOCK(descP->readMtx); + MUNLOCK(descP->writeMtx); + } +/* This function traverse the queue and sends the specified + * message to each member, and if the 'free' argument is TRUE, + * the queue will be emptied. + */ +static +void inform_waiting_procs(ErlNifEnv* env, + SocketDescriptor* descP, + SocketRequestQueue* q, + BOOLEAN_T free, + ERL_NIF_TERM msg) +{ + SocketRequestQueueElement* currentP = q->first; + SocketRequestQueueElement* nextP; + + while (currentP != NULL) { + + /* <KOLL> + * Should we inform anyone if we fail to demonitor? + * NOT SURE WHAT THAT WOULD REPRESENT AND IT IS NOT + * IMPORTANT IN *THIS* CASE, BUT ITS A FUNDAMENTAL OP... + * </KOLLA> + */ + + SASSERT( (NULL == send_msg(env, msg, ¤tP->data.pid)) ); + DEMONP(env, descP, ¤tP->data.mon); + nextP = currentP->nextP; + if (free) FREE(currentP); + currentP = nextP; + } + + if (free) { + q->first = NULL; + q->last = NULL; + } +} + + + /* ========================================================================= * socket_down - Callback function for resource down (monitored processes) * @@ -3564,31 +4008,32 @@ void socket_down(ErlNifEnv* env, static ErlNifFunc socket_funcs[] = { - // Some utility functions - {"nif_is_loaded", 0, nif_is_loaded}, - {"nif_info", 0, nif_info}, - // {"nif_debug", 1, nif_debug_}, - - // The proper "socket" interface - {"nif_open", 4, nif_open}, - {"nif_bind", 3, nif_bind}, - {"nif_connect", 3, nif_connect}, - {"nif_listen", 2, nif_listen}, - {"nif_accept", 2, nif_accept}, - {"nif_send", 4, nif_send}, - {"nif_sendto", 6, nif_sendto}, - {"nif_recv", 4, nif_recv}, - {"nif_recvfrom", 2, nif_recvfrom}, - {"nif_close", 1, nif_close}, - {"nif_setsockopt", 3, nif_setsockopt}, - {"nif_getsockopt", 2, nif_getsockopt}, - - /* "Extra" functions to "complete" the socket interface. - * For instance, the function nif_finalize_connection - * is called after the connect *select* has "completed". - */ - {"nif_finalize_connection", 1, nif_finalize_connection}, - {"nif_cancel", 2, nif_cancel}, + // Some utility functions + {"nif_is_loaded", 0, nif_is_loaded, 0}, + {"nif_info", 0, nif_info, 0}, + // {"nif_debug", 1, nif_debug_, 0}, + + // The proper "socket" interface + {"nif_open", 4, nif_open, 0}, + {"nif_bind", 3, nif_bind, 0}, + {"nif_connect", 3, nif_connect, 0}, + {"nif_listen", 2, nif_listen, 0}, + {"nif_accept", 2, nif_accept, 0}, + {"nif_send", 4, nif_send, 0}, + {"nif_sendto", 6, nif_sendto, 0}, + {"nif_recv", 4, nif_recv, 0}, + {"nif_recvfrom", 2, nif_recvfrom, 0}, + {"nif_close", 1, nif_close, 0}, + {"nif_setsockopt", 3, nif_setsockopt, 0}, + {"nif_getsockopt", 2, nif_getsockopt, 0}, + + /* "Extra" functions to "complete" the socket interface. + * For instance, the function nif_finalize_connection + * is called after the connect *select* has "completed". + */ + {"nif_finalize_connection", 1, nif_finalize_connection, 0}, + {"nif_cancel", 2, nif_cancel, 0}, + {"nif_finalize_close", 1, nif_finalize_close, ERL_NIF_DIRTY_JOB_IO_BOUND} }; @@ -3656,7 +4101,9 @@ int on_load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info) // atom_active_once = MKA(env, str_active_once); // atom_binary = MKA(env, str_binary); // atom_buildDate = MKA(env, str_buildDate); + atom_close = MKA(env, str_close); atom_closed = MKA(env, str_closed); + atom_closing = MKA(env, str_closing); atom_error = MKA(env, str_error); atom_false = MKA(env, str_false); // atom_list = MKA(env, str_list); @@ -3665,7 +4112,9 @@ int on_load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info) // atom_once = MKA(env, str_once); // atom_passive = MKA(env, str_passive); // atom_receiver = MKA(env, str_receiver); + atom_select = MKA(env, str_select); // atom_tcp_closed = MKA(env, str_tcp_closed); + atom_timeout = MKA(env, str_timeout); atom_true = MKA(env, str_true); atom_undefined = MKA(env, str_undefined); // atom_version = MKA(env, str_version); @@ -3675,6 +4124,7 @@ int on_load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info) atom_eafnosupport = MKA(env, str_eafnosupport); atom_einval = MKA(env, str_einval); atom_eisconn = MKA(env, str_eisconn); + atom_enotclosing = MKA(env, str_enotclosing); atom_enotconn = MKA(env, str_enotconn); atom_exalloc = MKA(env, str_exalloc); atom_exbadstate = MKA(env, str_exbadstate); @@ -3682,7 +4132,7 @@ int on_load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info) // atom_exnotopen = MKA(env, str_exnotopen); atom_exmon = MKA(env, str_exmon); atom_exself = MKA(env, str_exself); - // atom_exsend = MKA(env, str_exsend); + atom_exsend = MKA(env, str_exsend); // For storing "global" things... // socketData.env = enif_alloc_env(); // We should really check |