aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMicael Karlberg <[email protected]>2018-04-25 14:54:03 +0200
committerMicael Karlberg <[email protected]>2018-09-18 13:01:37 +0200
commit82bfbdd7919e1aee360b76bdbaca17d2cf00ee73 (patch)
treec2f4cab7e110cb2fbfb1bbe9a1f1275024a39f79
parent599a320f630991823fc28b6a8a9f09851e261fed (diff)
downloadotp-82bfbdd7919e1aee360b76bdbaca17d2cf00ee73.tar.gz
otp-82bfbdd7919e1aee360b76bdbaca17d2cf00ee73.tar.bz2
otp-82bfbdd7919e1aee360b76bdbaca17d2cf00ee73.zip
[socket-nif] More close-related work
There are still some questions regarding what hapopens when writing / reading from an (remote) closed socket (I talking about "properly" closed sockets).
-rw-r--r--erts/emulator/nifs/common/socket_nif.c122
-rw-r--r--erts/preloaded/src/socket.erl62
2 files changed, 146 insertions, 38 deletions
diff --git a/erts/emulator/nifs/common/socket_nif.c b/erts/emulator/nifs/common/socket_nif.c
index d55de9ff4e..dadea3b6fb 100644
--- a/erts/emulator/nifs/common/socket_nif.c
+++ b/erts/emulator/nifs/common/socket_nif.c
@@ -554,6 +554,7 @@ typedef struct {
ErlNifPid closerPid;
ErlNifMonitor closerMon;
ERL_NIF_TERM closeRef;
+ BOOLEAN_T closeLocal;
} SocketDescriptor;
@@ -762,7 +763,7 @@ static void inform_waiting_procs(ErlNifEnv* env,
SocketDescriptor* descP,
SocketRequestQueue* q,
BOOLEAN_T free,
- ERL_NIF_TERM msg);
+ ERL_NIF_TERM reason);
static BOOLEAN_T verify_is_connected(SocketDescriptor* descP, int* err);
@@ -812,6 +813,10 @@ static char* send_msg_error_closed(ErlNifEnv* env,
static char* send_msg_error(ErlNifEnv* env,
ERL_NIF_TERM reason,
ErlNifPid* pid);
+static char* send_msg_nif_abort(ErlNifEnv* env,
+ ERL_NIF_TERM ref,
+ ERL_NIF_TERM reason,
+ ErlNifPid* pid);
static char* send_msg(ErlNifEnv* env,
ERL_NIF_TERM msg,
ErlNifPid* pid);
@@ -862,6 +867,7 @@ static char str_closed[] = "closed";
static char str_closing[] = "closing";
static char str_error[] = "error";
static char str_false[] = "false";
+static char str_nif_abort[] = "nif_abort";
static char str_ok[] = "ok";
static char str_select[] = "select";
static char str_timeout[] = "timeout";
@@ -889,6 +895,7 @@ static ERL_NIF_TERM atom_closed;
static ERL_NIF_TERM atom_closing;
static ERL_NIF_TERM atom_error;
static ERL_NIF_TERM atom_false;
+static ERL_NIF_TERM atom_nif_abort;
static ERL_NIF_TERM atom_ok;
static ERL_NIF_TERM atom_select;
static ERL_NIF_TERM atom_timeout;
@@ -2605,8 +2612,9 @@ ERL_NIF_TERM nclose(ErlNifEnv* env,
return make_error(env, atom_exmon);
}
- descP->state = SOCKET_STATE_CLOSING;
- doClose = TRUE;
+ descP->closeLocal = TRUE;
+ descP->state = SOCKET_STATE_CLOSING;
+ doClose = TRUE;
}
MUNLOCK(descP->closeMtx);
@@ -2854,13 +2862,15 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env,
* ALL WAITING PROCESSES MUST ALSO GET THE ERROR!!
* HANDLED BY THE STOP (CALLBACK) FUNCTION?
*
- * WE DON'T NEED TO WAIT FOR OUTPUT TO BE WRITTEN,
- * JUST ABORT THE SOCKET!!!
+ * SINCE THIS IS A REMOTE CLOSE, WE DON'T NEED TO WAIT
+ * FOR OUTPUT TO BE WRITTEN (NO ONE WILL READ), JUST
+ * ABORT THE SOCKET REGARDLESS OF LINGER???
*
* </KOLLA>
*/
- descP->state = SOCKET_STATE_CLOSING;
+ descP->closeLocal = FALSE;
+ descP->state = SOCKET_STATE_CLOSING;
SELECT(env,
descP->sock,
@@ -2944,6 +2954,9 @@ ERL_NIF_TERM recvfrom_check_result(ErlNifEnv* env,
* </KOLLA>
*/
+ descP->closeLocal = FALSE;
+ descP->state = SOCKET_STATE_CLOSING;
+
SELECT(env,
descP->sock,
(ERL_NIF_SELECT_STOP),
@@ -3768,9 +3781,29 @@ char* send_msg_error(ErlNifEnv* env,
ERL_NIF_TERM reason,
ErlNifPid* pid)
{
- ERL_NIF_TERM msg = enif_make_tuple2(env, atom_error, reason);
+ ERL_NIF_TERM msg = enif_make_tuple2(env, atom_error, reason);
+
+ return send_msg(env, msg, pid);
+}
+
+
+/* Send an (nif-) abort message to the specified process:
+ * A message in the form:
+ *
+ * {nif_abort, Ref, Reason}
+ *
+ * This message is for processes that are waiting in the
+ * erlang API functions for a select message.
+ */
+static
+char* send_msg_nif_abort(ErlNifEnv* env,
+ ERL_NIF_TERM ref,
+ ERL_NIF_TERM reason,
+ ErlNifPid* pid)
+{
+ ERL_NIF_TERM msg = MKT3(env, atom_nif_abort, ref, reason);
- return send_msg(env, msg, pid);
+ return send_msg(env, msg, pid);
}
@@ -3871,8 +3904,7 @@ void socket_dtor(ErlNifEnv* env, void* obj)
static
void socket_stop(ErlNifEnv* env, void* obj, int fd, int is_direct_call)
{
- SocketDescriptor* descP = (SocketDescriptor*) obj;
- ERL_NIF_TERM errClosed = MKT2(env, atom_error, atom_closed);
+ SocketDescriptor* descP = (SocketDescriptor*) obj;
MLOCK(descP->writeMtx);
MLOCK(descP->readMtx);
@@ -3880,7 +3912,7 @@ void socket_stop(ErlNifEnv* env, void* obj, int fd, int is_direct_call)
MLOCK(descP->closeMtx);
- descP->state = SOCKET_STATE_CLOSING;
+ descP->state = SOCKET_STATE_CLOSING; // Just in case...???
descP->isReadable = FALSE;
descP->isWritable = FALSE;
@@ -3896,10 +3928,13 @@ void socket_stop(ErlNifEnv* env, void* obj, int fd, int is_direct_call)
* writers waiting.
*/
- SASSERT( (NULL == send_msg_error_closed(env, &descP->currentWriter.pid)) );
+ SASSERT( (NULL == send_msg_nif_abort(env,
+ descP->currentWriter.ref,
+ atom_closed,
+ &descP->currentWriter.pid)) );
/* And also deal with the waiting writers (in the same way) */
- inform_waiting_procs(env, descP, &descP->writersQ, TRUE, errClosed);
+ inform_waiting_procs(env, descP, &descP->writersQ, TRUE, atom_closed);
}
if (descP->currentReaderP != NULL) {
@@ -3908,10 +3943,13 @@ void socket_stop(ErlNifEnv* env, void* obj, int fd, int is_direct_call)
* readers waiting.
*/
- SASSERT( (NULL == send_msg_error_closed(env, &descP->currentReader.pid)) );
+ SASSERT( (NULL == send_msg_nif_abort(env,
+ descP->currentReader.ref,
+ atom_closed,
+ &descP->currentReader.pid)) );
/* And also deal with the waiting readers (in the same way) */
- inform_waiting_procs(env, descP, &descP->readersQ, TRUE, errClosed);
+ inform_waiting_procs(env, descP, &descP->readersQ, TRUE, atom_closed);
}
if (descP->currentAcceptorP != NULL) {
@@ -3919,24 +3957,50 @@ void socket_stop(ErlNifEnv* env, void* obj, int fd, int is_direct_call)
* acceptors waiting.
*/
- SASSERT( (NULL == send_msg_error_closed(env, &descP->currentAcceptor.pid)) );
+ SASSERT( (NULL == send_msg_nif_abort(env,
+ descP->currentAcceptor.ref,
+ atom_closed,
+ &descP->currentAcceptor.pid)) );
/* And also deal with the waiting acceptors (in the same way) */
- inform_waiting_procs(env, descP, &descP->acceptorsQ, TRUE, errClosed);
+ inform_waiting_procs(env, descP, &descP->acceptorsQ, TRUE, atom_closed);
}
if (descP->sock != INVALID_SOCKET) {
- /* +++ send close message to the waiting process +++
+ /*
+ * <KOLLA>
*
- * {close, CloseRef}
+ * WE NEED TO CHECK IF THIS OPERATION IS TRIGGERED
+ * LOCALLY (VIA A CALL TO CLOSE) OR REMOTELLY
+ * (VIA I.E. ECONSRESET).
*
+ * </KOLLA>
*/
- send_msg(env, MKT2(env, atom_close, descP->closeRef), &descP->closerPid);
+ if (descP->closeLocal) {
+
+ /* +++ send close message to the waiting process +++
+ *
+ * {close, CloseRef}
+ *
+ */
+
+ send_msg(env, MKT2(env, atom_close, descP->closeRef), &descP->closerPid);
+
+ DEMONP(env, descP, &descP->closerMon);
+
+ } else {
- DEMONP(env, descP, &descP->closerMon);
+ /*
+ * <KOLLA>
+ *
+ * ABORT?
+ *
+ * </KOLLA>
+ */
+ }
}
@@ -3949,29 +4013,34 @@ void socket_stop(ErlNifEnv* env, void* obj, int fd, int is_direct_call)
/* This function traverse the queue and sends the specified
- * message to each member, and if the 'free' argument is TRUE,
- * the queue will be emptied.
+ * nif_abort message with the specified reason to each member,
+ * and if the 'free' argument is TRUE, the queue will be emptied.
*/
static
void inform_waiting_procs(ErlNifEnv* env,
SocketDescriptor* descP,
SocketRequestQueue* q,
BOOLEAN_T free,
- ERL_NIF_TERM msg)
+ ERL_NIF_TERM reason)
{
SocketRequestQueueElement* currentP = q->first;
SocketRequestQueueElement* nextP;
while (currentP != NULL) {
- /* <KOLL>
+ /* <KOLLA>
+ *
* Should we inform anyone if we fail to demonitor?
* NOT SURE WHAT THAT WOULD REPRESENT AND IT IS NOT
* IMPORTANT IN *THIS* CASE, BUT ITS A FUNDAMENTAL OP...
+ *
* </KOLLA>
*/
- SASSERT( (NULL == send_msg(env, msg, &currentP->data.pid)) );
+ SASSERT( (NULL == send_msg_nif_abort(env,
+ currentP->data.ref,
+ reason,
+ &currentP->data.pid)) );
DEMONP(env, descP, &currentP->data.mon);
nextP = currentP->nextP;
if (free) FREE(currentP);
@@ -4108,6 +4177,7 @@ int on_load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info)
atom_false = MKA(env, str_false);
// atom_list = MKA(env, str_list);
// atom_mode = MKA(env, str_mode);
+ atom_nif_abort = MKA(env, str_nif_abort);
atom_ok = MKA(env, str_ok);
// atom_once = MKA(env, str_once);
// atom_passive = MKA(env, str_passive);
diff --git a/erts/preloaded/src/socket.erl b/erts/preloaded/src/socket.erl
index 044fe73906..f1e8a8b099 100644
--- a/erts/preloaded/src/socket.erl
+++ b/erts/preloaded/src/socket.erl
@@ -105,11 +105,12 @@
%% otp - The option is internal to our (OTP) imeplementation.
%% socket - The socket layer (SOL_SOCKET).
-%% ip - The ip layer (SOL_IP).
+%% ip - The IP layer (SOL_IP).
+%% ipv6 - The IPv6 layer (SOL_IPV6).
%% tcp - The TCP (Transport Control Protocol) layer (IPPROTO_TCP).
%% udp - The UDP (User Datagram Protocol) layer (IPPROTO_UDP).
%% Int - Raw level, sent down and used "as is".
--type option_level() :: otp | socket | ip | tcp | udp | non_neg_integer().
+-type option_level() :: otp | socket | ip | ipv6 | tcp | udp | non_neg_integer().
-type socket_info() :: map().
-record(socket, {info :: socket_info(),
@@ -212,8 +213,9 @@
-define(SOCKET_SETOPT_LEVEL_OTP, 0).
-define(SOCKET_SETOPT_LEVEL_SOCKET, 1).
-define(SOCKET_SETOPT_LEVEL_IP, 2).
--define(SOCKET_SETOPT_LEVEL_TCP, 3).
--define(SOCKET_SETOPT_LEVEL_UDP, 4).
+-define(SOCKET_SETOPT_LEVEL_IPV6, 3).
+-define(SOCKET_SETOPT_LEVEL_TCP, 4).
+-define(SOCKET_SETOPT_LEVEL_UDP, 5).
-define(SOCKET_SETOPT_KEY_DEBUG, 0).
@@ -485,7 +487,11 @@ do_accept(LSockRef, SI, Timeout) ->
NewTimeout = next_timeout(TS, Timeout),
receive
{select, LSockRef, AccRef, ready_input} ->
- do_accept(LSockRef, SI, next_timeout(TS, Timeout))
+ do_accept(LSockRef, SI, next_timeout(TS, Timeout));
+
+ {nif_abort, AccRef, Reason} ->
+ {error, Reason}
+
after NewTimeout ->
nif_cancel(LSockRef, accept, AccRef),
flush_select_msgs(LSockRef, AccRef),
@@ -539,7 +545,11 @@ do_send(SockRef, Data, EFlags, Timeout) ->
next_timeout(TS, Timeout));
{select, SockRef, SendRef, ready_output} ->
do_send(SockRef, Data, EFlags,
- next_timeout(TS, Timeout))
+ next_timeout(TS, Timeout));
+
+ {nif_abort, SendRef, Reason} ->
+ {error, Reason}
+
after NewTimeout ->
nif_cancel(SockRef, send, SendRef),
flush_select_msgs(SockRef, SendRef),
@@ -549,7 +559,11 @@ do_send(SockRef, Data, EFlags, Timeout) ->
receive
{select, SockRef, SendRef, ready_output} ->
do_send(SockRef, Data, EFlags,
- next_timeout(TS, Timeout))
+ next_timeout(TS, Timeout));
+
+ {nif_abort, SendRef, Reason} ->
+ {error, Reason}
+
after Timeout ->
nif_cancel(SockRef, send, SendRef),
flush_select_msgs(SockRef, SendRef),
@@ -608,7 +622,11 @@ do_sendto(SockRef, Data, EFlags, DestAddr, DestPort, Timeout) ->
next_timeout(TS, Timeout));
{select, SockRef, SendRef, ready_output} ->
do_sendto(SockRef, Data, EFlags, DestAddr, DestPort,
- next_timeout(TS, Timeout))
+ next_timeout(TS, Timeout));
+
+ {nif_abort, SendRef, Reason} ->
+ {error, Reason}
+
after Timeout ->
nif_cancel(SockRef, sendto, SendRef),
flush_select_msgs(SockRef, SendRef),
@@ -789,7 +807,12 @@ do_recv(SockRef, _OldRef, Length, EFlags, Acc, Timeout)
do_recv(SockRef, RecvRef,
Length-size(Bin), EFlags,
Bin,
- next_timeout(TS, Timeout))
+ next_timeout(TS, Timeout));
+
+ {nif_abort, RecvRef, Reason} ->
+ {error, Reason}
+
+
after NewTimeout ->
nif_cancel(SockRef, recv, RecvRef),
flush_select_msgs(SockRef, RecvRef),
@@ -804,7 +827,12 @@ do_recv(SockRef, _OldRef, Length, EFlags, Acc, Timeout)
do_recv(SockRef, RecvRef,
Length-size(Bin), EFlags,
<<Acc/binary, Bin/binary>>,
- next_timeout(TS, Timeout))
+ next_timeout(TS, Timeout));
+
+ {nif_abort, RecvRef, Reason} ->
+ {error, Reason}
+
+
after NewTimeout ->
nif_cancel(SockRef, recv, RecvRef),
flush_select_msgs(SockRef, RecvRef),
@@ -824,7 +852,11 @@ do_recv(SockRef, _OldRef, Length, EFlags, Acc, Timeout)
do_recv(SockRef, RecvRef,
Length, EFlags,
Acc,
- next_timeout(TS, Timeout))
+ next_timeout(TS, Timeout));
+
+ {nif_abort, RecvRef, Reason} ->
+ {error, Reason}
+
after NewTimeout ->
nif_cancel(SockRef, recv, RecvRef),
flush_select_msgs(SockRef, RecvRef),
@@ -913,7 +945,11 @@ do_recvfrom(SockRef, BufSz, EFlags, Timeout) ->
receive
{select, SockRef, RecvRef, ready_input} ->
do_recvfrom(SockRef, BufSz, EFlags,
- next_timeout(TS, Timeout))
+ next_timeout(TS, Timeout));
+
+ {nif_abort, RecvRef, Reason} ->
+ {error, Reason}
+
after NewTimeout ->
nif_cancel(SockRef, recvfrom, RecvRef),
flush_select_msgs(SockRef, RecvRef),
@@ -1141,6 +1177,8 @@ enc_setopt_level(socket) ->
{?SOCKET_SETOPT_LEVEL_ENCODED, ?SOCKET_SETOPT_LEVEL_SOCKET};
enc_setopt_level(ip) ->
{?SOCKET_SETOPT_LEVEL_ENCODED, ?SOCKET_SETOPT_LEVEL_IP};
+enc_setopt_level(ipv6) ->
+ {?SOCKET_SETOPT_LEVEL_ENCODED, ?SOCKET_SETOPT_LEVEL_IPV6};
enc_setopt_level(tcp) ->
{?SOCKET_SETOPT_LEVEL_ENCODED, ?SOCKET_SETOPT_LEVEL_TCP};
enc_setopt_level(udp) ->