diff options
author | Micael Karlberg <[email protected]> | 2018-04-25 14:54:03 +0200 |
---|---|---|
committer | Micael Karlberg <[email protected]> | 2018-09-18 13:01:37 +0200 |
commit | 82bfbdd7919e1aee360b76bdbaca17d2cf00ee73 (patch) | |
tree | c2f4cab7e110cb2fbfb1bbe9a1f1275024a39f79 | |
parent | 599a320f630991823fc28b6a8a9f09851e261fed (diff) | |
download | otp-82bfbdd7919e1aee360b76bdbaca17d2cf00ee73.tar.gz otp-82bfbdd7919e1aee360b76bdbaca17d2cf00ee73.tar.bz2 otp-82bfbdd7919e1aee360b76bdbaca17d2cf00ee73.zip |
[socket-nif] More close-related work
There are still some questions regarding what hapopens
when writing / reading from an (remote) closed socket
(I talking about "properly" closed sockets).
-rw-r--r-- | erts/emulator/nifs/common/socket_nif.c | 122 | ||||
-rw-r--r-- | erts/preloaded/src/socket.erl | 62 |
2 files changed, 146 insertions, 38 deletions
diff --git a/erts/emulator/nifs/common/socket_nif.c b/erts/emulator/nifs/common/socket_nif.c index d55de9ff4e..dadea3b6fb 100644 --- a/erts/emulator/nifs/common/socket_nif.c +++ b/erts/emulator/nifs/common/socket_nif.c @@ -554,6 +554,7 @@ typedef struct { ErlNifPid closerPid; ErlNifMonitor closerMon; ERL_NIF_TERM closeRef; + BOOLEAN_T closeLocal; } SocketDescriptor; @@ -762,7 +763,7 @@ static void inform_waiting_procs(ErlNifEnv* env, SocketDescriptor* descP, SocketRequestQueue* q, BOOLEAN_T free, - ERL_NIF_TERM msg); + ERL_NIF_TERM reason); static BOOLEAN_T verify_is_connected(SocketDescriptor* descP, int* err); @@ -812,6 +813,10 @@ static char* send_msg_error_closed(ErlNifEnv* env, static char* send_msg_error(ErlNifEnv* env, ERL_NIF_TERM reason, ErlNifPid* pid); +static char* send_msg_nif_abort(ErlNifEnv* env, + ERL_NIF_TERM ref, + ERL_NIF_TERM reason, + ErlNifPid* pid); static char* send_msg(ErlNifEnv* env, ERL_NIF_TERM msg, ErlNifPid* pid); @@ -862,6 +867,7 @@ static char str_closed[] = "closed"; static char str_closing[] = "closing"; static char str_error[] = "error"; static char str_false[] = "false"; +static char str_nif_abort[] = "nif_abort"; static char str_ok[] = "ok"; static char str_select[] = "select"; static char str_timeout[] = "timeout"; @@ -889,6 +895,7 @@ static ERL_NIF_TERM atom_closed; static ERL_NIF_TERM atom_closing; static ERL_NIF_TERM atom_error; static ERL_NIF_TERM atom_false; +static ERL_NIF_TERM atom_nif_abort; static ERL_NIF_TERM atom_ok; static ERL_NIF_TERM atom_select; static ERL_NIF_TERM atom_timeout; @@ -2605,8 +2612,9 @@ ERL_NIF_TERM nclose(ErlNifEnv* env, return make_error(env, atom_exmon); } - descP->state = SOCKET_STATE_CLOSING; - doClose = TRUE; + descP->closeLocal = TRUE; + descP->state = SOCKET_STATE_CLOSING; + doClose = TRUE; } MUNLOCK(descP->closeMtx); @@ -2854,13 +2862,15 @@ ERL_NIF_TERM recv_check_result(ErlNifEnv* env, * ALL WAITING PROCESSES MUST ALSO GET THE ERROR!! * HANDLED BY THE STOP (CALLBACK) FUNCTION? * - * WE DON'T NEED TO WAIT FOR OUTPUT TO BE WRITTEN, - * JUST ABORT THE SOCKET!!! + * SINCE THIS IS A REMOTE CLOSE, WE DON'T NEED TO WAIT + * FOR OUTPUT TO BE WRITTEN (NO ONE WILL READ), JUST + * ABORT THE SOCKET REGARDLESS OF LINGER??? * * </KOLLA> */ - descP->state = SOCKET_STATE_CLOSING; + descP->closeLocal = FALSE; + descP->state = SOCKET_STATE_CLOSING; SELECT(env, descP->sock, @@ -2944,6 +2954,9 @@ ERL_NIF_TERM recvfrom_check_result(ErlNifEnv* env, * </KOLLA> */ + descP->closeLocal = FALSE; + descP->state = SOCKET_STATE_CLOSING; + SELECT(env, descP->sock, (ERL_NIF_SELECT_STOP), @@ -3768,9 +3781,29 @@ char* send_msg_error(ErlNifEnv* env, ERL_NIF_TERM reason, ErlNifPid* pid) { - ERL_NIF_TERM msg = enif_make_tuple2(env, atom_error, reason); + ERL_NIF_TERM msg = enif_make_tuple2(env, atom_error, reason); + + return send_msg(env, msg, pid); +} + + +/* Send an (nif-) abort message to the specified process: + * A message in the form: + * + * {nif_abort, Ref, Reason} + * + * This message is for processes that are waiting in the + * erlang API functions for a select message. + */ +static +char* send_msg_nif_abort(ErlNifEnv* env, + ERL_NIF_TERM ref, + ERL_NIF_TERM reason, + ErlNifPid* pid) +{ + ERL_NIF_TERM msg = MKT3(env, atom_nif_abort, ref, reason); - return send_msg(env, msg, pid); + return send_msg(env, msg, pid); } @@ -3871,8 +3904,7 @@ void socket_dtor(ErlNifEnv* env, void* obj) static void socket_stop(ErlNifEnv* env, void* obj, int fd, int is_direct_call) { - SocketDescriptor* descP = (SocketDescriptor*) obj; - ERL_NIF_TERM errClosed = MKT2(env, atom_error, atom_closed); + SocketDescriptor* descP = (SocketDescriptor*) obj; MLOCK(descP->writeMtx); MLOCK(descP->readMtx); @@ -3880,7 +3912,7 @@ void socket_stop(ErlNifEnv* env, void* obj, int fd, int is_direct_call) MLOCK(descP->closeMtx); - descP->state = SOCKET_STATE_CLOSING; + descP->state = SOCKET_STATE_CLOSING; // Just in case...??? descP->isReadable = FALSE; descP->isWritable = FALSE; @@ -3896,10 +3928,13 @@ void socket_stop(ErlNifEnv* env, void* obj, int fd, int is_direct_call) * writers waiting. */ - SASSERT( (NULL == send_msg_error_closed(env, &descP->currentWriter.pid)) ); + SASSERT( (NULL == send_msg_nif_abort(env, + descP->currentWriter.ref, + atom_closed, + &descP->currentWriter.pid)) ); /* And also deal with the waiting writers (in the same way) */ - inform_waiting_procs(env, descP, &descP->writersQ, TRUE, errClosed); + inform_waiting_procs(env, descP, &descP->writersQ, TRUE, atom_closed); } if (descP->currentReaderP != NULL) { @@ -3908,10 +3943,13 @@ void socket_stop(ErlNifEnv* env, void* obj, int fd, int is_direct_call) * readers waiting. */ - SASSERT( (NULL == send_msg_error_closed(env, &descP->currentReader.pid)) ); + SASSERT( (NULL == send_msg_nif_abort(env, + descP->currentReader.ref, + atom_closed, + &descP->currentReader.pid)) ); /* And also deal with the waiting readers (in the same way) */ - inform_waiting_procs(env, descP, &descP->readersQ, TRUE, errClosed); + inform_waiting_procs(env, descP, &descP->readersQ, TRUE, atom_closed); } if (descP->currentAcceptorP != NULL) { @@ -3919,24 +3957,50 @@ void socket_stop(ErlNifEnv* env, void* obj, int fd, int is_direct_call) * acceptors waiting. */ - SASSERT( (NULL == send_msg_error_closed(env, &descP->currentAcceptor.pid)) ); + SASSERT( (NULL == send_msg_nif_abort(env, + descP->currentAcceptor.ref, + atom_closed, + &descP->currentAcceptor.pid)) ); /* And also deal with the waiting acceptors (in the same way) */ - inform_waiting_procs(env, descP, &descP->acceptorsQ, TRUE, errClosed); + inform_waiting_procs(env, descP, &descP->acceptorsQ, TRUE, atom_closed); } if (descP->sock != INVALID_SOCKET) { - /* +++ send close message to the waiting process +++ + /* + * <KOLLA> * - * {close, CloseRef} + * WE NEED TO CHECK IF THIS OPERATION IS TRIGGERED + * LOCALLY (VIA A CALL TO CLOSE) OR REMOTELLY + * (VIA I.E. ECONSRESET). * + * </KOLLA> */ - send_msg(env, MKT2(env, atom_close, descP->closeRef), &descP->closerPid); + if (descP->closeLocal) { + + /* +++ send close message to the waiting process +++ + * + * {close, CloseRef} + * + */ + + send_msg(env, MKT2(env, atom_close, descP->closeRef), &descP->closerPid); + + DEMONP(env, descP, &descP->closerMon); + + } else { - DEMONP(env, descP, &descP->closerMon); + /* + * <KOLLA> + * + * ABORT? + * + * </KOLLA> + */ + } } @@ -3949,29 +4013,34 @@ void socket_stop(ErlNifEnv* env, void* obj, int fd, int is_direct_call) /* This function traverse the queue and sends the specified - * message to each member, and if the 'free' argument is TRUE, - * the queue will be emptied. + * nif_abort message with the specified reason to each member, + * and if the 'free' argument is TRUE, the queue will be emptied. */ static void inform_waiting_procs(ErlNifEnv* env, SocketDescriptor* descP, SocketRequestQueue* q, BOOLEAN_T free, - ERL_NIF_TERM msg) + ERL_NIF_TERM reason) { SocketRequestQueueElement* currentP = q->first; SocketRequestQueueElement* nextP; while (currentP != NULL) { - /* <KOLL> + /* <KOLLA> + * * Should we inform anyone if we fail to demonitor? * NOT SURE WHAT THAT WOULD REPRESENT AND IT IS NOT * IMPORTANT IN *THIS* CASE, BUT ITS A FUNDAMENTAL OP... + * * </KOLLA> */ - SASSERT( (NULL == send_msg(env, msg, ¤tP->data.pid)) ); + SASSERT( (NULL == send_msg_nif_abort(env, + currentP->data.ref, + reason, + ¤tP->data.pid)) ); DEMONP(env, descP, ¤tP->data.mon); nextP = currentP->nextP; if (free) FREE(currentP); @@ -4108,6 +4177,7 @@ int on_load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info) atom_false = MKA(env, str_false); // atom_list = MKA(env, str_list); // atom_mode = MKA(env, str_mode); + atom_nif_abort = MKA(env, str_nif_abort); atom_ok = MKA(env, str_ok); // atom_once = MKA(env, str_once); // atom_passive = MKA(env, str_passive); diff --git a/erts/preloaded/src/socket.erl b/erts/preloaded/src/socket.erl index 044fe73906..f1e8a8b099 100644 --- a/erts/preloaded/src/socket.erl +++ b/erts/preloaded/src/socket.erl @@ -105,11 +105,12 @@ %% otp - The option is internal to our (OTP) imeplementation. %% socket - The socket layer (SOL_SOCKET). -%% ip - The ip layer (SOL_IP). +%% ip - The IP layer (SOL_IP). +%% ipv6 - The IPv6 layer (SOL_IPV6). %% tcp - The TCP (Transport Control Protocol) layer (IPPROTO_TCP). %% udp - The UDP (User Datagram Protocol) layer (IPPROTO_UDP). %% Int - Raw level, sent down and used "as is". --type option_level() :: otp | socket | ip | tcp | udp | non_neg_integer(). +-type option_level() :: otp | socket | ip | ipv6 | tcp | udp | non_neg_integer(). -type socket_info() :: map(). -record(socket, {info :: socket_info(), @@ -212,8 +213,9 @@ -define(SOCKET_SETOPT_LEVEL_OTP, 0). -define(SOCKET_SETOPT_LEVEL_SOCKET, 1). -define(SOCKET_SETOPT_LEVEL_IP, 2). --define(SOCKET_SETOPT_LEVEL_TCP, 3). --define(SOCKET_SETOPT_LEVEL_UDP, 4). +-define(SOCKET_SETOPT_LEVEL_IPV6, 3). +-define(SOCKET_SETOPT_LEVEL_TCP, 4). +-define(SOCKET_SETOPT_LEVEL_UDP, 5). -define(SOCKET_SETOPT_KEY_DEBUG, 0). @@ -485,7 +487,11 @@ do_accept(LSockRef, SI, Timeout) -> NewTimeout = next_timeout(TS, Timeout), receive {select, LSockRef, AccRef, ready_input} -> - do_accept(LSockRef, SI, next_timeout(TS, Timeout)) + do_accept(LSockRef, SI, next_timeout(TS, Timeout)); + + {nif_abort, AccRef, Reason} -> + {error, Reason} + after NewTimeout -> nif_cancel(LSockRef, accept, AccRef), flush_select_msgs(LSockRef, AccRef), @@ -539,7 +545,11 @@ do_send(SockRef, Data, EFlags, Timeout) -> next_timeout(TS, Timeout)); {select, SockRef, SendRef, ready_output} -> do_send(SockRef, Data, EFlags, - next_timeout(TS, Timeout)) + next_timeout(TS, Timeout)); + + {nif_abort, SendRef, Reason} -> + {error, Reason} + after NewTimeout -> nif_cancel(SockRef, send, SendRef), flush_select_msgs(SockRef, SendRef), @@ -549,7 +559,11 @@ do_send(SockRef, Data, EFlags, Timeout) -> receive {select, SockRef, SendRef, ready_output} -> do_send(SockRef, Data, EFlags, - next_timeout(TS, Timeout)) + next_timeout(TS, Timeout)); + + {nif_abort, SendRef, Reason} -> + {error, Reason} + after Timeout -> nif_cancel(SockRef, send, SendRef), flush_select_msgs(SockRef, SendRef), @@ -608,7 +622,11 @@ do_sendto(SockRef, Data, EFlags, DestAddr, DestPort, Timeout) -> next_timeout(TS, Timeout)); {select, SockRef, SendRef, ready_output} -> do_sendto(SockRef, Data, EFlags, DestAddr, DestPort, - next_timeout(TS, Timeout)) + next_timeout(TS, Timeout)); + + {nif_abort, SendRef, Reason} -> + {error, Reason} + after Timeout -> nif_cancel(SockRef, sendto, SendRef), flush_select_msgs(SockRef, SendRef), @@ -789,7 +807,12 @@ do_recv(SockRef, _OldRef, Length, EFlags, Acc, Timeout) do_recv(SockRef, RecvRef, Length-size(Bin), EFlags, Bin, - next_timeout(TS, Timeout)) + next_timeout(TS, Timeout)); + + {nif_abort, RecvRef, Reason} -> + {error, Reason} + + after NewTimeout -> nif_cancel(SockRef, recv, RecvRef), flush_select_msgs(SockRef, RecvRef), @@ -804,7 +827,12 @@ do_recv(SockRef, _OldRef, Length, EFlags, Acc, Timeout) do_recv(SockRef, RecvRef, Length-size(Bin), EFlags, <<Acc/binary, Bin/binary>>, - next_timeout(TS, Timeout)) + next_timeout(TS, Timeout)); + + {nif_abort, RecvRef, Reason} -> + {error, Reason} + + after NewTimeout -> nif_cancel(SockRef, recv, RecvRef), flush_select_msgs(SockRef, RecvRef), @@ -824,7 +852,11 @@ do_recv(SockRef, _OldRef, Length, EFlags, Acc, Timeout) do_recv(SockRef, RecvRef, Length, EFlags, Acc, - next_timeout(TS, Timeout)) + next_timeout(TS, Timeout)); + + {nif_abort, RecvRef, Reason} -> + {error, Reason} + after NewTimeout -> nif_cancel(SockRef, recv, RecvRef), flush_select_msgs(SockRef, RecvRef), @@ -913,7 +945,11 @@ do_recvfrom(SockRef, BufSz, EFlags, Timeout) -> receive {select, SockRef, RecvRef, ready_input} -> do_recvfrom(SockRef, BufSz, EFlags, - next_timeout(TS, Timeout)) + next_timeout(TS, Timeout)); + + {nif_abort, RecvRef, Reason} -> + {error, Reason} + after NewTimeout -> nif_cancel(SockRef, recvfrom, RecvRef), flush_select_msgs(SockRef, RecvRef), @@ -1141,6 +1177,8 @@ enc_setopt_level(socket) -> {?SOCKET_SETOPT_LEVEL_ENCODED, ?SOCKET_SETOPT_LEVEL_SOCKET}; enc_setopt_level(ip) -> {?SOCKET_SETOPT_LEVEL_ENCODED, ?SOCKET_SETOPT_LEVEL_IP}; +enc_setopt_level(ipv6) -> + {?SOCKET_SETOPT_LEVEL_ENCODED, ?SOCKET_SETOPT_LEVEL_IPV6}; enc_setopt_level(tcp) -> {?SOCKET_SETOPT_LEVEL_ENCODED, ?SOCKET_SETOPT_LEVEL_TCP}; enc_setopt_level(udp) -> |