aboutsummaryrefslogtreecommitdiffstats
path: root/erts/emulator/nifs/common/socket_nif.c
diff options
context:
space:
mode:
authorMicael Karlberg <[email protected]>2018-04-25 14:54:03 +0200
committerMicael Karlberg <[email protected]>2018-09-18 13:01:37 +0200
commit82bfbdd7919e1aee360b76bdbaca17d2cf00ee73 (patch)
treec2f4cab7e110cb2fbfb1bbe9a1f1275024a39f79 /erts/emulator/nifs/common/socket_nif.c
parent599a320f630991823fc28b6a8a9f09851e261fed (diff)
downloadotp-82bfbdd7919e1aee360b76bdbaca17d2cf00ee73.tar.gz
otp-82bfbdd7919e1aee360b76bdbaca17d2cf00ee73.tar.bz2
otp-82bfbdd7919e1aee360b76bdbaca17d2cf00ee73.zip
[socket-nif] More close-related work
There are still some questions regarding what hapopens when writing / reading from an (remote) closed socket (I talking about "properly" closed sockets).
Diffstat (limited to 'erts/emulator/nifs/common/socket_nif.c')
-rw-r--r--erts/emulator/nifs/common/socket_nif.c122
1 files changed, 96 insertions, 26 deletions
diff --git a/erts/emulator/nifs/common/socket_nif.c b/erts/emulator/nifs/common/socket_nif.c
index d55de9ff4e..dadea3b6fb 100644
--- a/erts/emulator/nifs/common/socket_nif.c
+++ b/erts/emulator/nifs/common/socket_nif.c
@@ -554,6 +554,7 @@ typedef struct {
ErlNifPid closerPid;
ErlNifMonitor closerMon;
ERL_NIF_TERM closeRef;
+ BOOLEAN_T closeLocal;
} SocketDescriptor;
@@ -762,7 +763,7 @@ static void inform_waiting_procs(ErlNifEnv* env,
SocketDescriptor* descP,
SocketRequestQueue* q,
BOOLEAN_T free,
- ERL_NIF_TERM msg);
+ ERL_NIF_TERM reason);
static BOOLEAN_T verify_is_connected(SocketDescriptor* descP, int* err);
@@ -812,6 +813,10 @@ static char* send_msg_error_closed(ErlNifEnv* env,
static char* send_msg_error(ErlNifEnv* env,
ERL_NIF_TERM reason,
ErlNifPid* pid);
+static char* send_msg_nif_abort(ErlNifEnv* env,
+ ERL_NIF_TERM ref,
+ ERL_NIF_TERM reason,
+ ErlNifPid* pid);
static char* send_msg(ErlNifEnv* env,
ERL_NIF_TERM msg,
ErlNifPid* pid);
@@ -862,6 +867,7 @@ static char str_closed[] = "closed";
static char str_closing[] = "closing";
static char str_error[] = "error";
static char str_false[] = "false";
+static char str_nif_abort[] = "nif_abort";
static char str_ok[] = "ok";
static char str_select[] = "select";
static char str_timeout[] = "timeout";
@@ -889,6 +895,7 @@ static ERL_NIF_TERM atom_closed;
static ERL_NIF_TERM atom_closing;
static ERL_NIF_TERM atom_error;
static ERL_NIF_TERM atom_false;
+static ERL_NIF_TERM atom_nif_abort;
static ERL_NIF_TERM atom_ok;
static ERL_NIF_TERM atom_select;
static ERL_NIF_TERM atom_timeout;
@@ -2605,8 +2612,9 @@ ERL_NIF_TERM nclose(ErlNifEnv* env,
return make_error(env, atom_exmon);
}
- descP->state = SOCKET_STATE_CLOSING;
- doClose = TRUE;
+ descP->closeLocal = TRUE;
+ descP->state = SOCKET_STATE_CLOSING;
+ doClose = TRUE;
}
MUNLOCK(descP->closeMtx);
@@ -2854,13 +2862,15 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env,
* ALL WAITING PROCESSES MUST ALSO GET THE ERROR!!
* HANDLED BY THE STOP (CALLBACK) FUNCTION?
*
- * WE DON'T NEED TO WAIT FOR OUTPUT TO BE WRITTEN,
- * JUST ABORT THE SOCKET!!!
+ * SINCE THIS IS A REMOTE CLOSE, WE DON'T NEED TO WAIT
+ * FOR OUTPUT TO BE WRITTEN (NO ONE WILL READ), JUST
+ * ABORT THE SOCKET REGARDLESS OF LINGER???
*
* </KOLLA>
*/
- descP->state = SOCKET_STATE_CLOSING;
+ descP->closeLocal = FALSE;
+ descP->state = SOCKET_STATE_CLOSING;
SELECT(env,
descP->sock,
@@ -2944,6 +2954,9 @@ ERL_NIF_TERM recvfrom_check_result(ErlNifEnv* env,
* </KOLLA>
*/
+ descP->closeLocal = FALSE;
+ descP->state = SOCKET_STATE_CLOSING;
+
SELECT(env,
descP->sock,
(ERL_NIF_SELECT_STOP),
@@ -3768,9 +3781,29 @@ char* send_msg_error(ErlNifEnv* env,
ERL_NIF_TERM reason,
ErlNifPid* pid)
{
- ERL_NIF_TERM msg = enif_make_tuple2(env, atom_error, reason);
+ ERL_NIF_TERM msg = enif_make_tuple2(env, atom_error, reason);
+
+ return send_msg(env, msg, pid);
+}
+
+
+/* Send an (nif-) abort message to the specified process:
+ * A message in the form:
+ *
+ * {nif_abort, Ref, Reason}
+ *
+ * This message is for processes that are waiting in the
+ * erlang API functions for a select message.
+ */
+static
+char* send_msg_nif_abort(ErlNifEnv* env,
+ ERL_NIF_TERM ref,
+ ERL_NIF_TERM reason,
+ ErlNifPid* pid)
+{
+ ERL_NIF_TERM msg = MKT3(env, atom_nif_abort, ref, reason);
- return send_msg(env, msg, pid);
+ return send_msg(env, msg, pid);
}
@@ -3871,8 +3904,7 @@ void socket_dtor(ErlNifEnv* env, void* obj)
static
void socket_stop(ErlNifEnv* env, void* obj, int fd, int is_direct_call)
{
- SocketDescriptor* descP = (SocketDescriptor*) obj;
- ERL_NIF_TERM errClosed = MKT2(env, atom_error, atom_closed);
+ SocketDescriptor* descP = (SocketDescriptor*) obj;
MLOCK(descP->writeMtx);
MLOCK(descP->readMtx);
@@ -3880,7 +3912,7 @@ void socket_stop(ErlNifEnv* env, void* obj, int fd, int is_direct_call)
MLOCK(descP->closeMtx);
- descP->state = SOCKET_STATE_CLOSING;
+ descP->state = SOCKET_STATE_CLOSING; // Just in case...???
descP->isReadable = FALSE;
descP->isWritable = FALSE;
@@ -3896,10 +3928,13 @@ void socket_stop(ErlNifEnv* env, void* obj, int fd, int is_direct_call)
* writers waiting.
*/
- SASSERT( (NULL == send_msg_error_closed(env, &descP->currentWriter.pid)) );
+ SASSERT( (NULL == send_msg_nif_abort(env,
+ descP->currentWriter.ref,
+ atom_closed,
+ &descP->currentWriter.pid)) );
/* And also deal with the waiting writers (in the same way) */
- inform_waiting_procs(env, descP, &descP->writersQ, TRUE, errClosed);
+ inform_waiting_procs(env, descP, &descP->writersQ, TRUE, atom_closed);
}
if (descP->currentReaderP != NULL) {
@@ -3908,10 +3943,13 @@ void socket_stop(ErlNifEnv* env, void* obj, int fd, int is_direct_call)
* readers waiting.
*/
- SASSERT( (NULL == send_msg_error_closed(env, &descP->currentReader.pid)) );
+ SASSERT( (NULL == send_msg_nif_abort(env,
+ descP->currentReader.ref,
+ atom_closed,
+ &descP->currentReader.pid)) );
/* And also deal with the waiting readers (in the same way) */
- inform_waiting_procs(env, descP, &descP->readersQ, TRUE, errClosed);
+ inform_waiting_procs(env, descP, &descP->readersQ, TRUE, atom_closed);
}
if (descP->currentAcceptorP != NULL) {
@@ -3919,24 +3957,50 @@ void socket_stop(ErlNifEnv* env, void* obj, int fd, int is_direct_call)
* acceptors waiting.
*/
- SASSERT( (NULL == send_msg_error_closed(env, &descP->currentAcceptor.pid)) );
+ SASSERT( (NULL == send_msg_nif_abort(env,
+ descP->currentAcceptor.ref,
+ atom_closed,
+ &descP->currentAcceptor.pid)) );
/* And also deal with the waiting acceptors (in the same way) */
- inform_waiting_procs(env, descP, &descP->acceptorsQ, TRUE, errClosed);
+ inform_waiting_procs(env, descP, &descP->acceptorsQ, TRUE, atom_closed);
}
if (descP->sock != INVALID_SOCKET) {
- /* +++ send close message to the waiting process +++
+ /*
+ * <KOLLA>
*
- * {close, CloseRef}
+ * WE NEED TO CHECK IF THIS OPERATION IS TRIGGERED
+ * LOCALLY (VIA A CALL TO CLOSE) OR REMOTELLY
+ * (VIA I.E. ECONSRESET).
*
+ * </KOLLA>
*/
- send_msg(env, MKT2(env, atom_close, descP->closeRef), &descP->closerPid);
+ if (descP->closeLocal) {
+
+ /* +++ send close message to the waiting process +++
+ *
+ * {close, CloseRef}
+ *
+ */
+
+ send_msg(env, MKT2(env, atom_close, descP->closeRef), &descP->closerPid);
+
+ DEMONP(env, descP, &descP->closerMon);
+
+ } else {
- DEMONP(env, descP, &descP->closerMon);
+ /*
+ * <KOLLA>
+ *
+ * ABORT?
+ *
+ * </KOLLA>
+ */
+ }
}
@@ -3949,29 +4013,34 @@ void socket_stop(ErlNifEnv* env, void* obj, int fd, int is_direct_call)
/* This function traverse the queue and sends the specified
- * message to each member, and if the 'free' argument is TRUE,
- * the queue will be emptied.
+ * nif_abort message with the specified reason to each member,
+ * and if the 'free' argument is TRUE, the queue will be emptied.
*/
static
void inform_waiting_procs(ErlNifEnv* env,
SocketDescriptor* descP,
SocketRequestQueue* q,
BOOLEAN_T free,
- ERL_NIF_TERM msg)
+ ERL_NIF_TERM reason)
{
SocketRequestQueueElement* currentP = q->first;
SocketRequestQueueElement* nextP;
while (currentP != NULL) {
- /* <KOLL>
+ /* <KOLLA>
+ *
* Should we inform anyone if we fail to demonitor?
* NOT SURE WHAT THAT WOULD REPRESENT AND IT IS NOT
* IMPORTANT IN *THIS* CASE, BUT ITS A FUNDAMENTAL OP...
+ *
* </KOLLA>
*/
- SASSERT( (NULL == send_msg(env, msg, &currentP->data.pid)) );
+ SASSERT( (NULL == send_msg_nif_abort(env,
+ currentP->data.ref,
+ reason,
+ &currentP->data.pid)) );
DEMONP(env, descP, &currentP->data.mon);
nextP = currentP->nextP;
if (free) FREE(currentP);
@@ -4108,6 +4177,7 @@ int on_load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info)
atom_false = MKA(env, str_false);
// atom_list = MKA(env, str_list);
// atom_mode = MKA(env, str_mode);
+ atom_nif_abort = MKA(env, str_nif_abort);
atom_ok = MKA(env, str_ok);
// atom_once = MKA(env, str_once);
// atom_passive = MKA(env, str_passive);