From f78ed9b50effd3007aafef2979ae5921ffedf75b Mon Sep 17 00:00:00 2001 From: Rickard Green Date: Wed, 21 Mar 2018 16:33:35 +0100 Subject: Fix signal order for is_process_alive --- erts/emulator/beam/bif.c | 38 ------- erts/emulator/beam/bif.h | 61 ----------- erts/emulator/beam/bif.tab | 1 + erts/emulator/beam/erl_bif_info.c | 74 ++++++++----- erts/emulator/beam/erl_proc_sig_queue.c | 180 +++++++++++++++++++++++++++----- erts/emulator/beam/erl_proc_sig_queue.h | 25 +++++ erts/emulator/test/bif_SUITE.erl | 27 ++++- 7 files changed, 255 insertions(+), 151 deletions(-) (limited to 'erts/emulator') diff --git a/erts/emulator/beam/bif.c b/erts/emulator/beam/bif.c index adea7d007e..232597c5b6 100644 --- a/erts/emulator/beam/bif.c +++ b/erts/emulator/beam/bif.c @@ -52,7 +52,6 @@ Export *erts_await_result; static Export await_exit_trap; static Export* flush_monitor_messages_trap = NULL; static Export* set_cpu_topology_trap = NULL; -static Export* await_proc_exit_trap = NULL; static Export* await_port_send_result_trap = NULL; Export* erts_format_cpu_topology_trap = NULL; static Export dsend_continue_trap_export; @@ -4654,42 +4653,6 @@ static BIF_RETTYPE bif_return_trap(BIF_ALIST_2) BIF_RET(res); } -void -erts_bif_prep_await_proc_exit_data_trap(Process *c_p, Eterm pid, Eterm ret) -{ - ERTS_BIF_PREP_TRAP3_NO_RET(await_proc_exit_trap, c_p, pid, am_data, ret); -} - -void -erts_bif_prep_await_proc_exit_reason_trap(Process *c_p, Eterm pid) -{ - ERTS_BIF_PREP_TRAP3_NO_RET(await_proc_exit_trap, c_p, - pid, am_reason, am_undefined); -} - -void -erts_bif_prep_await_proc_exit_apply_trap(Process *c_p, - Eterm pid, - Eterm module, - Eterm function, - Eterm args[], - int nargs) -{ - Eterm term; - Eterm *hp; - int i; - ASSERT(is_atom(module) && is_atom(function)); - - hp = HAlloc(c_p, 4+2*nargs); - term = NIL; - for (i = nargs-1; i >= 0; i--) { - term = CONS(hp, args[i], term); - hp += 2; - } - term = TUPLE3(hp, module, function, term); - ERTS_BIF_PREP_TRAP3_NO_RET(await_proc_exit_trap, c_p, pid, am_apply, term); -} - Export bif_return_trap_export; void erts_init_trap_export(Export* ep, Eterm m, Eterm f, Uint a, @@ -4742,7 +4705,6 @@ void erts_init_bif(void) erts_format_cpu_topology_trap = erts_export_put(am_erlang, am_format_cpu_topology, 1); - await_proc_exit_trap = erts_export_put(am_erlang,am_await_proc_exit,3); await_port_send_result_trap = erts_export_put(am_erts_internal, am_await_port_send_result, 3); system_flag_scheduler_wall_time_trap diff --git a/erts/emulator/beam/bif.h b/erts/emulator/beam/bif.h index dca53686f4..a33421d762 100644 --- a/erts/emulator/beam/bif.h +++ b/erts/emulator/beam/bif.h @@ -437,67 +437,6 @@ do { \ ERTS_BIF_EXITED((PROC)); \ } while (0) -/* - * The ERTS_BIF_*_AWAIT_X_*_TRAP makros either exits the caller, or - * sets up a trap to erlang:await_proc_exit/3. - * - * The caller is acquired to hold the 'main' lock on C_P. No other locks - * are allowed to be held. - */ - -#define ERTS_BIF_PREP_AWAIT_X_DATA_TRAP(RET, C_P, PID, DATA) \ -do { \ - erts_bif_prep_await_proc_exit_data_trap((C_P), (PID), (DATA)); \ - (RET) = THE_NON_VALUE; \ -} while (0) - -#define ERTS_BIF_PREP_AWAIT_X_REASON_TRAP(RET, C_P, PID) \ -do { \ - erts_bif_prep_await_proc_exit_reason_trap((C_P), (PID)); \ - (RET) = THE_NON_VALUE; \ -} while (0) - -#define ERTS_BIF_PREP_AWAIT_X_APPLY_TRAP(RET, C_P, PID, M, F, A, AN) \ -do { \ - erts_bif_prep_await_proc_exit_apply_trap((C_P), (PID), \ - (M), (F), (A), (AN)); \ - (RET) = THE_NON_VALUE; \ -} while (0) - -#define ERTS_BIF_AWAIT_X_DATA_TRAP(C_P, PID, DATA) \ -do { \ - erts_bif_prep_await_proc_exit_data_trap((C_P), (PID), (DATA)); \ - return THE_NON_VALUE; \ -} while (0) - -#define ERTS_BIF_AWAIT_X_REASON_TRAP(C_P, PID) \ -do { \ - erts_bif_prep_await_proc_exit_reason_trap((C_P), (PID)); \ - return THE_NON_VALUE; \ -} while (0) - -#define ERTS_BIF_AWAIT_X_APPLY_TRAP(C_P, PID, M, F, A, AN) \ -do { \ - erts_bif_prep_await_proc_exit_apply_trap((C_P), (PID), \ - (M), (F), (A), (AN)); \ - return THE_NON_VALUE; \ -} while (0) - -void -erts_bif_prep_await_proc_exit_data_trap(Process *c_p, - Eterm pid, - Eterm data); -void -erts_bif_prep_await_proc_exit_reason_trap(Process *c_p, - Eterm pid); -void -erts_bif_prep_await_proc_exit_apply_trap(Process *c_p, - Eterm pid, - Eterm module, - Eterm function, - Eterm args[], - int nargs); - int erts_call_dirty_bif(ErtsSchedulerData *esdp, Process *c_p, BeamInstr *I, Eterm *reg); diff --git a/erts/emulator/beam/bif.tab b/erts/emulator/beam/bif.tab index 93613ac2eb..687fd39d58 100644 --- a/erts/emulator/beam/bif.tab +++ b/erts/emulator/beam/bif.tab @@ -266,6 +266,7 @@ bif erlang:demonitor/1 bif erlang:demonitor/2 bif erlang:is_process_alive/1 +bif erts_internal:is_process_alive/2 bif erlang:error/1 error_1 bif erlang:error/2 error_2 diff --git a/erts/emulator/beam/erl_bif_info.c b/erts/emulator/beam/erl_bif_info.c index 9a3c058e82..e94544a678 100644 --- a/erts/emulator/beam/erl_bif_info.c +++ b/erts/emulator/beam/erl_bif_info.c @@ -73,6 +73,9 @@ static Export *gather_msacc_res_trap; static Export *gather_gc_info_res_trap; static Export *gather_system_check_res_trap; +static Export *is_process_alive_trap; + + #define DECL_AM(S) Eterm AM_ ## S = am_atom_put(#S, sizeof(#S) - 1) static char otp_version[] = ERLANG_OTP_VERSION; @@ -3502,34 +3505,53 @@ fun_info_mfa_1(BIF_ALIST_1) BIF_ERROR(p, BADARG); } +BIF_RETTYPE erts_internal_is_process_alive_2(BIF_ALIST_2) +{ + if (!is_internal_pid(BIF_ARG_1) || !is_internal_ordinary_ref(BIF_ARG_2)) + BIF_ERROR(BIF_P, BADARG); + erts_proc_sig_send_is_alive_request(BIF_P, BIF_ARG_1, BIF_ARG_2); + BIF_RET(am_ok); +} + BIF_RETTYPE is_process_alive_1(BIF_ALIST_1) { - if(is_internal_pid(BIF_ARG_1)) { - Process *rp; - - if (BIF_ARG_1 == BIF_P->common.id) - BIF_RET(am_true); - - rp = erts_proc_lookup_raw(BIF_ARG_1); - if (!rp) { - BIF_RET(am_false); - } - else { - if (erts_atomic32_read_acqb(&rp->state) & ERTS_PSFLG_EXITING) - ERTS_BIF_AWAIT_X_DATA_TRAP(BIF_P, BIF_ARG_1, am_false); - else - BIF_RET(am_true); - } - } - else if(is_external_pid(BIF_ARG_1)) { - if(external_pid_dist_entry(BIF_ARG_1) == erts_this_dist_entry) + if (is_internal_pid(BIF_ARG_1)) { + erts_aint32_t state; + Process *rp; + + if (BIF_ARG_1 == BIF_P->common.id) + BIF_RET(am_true); + + rp = erts_proc_lookup_raw(BIF_ARG_1); + if (!rp) + BIF_RET(am_false); + + state = erts_atomic32_read_acqb(&rp->state); + if (state & (ERTS_PSFLG_EXITING + | ERTS_PSFLG_SIG_Q + | ERTS_PSFLG_SIG_IN_Q)) { + /* + * If in exiting state, trap out and send 'is alive' + * request and wait for it to complete termination. + * + * If process has signals enqueued, we need to + * send it an 'is alive' request via its signal + * queue in order to ensure that signal order is + * preserved (we may earlier have sent it an + * exit signal that has not been processed yet). + */ + BIF_TRAP1(is_process_alive_trap, BIF_P, BIF_ARG_1); + } + + BIF_RET(am_true); + } + + if (is_external_pid(BIF_ARG_1)) { + if (external_pid_dist_entry(BIF_ARG_1) == erts_this_dist_entry) BIF_RET(am_false); /* A pid from an old incarnation of this node */ - else - BIF_ERROR(BIF_P, BADARG); - } - else { - BIF_ERROR(BIF_P, BADARG); } + + BIF_ERROR(BIF_P, BADARG); } BIF_RETTYPE process_display_2(BIF_ALIST_2) @@ -5008,6 +5030,10 @@ erts_bif_info_init(void) = erts_export_put(am_erts_internal, am_gather_microstate_accounting_result, 2); gather_system_check_res_trap = erts_export_put(am_erts_internal, am_gather_system_check_result, 1); + + is_process_alive_trap = erts_export_put(am_erts_internal, am_is_process_alive, 1); + + process_info_init(); os_info_init(); } diff --git a/erts/emulator/beam/erl_proc_sig_queue.c b/erts/emulator/beam/erl_proc_sig_queue.c index 1b5cbb1919..489df08991 100644 --- a/erts/emulator/beam/erl_proc_sig_queue.c +++ b/erts/emulator/beam/erl_proc_sig_queue.c @@ -48,7 +48,7 @@ * Note that not all signal are handled using this functionality! */ -#define ERTS_SIG_Q_OP_MAX 9 +#define ERTS_SIG_Q_OP_MAX 10 #define ERTS_SIG_Q_OP_EXIT 0 #define ERTS_SIG_Q_OP_EXIT_LINKED 1 @@ -59,7 +59,8 @@ #define ERTS_SIG_Q_OP_UNLINK 6 #define ERTS_SIG_Q_OP_GROUP_LEADER 7 #define ERTS_SIG_Q_OP_TRACE_CHANGE_STATE 8 -#define ERTS_SIG_Q_OP_PERSISTENT_MON_MSG ERTS_SIG_Q_OP_MAX +#define ERTS_SIG_Q_OP_PERSISTENT_MON_MSG 9 +#define ERTS_SIG_Q_OP_IS_ALIVE ERTS_SIG_Q_OP_MAX #define ERTS_SIG_Q_TYPE_MAX (ERTS_MON_LNK_TYPE_MAX + 5) @@ -184,6 +185,11 @@ typedef struct { Eterm heap[1]; } ErtsSigGroupLeader; +typedef struct { + Eterm message; + Eterm requester; +} ErtsIsAliveRequest; + static int handle_msg_tracing(Process *c_p, ErtsSigRecvTracing *tracing, ErtsMessage ***next_nm_sig); @@ -548,6 +554,43 @@ proc_queue_signal(Process *c_p, Eterm pid, ErtsSignal *sig, int op) return res; } +static int +maybe_elevate_sig_handling_prio(Process *c_p, Eterm other) +{ + /* + * returns: + * > 0 -> elevated prio; process alive or exiting + * < 0 -> no elevation needed; process alive or exiting + * 0 -> process terminated (free) + */ + int res; + Process *rp; + erts_aint32_t state, my_prio, other_prio; + + rp = erts_proc_lookup_raw(other); + if (!rp) + res = 0; + else { + res = -1; + state = erts_atomic32_read_nob(&c_p->state); + my_prio = ERTS_PSFLGS_GET_USR_PRIO(state); + + state = erts_atomic32_read_nob(&rp->state); + other_prio = ERTS_PSFLGS_GET_USR_PRIO(state); + + if (other_prio > my_prio) { + /* Others prio is lower than mine; elevate it... */ + res = !!erts_sig_prio(other, my_prio); + if (res) { + /* ensure handled if dirty executing... */ + state = erts_atomic32_read_nob(&rp->state); + ensure_dirty_proc_handled(other, state, my_prio); + } + } + } + return res; +} + void erts_proc_sig_fetch(Process *proc) { @@ -1215,33 +1258,10 @@ erts_proc_sig_send_group_leader(Process *c_p, Eterm to, Eterm gl, Eterm ref) if (!res) destroy_sig_group_leader(sgl); else if (c_p) { - int prio_res = !0; erts_aint_t flags, rm_flags = ERTS_SIG_GL_FLG_SENDER; - Process *rp; - erts_aint32_t state, my_prio, other_prio; - - state = erts_atomic32_read_nob(&c_p->state); - my_prio = ERTS_PSFLGS_GET_USR_PRIO(state); - - rp = erts_proc_lookup_raw(to); - if (!rp) - prio_res = 0; - else { - state = erts_atomic32_read_nob(&rp->state); - other_prio = ERTS_PSFLGS_GET_USR_PRIO(state); - - if (other_prio > my_prio) { - /* Others prio is lower than mine; elevate it... */ - prio_res = erts_sig_prio(to, my_prio); - if (prio_res) { - state = erts_atomic32_read_nob(&rp->state); - ensure_dirty_proc_handled(to, state, my_prio); - } - } - } + int prio_res = maybe_elevate_sig_handling_prio(c_p, to); if (!prio_res) rm_flags |= ERTS_SIG_GL_FLG_ACTIVE; - flags = erts_atomic_read_band_nob(&sgl->flags, ~rm_flags); if (!prio_res && (flags & ERTS_SIG_GL_FLG_ACTIVE)) res = 0; /* We deactivated signal... */ @@ -1253,6 +1273,99 @@ erts_proc_sig_send_group_leader(Process *c_p, Eterm to, Eterm gl, Eterm ref) group_leader_reply(c_p, c_p->common.id, ref, 0); } +void +erts_proc_sig_send_is_alive_request(Process *c_p, Eterm to, Eterm ref) +{ + ErlHeapFragment *hfrag; + Uint hsz; + Eterm *hp, *start_hp, ref_cpy, msg; + ErlOffHeap *ohp; + ErtsMessage *mp; + ErtsIsAliveRequest *alive_req; + + ASSERT(is_internal_ordinary_ref(ref)); + + hsz = ERTS_REF_THING_SIZE + 3 + sizeof(ErtsIsAliveRequest)/sizeof(Eterm); + + mp = erts_alloc_message(hsz, &hp); + hfrag = &mp->hfrag; + mp->next = NULL; + ohp = &hfrag->off_heap; + start_hp = hp; + + ref_cpy = STORE_NC(&hp, ohp, ref); + msg = TUPLE2(hp, ref_cpy, am_false); /* default res 'false' */ + hp += 3; + + hfrag->used_size = hp - start_hp; + + alive_req = (ErtsIsAliveRequest *) (char *) hp; + alive_req->message = msg; + alive_req->requester = c_p->common.id; + + ERL_MESSAGE_TERM(mp) = ERTS_PROC_SIG_MAKE_TAG(ERTS_SIG_Q_OP_IS_ALIVE, + ERTS_SIG_Q_TYPE_UNDEFINED, + 0); + ERL_MESSAGE_TOKEN(mp) = NIL; + ERL_MESSAGE_FROM(mp) = am_system; +#ifdef USE_VM_PROBES + ERL_MESSAGE_DT_UTAG(mp) = NIL; +#endif + + if (proc_queue_signal(c_p, to, (ErtsSignal *) mp, ERTS_SIG_Q_OP_IS_ALIVE)) + (void) maybe_elevate_sig_handling_prio(c_p, to); + else { + /* It wasn't alive; reply to ourselves... */ + mp->next = NULL; + mp->data.attached = ERTS_MSG_COMBINED_HFRAG; + erts_queue_message(c_p, ERTS_PROC_LOCK_MAIN, + mp, msg, am_system); + } +} + +static void +is_alive_response(Process *c_p, ErtsMessage *mp, int is_alive) +{ + /* + * Sender prepared the message for us. Just patch + * the result if necessary. The default prepared + * result is 'false'. + */ + Process *rp; + ErtsIsAliveRequest *alive_req; + + alive_req = (ErtsIsAliveRequest *) (char *) (&mp->hfrag.mem[0] + + mp->hfrag.used_size); + + + ASSERT(ERTS_SIG_IS_NON_MSG(mp)); + ASSERT(ERTS_PROC_SIG_OP(((ErtsSignal *) mp)->common.tag) + == ERTS_SIG_Q_OP_IS_ALIVE); + ASSERT(mp->hfrag.alloc_size > mp->hfrag.used_size); + ASSERT((mp->hfrag.alloc_size - mp->hfrag.used_size)*sizeof(UWord) + >= sizeof(ErtsIsAliveRequest)); + ASSERT(is_internal_pid(alive_req->requester)); + ASSERT(alive_req->requester != c_p->common.id); + ASSERT(is_tuple_arity(alive_req->message, 2)); + ASSERT(is_internal_ordinary_ref(tuple_val(alive_req->message)[1])); + ASSERT(tuple_val(alive_req->message)[2] == am_false); + + ERL_MESSAGE_TERM(mp) = alive_req->message; + mp->data.attached = ERTS_MSG_COMBINED_HFRAG; + mp->next = NULL; + + rp = erts_proc_lookup(alive_req->requester); + if (!rp) + erts_cleanup_messages(mp); + else { + if (is_alive) { /* patch result... */ + Eterm *tp = tuple_val(alive_req->message); + tp[2] = am_true; + } + erts_queue_message(rp, 0, mp, alive_req->message, am_system); + } +} + static ERTS_INLINE void adjust_tracing_state(Process *c_p, ErtsSigRecvTracing *tracing, int setup) { @@ -2355,6 +2468,13 @@ erts_proc_sig_handle_incoming(Process *c_p, erts_aint32_t *statep, break; } + case ERTS_SIG_Q_OP_IS_ALIVE: + ERTS_PROC_SIG_HDBG_PRIV_CHKQ(c_p, &tracing, next_nm_sig); + remove_nm_sig(c_p, sig, next_nm_sig); + is_alive_response(c_p, sig, !0); + ERTS_PROC_SIG_HDBG_PRIV_CHKQ(c_p, &tracing, next_nm_sig); + break; + case ERTS_SIG_Q_OP_TRACE_CHANGE_STATE: { Uint16 type = ERTS_PROC_SIG_TYPE(tag); @@ -2667,6 +2787,12 @@ erts_proc_sig_handle_exit(Process *c_p, int *redsp) break; } + case ERTS_SIG_Q_OP_IS_ALIVE: + ERTS_PROC_SIG_HDBG_PRIV_CHKQ(c_p, &tracing, next_nm_sig); + is_alive_response(c_p, sig, 0); + ERTS_PROC_SIG_HDBG_PRIV_CHKQ(c_p, &tracing, next_nm_sig); + break; + case ERTS_SIG_Q_OP_TRACE_CHANGE_STATE: destroy_trace_info((ErtsSigTraceInfo *) sig); break; @@ -2803,6 +2929,7 @@ erts_proc_sig_signal_size(ErtsSignal *sig) break; case ERTS_SIG_Q_OP_PERSISTENT_MON_MSG: + case ERTS_SIG_Q_OP_IS_ALIVE: size = ((ErtsMessage *) sig)->hfrag.alloc_size; size *= sizeof(Eterm); size += sizeof(ErtsMessage) - sizeof(Eterm); @@ -3514,6 +3641,7 @@ erts_proc_sig_debug_foreach_sig(Process *c_p, break; } + case ERTS_SIG_Q_OP_IS_ALIVE: case ERTS_SIG_Q_OP_TRACE_CHANGE_STATE: break; diff --git a/erts/emulator/beam/erl_proc_sig_queue.h b/erts/emulator/beam/erl_proc_sig_queue.h index 433e30ce4a..56fe3e683e 100644 --- a/erts/emulator/beam/erl_proc_sig_queue.h +++ b/erts/emulator/beam/erl_proc_sig_queue.h @@ -31,6 +31,7 @@ * - Link * - Unlink * - Group leader + * - Is process alive * - Trace change * * The signal queue consists of three parts: @@ -426,6 +427,30 @@ void erts_proc_sig_send_group_leader(Process *c_p, Eterm to, Eterm gl, Eterm ref); +/** + * + * @brief Send an 'is process alive' signal to a process. + * + * A response message '{Ref, Result}' is sent to the + * sender when performed where Ref is the reference passed + * as 'ref' argument, and Result is either 'true' or 'false'. + * + * @param[in] c_p Pointer to process struct of + * currently executing process. + * NULL if signal arrived via + * distribution. + * + * @param[in] to Identifier of receiver. + * + * @param[in] ref Reference to use in response + * message to the sending + * process (i.e., c_p). + * + */ +void +erts_proc_sig_send_is_alive_request(Process *c_p, Eterm to, + Eterm ref); + /* * End of send operations of currently supported process signals. */ diff --git a/erts/emulator/test/bif_SUITE.erl b/erts/emulator/test/bif_SUITE.erl index d16c6a320d..22706ae8b1 100644 --- a/erts/emulator/test/bif_SUITE.erl +++ b/erts/emulator/test/bif_SUITE.erl @@ -34,7 +34,8 @@ erl_crash_dump_bytes/1, is_builtin/1, error_stacktrace/1, error_stacktrace_during_call_trace/1, - group_leader_prio/1, group_leader_prio_dirty/1]). + group_leader_prio/1, group_leader_prio_dirty/1, + is_process_alive/1]). suite() -> [{ct_hooks,[ts_install_cth]}, @@ -48,7 +49,8 @@ all() -> atom_to_binary, binary_to_atom, binary_to_existing_atom, erl_crash_dump_bytes, min_max, erlang_halt, is_builtin, error_stacktrace, error_stacktrace_during_call_trace, - group_leader_prio, group_leader_prio_dirty]. + group_leader_prio, group_leader_prio_dirty, + is_process_alive]. %% Uses erlang:display to test that erts_printf does not do deep recursion display(Config) when is_list(Config) -> @@ -1076,6 +1078,27 @@ group_leader_prio_test(Dirty) -> TLs), ok. +is_process_alive(Config) when is_list(Config) -> + process_flag(priority, max), + Ps = lists:map(fun (_) -> + spawn_opt(fun () -> tok_loop() end, + [{priority, high}, link]) + end, + lists:seq(1, 2*erlang:system_info(schedulers))), + receive after 1000 -> ok end, %% Wait for load to spread + lists:foreach(fun (P) -> + %% Ensure that signal order is preserved + %% and that we are not starved due to + %% priority inversion + true = erlang:is_process_alive(P), + unlink(P), + true = erlang:is_process_alive(P), + exit(P, kill), + false = erlang:is_process_alive(P) + end, + Ps), + ok. + %% helpers id(I) -> I. -- cgit v1.2.3