diff options
Diffstat (limited to 'erts')
-rw-r--r-- | erts/emulator/beam/erl_db_hash.c | 1 | ||||
-rw-r--r-- | erts/emulator/beam/erl_process.c | 3 | ||||
-rw-r--r-- | erts/emulator/nifs/common/net_nif.c | 3 | ||||
-rw-r--r-- | erts/emulator/nifs/common/socket_nif.c | 175 | ||||
-rw-r--r-- | erts/emulator/test/nif_SUITE.erl | 37 | ||||
-rw-r--r-- | erts/emulator/test/socket_SUITE.erl | 17 | ||||
-rw-r--r-- | erts/emulator/test/socket_test_evaluator.erl | 101 |
7 files changed, 192 insertions, 145 deletions
diff --git a/erts/emulator/beam/erl_db_hash.c b/erts/emulator/beam/erl_db_hash.c index fac4703620..f225730029 100644 --- a/erts/emulator/beam/erl_db_hash.c +++ b/erts/emulator/beam/erl_db_hash.c @@ -2214,6 +2214,7 @@ static int db_select_replace_hash(Process *p, DbTable *tbl, Eterm tid, ctx.tb = &tbl->hash; ctx.tid = tid; ctx.prev_continuation_tptr = NULL; + ctx.safety = safety; return match_traverse( &ctx, diff --git a/erts/emulator/beam/erl_process.c b/erts/emulator/beam/erl_process.c index 0a099e69bb..3fa06d1407 100644 --- a/erts/emulator/beam/erl_process.c +++ b/erts/emulator/beam/erl_process.c @@ -9460,6 +9460,7 @@ Process *erts_schedule(ErtsSchedulerData *esdp, Process *p, int calls) if (!is_normal_sched & !!(flags & ERTS_RUNQ_FLG_HALTING)) { /* Wait for emulator to terminate... */ + erts_runq_unlock(rq); while (1) erts_milli_sleep(1000*1000); } @@ -13403,10 +13404,10 @@ void erts_halt(int code) if (-1 == erts_atomic32_cmpxchg_acqb(&erts_halt_progress, erts_no_schedulers, -1)) { + notify_reap_ports_relb(); ERTS_RUNQ_FLGS_SET(ERTS_DIRTY_CPU_RUNQ, ERTS_RUNQ_FLG_HALTING); ERTS_RUNQ_FLGS_SET(ERTS_DIRTY_IO_RUNQ, ERTS_RUNQ_FLG_HALTING); erts_halt_code = code; - notify_reap_ports_relb(); } } diff --git a/erts/emulator/nifs/common/net_nif.c b/erts/emulator/nifs/common/net_nif.c index 6c91bd74bd..252aa3c835 100644 --- a/erts/emulator/nifs/common/net_nif.c +++ b/erts/emulator/nifs/common/net_nif.c @@ -1363,7 +1363,7 @@ ERL_NIF_TERM encode_address_infos(ErlNifEnv* env, NDBG( ("NET", "encode_address_infos -> len: %d\r\n", len) ); if (len > 0) { - ERL_NIF_TERM* array = MALLOC(len * sizeof(ERL_NIF_TERM)); // LEAK? + ERL_NIF_TERM* array = MALLOC(len * sizeof(ERL_NIF_TERM)); unsigned int i = 0; struct addrinfo* p = addrInfo; @@ -1374,6 +1374,7 @@ ERL_NIF_TERM encode_address_infos(ErlNifEnv* env, } result = MKLA(env, array, len); + FREE(array); } else { result = MKEL(env); } diff --git a/erts/emulator/nifs/common/socket_nif.c b/erts/emulator/nifs/common/socket_nif.c index bb3df85ea4..870ab63bdf 100644 --- a/erts/emulator/nifs/common/socket_nif.c +++ b/erts/emulator/nifs/common/socket_nif.c @@ -2309,12 +2309,14 @@ static void dec_socket(int domain, int type, int protocol); ERL_NIF_TERM sockRef); ACTIVATE_NEXT_FUNCS_DEFS #undef ACTIVATE_NEXT_FUNC_DEF - + +/* static BOOLEAN_T activate_next(ErlNifEnv* env, SocketDescriptor* descP, SocketRequestor* reqP, SocketRequestQueue* q, ERL_NIF_TERM sockRef); +*/ /* *** acceptor_search4pid | writer_search4pid | reader_search4pid *** * *** acceptor_push | writer_push | reader_push *** @@ -2408,7 +2410,8 @@ static void socket_down_reader(ErlNifEnv* env, const ErlNifPid* pid); static char* esock_send_close_msg(ErlNifEnv* env, - SocketDescriptor* descP); + SocketDescriptor* descP, + ERL_NIF_TERM sockRef); static char* esock_send_abort_msg(ErlNifEnv* env, ERL_NIF_TERM sockRef, ERL_NIF_TERM recvRef, @@ -16894,15 +16897,18 @@ char* send_msg_error(ErlNifEnv* env, */ static char* esock_send_close_msg(ErlNifEnv* env, - SocketDescriptor* descP) -{ - ERL_NIF_TERM sockRef = enif_make_resource(descP->closeEnv, descP); - char* res = esock_send_socket_msg(env, - sockRef, - esock_atom_close, - descP->closeRef, - &descP->closerPid, - descP->closeEnv); + SocketDescriptor* descP, + ERL_NIF_TERM sockRef) +{ + ERL_NIF_TERM sr = ((descP->closeEnv != NULL) ? + enif_make_copy(descP->closeEnv, sockRef) : + sockRef); + char* res = esock_send_socket_msg(env, + sr, + esock_atom_close, + descP->closeRef, + &descP->closerPid, + descP->closeEnv); descP->closeEnv = NULL; @@ -17047,93 +17053,81 @@ int esock_select_cancel(ErlNifEnv* env, * *** activate_next_writer *** * *** activate_next_reader *** * - * This functions pops the writer queue and then selects until it - * manages to successfully activate a writer or the queue is empty. + * This functions pops the requestors queue and then selects until it + * manages to successfully activate a requestor or the queue is empty. + * Return value indicates if a new requestor was activated or not. */ -#define ACTIVATE_NEXT_FUNCS \ - ACTIVATE_NEXT_FUNC_DECL(acceptor, currentAcceptor, acceptorsQ) \ - ACTIVATE_NEXT_FUNC_DECL(writer, currentWriter, writersQ) \ - ACTIVATE_NEXT_FUNC_DECL(reader, currentReader, readersQ) +#define ACTIVATE_NEXT_FUNCS \ + ACTIVATE_NEXT_FUNC_DECL(acceptor, read, currentAcceptor, acceptorsQ) \ + ACTIVATE_NEXT_FUNC_DECL(writer, write, currentWriter, writersQ) \ + ACTIVATE_NEXT_FUNC_DECL(reader, read, currentReader, readersQ) -#define ACTIVATE_NEXT_FUNC_DECL(F, R, Q) \ +#define ACTIVATE_NEXT_FUNC_DECL(F, S, R, Q) \ static \ BOOLEAN_T activate_next_##F(ErlNifEnv* env, \ SocketDescriptor* descP, \ ERL_NIF_TERM sockRef) \ { \ - return activate_next(env, descP, \ - &descP->R, &descP->Q, \ - sockRef); \ + BOOLEAN_T popped, activated; \ + int sres; \ + SocketRequestor* reqP = &descP->R; \ + SocketRequestQueue* q = &descP->Q; \ + \ + popped = FALSE; \ + do { \ + \ + if (requestor_pop(q, reqP)) { \ + \ + /* There was another one */ \ + \ + SSDBG( descP, \ + ("SOCKET", \ + "activate_next_" #F " -> new (active) requestor: " \ + "\r\n pid: %T" \ + "\r\n ref: %T" \ + "\r\n", reqP->pid, reqP->ref) ); \ + \ + if ((sres = esock_select_##S(env, descP->sock, descP, \ + &reqP->pid, reqP->ref)) < 0) { \ + /* We need to inform this process, reqP->pid, */ \ + /* that we failed to select, so we don't leave */ \ + /* it hanging. */ \ + /* => send abort */ \ + \ + esock_send_abort_msg(env, sockRef, reqP->ref, \ + sres, &reqP->pid); \ + \ + } else { \ + \ + /* Success: New requestor selected */ \ + popped = TRUE; \ + activated = FALSE; \ + \ + } \ + \ + } else { \ + \ + SSDBG( descP, \ + ("SOCKET", \ + "activate_next_" #F " -> no more requestors\r\n") ); \ + \ + popped = TRUE; \ + activated = FALSE; \ + } \ + \ + } while (!popped); \ + \ + SSDBG( descP, \ + ("SOCKET", "activate_next_" #F " -> " \ + "done with %s\r\n", B2S(activated)) ); \ + \ + return activated; \ } ACTIVATE_NEXT_FUNCS #undef ACTIVATE_NEXT_FUNC_DECL -/* *** activate_next *** - * - * This functions pops the requestor queue and then selects until it - * manages to successfully activate a new requestor or the queue is empty. - * Return value indicates if a new requestor was activated or not. - */ - -static -BOOLEAN_T activate_next(ErlNifEnv* env, - SocketDescriptor* descP, - SocketRequestor* reqP, - SocketRequestQueue* q, - ERL_NIF_TERM sockRef) -{ - BOOLEAN_T popped, activated; - int sres; - - popped = FALSE; - do { - - if (requestor_pop(q, reqP)) { - - /* There was another one */ - - SSDBG( descP, - ("SOCKET", "activate_next -> new (active) requestor: " - "\r\n pid: %T" - "\r\n ref: %T" - "\r\n", reqP->pid, reqP->ref) ); - - if ((sres = esock_select_read(env, descP->sock, descP, - &reqP->pid, reqP->ref)) < 0) { - /* We need to inform this process, reqP->pid, that we - * failed to select, so we don't leave it hanging. - * => send abort - */ - - esock_send_abort_msg(env, sockRef, reqP->ref, sres, &reqP->pid); - - } else { - - /* Success: New requestor selected */ - popped = TRUE; - activated = FALSE; - - } - - } else { - - SSDBG( descP, - ("SOCKET", "send_activate_next -> no more requestors\r\n") ); - - popped = TRUE; - activated = FALSE; - } - - } while (!popped); - - SSDBG( descP, - ("SOCKET", "activate_next -> " - "done with %s\r\n", B2S(activated)) ); - - return activated; -} /* ---------------------------------------------------------------------- @@ -17730,7 +17724,7 @@ void socket_stop(ErlNifEnv* env, void* obj, int fd, int is_direct_call) if (COMPARE_PIDS(&descP->closerPid, &descP->currentAcceptor.pid) != 0) { SSDBG( descP, ("SOCKET", "socket_stop -> " "send abort message to current acceptor %T\r\n", - descP->currentWriter.pid) ); + descP->currentAcceptor.pid) ); if (esock_send_abort_msg(env, sockRef, descP->currentAcceptor.ref, @@ -17764,11 +17758,12 @@ void socket_stop(ErlNifEnv* env, void* obj, int fd, int is_direct_call) if (descP->sock != INVALID_SOCKET) { if (descP->closeLocal) { + if (!is_direct_call) { /* +++ send close message to the waiting process +++ */ - esock_send_close_msg(env, descP); + esock_send_close_msg(env, descP, sockRef); DEMONP("socket_stop -> closer", env, descP, &descP->closerMon); @@ -17778,7 +17773,11 @@ void socket_stop(ErlNifEnv* env, void* obj, int fd, int is_direct_call) * since the message send takes care of it if scheduled. */ - if (descP->closeEnv != NULL) enif_free_env(descP->closeEnv); + if (descP->closeEnv != NULL) { + enif_clear_env(descP->closeEnv); + enif_free_env(descP->closeEnv); + descP->closeEnv = NULL; + } } } diff --git a/erts/emulator/test/nif_SUITE.erl b/erts/emulator/test/nif_SUITE.erl index 2309f844b9..62b7f77a52 100644 --- a/erts/emulator/test/nif_SUITE.erl +++ b/erts/emulator/test/nif_SUITE.erl @@ -1223,7 +1223,7 @@ maps(Config) when is_list(Config) -> repeat_while(fun({35,_}) -> false; ({K,Map}) -> Map = maps_from_list_nif(maps:to_list(Map)), - Map = maps:filter(fun(K,V) -> V =:= K*100 end, Map), + Map = maps:filter(fun(K2,V) -> V =:= K2*100 end, Map), {K+1, maps:put(K,K*100,Map)} end, {1,#{}}), @@ -1294,24 +1294,29 @@ resource_hugo_do(Type) -> release_resource(HugoPtr), erlang:garbage_collect(), {HugoPtr,HugoBin} = get_resource(Type,Hugo), - Pid = spawn_link(fun() -> - receive {Pid, Type, Resource, Ptr, Bin} -> - Pid ! {self(), got_it}, - receive {Pid, check_it} -> - {Ptr,Bin} = get_resource(Type,Resource), - Pid ! {self(), ok} - end - end - end), + {Pid,_} = + spawn_monitor(fun() -> + receive {Pid, Type, Resource, Ptr, Bin} -> + Pid ! {self(), got_it}, + receive {Pid, check_it} -> + {Ptr,Bin} = get_resource(Type,Resource) + end + end, + gc_and_exit(ok) + end), Pid ! {self(), Type, Hugo, HugoPtr, HugoBin}, {Pid, got_it} = receive_any(), erlang:garbage_collect(), % just to make our ProcBin move in memory Pid ! {self(), check_it}, - {Pid, ok} = receive_any(), + {'DOWN', _, process, Pid, ok} = receive_any(), [] = last_resource_dtor_call(), {HugoPtr,HugoBin} = get_resource(Type,Hugo), {HugoPtr, HugoBin, 1}. +gc_and_exit(Reason) -> + erlang:garbage_collect(), + exit(Reason). + resource_otto(Type) -> {OttoPtr, OttoBin} = resource_otto_do(Type), erlang:garbage_collect(), @@ -1388,14 +1393,14 @@ resource_binary_do() -> ResInfo = {Ptr,_} = get_resource(binary_resource_type,ResBin1), Papa = self(), - Forwarder = spawn_link(fun() -> forwarder(Papa) end), + {Forwarder,_} = spawn_monitor(fun() -> forwarder(Papa) end), io:format("sending to forwarder pid=~p\n",[Forwarder]), Forwarder ! ResBin1, ResBin2 = receive_any(), ResBin2 = ResBin1, ResInfo = get_resource(binary_resource_type,ResBin2), Forwarder ! terminate, - {Forwarder, 1} = receive_any(), + {'DOWN', _, process, Forwarder, 1} = receive_any(), erlang:garbage_collect(), ResInfo = get_resource(binary_resource_type,ResBin1), ResInfo = get_resource(binary_resource_type,ResBin2), @@ -1915,11 +1920,11 @@ send2_do1(SendBlobF) -> send2_do2(SendBlobF, self()), Papa = self(), - Forwarder = spawn_link(fun() -> forwarder(Papa) end), + {Forwarder,_} = spawn_monitor(fun() -> forwarder(Papa) end), io:format("sending to forwarder pid=~p\n",[Forwarder]), send2_do2(SendBlobF, Forwarder), Forwarder ! terminate, - {Forwarder, 4} = receive_any(), + {'DOWN', _, process, Forwarder, 4} = receive_any(), ok. send2_do2(SendBlobF, To) -> @@ -1975,7 +1980,7 @@ forwarder(To) -> forwarder(To, N) -> case receive_any() of terminate -> - To ! {self(), N}; + gc_and_exit(N); Msg -> To ! Msg, forwarder(To, N+1) diff --git a/erts/emulator/test/socket_SUITE.erl b/erts/emulator/test/socket_SUITE.erl index aec280485c..8a32efcd85 100644 --- a/erts/emulator/test/socket_SUITE.erl +++ b/erts/emulator/test/socket_SUITE.erl @@ -4541,7 +4541,7 @@ api_to_recv_tcp4(_Config) when is_list(_Config) -> Recv = fun(Sock, To) -> socket:recv(Sock, 0, To) end, InitState = #{domain => inet, recv => Recv, - timeout => 5000}, + timeout => 2000}, ok = api_to_receive_tcp(InitState) end). @@ -4566,7 +4566,7 @@ api_to_recv_tcp6(_Config) when is_list(_Config) -> end, InitState = #{domain => inet6, recv => Recv, - timeout => 5000}, + timeout => 2000}, ok = api_to_receive_tcp(InitState); false -> skip("ipv6 not supported") @@ -4900,7 +4900,7 @@ api_to_recvfrom_udp4(_Config) when is_list(_Config) -> Recv = fun(Sock, To) -> socket:recvfrom(Sock, 0, To) end, InitState = #{domain => inet, recv => Recv, - timeout => 5000}, + timeout => 2000}, ok = api_to_receive_udp(InitState) end). @@ -4921,7 +4921,7 @@ api_to_recvfrom_udp6(_Config) when is_list(_Config) -> Recv = fun(Sock, To) -> socket:recvfrom(Sock, 0, To) end, InitState = #{domain => inet6, recv => Recv, - timeout => 5000}, + timeout => 2000}, ok = api_to_receive_udp(InitState) end). @@ -4984,6 +4984,7 @@ api_to_receive_udp(InitState) -> %% *** Termination *** #{desc => "close socket", cmd => fun(#{sock := Sock} = _State) -> + socket:setopt(Sock, otp, debug, true), sock_close(Sock), ok end}, @@ -5015,7 +5016,7 @@ api_to_recvmsg_udp4(_Config) when is_list(_Config) -> Recv = fun(Sock, To) -> socket:recvmsg(Sock, To) end, InitState = #{domain => inet, recv => Recv, - timeout => 5000}, + timeout => 2000}, ok = api_to_receive_udp(InitState) end). @@ -5036,7 +5037,7 @@ api_to_recvmsg_udp6(_Config) when is_list(_Config) -> Recv = fun(Sock, To) -> socket:recvmsg(Sock, To) end, InitState = #{domain => inet6, recv => Recv, - timeout => 5000}, + timeout => 2000}, ok = api_to_receive_udp(InitState) end). @@ -5056,7 +5057,7 @@ api_to_recvmsg_tcp4(_Config) when is_list(_Config) -> Recv = fun(Sock, To) -> socket:recvmsg(Sock, To) end, InitState = #{domain => inet, recv => Recv, - timeout => 5000}, + timeout => 2000}, ok = api_to_receive_tcp(InitState) end). @@ -5077,7 +5078,7 @@ api_to_recvmsg_tcp6(_Config) when is_list(_Config) -> Recv = fun(Sock, To) -> socket:recvmsg(Sock, To) end, InitState = #{domain => inet6, recv => Recv, - timeout => 5000}, + timeout => 2000}, ok = api_to_receive_tcp(InitState) end). diff --git a/erts/emulator/test/socket_test_evaluator.erl b/erts/emulator/test/socket_test_evaluator.erl index bd86b3b92e..c5748ac21b 100644 --- a/erts/emulator/test/socket_test_evaluator.erl +++ b/erts/emulator/test/socket_test_evaluator.erl @@ -104,8 +104,9 @@ start(Name, Seq, InitState) erlang:error({already_used, parent}); error -> InitState2 = InitState#{parent => self()}, - {Pid, MRef} = erlang:spawn_monitor( - fun() -> init(Name, Seq, InitState2) end), + Pid = erlang:spawn_link( + fun() -> init(Name, Seq, InitState2) end), + MRef = erlang:monitor(process, Pid), #ev{name = Name, pid = Pid, mref = MRef} end. @@ -149,55 +150,93 @@ loop(ID, [#{desc := Desc, Evs :: [ev()]. await_finish(Evs) -> - await_finish(Evs, []). + await_finish(Evs, [], []). -await_finish([], []) -> +await_finish([], _, []) -> ok; -await_finish([], Fails) -> +await_finish([], _OK, Fails) -> ?SEV_EPRINT("Fails: " "~n ~p", [Fails]), Fails; -await_finish(Evs, Fails) -> +await_finish(Evs, OK, Fails) -> receive %% Successfull termination of evaluator {'DOWN', _MRef, process, Pid, normal} -> - case lists:keysearch(Pid, #ev.pid, Evs) of - {value, #ev{name = Name}} -> - iprint("evaluator '~s' (~p) success", [Name, Pid]), - NewEvs = lists:keydelete(Pid, #ev.pid, Evs), - await_finish(NewEvs, Fails); - false -> - iprint("unknown process ~p died (normal)", [Pid]), - await_finish(Evs, Fails) - end; + {Evs2, OK2, Fails2} = await_finish_normal(Pid, Evs, OK, Fails), + await_finish(Evs2, OK2, Fails2); + {'EXIT', Pid, normal} -> + {Evs2, OK2, Fails2} = await_finish_normal(Pid, Evs, OK, Fails), + await_finish(Evs2, OK2, Fails2); %% The evaluator can skip the teat case: {'DOWN', _MRef, process, Pid, {skip, Reason}} -> - case lists:keysearch(Pid, #ev.pid, Evs) of - {value, #ev{name = Name}} -> - iprint("evaluator '~s' (~p) issued SKIP: " - "~n ~p", [Name, Pid, Reason]); + await_finish_skip(Pid, Reason, Evs, OK); + {'EXIT', Pid, {skip, Reason}} -> + await_finish_skip(Pid, Reason, Evs, OK); + + %% Evaluator failed + {'DOWN', _MRef, process, Pid, Reason} -> + {Evs2, OK2, Fails2} = await_finish_fail(Pid, Reason, Evs, OK, Fails), + await_finish(Evs2, OK2, Fails2); + {'EXIT', Pid, Reason} -> + {Evs2, OK2, Fails2} = await_finish_fail(Pid, Reason, Evs, OK, Fails), + await_finish(Evs2, OK2, Fails2) + end. + + +await_finish_normal(Pid, Evs, OK, Fails) -> + case lists:keysearch(Pid, #ev.pid, Evs) of + {value, #ev{name = Name}} -> + iprint("evaluator '~s' (~p) success", [Name, Pid]), + NewEvs = lists:keydelete(Pid, #ev.pid, Evs), + {NewEvs, [Pid|OK], Fails}; + false -> + case lists:member(Pid, OK) of + true -> + ok; + false -> + iprint("unknown process ~p died (normal)", [Pid]), + ok + end, + {Evs, OK, Fails} + end. + +await_finish_skip(Pid, Reason, Evs, OK) -> + case lists:keysearch(Pid, #ev.pid, Evs) of + {value, #ev{name = Name}} -> + iprint("evaluator '~s' (~p) issued SKIP: " + "~n ~p", [Name, Pid, Reason]); + false -> + case lists:member(Pid, OK) of + true -> + ok; false -> iprint("unknown process ~p issued SKIP: " "~n ~p", [Pid, Reason]) - end, - ?LIB:skip(Reason); + end + end, + ?LIB:skip(Reason). - %% Evaluator failed - {'DOWN', _MRef, process, Pid, Reason} -> - case lists:keysearch(Pid, #ev.pid, Evs) of - {value, #ev{name = Name}} -> - iprint("evaluator '~s' (~p) failed", [Name, Pid]), - NewEvs = lists:keydelete(Pid, #ev.pid, Evs), - await_finish(NewEvs, [{Pid, Reason}|Fails]); + +await_finish_fail(Pid, Reason, Evs, OK, Fails) -> + case lists:keysearch(Pid, #ev.pid, Evs) of + {value, #ev{name = Name}} -> + iprint("evaluator '~s' (~p) failed", [Name, Pid]), + NewEvs = lists:keydelete(Pid, #ev.pid, Evs), + {NewEvs, OK, [{Pid, Reason}|Fails]}; + false -> + case lists:member(Pid, OK) of + true -> + ok; false -> iprint("unknown process ~p died: " - "~n ~p", [Pid, Reason]), - await_finish(Evs, Fails) - end + "~n ~p", [Pid, Reason]) + end, + {Evs, OK, Fails} end. + %% ============================================================================ -spec announce_start(To) -> ok when |