From 82bfbdd7919e1aee360b76bdbaca17d2cf00ee73 Mon Sep 17 00:00:00 2001 From: Micael Karlberg Date: Wed, 25 Apr 2018 14:54:03 +0200 Subject: [socket-nif] More close-related work There are still some questions regarding what hapopens when writing / reading from an (remote) closed socket (I talking about "properly" closed sockets). --- erts/emulator/nifs/common/socket_nif.c | 122 ++++++++++++++++++++++++++------- 1 file changed, 96 insertions(+), 26 deletions(-) (limited to 'erts/emulator/nifs/common/socket_nif.c') diff --git a/erts/emulator/nifs/common/socket_nif.c b/erts/emulator/nifs/common/socket_nif.c index d55de9ff4e..dadea3b6fb 100644 --- a/erts/emulator/nifs/common/socket_nif.c +++ b/erts/emulator/nifs/common/socket_nif.c @@ -554,6 +554,7 @@ typedef struct { ErlNifPid closerPid; ErlNifMonitor closerMon; ERL_NIF_TERM closeRef; + BOOLEAN_T closeLocal; } SocketDescriptor; @@ -762,7 +763,7 @@ static void inform_waiting_procs(ErlNifEnv* env, SocketDescriptor* descP, SocketRequestQueue* q, BOOLEAN_T free, - ERL_NIF_TERM msg); + ERL_NIF_TERM reason); static BOOLEAN_T verify_is_connected(SocketDescriptor* descP, int* err); @@ -812,6 +813,10 @@ static char* send_msg_error_closed(ErlNifEnv* env, static char* send_msg_error(ErlNifEnv* env, ERL_NIF_TERM reason, ErlNifPid* pid); +static char* send_msg_nif_abort(ErlNifEnv* env, + ERL_NIF_TERM ref, + ERL_NIF_TERM reason, + ErlNifPid* pid); static char* send_msg(ErlNifEnv* env, ERL_NIF_TERM msg, ErlNifPid* pid); @@ -862,6 +867,7 @@ static char str_closed[] = "closed"; static char str_closing[] = "closing"; static char str_error[] = "error"; static char str_false[] = "false"; +static char str_nif_abort[] = "nif_abort"; static char str_ok[] = "ok"; static char str_select[] = "select"; static char str_timeout[] = "timeout"; @@ -889,6 +895,7 @@ static ERL_NIF_TERM atom_closed; static ERL_NIF_TERM atom_closing; static ERL_NIF_TERM atom_error; static ERL_NIF_TERM atom_false; +static ERL_NIF_TERM atom_nif_abort; static ERL_NIF_TERM atom_ok; static ERL_NIF_TERM atom_select; static ERL_NIF_TERM atom_timeout; @@ -2605,8 +2612,9 @@ ERL_NIF_TERM nclose(ErlNifEnv* env, return make_error(env, atom_exmon); } - descP->state = SOCKET_STATE_CLOSING; - doClose = TRUE; + descP->closeLocal = TRUE; + descP->state = SOCKET_STATE_CLOSING; + doClose = TRUE; } MUNLOCK(descP->closeMtx); @@ -2854,13 +2862,15 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env, * ALL WAITING PROCESSES MUST ALSO GET THE ERROR!! * HANDLED BY THE STOP (CALLBACK) FUNCTION? * - * WE DON'T NEED TO WAIT FOR OUTPUT TO BE WRITTEN, - * JUST ABORT THE SOCKET!!! + * SINCE THIS IS A REMOTE CLOSE, WE DON'T NEED TO WAIT + * FOR OUTPUT TO BE WRITTEN (NO ONE WILL READ), JUST + * ABORT THE SOCKET REGARDLESS OF LINGER??? * * */ - descP->state = SOCKET_STATE_CLOSING; + descP->closeLocal = FALSE; + descP->state = SOCKET_STATE_CLOSING; SELECT(env, descP->sock, @@ -2944,6 +2954,9 @@ ERL_NIF_TERM recvfrom_check_result(ErlNifEnv* env, * */ + descP->closeLocal = FALSE; + descP->state = SOCKET_STATE_CLOSING; + SELECT(env, descP->sock, (ERL_NIF_SELECT_STOP), @@ -3768,9 +3781,29 @@ char* send_msg_error(ErlNifEnv* env, ERL_NIF_TERM reason, ErlNifPid* pid) { - ERL_NIF_TERM msg = enif_make_tuple2(env, atom_error, reason); + ERL_NIF_TERM msg = enif_make_tuple2(env, atom_error, reason); + + return send_msg(env, msg, pid); +} + + +/* Send an (nif-) abort message to the specified process: + * A message in the form: + * + * {nif_abort, Ref, Reason} + * + * This message is for processes that are waiting in the + * erlang API functions for a select message. + */ +static +char* send_msg_nif_abort(ErlNifEnv* env, + ERL_NIF_TERM ref, + ERL_NIF_TERM reason, + ErlNifPid* pid) +{ + ERL_NIF_TERM msg = MKT3(env, atom_nif_abort, ref, reason); - return send_msg(env, msg, pid); + return send_msg(env, msg, pid); } @@ -3871,8 +3904,7 @@ void socket_dtor(ErlNifEnv* env, void* obj) static void socket_stop(ErlNifEnv* env, void* obj, int fd, int is_direct_call) { - SocketDescriptor* descP = (SocketDescriptor*) obj; - ERL_NIF_TERM errClosed = MKT2(env, atom_error, atom_closed); + SocketDescriptor* descP = (SocketDescriptor*) obj; MLOCK(descP->writeMtx); MLOCK(descP->readMtx); @@ -3880,7 +3912,7 @@ void socket_stop(ErlNifEnv* env, void* obj, int fd, int is_direct_call) MLOCK(descP->closeMtx); - descP->state = SOCKET_STATE_CLOSING; + descP->state = SOCKET_STATE_CLOSING; // Just in case...??? descP->isReadable = FALSE; descP->isWritable = FALSE; @@ -3896,10 +3928,13 @@ void socket_stop(ErlNifEnv* env, void* obj, int fd, int is_direct_call) * writers waiting. */ - SASSERT( (NULL == send_msg_error_closed(env, &descP->currentWriter.pid)) ); + SASSERT( (NULL == send_msg_nif_abort(env, + descP->currentWriter.ref, + atom_closed, + &descP->currentWriter.pid)) ); /* And also deal with the waiting writers (in the same way) */ - inform_waiting_procs(env, descP, &descP->writersQ, TRUE, errClosed); + inform_waiting_procs(env, descP, &descP->writersQ, TRUE, atom_closed); } if (descP->currentReaderP != NULL) { @@ -3908,10 +3943,13 @@ void socket_stop(ErlNifEnv* env, void* obj, int fd, int is_direct_call) * readers waiting. */ - SASSERT( (NULL == send_msg_error_closed(env, &descP->currentReader.pid)) ); + SASSERT( (NULL == send_msg_nif_abort(env, + descP->currentReader.ref, + atom_closed, + &descP->currentReader.pid)) ); /* And also deal with the waiting readers (in the same way) */ - inform_waiting_procs(env, descP, &descP->readersQ, TRUE, errClosed); + inform_waiting_procs(env, descP, &descP->readersQ, TRUE, atom_closed); } if (descP->currentAcceptorP != NULL) { @@ -3919,24 +3957,50 @@ void socket_stop(ErlNifEnv* env, void* obj, int fd, int is_direct_call) * acceptors waiting. */ - SASSERT( (NULL == send_msg_error_closed(env, &descP->currentAcceptor.pid)) ); + SASSERT( (NULL == send_msg_nif_abort(env, + descP->currentAcceptor.ref, + atom_closed, + &descP->currentAcceptor.pid)) ); /* And also deal with the waiting acceptors (in the same way) */ - inform_waiting_procs(env, descP, &descP->acceptorsQ, TRUE, errClosed); + inform_waiting_procs(env, descP, &descP->acceptorsQ, TRUE, atom_closed); } if (descP->sock != INVALID_SOCKET) { - /* +++ send close message to the waiting process +++ + /* + * * - * {close, CloseRef} + * WE NEED TO CHECK IF THIS OPERATION IS TRIGGERED + * LOCALLY (VIA A CALL TO CLOSE) OR REMOTELLY + * (VIA I.E. ECONSRESET). * + * */ - send_msg(env, MKT2(env, atom_close, descP->closeRef), &descP->closerPid); + if (descP->closeLocal) { + + /* +++ send close message to the waiting process +++ + * + * {close, CloseRef} + * + */ + + send_msg(env, MKT2(env, atom_close, descP->closeRef), &descP->closerPid); + + DEMONP(env, descP, &descP->closerMon); + + } else { - DEMONP(env, descP, &descP->closerMon); + /* + * + * + * ABORT? + * + * + */ + } } @@ -3949,29 +4013,34 @@ void socket_stop(ErlNifEnv* env, void* obj, int fd, int is_direct_call) /* This function traverse the queue and sends the specified - * message to each member, and if the 'free' argument is TRUE, - * the queue will be emptied. + * nif_abort message with the specified reason to each member, + * and if the 'free' argument is TRUE, the queue will be emptied. */ static void inform_waiting_procs(ErlNifEnv* env, SocketDescriptor* descP, SocketRequestQueue* q, BOOLEAN_T free, - ERL_NIF_TERM msg) + ERL_NIF_TERM reason) { SocketRequestQueueElement* currentP = q->first; SocketRequestQueueElement* nextP; while (currentP != NULL) { - /* + /* + * * Should we inform anyone if we fail to demonitor? * NOT SURE WHAT THAT WOULD REPRESENT AND IT IS NOT * IMPORTANT IN *THIS* CASE, BUT ITS A FUNDAMENTAL OP... + * * */ - SASSERT( (NULL == send_msg(env, msg, ¤tP->data.pid)) ); + SASSERT( (NULL == send_msg_nif_abort(env, + currentP->data.ref, + reason, + ¤tP->data.pid)) ); DEMONP(env, descP, ¤tP->data.mon); nextP = currentP->nextP; if (free) FREE(currentP); @@ -4108,6 +4177,7 @@ int on_load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info) atom_false = MKA(env, str_false); // atom_list = MKA(env, str_list); // atom_mode = MKA(env, str_mode); + atom_nif_abort = MKA(env, str_nif_abort); atom_ok = MKA(env, str_ok); // atom_once = MKA(env, str_once); // atom_passive = MKA(env, str_passive); -- cgit v1.2.3