diff options
Diffstat (limited to 'erts/emulator/beam')
-rw-r--r-- | erts/emulator/beam/atom.names | 3 | ||||
-rw-r--r-- | erts/emulator/beam/beam_bif_load.c | 32 | ||||
-rw-r--r-- | erts/emulator/beam/beam_emu.c | 3 | ||||
-rw-r--r-- | erts/emulator/beam/bif.c | 181 | ||||
-rw-r--r-- | erts/emulator/beam/bif.h | 37 | ||||
-rw-r--r-- | erts/emulator/beam/bif.tab | 9 | ||||
-rw-r--r-- | erts/emulator/beam/dist.c | 406 | ||||
-rw-r--r-- | erts/emulator/beam/erl_alloc.types | 3 | ||||
-rw-r--r-- | erts/emulator/beam/erl_bif_info.c | 202 | ||||
-rw-r--r-- | erts/emulator/beam/erl_bif_trace.c | 190 | ||||
-rw-r--r-- | erts/emulator/beam/erl_gc.c | 15 | ||||
-rw-r--r-- | erts/emulator/beam/erl_monitor_link.c | 97 | ||||
-rw-r--r-- | erts/emulator/beam/erl_monitor_link.h | 57 | ||||
-rw-r--r-- | erts/emulator/beam/erl_proc_sig_queue.c | 469 | ||||
-rw-r--r-- | erts/emulator/beam/erl_proc_sig_queue.h | 116 | ||||
-rw-r--r-- | erts/emulator/beam/erl_process.c | 897 | ||||
-rw-r--r-- | erts/emulator/beam/erl_process.h | 57 | ||||
-rw-r--r-- | erts/emulator/beam/erl_trace.c | 32 | ||||
-rw-r--r-- | erts/emulator/beam/erl_trace.h | 2 |
19 files changed, 1584 insertions, 1224 deletions
diff --git a/erts/emulator/beam/atom.names b/erts/emulator/beam/atom.names index fba0611042..45b7540aeb 100644 --- a/erts/emulator/beam/atom.names +++ b/erts/emulator/beam/atom.names @@ -142,6 +142,7 @@ atom bsr atom bsr_anycrlf atom bsr_unicode atom build_type +atom busy atom busy_dist_port atom busy_port atom call @@ -252,6 +253,7 @@ atom exception_from atom exception_trace atom exclusive atom exit_status +atom exited atom existing atom existing_processes atom existing_ports @@ -445,6 +447,7 @@ atom no_float atom no_integer atom no_network atom no_start_optimize +atom not_suspended atom not atom not_a_list atom not_loaded diff --git a/erts/emulator/beam/beam_bif_load.c b/erts/emulator/beam/beam_bif_load.c index d9312f4df8..a0dbd9ec7b 100644 --- a/erts/emulator/beam/beam_bif_load.c +++ b/erts/emulator/beam/beam_bif_load.c @@ -603,8 +603,9 @@ badarg: BIF_RETTYPE erts_internal_check_dirty_process_code_2(BIF_ALIST_2) { + erts_aint32_t state; Process *rp; - int reds = 0; + int dirty, busy, reds = 0; Eterm res; if (BIF_P != erts_dirty_process_signal_handler @@ -618,20 +619,29 @@ BIF_RETTYPE erts_internal_check_dirty_process_code_2(BIF_ALIST_2) if (is_not_atom(BIF_ARG_2)) BIF_ERROR(BIF_P, BADARG); - rp = erts_pid2proc_not_running(BIF_P, ERTS_PROC_LOCK_MAIN, - BIF_ARG_1, ERTS_PROC_LOCK_MAIN); - if (rp == ERTS_PROC_LOCK_BUSY) - ERTS_BIF_YIELD2(bif_export[BIF_erts_internal_check_dirty_process_code_2], - BIF_P, BIF_ARG_1, BIF_ARG_2); + if (BIF_ARG_1 == BIF_P->common.id) + BIF_RET(am_normal); + + rp = erts_proc_lookup_raw(BIF_ARG_1); if (!rp) - BIF_RET(am_false); - + BIF_RET(am_false); + + state = erts_atomic32_read_nob(&rp->state); + dirty = (state & (ERTS_PSFLG_DIRTY_RUNNING + | ERTS_PSFLG_DIRTY_RUNNING_SYS)); + if (!dirty) + BIF_RET(am_normal); + + busy = erts_proc_trylock(rp, ERTS_PROC_LOCK_MAIN) == EBUSY; + + if (busy) + BIF_RET(am_busy); + res = erts_check_process_code(rp, BIF_ARG_2, &reds, BIF_P->fcalls); - if (BIF_P != rp) - erts_proc_unlock(rp, ERTS_PROC_LOCK_MAIN); + erts_proc_unlock(rp, ERTS_PROC_LOCK_MAIN); - ASSERT(is_value(res)); + ASSERT(res == am_true || res == am_false); BIF_RET2(res, reds); } diff --git a/erts/emulator/beam/beam_emu.c b/erts/emulator/beam/beam_emu.c index ee287243a4..ab5920a67e 100644 --- a/erts/emulator/beam/beam_emu.c +++ b/erts/emulator/beam/beam_emu.c @@ -1166,6 +1166,9 @@ void erts_dirty_process_main(ErtsSchedulerData *esdp) reds_used = treds > INT_MAX ? INT_MAX : (int) treds; } + if (c_p && ERTS_PROC_GET_PENDING_SUSPEND(c_p)) + erts_proc_sig_handle_pending_suspend(c_p); + PROCESS_MAIN_CHK_LOCKS(c_p); ERTS_UNREQ_PROC_MAIN_LOCK(c_p); ERTS_VERIFY_UNUSED_TEMP_ALLOC(c_p); diff --git a/erts/emulator/beam/bif.c b/erts/emulator/beam/bif.c index 79244b8544..97e1ee1286 100644 --- a/erts/emulator/beam/bif.c +++ b/erts/emulator/beam/bif.c @@ -1364,13 +1364,14 @@ BIF_RETTYPE exit_signal_2(BIF_ALIST_2) /* Handle flags common to both process_flag_2 and process_flag_3. */ -static BIF_RETTYPE process_flag_aux(Process *BIF_P, - Process *rp, - Eterm flag, - Eterm val) +static Eterm process_flag_aux(Process *c_p, int *redsp, Eterm flag, Eterm val) { Eterm old_value = NIL; /* shut up warning about use before set */ Sint i; + + if (redsp) + *redsp = 1; + if (flag == am_save_calls) { struct saved_calls *scb; if (!is_small(val)) @@ -1390,30 +1391,89 @@ static BIF_RETTYPE process_flag_aux(Process *BIF_P, } #ifdef HIPE - if (rp->flags & F_HIPE_MODE) { - ASSERT(!ERTS_PROC_GET_SAVED_CALLS_BUF(rp)); - scb = ERTS_PROC_SET_SUSPENDED_SAVED_CALLS_BUF(rp, scb); + if (c_p->flags & F_HIPE_MODE) { + ASSERT(!ERTS_PROC_GET_SAVED_CALLS_BUF(c_p)); + scb = ERTS_PROC_SET_SUSPENDED_SAVED_CALLS_BUF(c_p, scb); } else #endif { #ifdef HIPE - ASSERT(!ERTS_PROC_GET_SUSPENDED_SAVED_CALLS_BUF(rp)); + ASSERT(!ERTS_PROC_GET_SUSPENDED_SAVED_CALLS_BUF(c_p)); #endif - scb = ERTS_PROC_SET_SAVED_CALLS_BUF(rp, scb); - if (rp == BIF_P && ((scb && i == 0) || (!scb && i != 0))) { - /* Adjust fcalls to match save calls setting... */ - if (i == 0) - BIF_P->fcalls += CONTEXT_REDS; /* disabled it */ - else - BIF_P->fcalls -= CONTEXT_REDS; /* enabled it */ - - /* - * Make sure we reschedule immediately so the - * change take effect at once. - */ - ERTS_VBUMP_ALL_REDS(BIF_P); - } + scb = ERTS_PROC_SET_SAVED_CALLS_BUF(c_p, scb); + + if (((scb && i == 0) || (!scb && i != 0))) { + + /* + * Make sure we reschedule immediately so the + * change take effect at once. + */ + if (!redsp) { + /* Executed via BIF call.. */ + via_bif: + + /* Adjust fcalls to match save calls setting... */ + if (i == 0) + c_p->fcalls += CONTEXT_REDS; /* disabled it */ + else + c_p->fcalls -= CONTEXT_REDS; /* enabled it */ + + ERTS_VBUMP_ALL_REDS(c_p); + } + else { + erts_aint32_t state; + /* + * Executed via signal handler. Try to figure + * out in what context we are executing... + */ + + state = erts_atomic32_read_nob(&c_p->state); + if (state & (ERTS_PSFLG_RUNNING_SYS + | ERTS_PSFLG_DIRTY_RUNNING_SYS + | ERTS_PSFLG_DIRTY_RUNNING)) { + /* + * We are either processing signals before + * being executed or executing dirty. That + * is, no need to adjust anything... + */ + *redsp = 1; + } + else { + ErtsSchedulerData *esdp; + ASSERT(state & ERTS_PSFLG_RUNNING); + + /* + * F_DELAY_GC is currently only set when + * we handle signals in state running via + * receive helper... + */ + + if (!(c_p->flags & F_DELAY_GC)) { + *redsp = 1; + goto via_bif; + } + + /* + * Executing via receive helper... + * + * We utilize the virtual reds counter + * in order to get correct calculation + * of reductions consumed when scheduling + * out the process... + */ + + esdp = erts_get_scheduler_data(); + + if (i == 0) + esdp->virtual_reds += CONTEXT_REDS; /* disabled it */ + else + esdp->virtual_reds -= CONTEXT_REDS; /* enabled it */ + + *redsp = -1; + } + } + } } if (!scb) @@ -1423,11 +1483,12 @@ static BIF_RETTYPE process_flag_aux(Process *BIF_P, erts_free(ERTS_ALC_T_CALLS_BUF, (void *) scb); } - BIF_RET(old_value); + ASSERT(is_immed(old_value)); + return old_value; } error: - BIF_ERROR(BIF_P, BADARG); + return am_badarg; } BIF_RETTYPE process_flag_2(BIF_ALIST_2) @@ -1596,29 +1657,73 @@ BIF_RETTYPE process_flag_2(BIF_ALIST_2) /* Fall through and try process_flag_aux() ... */ } - BIF_RET(process_flag_aux(BIF_P, BIF_P, BIF_ARG_1, BIF_ARG_2)); + old_value = process_flag_aux(BIF_P, NULL, BIF_ARG_1, BIF_ARG_2); + if (old_value != am_badarg) + BIF_RET(old_value); error: BIF_ERROR(BIF_P, BADARG); } -BIF_RETTYPE process_flag_3(BIF_ALIST_3) +typedef struct { + Eterm flag; + Eterm value; + ErlOffHeap oh; + Eterm heap[1]; +} ErtsProcessFlag3Args; + +static Eterm +exec_process_flag_3(Process *c_p, void *arg, int *redsp, ErlHeapFragment **bpp) { - Process *rp; - Eterm res; + ErtsProcessFlag3Args *pf3a = arg; + Eterm res; + + if (ERTS_PROC_IS_EXITING(c_p)) + res = am_badarg; + else + res = process_flag_aux(c_p, redsp, pf3a->flag, pf3a->value); + erts_cleanup_offheap(&pf3a->oh); + erts_free(ERTS_ALC_T_PF3_ARGS, arg); + return res; +} + + +BIF_RETTYPE erts_internal_process_flag_3(BIF_ALIST_3) +{ + Eterm res, *hp; + ErlOffHeap *ohp; + ErtsProcessFlag3Args *pf3a; + Uint flag_sz, value_sz; + + if (BIF_P->common.id == BIF_ARG_1) { + res = process_flag_aux(BIF_P, NULL, BIF_ARG_2, BIF_ARG_3); + BIF_RET(res); + } + + if (is_not_internal_pid(BIF_ARG_1)) + BIF_RET(am_badarg); + + flag_sz = is_immed(BIF_ARG_2) ? 0 : size_object(BIF_ARG_2); + value_sz = is_immed(BIF_ARG_3) ? 0 : size_object(BIF_ARG_3); + + pf3a = erts_alloc(ERTS_ALC_T_PF3_ARGS, + sizeof(ErtsProcessFlag3Args) + + sizeof(Eterm)*(flag_sz+value_sz-1)); + + ohp = &pf3a->oh; + ERTS_INIT_OFF_HEAP(&pf3a->oh); - rp = erts_pid2proc_not_running(BIF_P, ERTS_PROC_LOCK_MAIN, - BIF_ARG_1, ERTS_PROC_LOCK_MAIN); - if (rp == ERTS_PROC_LOCK_BUSY) - ERTS_BIF_YIELD3(bif_export[BIF_process_flag_3], BIF_P, - BIF_ARG_1, BIF_ARG_2, BIF_ARG_3); + hp = &pf3a->heap[0]; - if (!rp) - BIF_ERROR(BIF_P, BADARG); + pf3a->flag = copy_struct(BIF_ARG_2, flag_sz, &hp, ohp); + pf3a->value = copy_struct(BIF_ARG_3, value_sz, &hp, ohp); - res = process_flag_aux(BIF_P, rp, BIF_ARG_2, BIF_ARG_3); + res = erts_proc_sig_send_rpc_request(BIF_P, BIF_ARG_1, + !0, + exec_process_flag_3, + (void *) pf3a); - if (rp != BIF_P) - erts_proc_unlock(rp, ERTS_PROC_LOCK_MAIN); + if (is_non_value(res)) + BIF_RET(am_badarg); return res; } diff --git a/erts/emulator/beam/bif.h b/erts/emulator/beam/bif.h index a47339253e..cf9f61c0b8 100644 --- a/erts/emulator/beam/bif.h +++ b/erts/emulator/beam/bif.h @@ -295,6 +295,19 @@ do { \ (Ret) = THE_NON_VALUE; \ } while (0) +#define ERTS_BIF_PREP_TRAP4(Ret, Trap, Proc, A0, A1, A2, A3) \ +do { \ + Eterm* reg = erts_proc_sched_data((Proc))->x_reg_array; \ + (Proc)->arity = 4; \ + reg[0] = (Eterm) (A0); \ + reg[1] = (Eterm) (A1); \ + reg[2] = (Eterm) (A2); \ + reg[3] = (Eterm) (A3); \ + (Proc)->i = (BeamInstr*) ((Trap)->addressv[erts_active_code_ix()]); \ + (Proc)->freason = TRAP; \ + (Ret) = THE_NON_VALUE; \ +} while (0) + #define ERTS_BIF_PREP_TRAP3_NO_RET(Trap, Proc, A0, A1, A2)\ do { \ Eterm* reg = erts_proc_sched_data((Proc))->x_reg_array; \ @@ -343,6 +356,18 @@ do { \ return THE_NON_VALUE; \ } while(0) +#define BIF_TRAP4(Trap_, p, A0, A1, A2, A3) do { \ + Eterm* reg = erts_proc_sched_data((p))->x_reg_array; \ + (p)->arity = 4; \ + reg[0] = (A0); \ + reg[1] = (A1); \ + reg[2] = (A2); \ + reg[3] = (A3); \ + (p)->i = (BeamInstr*) ((Trap_)->addressv[erts_active_code_ix()]); \ + (p)->freason = TRAP; \ + return THE_NON_VALUE; \ + } while(0) + #define BIF_TRAP_CODE_PTR_0(p, Code_) do { \ (p)->arity = 0; \ (p)->i = (BeamInstr*) (Code_); \ @@ -401,6 +426,12 @@ do { \ ERTS_BIF_PREP_TRAP3(RET, (TRP), (P), (A0), (A1), (A2)); \ } while (0) +#define ERTS_BIF_PREP_YIELD4(RET, TRP, P, A0, A1, A2, A3) \ +do { \ + ERTS_VBUMP_ALL_REDS((P)); \ + ERTS_BIF_PREP_TRAP4(RET, (TRP), (P), (A0), (A1), (A2), (A3)); \ +} while (0) + #define ERTS_BIF_YIELD0(TRP, P) \ do { \ ERTS_VBUMP_ALL_REDS((P)); \ @@ -425,6 +456,12 @@ do { \ BIF_TRAP3((TRP), (P), (A0), (A1), (A2)); \ } while (0) +#define ERTS_BIF_YIELD4(TRP, P, A0, A1, A2, A3) \ +do { \ + ERTS_VBUMP_ALL_REDS((P)); \ + BIF_TRAP4((TRP), (P), (A0), (A1), (A2), (A3)); \ +} while (0) + #define ERTS_BIF_PREP_EXITED(RET, PROC) \ do { \ KILL_CATCHES((PROC)); \ diff --git a/erts/emulator/beam/bif.tab b/erts/emulator/beam/bif.tab index 33738dc20b..21dda9ff03 100644 --- a/erts/emulator/beam/bif.tab +++ b/erts/emulator/beam/bif.tab @@ -125,7 +125,7 @@ bif erlang:pid_to_list/1 bif erlang:ports/0 bif erlang:pre_loaded/0 bif erlang:process_flag/2 -bif erlang:process_flag/3 +bif erts_internal:process_flag/3 bif erlang:process_info/1 bif erlang:process_info/2 bif erlang:processes/0 @@ -154,7 +154,6 @@ bif erlang:unregister/1 bif erlang:whereis/1 bif erlang:spawn_opt/1 bif erlang:setnode/2 -bif erlang:setnode/3 bif erlang:dist_get_stat/1 bif erlang:dist_ctrl_input_handler/2 bif erlang:dist_ctrl_put_data/2 @@ -191,6 +190,8 @@ bif erts_internal:scheduler_wall_time/1 bif erts_internal:dirty_process_handle_signals/1 +bif erts_internal:create_dist_channel/4 + # inet_db support bif erlang:port_set_data/2 bif erlang:port_get_data/1 @@ -204,9 +205,9 @@ bif erlang:seq_trace/2 bif erlang:seq_trace_info/1 bif erlang:seq_trace_print/1 bif erlang:seq_trace_print/2 -bif erlang:suspend_process/2 +bif erts_internal:suspend_process/2 bif erlang:resume_process/1 -bif erlang:process_display/2 +bif erts_internal:process_display/2 bif erlang:bump_reductions/1 diff --git a/erts/emulator/beam/dist.c b/erts/emulator/beam/dist.c index 026f0a62d4..70474898b2 100644 --- a/erts/emulator/beam/dist.c +++ b/erts/emulator/beam/dist.c @@ -3138,60 +3138,60 @@ BIF_RETTYPE setnode_2(BIF_ALIST_2) BIF_ERROR(BIF_P, BADARG); } -/********************************************************************** - ** Allocate a dist entry, set node name install the connection handler - ** setnode_3({name@host, Creation}, Cid, {Type, Version, Initial, IC, OC}) - ** Type = flag field, where the flags are specified in dist.h - ** Version = distribution version, >= 1 - ** IC = in_cookie (ignored) - ** OC = out_cookie (ignored) - ** - ** Note that in distribution protocols above 1, the Initial parameter - ** is always NIL and the cookies are always the atom '', cookies are not - ** sent in the distribution messages but are only used in - ** the handshake. - ** - ***********************************************************************/ +/* + * erts_internal:create_dist_channel/4 is used by + * erlang:setnode/3. + */ + +typedef struct { + DistEntry *dep; + Uint flags; + Uint version; +} ErtsSetupConnDistCtrl; + +static void +setup_connection_epiloge_rwunlock(Process *c_p, DistEntry *dep, + Eterm ctrlr, Uint flags, + Uint version); -BIF_RETTYPE setnode_3(BIF_ALIST_3) +static Eterm +setup_connection_distctrl(Process *c_p, void *arg, + int *redsp, ErlHeapFragment **bpp); + +BIF_RETTYPE erts_internal_create_dist_channel_4(BIF_ALIST_4) { BIF_RETTYPE ret; Uint flags; - unsigned long version; - Eterm ic, oc; - Eterm *tp; + Uint version; + Eterm *hp, res_tag = THE_NON_VALUE, res = THE_NON_VALUE; DistEntry *dep = NULL; - ErtsProcLocks proc_unlock = 0; - Process *proc; + int de_locked = 0; Port *pp = NULL; - Eterm notify_proc; - erts_aint32_t qflgs; /* * Check and pick out arguments */ - if (!is_node_name_atom(BIF_ARG_1) || - !(is_internal_port(BIF_ARG_2) - || is_internal_pid(BIF_ARG_2)) - || (erts_this_node->sysname == am_Noname)) { - goto badarg; - } + /* Node name... */ + if (!is_node_name_atom(BIF_ARG_1)) + goto badarg; - if (!is_tuple(BIF_ARG_3)) - goto badarg; - tp = tuple_val(BIF_ARG_3); - if (*tp++ != make_arityval(4)) - goto badarg; - if (!is_small(*tp)) - goto badarg; - flags = unsigned_val(*tp++); - if (!is_small(*tp) || (version = unsigned_val(*tp)) == 0) - goto badarg; - ic = *(++tp); - oc = *(++tp); - if (!is_atom(ic) || !is_atom(oc)) - goto badarg; + /* Distribution controller... */ + if (!is_internal_port(BIF_ARG_2) && !is_internal_pid(BIF_ARG_2)) + goto badarg; + + /* Dist flags... */ + if (!is_small(BIF_ARG_3)) + goto badarg; + flags = unsigned_val(BIF_ARG_3); + + /* Version... */ + if (!is_small(BIF_ARG_4)) + goto badarg; + version = unsigned_val(BIF_ARG_4); + + if (version == 0) + goto badarg; if (~flags & DFLAG_DIST_MANDATORY) { erts_dsprintf_buf_t *dsbufp = erts_create_logger_dsbuf(); @@ -3222,74 +3222,79 @@ BIF_RETTYPE setnode_3(BIF_ALIST_3) else if (!dep) goto system_limit; /* Should never happen!!! */ + erts_de_rlock(dep); + de_locked = -1; + + if (dep->state == ERTS_DE_STATE_EXITING) { + /* Suspend on dist entry waiting for the exit to finish */ + ErtsProcList *plp = erts_proclist_create(BIF_P); + plp->next = NULL; + erts_suspend(BIF_P, ERTS_PROC_LOCK_MAIN, NULL); + erts_mtx_lock(&dep->qlock); + erts_proclist_store_last(&dep->suspended, plp); + erts_mtx_unlock(&dep->qlock); + goto yield; + } + + erts_de_runlock(dep); + de_locked = 0; + if (is_internal_pid(BIF_ARG_2)) { if (BIF_P->common.id == BIF_ARG_2) { - proc_unlock = 0; - proc = BIF_P; - } - else { - proc_unlock = ERTS_PROC_LOCK_MAIN; - proc = erts_pid2proc_not_running(BIF_P, ERTS_PROC_LOCK_MAIN, - BIF_ARG_2, proc_unlock); - } - erts_de_rwlock(dep); - - if (!proc) - goto badarg; - else if (proc == ERTS_PROC_LOCK_BUSY) { - proc_unlock = 0; - goto yield; - } + ErtsSetupConnDistCtrl scdc; - erts_proc_lock(proc, ERTS_PROC_LOCK_STATUS); - proc_unlock |= ERTS_PROC_LOCK_STATUS; + scdc.dep = dep; + scdc.flags = flags; + scdc.version = version; - if (ERTS_PROC_GET_DIST_ENTRY(proc)) { - if (dep == ERTS_PROC_GET_DIST_ENTRY(proc) - && (proc->flags & F_DISTRIBUTION) - && dep->cid == BIF_ARG_2) { - ERTS_BIF_PREP_RET(ret, erts_make_dhandle(BIF_P, dep)); - goto done; - } - goto badarg; - } + res = setup_connection_distctrl(BIF_P, &scdc, NULL, NULL); + BUMP_REDS(BIF_P, 5); + dep = NULL; - if (dep->state == ERTS_DE_STATE_EXITING) { - /* Suspend on dist entry waiting for the exit to finish */ - ErtsProcList *plp = erts_proclist_create(BIF_P); - plp->next = NULL; - erts_suspend(BIF_P, ERTS_PROC_LOCK_MAIN, NULL); - erts_mtx_lock(&dep->qlock); - erts_proclist_store_last(&dep->suspended, plp); - erts_mtx_unlock(&dep->qlock); - goto yield; - } - if (dep->state != ERTS_DE_STATE_PENDING) { - if (dep->state == ERTS_DE_STATE_IDLE) - erts_set_dist_entry_pending(dep); - else + if (res == am_badarg) goto badarg; + + ASSERT(is_internal_magic_ref(res)); + res_tag = am_ok; /* Connection up */ } + else { + ErtsSetupConnDistCtrl *scdcp; - if (is_not_nil(dep->cid)) - goto badarg; + scdcp = erts_alloc(ERTS_ALC_T_SETUP_CONN_ARG, + sizeof(ErtsSetupConnDistCtrl)); - proc->flags |= F_DISTRIBUTION; - ERTS_PROC_SET_DIST_ENTRY(proc, dep); + scdcp->dep = dep; + scdcp->flags = flags; + scdcp->version = version; - proc_unlock &= ~ERTS_PROC_LOCK_STATUS; - erts_proc_unlock(proc, ERTS_PROC_LOCK_STATUS); + res = erts_proc_sig_send_rpc_request(BIF_P, + BIF_ARG_2, + !0, + setup_connection_distctrl, + (void *) scdcp); + if (is_non_value(res)) + goto badarg; - dep->send = NULL; /* Only for distr ports... */ + dep = NULL; + ASSERT(is_internal_ordinary_ref(res)); + + res_tag = am_message; /* Caller need to wait for dhandle in message */ + } + hp = HAlloc(BIF_P, 3); } else { + int new; pp = erts_id2port_sflgs(BIF_ARG_2, BIF_P, ERTS_PROC_LOCK_MAIN, ERTS_PORT_SFLGS_INVALID_LOOKUP); erts_de_rwlock(dep); + de_locked = 1; + + if (dep->state == ERTS_DE_STATE_EXITING) + goto badarg; if (!pp || (erts_atomic32_read_nob(&pp->state) & ERTS_PORT_SFLG_EXITING)) @@ -3298,65 +3303,108 @@ BIF_RETTYPE setnode_3(BIF_ALIST_3) if ((pp->drv_ptr->flags & ERL_DRV_FLAG_SOFT_BUSY) == 0) goto badarg; - if (dep->cid == BIF_ARG_2 && pp->dist_entry == dep) { - ERTS_BIF_PREP_RET(ret, erts_make_dhandle(BIF_P, dep)); - goto done; /* Already set */ - } + if (dep->cid == BIF_ARG_2 && pp->dist_entry == dep) + new = 0; + else { + if (dep->state != ERTS_DE_STATE_PENDING) { + if (dep->state == ERTS_DE_STATE_IDLE) + erts_set_dist_entry_pending(dep); + else + goto badarg; + } - if (dep->state == ERTS_DE_STATE_EXITING) { - /* Suspend on dist entry waiting for the exit to finish */ - ErtsProcList *plp = erts_proclist_create(BIF_P); - plp->next = NULL; - erts_suspend(BIF_P, ERTS_PROC_LOCK_MAIN, NULL); - erts_mtx_lock(&dep->qlock); - erts_proclist_store_last(&dep->suspended, plp); - erts_mtx_unlock(&dep->qlock); - goto yield; - } - if (dep->state != ERTS_DE_STATE_PENDING) { - if (dep->state == ERTS_DE_STATE_IDLE) - erts_set_dist_entry_pending(dep); - else + if (pp->dist_entry || is_not_nil(dep->cid)) goto badarg; - } - if (pp->dist_entry || is_not_nil(dep->cid)) - goto badarg; + erts_atomic32_read_bor_nob(&pp->state, ERTS_PORT_SFLG_DISTRIBUTION); - erts_atomic32_read_bor_nob(&pp->state, ERTS_PORT_SFLG_DISTRIBUTION); + pp->dist_entry = dep; - pp->dist_entry = dep; + ASSERT(pp->drv_ptr->outputv || pp->drv_ptr->output); - ASSERT(pp->drv_ptr->outputv || pp->drv_ptr->output); + dep->send = (pp->drv_ptr->outputv + ? dist_port_commandv + : dist_port_command); + ASSERT(dep->send); - dep->send = (pp->drv_ptr->outputv - ? dist_port_commandv - : dist_port_command); - ASSERT(dep->send); + /* + * Dist-ports do not use the "busy port message queue" functionality, but + * instead use "busy dist entry" functionality. + */ + { + ErlDrvSizeT disable = ERL_DRV_BUSY_MSGQ_DISABLED; + erl_drv_busy_msgq_limits(ERTS_Port2ErlDrvPort(pp), &disable, NULL); + } - /* - * Dist-ports do not use the "busy port message queue" functionality, but - * instead use "busy dist entry" functionality. - */ - { - ErlDrvSizeT disable = ERL_DRV_BUSY_MSGQ_DISABLED; - erl_drv_busy_msgq_limits(ERTS_Port2ErlDrvPort(pp), &disable, NULL); + setup_connection_epiloge_rwunlock(BIF_P, dep, BIF_ARG_2, flags, version); + de_locked = 0; + new = !0; } + hp = HAlloc(BIF_P, 3 + ERTS_MAGIC_REF_THING_SIZE); + res = erts_build_dhandle(&hp, &BIF_P->off_heap, dep); + res_tag = am_ok; /* Connection up */ + if (new) + dep = NULL; /* inc of refc transferred to port (dist_entry field) */ + } + + ASSERT(is_value(res) && is_value(res_tag)); + + res = TUPLE2(hp, res_tag, res); + + ERTS_BIF_PREP_RET(ret, res); + + done: + + if (dep && dep != erts_this_dist_entry) { + if (de_locked) { + if (de_locked > 0) + erts_de_rwunlock(dep); + else + erts_de_runlock(dep); + } + erts_deref_dist_entry(dep); } + if (pp) + erts_port_release(pp); + + return ret; + + yield: + ERTS_BIF_PREP_YIELD4(ret, + bif_export[BIF_erts_internal_create_dist_channel_4], + BIF_P, BIF_ARG_1, BIF_ARG_2, BIF_ARG_3, BIF_ARG_4); + goto done; + + badarg: + ERTS_BIF_PREP_RET(ret, am_badarg); + goto done; + + system_limit: + ERTS_BIF_PREP_RET(ret, am_system_limit); + goto done; +} + +static void +setup_connection_epiloge_rwunlock(Process *c_p, DistEntry *dep, + Eterm ctrlr, Uint flags, + Uint version) +{ + Eterm notify_proc = NIL; + erts_aint32_t qflgs; + dep->version = version; dep->creation = 0; -#ifdef DEBUG + ASSERT(is_internal_port(ctrlr) || is_internal_pid(ctrlr)); ASSERT(erts_atomic_read_nob(&dep->qsize) == 0 || (dep->state == ERTS_DE_STATE_PENDING)); -#endif if (flags & DFLAG_DIST_HDR_ATOM_CACHE) create_cache(dep); - erts_set_dist_entry_connected(dep, BIF_ARG_2, flags); + erts_set_dist_entry_connected(dep, ctrlr, flags); notify_proc = NIL; if (erts_atomic_read_nob(&dep->qsize)) { @@ -3375,50 +3423,100 @@ BIF_RETTYPE setnode_3(BIF_ALIST_3) } } } - erts_de_rwunlock(dep); - if (is_internal_pid(notify_proc)) - notify_dist_data(BIF_P, notify_proc); - ERTS_BIF_PREP_RET(ret, erts_make_dhandle(BIF_P, dep)); + erts_de_rwunlock(dep); - dep = NULL; /* inc of refc transferred to port (dist_entry field) */ + if (is_internal_pid(notify_proc)) + notify_dist_data(c_p, notify_proc); inc_no_nodes(); - send_nodes_mon_msgs(BIF_P, + send_nodes_mon_msgs(c_p, am_nodeup, - BIF_ARG_1, + dep->sysname, flags & DFLAG_PUBLISHED ? am_visible : am_hidden, NIL); - done: +} - if (dep && dep != erts_this_dist_entry) { - erts_de_rwunlock(dep); - erts_deref_dist_entry(dep); +static Eterm +setup_connection_distctrl(Process *c_p, void *arg, int *redsp, ErlHeapFragment **bpp) +{ + ErtsSetupConnDistCtrl *scdcp = (ErtsSetupConnDistCtrl *) arg; + DistEntry *dep = scdcp->dep; + int dep_locked = 0; + Eterm *hp; + erts_aint32_t state; + + if (redsp) + *redsp = 1; + + state = erts_atomic32_read_nob(&c_p->state); + + if (state & ERTS_PSFLG_EXITING) + goto badarg; + + erts_de_rwlock(dep); + dep_locked = !0; + + if (dep->state == ERTS_DE_STATE_EXITING) + goto badarg; + + if (ERTS_PROC_GET_DIST_ENTRY(c_p)) { + if (dep == ERTS_PROC_GET_DIST_ENTRY(c_p) + && (c_p->flags & F_DISTRIBUTION) + && dep->cid == c_p->common.id) { + goto connected; + } + goto badarg; } - if (pp) - erts_port_release(pp); + if (dep->state != ERTS_DE_STATE_PENDING) { + if (dep->state == ERTS_DE_STATE_IDLE) + erts_set_dist_entry_pending(dep); + else + goto badarg; + } - if (proc_unlock) - erts_proc_unlock(proc, proc_unlock); + if (is_not_nil(dep->cid)) + goto badarg; - return ret; + c_p->flags |= F_DISTRIBUTION; + ERTS_PROC_SET_DIST_ENTRY(c_p, dep); - yield: - ERTS_BIF_PREP_YIELD3(ret, bif_export[BIF_setnode_3], BIF_P, - BIF_ARG_1, BIF_ARG_2, BIF_ARG_3); - goto done; + dep->send = NULL; /* Only for distr ports... */ - badarg: - ERTS_BIF_PREP_ERROR(ret, BIF_P, BADARG); - goto done; + if (redsp) + *redsp = 5; - system_limit: - ERTS_BIF_PREP_ERROR(ret, BIF_P, SYSTEM_LIMIT); - goto done; + setup_connection_epiloge_rwunlock(c_p, dep, c_p->common.id, + scdcp->flags, scdcp->version); +connected: + + /* we take over previous inc in refc of dep */ + + if (!bpp) /* called directly... */ + return erts_make_dhandle(c_p, dep); + + erts_free(ERTS_ALC_T_SETUP_CONN_ARG, arg); + + *bpp = new_message_buffer(ERTS_MAGIC_REF_THING_SIZE); + hp = (*bpp)->mem; + return erts_build_dhandle(&hp, &(*bpp)->off_heap, dep); + +badarg: + + if (bpp) /* not called directly */ + erts_free(ERTS_ALC_T_SETUP_CONN_ARG, arg); + + if (dep_locked) + erts_de_rwunlock(dep); + + erts_deref_dist_entry(dep); + + return am_badarg; } + BIF_RETTYPE erts_internal_get_dflags_0(BIF_ALIST_0) { return erts_dflags_record; diff --git a/erts/emulator/beam/erl_alloc.types b/erts/emulator/beam/erl_alloc.types index 4a6a19b210..9db600dce0 100644 --- a/erts/emulator/beam/erl_alloc.types +++ b/erts/emulator/beam/erl_alloc.types @@ -287,6 +287,8 @@ type DIST_DEMONITOR SHORT_LIVED PROCESSES dist_demonitor type CML_CLEANUP SHORT_LIVED SYSTEM connection_ml_cleanup type ML_YIELD_STATE SHORT_LIVED SYSTEM monitor_link_yield_state type ML_DIST STANDARD SYSTEM monitor_link_dist +type PF3_ARGS SHORT_LIVED PROCESSES process_flag_3_arguments +type SETUP_CONN_ARG SHORT_LIVED PROCESSES setup_connection_argument type ENVIRONMENT SYSTEM SYSTEM environment @@ -346,6 +348,7 @@ type NIF_TRAP_EXPORT STANDARD PROCESSES nif_trap_export_entry type NIF_EXP_TRACE FIXED_SIZE PROCESSES nif_export_trace type EXPORT LONG_LIVED CODE export_entry type MONITOR FIXED_SIZE PROCESSES monitor +type MONITOR_SUSPEND STANDARD PROCESSES monitor_suspend type LINK FIXED_SIZE PROCESSES link type AINFO_REQ SHORT_LIVED SYSTEM alloc_info_request type SCHED_WTIME_REQ SHORT_LIVED SYSTEM sched_wall_time_request diff --git a/erts/emulator/beam/erl_bif_info.c b/erts/emulator/beam/erl_bif_info.c index 6f9e507228..55450e397f 100644 --- a/erts/emulator/beam/erl_bif_info.c +++ b/erts/emulator/beam/erl_bif_info.c @@ -617,17 +617,13 @@ static void collect_one_target_monitor(ErtsMonitor *mon, void *vmicp) } typedef struct { - Process *c_p; - ErtsProcLocks c_p_locks; ErtsMonitorSuspend **smi; Uint smi_i; Uint smi_max; - int sz; + Uint sz; } ErtsSuspendMonitorInfoCollection; -#define ERTS_INIT_SUSPEND_MONITOR_INFOS(SMIC, CP, CPL) do { \ - (SMIC).c_p = (CP); \ - (SMIC).c_p_locks = (CPL); \ +#define ERTS_INIT_SUSPEND_MONITOR_INFOS(SMIC) do { \ (SMIC).smi = NULL; \ (SMIC).smi_i = (SMIC).smi_max = 0; \ (SMIC).sz = 0; \ @@ -660,34 +656,26 @@ do { \ static void collect_one_suspend_monitor(ErtsMonitor *mon, void *vsmicp) { - ErtsMonitorSuspend *smon = erts_monitor_suspend(mon); - ErtsSuspendMonitorInfoCollection *smicp = vsmicp; - Process *suspendee = erts_pid2proc(smicp->c_p, - smicp->c_p_locks, - mon->other.item, - 0); - if (suspendee) { /* suspendee is alive */ - Sint a, p; - if (smon->active) { - smon->active += smon->pending; - smon->pending = 0; - } + if (mon->type == ERTS_MON_TYPE_SUSPEND) { + Sint count; + erts_aint_t mstate; + ErtsMonitorSuspend *msp; + ErtsSuspendMonitorInfoCollection *smicp; - ASSERT((smon->active && !smon->pending) - || (smon->pending && !smon->active)); + msp = (ErtsMonitorSuspend *) erts_monitor_to_data(mon); + smicp = vsmicp; ERTS_EXTEND_SUSPEND_MONITOR_INFOS(smicp); - smicp->smi[smicp->smi_i] = smon; + smicp->smi[smicp->smi_i] = msp; smicp->sz += 2 /* cons */ + 4 /* 3-tuple */; - a = (Sint) smon->active; /* quiet compiler warnings */ - p = (Sint) smon->pending; /* on 64-bit machines */ + mstate = erts_atomic_read_nob(&msp->state); - if (!IS_SSMALL(a)) - smicp->sz += BIG_UINT_HEAP_SIZE; - if (!IS_SSMALL(p)) + count = (Sint) (mstate & ERTS_MSUSPEND_STATE_COUNTER_MASK); + if (!IS_SSMALL(count)) smicp->sz += BIG_UINT_HEAP_SIZE; + smicp->smi_i++; } } @@ -1075,8 +1063,10 @@ process_info_bif(Process *c_p, Eterm pid, Eterm opt, int always_wrap, int pi2) if (c_p->common.id == pid) { int local_only = c_p->flags & F_LOCAL_SIGS_ONLY; - int sreds = ERTS_BIF_REDS_LEFT(c_p); - int sres; + int sres, sreds, reds_left; + + reds_left = ERTS_BIF_REDS_LEFT(c_p); + sreds = reds_left; if (!local_only) { erts_proc_lock(c_p, ERTS_PROC_LOCK_MSGQ); @@ -1085,15 +1075,19 @@ process_info_bif(Process *c_p, Eterm pid, Eterm opt, int always_wrap, int pi2) } sres = erts_proc_sig_handle_incoming(c_p, &state, &sreds, sreds, !0); + + BUMP_REDS(c_p, (int) sreds); + reds_left -= sreds; + if (state & ERTS_PSFLG_EXITING) { c_p->flags &= ~F_LOCAL_SIGS_ONLY; goto exited; } - if (!sres) { + if (!sres | (reds_left <= 0)) { /* - * More signals to handle; need to yield and continue. - * Prevent fetching of more signals by setting - * local-sigs-only flag. + * More signals to handle or out of reds; need + * to yield and continue. Prevent fetching of + * more signals by setting local-sigs-only flag. */ c_p->flags |= F_LOCAL_SIGS_ONLY; goto yield; @@ -1166,6 +1160,7 @@ process_info_bif(Process *c_p, Eterm pid, Eterm opt, int always_wrap, int pi2) else { if (flags & ERTS_PI_FLAG_FORCE_SIG_SEND) goto send_signal; + state = ERTS_PSFLG_RUNNING; /* fail state... */ rp = erts_try_lock_sig_free_proc(pid, locks, &state); if (!rp) goto undefined; @@ -1627,56 +1622,56 @@ process_info_aux(Process *c_p, case ERTS_PI_IX_SUSPENDING: { ErtsSuspendMonitorInfoCollection smic; int i; - Eterm item; - erts_proc_lock(rp, ERTS_PROC_LOCK_STATUS); + ERTS_INIT_SUSPEND_MONITOR_INFOS(smic); - ERTS_INIT_SUSPEND_MONITOR_INFOS(smic, - c_p, - (c_p == rp - ? ERTS_PROC_LOCK_MAIN - : 0) | ERTS_PROC_LOCK_STATUS); - - erts_monitor_tree_foreach(rp->suspend_monitors, - &collect_one_suspend_monitor, - &smic); + erts_monitor_tree_foreach(ERTS_P_MONITORS(rp), + collect_one_suspend_monitor, + (void *) &smic); reserve_size += smic.sz; res = NIL; for (i = 0; i < smic.smi_i; i++) { - Sint a = (Sint) smic.smi[i]->active; /* quiet compiler warnings */ - Sint p = (Sint) smic.smi[i]->pending; /* on 64-bit machines... */ - Eterm active; - Eterm pending; + ErtsMonitorSuspend *msp; + erts_aint_t mstate; + Sint ci; + Eterm ct, active, pending, item; Uint sz = 4 + 2; - if (!IS_SSMALL(a)) - sz += BIG_UINT_HEAP_SIZE; - if (!IS_SSMALL(p)) - sz += BIG_UINT_HEAP_SIZE; + + msp = smic.smi[i]; + mstate = erts_atomic_read_nob(&msp->state); + + ci = (Sint) (mstate & ERTS_MSUSPEND_STATE_COUNTER_MASK); + if (!IS_SSMALL(ci)) + sz += BIG_UINT_HEAP_SIZE; ERTS_PI_UNRESERVE(reserve_size, sz); hp = erts_produce_heap(hfact, sz, reserve_size); - if (IS_SSMALL(a)) - active = make_small(a); - else { - active = small_to_big(a, hp); - hp += BIG_UINT_HEAP_SIZE; - } - if (IS_SSMALL(p)) - pending = make_small(p); - else { - pending = small_to_big(p, hp); - hp += BIG_UINT_HEAP_SIZE; - } - item = TUPLE3(hp, smic.smi[i]->mon.other.item, active, pending); + if (IS_SSMALL(ci)) + ct = make_small(ci); + else { + ct = small_to_big(ci, hp); + hp += BIG_UINT_HEAP_SIZE; + } + + if (mstate & ERTS_MSUSPEND_STATE_FLG_ACTIVE) { + active = ct; + pending = make_small(0); + } + else { + active = make_small(0); + pending = ct; + } + + ASSERT(is_internal_pid(msp->md.origin.other.item)); + + item = TUPLE3(hp, msp->md.origin.other.item, active, pending); hp += 4; res = CONS(hp, item, res); } - erts_proc_unlock(rp, ERTS_PROC_LOCK_STATUS); - *reds += (Uint) smic.smi_i / 4; ERTS_DESTROY_SUSPEND_MONITOR_INFOS(smic); @@ -3637,26 +3632,46 @@ BIF_RETTYPE is_process_alive_1(BIF_ALIST_1) BIF_ERROR(BIF_P, BADARG); } -BIF_RETTYPE process_display_2(BIF_ALIST_2) +static Eterm +process_display(Process *c_p, void *arg, int *redsp, ErlHeapFragment **bpp) { - Process *rp; + if (redsp) + *redsp = 1; - if (BIF_ARG_2 != am_backtrace) - BIF_ERROR(BIF_P, BADARG); + if (ERTS_PROC_IS_EXITING(c_p)) + return am_badarg; - rp = erts_pid2proc_nropt(BIF_P, ERTS_PROC_LOCK_MAIN, - BIF_ARG_1, ERTS_PROC_LOCKS_ALL); - if(!rp) { - BIF_ERROR(BIF_P, BADARG); - } - if (rp == ERTS_PROC_LOCK_BUSY) - ERTS_BIF_YIELD2(bif_export[BIF_process_display_2], BIF_P, - BIF_ARG_1, BIF_ARG_2); - erts_stack_dump(ERTS_PRINT_STDERR, NULL, rp); - erts_proc_unlock(rp, (BIF_P == rp - ? ERTS_PROC_LOCKS_ALL_MINOR - : ERTS_PROC_LOCKS_ALL)); - BIF_RET(am_true); + erts_proc_lock(c_p, ERTS_PROC_LOCKS_ALL_MINOR); + erts_stack_dump(ERTS_PRINT_STDERR, NULL, c_p); + erts_proc_unlock(c_p, ERTS_PROC_LOCKS_ALL_MINOR); + + return am_true; +} + + +BIF_RETTYPE erts_internal_process_display_2(BIF_ALIST_2) +{ + Eterm res; + + if (BIF_ARG_2 != am_backtrace) + BIF_RET(am_badarg); + + if (BIF_P->common.id == BIF_ARG_1) { + res = process_display(BIF_P, NULL, NULL, NULL); + BIF_RET(res); + } + + if (is_not_internal_pid(BIF_ARG_1)) + BIF_RET(am_badarg); + + res = erts_proc_sig_send_rpc_request(BIF_P, BIF_ARG_1, + !0, + process_display, + NULL); + if (is_non_value(res)) + BIF_RET(am_badarg); + + BIF_RET(res); } /* this is a general call which return some possibly useful information */ @@ -4597,27 +4612,6 @@ BIF_RETTYPE erts_debug_set_internal_state_2(BIF_ALIST_2) BIF_RET(am_true); } } - else if (ERTS_IS_ATOM_STR("not_running_optimization", BIF_ARG_1)) { - int old_use_opt, use_opt; - switch (BIF_ARG_2) { - case am_true: - use_opt = 1; - break; - case am_false: - use_opt = 0; - break; - default: - BIF_ERROR(BIF_P, BADARG); - } - - erts_proc_unlock(BIF_P, ERTS_PROC_LOCK_MAIN); - erts_thr_progress_block(); - old_use_opt = !erts_disable_proc_not_running_opt; - erts_disable_proc_not_running_opt = !use_opt; - erts_thr_progress_unblock(); - erts_proc_lock(BIF_P, ERTS_PROC_LOCK_MAIN); - BIF_RET(old_use_opt ? am_true : am_false); - } else if (ERTS_IS_ATOM_STR("wait", BIF_ARG_1)) { if (ERTS_IS_ATOM_STR("deallocations", BIF_ARG_2)) { int flag = ERTS_DEBUG_WAIT_COMPLETED_DEALLOCATIONS; diff --git a/erts/emulator/beam/erl_bif_trace.c b/erts/emulator/beam/erl_bif_trace.c index 1953f79d79..f9d351e69e 100644 --- a/erts/emulator/beam/erl_bif_trace.c +++ b/erts/emulator/beam/erl_bif_trace.c @@ -809,10 +809,129 @@ Eterm trace_info_2(BIF_ALIST_2) BIF_ERROR(p, BADARG); } erts_release_code_write_permission(); + + if (is_internal_ref(res)) + BIF_TRAP1(erts_await_result, BIF_P, res); + BIF_RET(res); } static Eterm +build_trace_flags_term(Eterm **hpp, Uint *szp, Uint trace_flags) +{ + +#define ERTS_TFLAG__(F, FN) \ + if (trace_flags & F) { \ + if (szp) \ + sz += 2; \ + if (hp) { \ + res = CONS(hp, FN, res); \ + hp += 2; \ + } \ + } + + Eterm res; + Uint sz = 0; + Eterm *hp; + + if (hpp) { + hp = *hpp; + res = NIL; + } + else { + hp = NULL; + res = THE_NON_VALUE; + } + + ERTS_TFLAG__(F_NOW_TS, am_timestamp); + ERTS_TFLAG__(F_STRICT_MON_TS, am_strict_monotonic_timestamp); + ERTS_TFLAG__(F_MON_TS, am_monotonic_timestamp); + ERTS_TFLAG__(F_TRACE_SEND, am_send); + ERTS_TFLAG__(F_TRACE_RECEIVE, am_receive); + ERTS_TFLAG__(F_TRACE_SOS, am_set_on_spawn); + ERTS_TFLAG__(F_TRACE_CALLS, am_call); + ERTS_TFLAG__(F_TRACE_PROCS, am_procs); + ERTS_TFLAG__(F_TRACE_SOS1, am_set_on_first_spawn); + ERTS_TFLAG__(F_TRACE_SOL, am_set_on_link); + ERTS_TFLAG__(F_TRACE_SOL1, am_set_on_first_link); + ERTS_TFLAG__(F_TRACE_SCHED, am_running); + ERTS_TFLAG__(F_TRACE_SCHED_EXIT, am_exiting); + ERTS_TFLAG__(F_TRACE_GC, am_garbage_collection); + ERTS_TFLAG__(F_TRACE_ARITY_ONLY, am_arity); + ERTS_TFLAG__(F_TRACE_RETURN_TO, am_return_to); + ERTS_TFLAG__(F_TRACE_SILENT, am_silent); + ERTS_TFLAG__(F_TRACE_SCHED_NO, am_scheduler_id); + ERTS_TFLAG__(F_TRACE_PORTS, am_ports); + ERTS_TFLAG__(F_TRACE_SCHED_PORTS, am_running_ports); + ERTS_TFLAG__(F_TRACE_SCHED_PROCS, am_running_procs); + + if (szp) + *szp += sz; + + if (hpp) + *hpp = hp; + + return res; + +#undef ERTS_TFLAG__ +} + +static Eterm +trace_info_tracee(Process *c_p, void *arg, int *redsp, ErlHeapFragment **bpp) +{ + ErlHeapFragment *bp; + Eterm *hp, res, key; + Uint sz; + + *redsp = 1; + + if (ERTS_PROC_IS_EXITING(c_p)) + return am_undefined; + + key = (Eterm) arg; + sz = 3; + + if (!ERTS_TRACER_IS_NIL(ERTS_TRACER(c_p))) + erts_is_tracer_proc_enabled(c_p, ERTS_PROC_LOCK_MAIN, + &c_p->common); + + switch (key) { + case am_tracer: + + erts_build_tracer_to_term(NULL, NULL, &sz, ERTS_TRACER(c_p)); + bp = new_message_buffer(sz); + hp = bp->mem; + res = erts_build_tracer_to_term(&hp, &bp->off_heap, + NULL, ERTS_TRACER(c_p)); + if (res == am_false) + res = NIL; + break; + + case am_flags: + + build_trace_flags_term(NULL, &sz, ERTS_TRACE_FLAGS(c_p)); + bp = new_message_buffer(sz); + hp = bp->mem; + res = build_trace_flags_term(&hp, NULL, ERTS_TRACE_FLAGS(c_p)); + break; + + default: + + ERTS_INTERNAL_ERROR("Key not supported"); + res = NIL; + bp = NULL; + hp = NULL; + break; + } + + *redsp += 2; + + res = TUPLE2(hp, key, res); + *bpp = bp; + return res; +} + +static Eterm trace_info_pid(Process* p, Eterm pid_spec, Eterm key) { Eterm tracer; @@ -846,24 +965,19 @@ trace_info_pid(Process* p, Eterm pid_spec, Eterm key) erts_port_release(tracee); } else if (is_internal_pid(pid_spec)) { - Process *tracee = erts_pid2proc_not_running(p, ERTS_PROC_LOCK_MAIN, - pid_spec, ERTS_PROC_LOCK_MAIN); - - if (tracee == ERTS_PROC_LOCK_BUSY) - ERTS_BIF_YIELD2(bif_export[BIF_trace_info_2], p, pid_spec, key); + Eterm ref; - if (!tracee) - return am_undefined; + if (key != am_flags && key != am_tracer) + goto error; - if (!ERTS_TRACER_IS_NIL(ERTS_TRACER(tracee))) - erts_is_tracer_proc_enabled(tracee, ERTS_PROC_LOCK_MAIN, - &tracee->common); + ref = erts_proc_sig_send_rpc_request(p, pid_spec, !0, + trace_info_tracee, + (void *) key); - tracer = erts_tracer_to_term(p, ERTS_TRACER(tracee)); - trace_flags = ERTS_TRACE_FLAGS(tracee); + if (is_non_value(ref)) + return am_undefined; - if (tracee != p) - erts_proc_unlock(tracee, ERTS_PROC_LOCK_MAIN); + return ref; } else if (is_external_pid(pid_spec) && external_pid_dist_entry(pid_spec) == erts_this_dist_entry) { return am_undefined; @@ -873,48 +987,16 @@ trace_info_pid(Process* p, Eterm pid_spec, Eterm key) } if (key == am_flags) { - int num_flags = 21; /* MAXIMUM number of flags. */ - Uint needed = 3+2*num_flags; - Eterm flag_list = NIL; - Eterm* limit; + Eterm flag_list; + Uint sz = 3; + Eterm *hp; -#define FLAG0(flag_mask,flag) \ - if (trace_flags & (flag_mask)) { flag_list = CONS(hp, flag, flag_list); hp += 2; } else {} + build_trace_flags_term(NULL, &sz, trace_flags); + + hp = HAlloc(p, sz); + + flag_list = build_trace_flags_term(&hp, NULL, trace_flags); -#if defined(DEBUG) - /* - * Check num_flags if this assertion fires. - */ -# define FLAG ASSERT(num_flags-- > 0); FLAG0 -#else -# define FLAG FLAG0 -#endif - hp = HAlloc(p, needed); - limit = hp+needed; - FLAG(F_NOW_TS, am_timestamp); - FLAG(F_STRICT_MON_TS, am_strict_monotonic_timestamp); - FLAG(F_MON_TS, am_monotonic_timestamp); - FLAG(F_TRACE_SEND, am_send); - FLAG(F_TRACE_RECEIVE, am_receive); - FLAG(F_TRACE_SOS, am_set_on_spawn); - FLAG(F_TRACE_CALLS, am_call); - FLAG(F_TRACE_PROCS, am_procs); - FLAG(F_TRACE_SOS1, am_set_on_first_spawn); - FLAG(F_TRACE_SOL, am_set_on_link); - FLAG(F_TRACE_SOL1, am_set_on_first_link); - FLAG(F_TRACE_SCHED, am_running); - FLAG(F_TRACE_SCHED_EXIT, am_exiting); - FLAG(F_TRACE_GC, am_garbage_collection); - FLAG(F_TRACE_ARITY_ONLY, am_arity); - FLAG(F_TRACE_RETURN_TO, am_return_to); - FLAG(F_TRACE_SILENT, am_silent); - FLAG(F_TRACE_SCHED_NO, am_scheduler_id); - FLAG(F_TRACE_PORTS, am_ports); - FLAG(F_TRACE_SCHED_PORTS, am_running_ports); - FLAG(F_TRACE_SCHED_PROCS, am_running_procs); -#undef FLAG0 -#undef FLAG - HRelease(p,limit,hp+3); return TUPLE2(hp, key, flag_list); } else if (key == am_tracer) { if (tracer == am_false) diff --git a/erts/emulator/beam/erl_gc.c b/erts/emulator/beam/erl_gc.c index 0692cea0ee..a65dbbf42b 100644 --- a/erts/emulator/beam/erl_gc.c +++ b/erts/emulator/beam/erl_gc.c @@ -413,21 +413,20 @@ erts_gc_after_bif_call_lhf(Process* p, ErlHeapFragment *live_hf_end, { int cost; - if (p->flags & F_HIBERNATE_SCHED) { + if (p->flags & (F_HIBERNATE_SCHED|F_HIPE_RECV_LOCKED)) { /* * We just hibernated. We do *not* want to mess * up the hibernation by an ordinary GC... + * + * OR + * + * We left a receive in HiPE with message + * queue lock locked, and we do not want to + * do a GC with message queue locked... */ return result; } -#ifdef HIPE - if (p->hipe_smp.have_receive_locks) { - /* Do not want to GC with message queue locked... */ - return result; - } -#endif - if (!p->mbuf) { /* Must have GC:d in BIF call... invalidate live_hf_end */ live_hf_end = ERTS_INVALID_HFRAG_PTR; diff --git a/erts/emulator/beam/erl_monitor_link.c b/erts/emulator/beam/erl_monitor_link.c index 70f36fb6b7..48d9bd4ca5 100644 --- a/erts/emulator/beam/erl_monitor_link.c +++ b/erts/emulator/beam/erl_monitor_link.c @@ -630,7 +630,9 @@ erts_monitor_tree_lookup_create(ErtsMonitor **root, int *created, Uint16 type, ErtsMonitor *res; ErtsMonitorCreateCtxt cctxt = {type, origin}; - ERTS_ML_ASSERT(type == ERTS_MON_TYPE_NODE || type == ERTS_MON_TYPE_NODES); + ERTS_ML_ASSERT(type == ERTS_MON_TYPE_NODE + || type == ERTS_MON_TYPE_NODES + || type == ERTS_MON_TYPE_SUSPEND); res = (ErtsMonitor *) ml_rbt_lookup_create((ErtsMonLnkNode **) root, target, create_monitor, @@ -760,11 +762,13 @@ erts_monitor_create(Uint16 type, Eterm ref, Eterm orgn, Eterm trgt, Eterm name) switch (type) { case ERTS_MON_TYPE_PROC: case ERTS_MON_TYPE_PORT: - case ERTS_MON_TYPE_TIME_OFFSET: if (is_nil(name)) { ErtsMonitorDataHeap *mdhp; ErtsORefThing *ortp; + case ERTS_MON_TYPE_TIME_OFFSET: + + ERTS_ML_ASSERT(is_nil(name)); ERTS_ML_ASSERT(is_immed(orgn) && is_immed(trgt)); ERTS_ML_ASSERT(is_internal_ordinary_ref(ref)); @@ -860,10 +864,38 @@ erts_monitor_create(Uint16 type, Eterm ref, Eterm orgn, Eterm trgt, Eterm name) mdep->dist = NULL; break; } - case ERTS_MON_TYPE_SUSPEND: - ERTS_INTERNAL_ERROR("Use erts_monitor_suspend_create() instead..."); - mdp = NULL; + case ERTS_MON_TYPE_SUSPEND: { + ErtsMonitorSuspend *msp; + + ERTS_ML_ASSERT(is_nil(name)); + ERTS_ML_ASSERT(is_nil(ref)); + ERTS_ML_ASSERT(is_internal_pid(orgn) && is_internal_pid(trgt)); + + msp = erts_alloc(ERTS_ALC_T_MONITOR_SUSPEND, + sizeof(ErtsMonitorSuspend)); + mdp = &msp->md; + ERTS_ML_ASSERT(((void *) mdp) == ((void *) msp)); + + mdp->ref = NIL; + + mdp->origin.other.item = trgt; + mdp->origin.offset = (Uint16) offsetof(ErtsMonitorData, origin); + mdp->origin.key_offset = (Uint16) offsetof(ErtsMonitor, other.item); + ERTS_ML_ASSERT(mdp->origin.key_offset >= mdp->origin.offset); + mdp->origin.flags = (Uint16) ERTS_ML_FLG_EXTENDED; + mdp->origin.type = type; + + mdp->target.other.item = orgn; + mdp->target.offset = (Uint16) offsetof(ErtsMonitorData, target); + mdp->target.key_offset = (Uint16) offsetof(ErtsMonitor, other.item); + mdp->target.flags = ERTS_ML_FLG_TARGET|ERTS_ML_FLG_EXTENDED; + mdp->target.type = type; + + msp->next = NULL; + erts_atomic_init_relb(&msp->state, 0); + break; + } default: ERTS_INTERNAL_ERROR("Invalid monitor type"); mdp = NULL; @@ -887,10 +919,11 @@ erts_monitor_destroy__(ErtsMonitorData *mdp) ERTS_ML_ASSERT(!(mdp->target.flags & ERTS_ML_FLG_IN_TABLE)); ERTS_ML_ASSERT((mdp->origin.flags & ERTS_ML_FLGS_SAME) == (mdp->target.flags & ERTS_ML_FLGS_SAME)); - ERTS_ML_ASSERT(mdp->origin.type != ERTS_MON_TYPE_SUSPEND); if (!(mdp->origin.flags & ERTS_ML_FLG_EXTENDED)) erts_free(ERTS_ALC_T_MONITOR, mdp); + else if (mdp->origin.type == ERTS_MON_TYPE_SUSPEND) + erts_free(ERTS_ALC_T_MONITOR_SUSPEND, mdp); else { ErtsMonitorDataExtended *mdep = (ErtsMonitorDataExtended *) mdp; ErlOffHeap oh; @@ -927,10 +960,10 @@ erts_monitor_size(ErtsMonitor *mon) Uint size, refc; ErtsMonitorData *mdp = erts_monitor_to_data(mon); - ERTS_ML_ASSERT(mon->type != ERTS_MON_TYPE_SUSPEND); - if (!(mon->flags & ERTS_ML_FLG_EXTENDED)) size = sizeof(ErtsMonitorDataHeap); + else if (mon->type == ERTS_MON_TYPE_SUSPEND) + size = sizeof(ErtsMonitorSuspend); else { ErtsMonitorDataExtended *mdep; Uint hsz = 0; @@ -957,54 +990,6 @@ erts_monitor_size(ErtsMonitor *mon) return size / refc; } - -/* suspend monitors... */ - -ErtsMonitorSuspend * -erts_monitor_suspend_create(Eterm pid) -{ - ErtsMonitorSuspend *msp; - - ERTS_ML_ASSERT(is_internal_pid(pid)); - - msp = erts_alloc(ERTS_ALC_T_SUSPEND_MON, - sizeof(ErtsMonitorSuspend)); - msp->mon.offset = (Uint16) offsetof(ErtsMonitorSuspend, mon); - msp->mon.key_offset = (Uint16) offsetof(ErtsMonitor, other.item); - msp->mon.other.item = pid; - msp->mon.flags = 0; - msp->mon.type = ERTS_MON_TYPE_SUSPEND; - msp->pending = 0; - msp->active = 0; - return msp; -} - -static ErtsMonLnkNode * -create_monitor_suspend(Eterm pid, void *unused) -{ - ErtsMonitorSuspend *msp = erts_monitor_suspend_create(pid); - return (ErtsMonLnkNode *) &msp->mon; -} - -ErtsMonitorSuspend * -erts_monitor_suspend_tree_lookup_create(ErtsMonitor **root, int *created, - Eterm pid) -{ - ErtsMonitor *mon; - mon = (ErtsMonitor *) ml_rbt_lookup_create((ErtsMonLnkNode **) root, - pid, create_monitor_suspend, - NULL, - created); - return erts_monitor_suspend(mon); -} - -void -erts_monitor_suspend_destroy(ErtsMonitorSuspend *msp) -{ - ERTS_ML_ASSERT(!(msp->mon.flags & ERTS_ML_FLG_IN_TABLE)); - erts_free(ERTS_ALC_T_SUSPEND_MON, msp); -} - /* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *\ * Link Operations * * * diff --git a/erts/emulator/beam/erl_monitor_link.h b/erts/emulator/beam/erl_monitor_link.h index 603aead8cc..9ff8aa509a 100644 --- a/erts/emulator/beam/erl_monitor_link.h +++ b/erts/emulator/beam/erl_monitor_link.h @@ -246,15 +246,28 @@ * * --- ERTS_MON_TYPE_SUSPEND ------------------------------------- * - * Suspend monitor. + * Suspend monitor. A local process (origin) suspends another + * local process (target). * - * Other Item: Suspendee process identifier - * Key: Suspendee process identifier - * - * Valid keys are only ordinary internal references. + * Origin: + * Other Item: Process identifier of suspendee + * (target) + * Key: Process identifier of suspendee + * (target) + * Target: + * Other Item: Process identifier of suspender + * (origin) + * Key: Process identifier of suspender + * (origin) + * Shared: + * Next: Pointer to another suspend monitor + * State: Number of suspends and a flag + * indicating if the suspend is + * active or not. * - * This type of monitor is a bit strange and the whole process - * suspend functionality should be improved... + * Origin part of the monitor is stored in the monitor tree of + * origin process and target part of the monitor is stored in + * monitor list for local targets on the target process. * * * @@ -638,11 +651,15 @@ struct ErtsMonitorDataExtended__ { Eterm heap[1]; /* heap start... */ }; -typedef struct { - ErtsMonitor mon; - int pending; - int active; -} ErtsMonitorSuspend; +typedef struct ErtsMonitorSuspend__ ErtsMonitorSuspend; + +struct ErtsMonitorSuspend__ { + ErtsMonitorData md; /* origin = suspender; target = suspendee */ + ErtsMonitorSuspend *next; + erts_atomic_t state; +}; +#define ERTS_MSUSPEND_STATE_FLG_ACTIVE ((erts_aint_t) (((Uint) 1) << (sizeof(Uint)*8 - 1))) +#define ERTS_MSUSPEND_STATE_COUNTER_MASK (~ERTS_MSUSPEND_STATE_FLG_ACTIVE) /* * --- Monitor tree operations --- @@ -1094,24 +1111,25 @@ int erts_monitor_list_foreach_delete_yielding(ErtsMonitor **list, * * @brief Create a monitor * - * Can create all types of monitors exept for suspend monitors + * Can create all types of monitors * * When the funcion is called it is assumed that: * - 'ref' is an internal ordinary reference if type is ERTS_MON_TYPE_PROC, * ERTS_MON_TYPE_PORT, ERTS_MON_TYPE_TIME_OFFSET, or ERTS_MON_TYPE_RESOURCE - * - 'ref' is NIL if type is ERTS_MON_TYPE_NODE or ERTS_MON_TYPE_NODES + * - 'ref' is NIL if type is ERTS_MON_TYPE_NODE, ERTS_MON_TYPE_NODES, or + * ERTS_MON_TYPE_SUSPEND * - 'ref' is and ordinary internal reference or an external reference if * type is ERTS_MON_TYPE_DIST_PROC * - 'name' is an atom or NIL if type is ERTS_MON_TYPE_PROC, * ERTS_MON_TYPE_PORT, or ERTS_MON_TYPE_DIST_PROC * - 'name is NIL if type is ERTS_MON_TYPE_TIME_OFFSET, ERTS_MON_TYPE_RESOURCE, - * ERTS_MON_TYPE_NODE, or ERTS_MON_TYPE_NODES + * ERTS_MON_TYPE_NODE, ERTS_MON_TYPE_NODES, or ERTS_MON_TYPE_SUSPEND * If the above is not true, bad things will happen. * * @param[in] type ERTS_MON_TYPE_PROC, ERTS_MON_TYPE_PORT, * ERTS_MON_TYPE_TIME_OFFSET, ERTS_MON_TYPE_DIST_PROC, * ERTS_MON_TYPE_RESOURCE, ERTS_MON_TYPE_NODE, - * or ERTS_MON_TYPE_NODES + * ERTS_MON_TYPE_NODES, or ERTS_MON_TYPE_SUSPEND * * @param[in] ref A reference or NIL depending on type * @@ -1119,6 +1137,10 @@ int erts_monitor_list_foreach_delete_yielding(ErtsMonitor **list, * * @param[in] target The key of the target * + * @param[in] name An atom (the name) or NIL depending on type + * + * @returns A pointer to monitor data structure + * */ ErtsMonitorData *erts_monitor_create(Uint16 type, Eterm ref, Eterm origin, Eterm target, Eterm name); @@ -1347,7 +1369,8 @@ erts_monitor_to_data(ErtsMonitor *mon) ERTS_ML_ASSERT(erts_monitor_origin_offset == (size_t) mdp->origin.offset); ERTS_ML_ASSERT(!!(mdp->target.flags & ERTS_ML_FLG_TARGET)); ERTS_ML_ASSERT(erts_monitor_target_offset == (size_t) mdp->target.offset); - if (mon->type == ERTS_MON_TYPE_NODE || mon->type == ERTS_MON_TYPE_NODES) { + if (mon->type == ERTS_MON_TYPE_NODE || mon->type == ERTS_MON_TYPE_NODES + || mon->type == ERTS_MON_TYPE_SUSPEND) { ERTS_ML_ASSERT(erts_monitor_node_key_offset == (size_t) mdp->origin.key_offset); ERTS_ML_ASSERT(erts_monitor_node_key_offset == (size_t) mdp->target.key_offset); } diff --git a/erts/emulator/beam/erl_proc_sig_queue.c b/erts/emulator/beam/erl_proc_sig_queue.c index 6d81044c39..e9b41ad298 100644 --- a/erts/emulator/beam/erl_proc_sig_queue.c +++ b/erts/emulator/beam/erl_proc_sig_queue.c @@ -39,6 +39,7 @@ #include "big.h" #include "erl_gc.h" #include "bif.h" +#include "erl_bif_unique.h" #include "erl_proc_sig_queue.h" #include "dtrace-wrapper.h" @@ -49,7 +50,7 @@ * Note that not all signal are handled using this functionality! */ -#define ERTS_SIG_Q_OP_MAX 11 +#define ERTS_SIG_Q_OP_MAX 13 #define ERTS_SIG_Q_OP_EXIT 0 #define ERTS_SIG_Q_OP_EXIT_LINKED 1 @@ -62,7 +63,9 @@ #define ERTS_SIG_Q_OP_TRACE_CHANGE_STATE 8 #define ERTS_SIG_Q_OP_PERSISTENT_MON_MSG 9 #define ERTS_SIG_Q_OP_IS_ALIVE 10 -#define ERTS_SIG_Q_OP_PROCESS_INFO ERTS_SIG_Q_OP_MAX +#define ERTS_SIG_Q_OP_PROCESS_INFO 11 +#define ERTS_SIG_Q_OP_SYNC_SUSPEND 12 +#define ERTS_SIG_Q_OP_RPC ERTS_SIG_Q_OP_MAX #define ERTS_SIG_Q_TYPE_MAX (ERTS_MON_LNK_TYPE_MAX + 5) @@ -154,6 +157,17 @@ typedef struct { } ErtsIsAliveRequest; typedef struct { + Eterm message; + Eterm requester; + int async; +} ErtsSyncSuspendRequest; + +typedef struct { + ErtsMonitorSuspend *mon; + ErtsMessage *sync; +} ErtsProcSigPendingSuspend; + +typedef struct { ErtsSignalCommon common; Sint refc; Sint delayed_len; @@ -176,6 +190,15 @@ typedef struct { #define ERTS_PROC_SIG_PI_MSGQ_LEN_IGNORE ((Sint) -1) #define ERTS_PROC_SIG_PI_MSGQ_LEN_SYNC ((Sint) -2) +typedef struct { + ErtsSignalCommon common; + Eterm requester; + Eterm (*func)(Process *, void *, int *, ErlHeapFragment **); + void *arg; + Eterm ref; + ErtsORefThing oref_thing; +} ErtsProcSigRPC; + static int handle_msg_tracing(Process *c_p, ErtsSigRecvTracing *tracing, ErtsMessage ***next_nm_sig); @@ -1308,6 +1331,8 @@ erts_proc_sig_send_monitor_down(ErtsMonitor *mon, Eterm reason) /* Pass signal using old monitor structure... */ ErtsSignal *sig; + send_using_monitor_struct: + mon->other.item = reason; /* Pass immed reason via other.item... */ sig = (ErtsSignal *) mon; sig->common.tag = ERTS_PROC_SIG_MAKE_TAG(ERTS_SIG_Q_OP_MONITOR_DOWN, @@ -1319,6 +1344,18 @@ erts_proc_sig_send_monitor_down(ErtsMonitor *mon, Eterm reason) ErtsMonitorData *mdp = erts_monitor_to_data(mon); Eterm from_tag, monitored, heap[3]; + if (mon->type == ERTS_MON_TYPE_SUSPEND) { + /* + * Set reason to 'undefined', since exit + * reason is not used for suspend monitors, + * and send using monitor structure. This + * since we don't want to trigger + * unnecessary memory allocation etc... + */ + reason = am_undefined; + goto send_using_monitor_struct; + } + if (!(mon->flags & ERTS_ML_FLG_NAME)) { from_tag = monitored = mdp->origin.other.item; if (is_external_pid(from_tag)) { @@ -1596,7 +1633,173 @@ erts_proc_sig_send_process_info_request(Process *c_p, else erts_free(ERTS_ALC_T_SIG_DATA, pis); return res; -} +} + +void +erts_proc_sig_send_sync_suspend(Process *c_p, Eterm to, Eterm tag, Eterm reply) +{ + ErlHeapFragment *hfrag; + Uint hsz, tag_sz; + Eterm *hp, *start_hp, tag_cpy, msg, default_reply; + ErlOffHeap *ohp; + ErtsMessage *mp; + ErtsSyncSuspendRequest *ssusp; + int async_suspend; + + tag_sz = size_object(tag); + + hsz = 3 + tag_sz + sizeof(ErtsSyncSuspendRequest)/sizeof(Eterm); + + mp = erts_alloc_message(hsz, &hp); + hfrag = &mp->hfrag; + mp->next = NULL; + ohp = &hfrag->off_heap; + start_hp = hp; + + tag_cpy = copy_struct(tag, tag_sz, &hp, ohp); + + async_suspend = is_non_value(reply); + default_reply = async_suspend ? am_suspended : reply; + + msg = TUPLE2(hp, tag_cpy, default_reply); + hp += 3; + + hfrag->used_size = hp - start_hp; + + ssusp = (ErtsSyncSuspendRequest *) (char *) hp; + ssusp->message = msg; + ssusp->requester = c_p->common.id; + ssusp->async = async_suspend; + + ERL_MESSAGE_TERM(mp) = ERTS_PROC_SIG_MAKE_TAG(ERTS_SIG_Q_OP_SYNC_SUSPEND, + ERTS_SIG_Q_TYPE_UNDEFINED, + 0); + ERL_MESSAGE_TOKEN(mp) = NIL; + ERL_MESSAGE_FROM(mp) = am_system; +#ifdef USE_VM_PROBES + ERL_MESSAGE_DT_UTAG(mp) = NIL; +#endif + + if (proc_queue_signal(c_p, to, (ErtsSignal *) mp, ERTS_SIG_Q_OP_SYNC_SUSPEND)) + (void) maybe_elevate_sig_handling_prio(c_p, to); + else { + Eterm *tp; + /* It wasn't alive; reply to ourselves... */ + mp->next = NULL; + mp->data.attached = ERTS_MSG_COMBINED_HFRAG; + tp = tuple_val(msg); + tp[2] = async_suspend ? am_badarg : am_exited; + erts_queue_message(c_p, ERTS_PROC_LOCK_MAIN, + mp, msg, am_system); + } +} + +Eterm +erts_proc_sig_send_rpc_request(Process *c_p, + Eterm to, + int reply, + Eterm (*func)(Process *, void *, int *, ErlHeapFragment **), + void *arg) +{ + Eterm res; + ErtsProcSigRPC *sig = erts_alloc(ERTS_ALC_T_SIG_DATA, + sizeof(ErtsProcSigRPC)); + sig->common.tag = ERTS_PROC_SIG_MAKE_TAG(ERTS_SIG_Q_OP_RPC, + ERTS_SIG_Q_TYPE_UNDEFINED, + 0); + sig->requester = reply ? c_p->common.id : NIL; + sig->func = func; + sig->arg = arg; + + if (!reply) { + res = am_ok; + sig->ref = am_ok; + } + else { + res = erts_make_ref(c_p); + + sys_memcpy((void *) &sig->oref_thing, + (void *) internal_ref_val(res), + sizeof(ErtsORefThing)); + + sig->ref = make_internal_ref(&sig->oref_thing); + + ERTS_RECV_MARK_SAVE(c_p); + ERTS_RECV_MARK_SET(c_p); + } + + if (proc_queue_signal(c_p, to, (ErtsSignal *) sig, ERTS_SIG_Q_OP_RPC)) + (void) maybe_elevate_sig_handling_prio(c_p, to); + else { + erts_free(ERTS_ALC_T_SIG_DATA, sig); + res = THE_NON_VALUE; + if (reply) + JOIN_MESSAGE(c_p); + } + + return res; +} + +static int +handle_rpc(Process *c_p, ErtsProcSigRPC *rpc, int cnt, int limit, int *yieldp) +{ + Process *rp; + ErlHeapFragment *bp = NULL; + Eterm res; + Uint hsz; + int reds, out_cnt; + + /* + * reds in: + * Reductions left. + * + * reds out: + * Absolute value of reds out equals consumed + * amount of reds. If a negative value, force + * a yield. + */ + + reds = (limit - cnt) / ERTS_SIG_REDS_CNT_FACTOR; + if (reds <= 0) + reds = 1; + + res = (*rpc->func)(c_p, rpc->arg, &reds, &bp); + + if (reds < 0) { + /* Force yield... */ + *yieldp = !0; + reds *= -1; + } + + out_cnt = reds*ERTS_SIG_REDS_CNT_FACTOR; + + hsz = 3 + sizeof(ErtsORefThing)/sizeof(Eterm); + + rp = erts_proc_lookup(rpc->requester); + if (!rp) { + if (bp) + free_message_buffer(bp); + } + else { + Eterm *hp, msg, ref; + ErtsMessage *mp = erts_alloc_message(hsz, &hp); + + sys_memcpy((void *) hp, (void *) &rpc->oref_thing, + sizeof(rpc->oref_thing)); + + ref = make_internal_ref(hp); + hp += sizeof(rpc->oref_thing)/sizeof(Eterm); + msg = TUPLE2(hp, ref, res); + + mp->hfrag.next = bp; + + erts_queue_proc_message(c_p, rp, 0, mp, msg); + } + + erts_free(ERTS_ALC_T_SIG_DATA, rpc); + + return out_cnt; +} static void is_alive_response(Process *c_p, ErtsMessage *mp, int is_alive) @@ -2640,6 +2843,155 @@ handle_process_info(Process *c_p, ErtsSigRecvTracing *tracing, return ((int) reds)*4 + 8; } +static void +handle_suspend(Process *c_p, ErtsMonitor *mon, int *yieldp) +{ + erts_aint32_t state = erts_atomic32_read_nob(&c_p->state); + + ASSERT(mon->type == ERTS_MON_TYPE_SUSPEND); + + if (!(state & ERTS_PSFLG_DIRTY_RUNNING)) { + ErtsMonitorSuspend *msp; + erts_aint_t mstate; + + msp = (ErtsMonitorSuspend *) erts_monitor_to_data(mon); + mstate = erts_atomic_read_bor_acqb(&msp->state, + ERTS_MSUSPEND_STATE_FLG_ACTIVE); + ASSERT(!(mstate & ERTS_MSUSPEND_STATE_FLG_ACTIVE)); (void) mstate; + erts_suspend(c_p, ERTS_PROC_LOCK_MAIN, NULL); + *yieldp = !0; + } + else { + /* Executing dirty; delay suspend... */ + ErtsProcSigPendingSuspend *psusp; + ErtsMonitorSuspend *msp; + + psusp = ERTS_PROC_GET_PENDING_SUSPEND(c_p); + if (!psusp) { + psusp = erts_alloc(ERTS_ALC_T_SIG_DATA, + sizeof(ErtsProcSigPendingSuspend)); + psusp->mon = NULL; + psusp->sync = NULL; + ERTS_PROC_SET_PENDING_SUSPEND(c_p, (void *) psusp); + } + + msp = (ErtsMonitorSuspend *) erts_monitor_to_data(mon); + + msp->next = psusp->mon; + psusp->mon = msp; + + erts_atomic32_inc_nob(&msp->md.refc); + } +} + +static void +sync_suspend_reply(Process *c_p, ErtsMessage *mp, erts_aint32_t state) +{ + /* + * Sender prepared the message for us. Just patch + * the result if necessary. The default prepared + * result is 'false'. + */ + Process *rp; + ErtsSyncSuspendRequest *ssusp; + + ssusp = (ErtsSyncSuspendRequest *) (char *) (&mp->hfrag.mem[0] + + mp->hfrag.used_size); + + ASSERT(ERTS_SIG_IS_NON_MSG(mp)); + ASSERT(ERTS_PROC_SIG_OP(((ErtsSignal *) mp)->common.tag) + == ERTS_SIG_Q_OP_SYNC_SUSPEND); + ASSERT(mp->hfrag.alloc_size > mp->hfrag.used_size); + ASSERT((mp->hfrag.alloc_size - mp->hfrag.used_size)*sizeof(UWord) + >= sizeof(ErtsSyncSuspendRequest)); + ASSERT(is_internal_pid(ssusp->requester)); + ASSERT(ssusp->requester != c_p->common.id); + ASSERT(is_tuple_arity(ssusp->message, 2)); + ASSERT(is_immed(tuple_val(ssusp->message)[2])); + + ERL_MESSAGE_TERM(mp) = ssusp->message; + mp->data.attached = ERTS_MSG_COMBINED_HFRAG; + mp->next = NULL; + + rp = erts_proc_lookup(ssusp->requester); + if (!rp) + erts_cleanup_messages(mp); + else { + if ((state & (ERTS_PSFLG_EXITING + | ERTS_PSFLG_SUSPENDED)) != ERTS_PSFLG_SUSPENDED) { + /* Not suspended -> patch result... */ + if (state & ERTS_PSFLG_EXITING) { + Eterm *tp = tuple_val(ssusp->message); + tp[2] = ssusp->async ? am_exited : am_badarg; + } + else { + Eterm *tp = tuple_val(ssusp->message); + ASSERT(!(state & ERTS_PSFLG_SUSPENDED)); + tp[2] = ssusp->async ? am_not_suspended : am_internal_error; + } + } + erts_queue_proc_message(c_p, rp, 0, mp, ssusp->message); + } +} + +static void +handle_sync_suspend(Process *c_p, ErtsMessage *mp) +{ + ErtsProcSigPendingSuspend *psusp; + + psusp = (ErtsProcSigPendingSuspend *) ERTS_PROC_GET_PENDING_SUSPEND(c_p); + if (!psusp) + sync_suspend_reply(c_p, mp, erts_atomic32_read_nob(&c_p->state)); + else { + mp->next = psusp->sync; + psusp->sync = mp; + } +} + +void +erts_proc_sig_handle_pending_suspend(Process *c_p) +{ + ErtsMonitorSuspend *msp; + ErtsMessage *sync; + ErtsProcSigPendingSuspend *psusp; + erts_aint32_t state = erts_atomic32_read_nob(&c_p->state); + + psusp = (ErtsProcSigPendingSuspend *) ERTS_PROC_GET_PENDING_SUSPEND(c_p); + + msp = psusp->mon; + + while (msp) { + ErtsMonitorSuspend *next_msp = msp->next; + msp->next = NULL; + if (!(state & ERTS_PSFLG_EXITING) + && erts_monitor_is_in_table(&msp->md.target)) { + erts_aint_t mstate; + + mstate = erts_atomic_read_bor_acqb(&msp->state, + ERTS_MSUSPEND_STATE_FLG_ACTIVE); + ASSERT(!(mstate & ERTS_MSUSPEND_STATE_FLG_ACTIVE)); (void) mstate; + erts_suspend(c_p, ERTS_PROC_LOCK_MAIN, NULL); + } + + erts_monitor_release(&msp->md.target); + + msp = next_msp; + } + + sync = psusp->sync; + + while (sync) { + ErtsMessage *next_sync = sync->next; + sync->next = NULL; + sync_suspend_reply(c_p, sync, state); + sync = next_sync; + } + + erts_free(ERTS_ALC_T_SIG_DATA, psusp); + + ERTS_PROC_SET_PENDING_SUSPEND(c_p, NULL); +} + /* * Called in order to handle incoming signals. */ @@ -2650,7 +3002,7 @@ erts_proc_sig_handle_incoming(Process *c_p, erts_aint32_t *statep, { Eterm tag; erts_aint32_t state; - int cnt, limit, abs_lim, msg_tracing; + int yield, cnt, limit, abs_lim, msg_tracing; ErtsMessage *sig, ***next_nm_sig; ErtsSigRecvTracing tracing; @@ -2670,6 +3022,7 @@ erts_proc_sig_handle_incoming(Process *c_p, erts_aint32_t *statep, limit = *redsp; *redsp = 0; + yield = 0; if (!c_p->sig_qs.cont) { if (state == -1) @@ -2783,6 +3136,18 @@ erts_proc_sig_handle_incoming(Process *c_p, erts_aint32_t *statep, cnt += handle_nodedown(c_p, sig, mdp, next_nm_sig); } break; + case ERTS_MON_TYPE_SUSPEND: + tmon = (ErtsMonitor *) sig; + ASSERT(erts_monitor_is_target(tmon)); + ASSERT(!erts_monitor_is_in_table(tmon)); + mdp = erts_monitor_to_data(tmon); + if (erts_monitor_is_in_table(&mdp->origin)) { + erts_monitor_tree_delete(&ERTS_P_MONITORS(c_p), + &mdp->origin); + omon = &mdp->origin; + } + remove_nm_sig(c_p, sig, next_nm_sig); + break; default: ERTS_INTERNAL_ERROR("invalid monitor type"); break; @@ -2846,9 +3211,13 @@ erts_proc_sig_handle_incoming(Process *c_p, erts_aint32_t *statep, if (mon->type == ERTS_MON_TYPE_DIST_PROC) erts_monitor_tree_insert(&ERTS_P_MONITORS(c_p), mon); - else + else { erts_monitor_list_insert(&ERTS_P_LT_MONITORS(c_p), mon); + if (mon->type == ERTS_MON_TYPE_SUSPEND) + handle_suspend(c_p, mon, &yield); + } ERTS_PROC_SIG_HDBG_PRIV_CHKQ(c_p, &tracing, next_nm_sig); + cnt += 2; break; } @@ -2892,9 +3261,16 @@ erts_proc_sig_handle_incoming(Process *c_p, erts_aint32_t *statep, erts_monitor_tree_delete(&ERTS_P_MONITORS(c_p), tmon); else { erts_monitor_list_delete(&ERTS_P_LT_MONITORS(c_p), tmon); - if (type == ERTS_MON_TYPE_RESOURCE) { + switch (type) { + case ERTS_MON_TYPE_RESOURCE: erts_nif_demonitored((ErtsResource *) tmon->other.ptr); cnt++; + break; + case ERTS_MON_TYPE_SUSPEND: + erts_resume(c_p, ERTS_PROC_LOCK_MAIN); + break; + default: + break; } } erts_monitor_release_both(mdp); @@ -3009,6 +3385,21 @@ erts_proc_sig_handle_incoming(Process *c_p, erts_aint32_t *statep, ERTS_PROC_SIG_HDBG_PRIV_CHKQ(c_p, &tracing, next_nm_sig); break; + case ERTS_SIG_Q_OP_SYNC_SUSPEND: + ERTS_PROC_SIG_HDBG_PRIV_CHKQ(c_p, &tracing, next_nm_sig); + remove_nm_sig(c_p, sig, next_nm_sig); + handle_sync_suspend(c_p, sig); + ERTS_PROC_SIG_HDBG_PRIV_CHKQ(c_p, &tracing, next_nm_sig); + break; + + case ERTS_SIG_Q_OP_RPC: + ERTS_PROC_SIG_HDBG_PRIV_CHKQ(c_p, &tracing, next_nm_sig); + remove_nm_sig(c_p, sig, next_nm_sig); + cnt += handle_rpc(c_p, (ErtsProcSigRPC *) sig, cnt, + limit, &yield); + ERTS_PROC_SIG_HDBG_PRIV_CHKQ(c_p, &tracing, next_nm_sig); + break; + case ERTS_SIG_Q_OP_TRACE_CHANGE_STATE: { Uint16 type = ERTS_PROC_SIG_TYPE(tag); @@ -3166,6 +3557,15 @@ stop: { *redsp = cnt/4 + 1; + if (yield) { + int vreds = max_reds - *redsp; + if (vreds > 0) { + ErtsSchedulerData *esdp = erts_get_scheduler_data(); + esdp->virtual_reds += vreds; + } + *redsp = max_reds; + } + return res; } } @@ -3274,6 +3674,8 @@ erts_proc_sig_handle_exit(Process *c_p, int *redsp) case ERTS_MON_TYPE_PROC: case ERTS_MON_TYPE_DIST_PROC: case ERTS_MON_TYPE_NODE: + case ERTS_MON_TYPE_NODES: + case ERTS_MON_TYPE_SUSPEND: erts_monitor_release((ErtsMonitor *) sig); break; default: @@ -3329,6 +3731,17 @@ erts_proc_sig_handle_exit(Process *c_p, int *redsp) handle_process_info(c_p, NULL, sig, next_nm_sig, 0); break; + case ERTS_SIG_Q_OP_SYNC_SUSPEND: + handle_sync_suspend(c_p, sig); + break; + + case ERTS_SIG_Q_OP_RPC: { + int yield = 0; + handle_rpc(c_p, (ErtsProcSigRPC *) sig, + cnt, limit, &yield); + break; + } + case ERTS_SIG_Q_OP_TRACE_CHANGE_STATE: destroy_trace_info((ErtsSigTraceInfo *) sig); break; @@ -3464,6 +3877,7 @@ erts_proc_sig_signal_size(ErtsSignal *sig) } break; + case ERTS_SIG_Q_OP_SYNC_SUSPEND: case ERTS_SIG_Q_OP_PERSISTENT_MON_MSG: case ERTS_SIG_Q_OP_IS_ALIVE: size = ((ErtsMessage *) sig)->hfrag.alloc_size; @@ -3519,6 +3933,10 @@ erts_proc_sig_signal_size(ErtsSignal *sig) break; } + case ERTS_SIG_Q_OP_RPC: + size = sizeof(ErtsProcSigRPC); + break; + default: ERTS_INTERNAL_ERROR("Unknown signal"); break; @@ -3595,17 +4013,13 @@ erts_proc_sig_receive_helper(Process *c_p, */ *get_outp = 0; *msgpp = NULL; + return consumed_reds; } erts_proc_unlock(c_p, ERTS_PROC_LOCK_MSGQ); - if (left_reds <= 0) { - *get_outp = -1; /* yield */ - *msgpp = NULL; - - ASSERT(consumed_reds >= (fcalls - neg_o_reds)); - return consumed_reds; - } + if (left_reds <= 0) + break; /* Yield */ /* handle newly arrived signals... */ } @@ -3626,19 +4040,27 @@ erts_proc_sig_receive_helper(Process *c_p, max_reds, !0); consumed_reds += reds; left_reds -= reds; - /* we may have exited by an incoming signal... */ - if (state & ERTS_PSFLG_EXITING) { + + /* we may have exited or suspended by an incoming signal... */ + + if (state & (ERTS_PSFLG_EXITING|ERTS_PSFLG_SUSPENDED)) { + if (state & ERTS_PSFLG_SUSPENDED) + break; /* Yield */ + /* * Process need to schedule out in order * to terminate. Prepare this a bit... */ + ASSERT(state & ERTS_PSFLG_EXITING); ASSERT(c_p->flags & F_DELAY_GC); c_p->flags &= ~F_DELAY_GC; c_p->arity = 0; c_p->current = NULL; + *get_outp = 1; *msgpp = NULL; + return consumed_reds; } @@ -3649,17 +4071,20 @@ erts_proc_sig_receive_helper(Process *c_p, return consumed_reds; } - if (left_reds <= 0) { - *get_outp = -1; /* yield */ - *msgpp = NULL; - - ASSERT(consumed_reds >= (fcalls - neg_o_reds)); - return consumed_reds; - } + if (left_reds <= 0) + break; /* yield */ ASSERT(!c_p->sig_qs.cont); /* Go fetch again... */ } + + /* Yield... */ + + *get_outp = -1; + *msgpp = NULL; + + ASSERT(consumed_reds >= (fcalls - neg_o_reds)); + return consumed_reds; } static int diff --git a/erts/emulator/beam/erl_proc_sig_queue.h b/erts/emulator/beam/erl_proc_sig_queue.h index 8edd277309..efa7c08664 100644 --- a/erts/emulator/beam/erl_proc_sig_queue.h +++ b/erts/emulator/beam/erl_proc_sig_queue.h @@ -33,6 +33,11 @@ * - Group leader * - Is process alive * - Process info request + * - Suspend request (monitor of suspend type) + * - Resume request (demonitor of suspend type) + * - Suspend cleanup (monitor down of suspend type) + * - Sync suspend + * - RPC request * - Trace change * * The signal queue consists of three parts: @@ -557,6 +562,102 @@ erts_proc_sig_send_process_info_request(Process *c_p, Uint reserve_size, Eterm ref); +/** + * + * @brief Send a 'sync suspend' signal to a process. + * + * A response message '{Tag, Reply}' is sent to the + * sender when performed where Tag is the term passed + * as 'tag' argument. Reply is either 'suspended', + * 'not_suspended', 'exited' if the operation is + * asynchronous; otherwise, the 'reply' argument or + * 'badarg' if process terminated. + * + * This signal does *not* change the suspend state, only + * reads and reply the state. This signal is typically + * sent after a suspend request (monitor of suspend type) + * signal has been sent to the process in order to get a + * response when the suspend monitor has been processed. + * + * @param[in] c_p Pointer to process struct of + * currently executing process. + * + * @param[in] to Identifier of receiver. + * + * @param[in] tag Tag to use in response + * message to the sending + * process (i.e., c_p). + * + * @param[in] reply Reply to send if this + * is a synchronous operation; + * otherwise, THE_NON_VALUE. + */ +void +erts_proc_sig_send_sync_suspend(Process *c_p, Eterm to, + Eterm tag, Eterm reply); + +/** + * + * @brief Send an 'rpc' signal to a process. + * + * The function 'func' will be executed in the + * context of the receiving process. A response + * message '{Ref, Result}' is sent to the sender + * when 'func' has been called. 'Ref' is the reference + * returned by this function and 'Result' is the + * term returned by 'func'. If the return value of + * 'func' is not an immediate term, 'func' has to + * allocate a heap fragment where the result is stored + * and update the the heap fragment pointer pointer + * passed as third argument to point to it. + * + * If this function returns a reference, 'func' will + * be called in the context of the receiver. However, + * note that this might happen when the receiver is in + * an exiting state. The caller of this function + * *unconditionally* has to enter a receive that match + * on the returned reference in all clauses as next + * receive; otherwise, bad things will happen! + * + * If THE_NON_VALUE is returned, the receiver did not + * exist. The signal was not sent, and no specific + * receive has to be entered by the caller. + * + * @param[in] c_p Pointer to process struct of + * currently executing process. + * + * @param[in] to Identifier of receiver process. + * + * @param[in] reply Non-zero if a reply is wanted. + * + * @param[in] func Function to execute in the + * context of the receiver. + * First argument will be a + * pointer to the process struct + * of the receiver process. + * Second argument will be 'arg' + * (see below). Third argument + * will be a pointer to a pointer + * to a heap fragment for storage + * of result returned from 'func' + * (i.e. an 'out' parameter). + * + * @param[in] arg Void pointer to argument + * to pass as second argument + * in call of 'func'. + * + * @returns If the request was sent, + * an internal ordinary + * reference; otherwise, + * THE_NON_VALUE (non-existing + * receiver). + */ +Eterm +erts_proc_sig_send_rpc_request(Process *c_p, + Eterm to, + int reply, + Eterm (*func)(Process *, void *, int *, ErlHeapFragment **), + void *arg); /* * End of send operations of currently supported process signals. @@ -820,6 +921,21 @@ void erts_proc_sig_clear_seq_trace_tokens(Process *c_p); /** + * + * @brief Handle pending suspend requests + * + * Should be called by processes when they stop + * execution on a dirty scheduler if they have + * pending suspend requests (i.e. when + * ERTS_PROC_GET_PENDING_SUSPEND(c_p) != NULL). + * + * @param[in] c_p Pointer to executing + * process + */ +void +erts_proc_sig_handle_pending_suspend(Process *c_p); + +/** * @brief Initialize this functionality */ void erts_proc_sig_queue_init(void); diff --git a/erts/emulator/beam/erl_process.c b/erts/emulator/beam/erl_process.c index ad7ac27ac3..b373d08a6b 100644 --- a/erts/emulator/beam/erl_process.c +++ b/erts/emulator/beam/erl_process.c @@ -185,8 +185,6 @@ sched_get_busy_wait_params(ErtsSchedulerData *esdp) return &sched_busy_wait_params[esdp->type]; } -int erts_disable_proc_not_running_opt; - static ErtsAuxWorkData *aux_thread_aux_work_data; static ErtsAuxWorkData *poll_thread_aux_work_data; @@ -730,6 +728,11 @@ erts_pre_init_process(void) = ERTS_PSD_DIST_ENTRY_GET_LOCKS; erts_psd_required_locks[ERTS_PSD_DIST_ENTRY].set_locks = ERTS_PSD_DIST_ENTRY_SET_LOCKS; + + erts_psd_required_locks[ERTS_PSD_PENDING_SUSPEND].get_locks + = ERTS_PSD_PENDING_SUSPEND_GET_LOCKS; + erts_psd_required_locks[ERTS_PSD_PENDING_SUSPEND].set_locks + = ERTS_PSD_PENDING_SUSPEND_SET_LOCKS; #endif } @@ -744,7 +747,6 @@ void erts_init_process(int ncpu, int proc_tab_size, int legacy_proc_tab) { - erts_disable_proc_not_running_opt = 0; erts_init_proc_lock(ncpu); init_proclist_alloc(); @@ -8553,427 +8555,22 @@ erts_start_schedulers(void) } } - - -static void -add_pend_suspend(Process *suspendee, - Eterm originator_pid, - void (*handle_func)(Process *, - ErtsProcLocks, - int, - Eterm)) -{ - ErtsPendingSuspend *psp = erts_alloc(ERTS_ALC_T_PEND_SUSPEND, - sizeof(ErtsPendingSuspend)); - psp->next = NULL; -#ifdef DEBUG -#if defined(ARCH_64) - psp->end = (ErtsPendingSuspend *) 0xdeaddeaddeaddead; -#else - psp->end = (ErtsPendingSuspend *) 0xdeaddead; -#endif -#endif - psp->pid = originator_pid; - psp->handle_func = handle_func; - - if (suspendee->pending_suspenders) - suspendee->pending_suspenders->end->next = psp; - else - suspendee->pending_suspenders = psp; - suspendee->pending_suspenders->end = psp; -} - -static void -handle_pending_suspend(Process *p, ErtsProcLocks p_locks) -{ - ErtsPendingSuspend *psp; - int is_alive = !ERTS_PROC_IS_EXITING(p); - - ERTS_LC_ASSERT(p_locks & ERTS_PROC_LOCK_STATUS); - - /* - * New pending suspenders might appear while we are processing - * (since we may release the status lock on p while processing). - */ - while (p->pending_suspenders) { - psp = p->pending_suspenders; - p->pending_suspenders = NULL; - while (psp) { - ErtsPendingSuspend *free_psp; - (*psp->handle_func)(p, p_locks, is_alive, psp->pid); - free_psp = psp; - psp = psp->next; - erts_free(ERTS_ALC_T_PEND_SUSPEND, (void *) free_psp); - } - } - -} - -static ERTS_INLINE void -cancel_suspend_of_suspendee(Process *p, ErtsProcLocks p_locks) -{ - if (is_not_nil(p->suspendee)) { - ErtsMonitor *mon; - Eterm suspendee = p->suspendee; - Process *rp; - if (!(p_locks & ERTS_PROC_LOCK_STATUS)) - erts_proc_lock(p, ERTS_PROC_LOCK_STATUS); - rp = erts_pid2proc(p, p_locks|ERTS_PROC_LOCK_STATUS, - suspendee, ERTS_PROC_LOCK_STATUS); - if (rp) { - erts_resume(rp, ERTS_PROC_LOCK_STATUS); - erts_proc_unlock(rp, ERTS_PROC_LOCK_STATUS); - } - if (!(p_locks & ERTS_PROC_LOCK_STATUS)) - erts_proc_unlock(p, ERTS_PROC_LOCK_STATUS); - p->suspendee = NIL; - - mon = erts_monitor_tree_lookup(p->suspend_monitors, - suspendee); - if (mon) { - erts_monitor_tree_delete(&p->suspend_monitors, - mon); - erts_monitor_suspend_destroy(erts_monitor_suspend(mon)); - } - } -} - -static void -handle_pend_sync_suspend(Process *suspendee, - ErtsProcLocks suspendee_locks, - int suspendee_alive, - Eterm suspender_pid) -{ - Process *suspender; - - ERTS_LC_ASSERT(suspendee_locks & ERTS_PROC_LOCK_STATUS); - - suspender = erts_pid2proc(suspendee, - suspendee_locks, - suspender_pid, - ERTS_PROC_LOCK_STATUS); - if (suspender) { - ASSERT(is_nil(suspender->suspendee)); - if (suspendee_alive) { - erts_suspend(suspendee, suspendee_locks, NULL); - suspender->suspendee = suspendee->common.id; - } - /* suspender is suspended waiting for suspendee to suspend; - resume suspender */ - ASSERT(suspendee != suspender); - resume_process(suspender, ERTS_PROC_LOCK_STATUS); - erts_proc_unlock(suspender, ERTS_PROC_LOCK_STATUS); - } -} - -static Process * -pid2proc_not_running(Process *c_p, ErtsProcLocks c_p_locks, - Eterm pid, ErtsProcLocks pid_locks, int suspend) -{ - Process *rp; - int unlock_c_p_status; - - ERTS_LC_ASSERT(c_p_locks == erts_proc_lc_my_proc_locks(c_p)); - - ERTS_LC_ASSERT(c_p_locks & ERTS_PROC_LOCK_MAIN); - ERTS_LC_ASSERT(pid_locks & (ERTS_PROC_LOCK_MAIN|ERTS_PROC_LOCK_STATUS)); - - if (c_p->common.id == pid) - return erts_pid2proc(c_p, c_p_locks, pid, pid_locks); - - if (c_p_locks & ERTS_PROC_LOCK_STATUS) - unlock_c_p_status = 0; - else { - unlock_c_p_status = 1; - erts_proc_lock(c_p, ERTS_PROC_LOCK_STATUS); - } - - if (c_p->suspendee == pid) { - /* Process previously suspended by c_p (below)... */ - ErtsProcLocks rp_locks = pid_locks|ERTS_PROC_LOCK_STATUS; - rp = erts_pid2proc(c_p, c_p_locks|ERTS_PROC_LOCK_STATUS, pid, rp_locks); - c_p->suspendee = NIL; - ASSERT(c_p->flags & F_P2PNR_RESCHED); - c_p->flags &= ~F_P2PNR_RESCHED; - if (!suspend && rp) - resume_process(rp, rp_locks); - } - else { - rp = erts_pid2proc(c_p, c_p_locks|ERTS_PROC_LOCK_STATUS, - pid, ERTS_PROC_LOCK_STATUS); - - if (!rp) { - c_p->flags &= ~F_P2PNR_RESCHED; - goto done; - } - - ASSERT(!(c_p->flags & F_P2PNR_RESCHED)); - - /* - * Suspend the other process in order to prevent - * it from being selected for normal execution. - * This will however not prevent it from being - * selected for execution of a system task. If - * it is selected for execution of a system task - * we might be blocked for quite a while if the - * try-lock below fails. That is, there is room - * for improvement here... - */ - - if (!suspend_process(c_p, rp)) { - /* Other process running */ - - ASSERT((ERTS_PSFLG_RUNNING | ERTS_PSFLG_DIRTY_RUNNING) - & erts_atomic32_read_nob(&rp->state)); - - if (!suspend - && (erts_atomic32_read_nob(&rp->state) - & ERTS_PSFLG_DIRTY_RUNNING)) { - ErtsProcLocks need_locks = pid_locks & ~ERTS_PROC_LOCK_STATUS; - if (need_locks && erts_proc_trylock(rp, need_locks) == EBUSY) { - erts_proc_unlock(rp, ERTS_PROC_LOCK_STATUS); - rp = erts_pid2proc(c_p, c_p_locks|ERTS_PROC_LOCK_STATUS, - pid, pid_locks|ERTS_PROC_LOCK_STATUS); - } - goto done; - } - - running: - - /* - * If we got pending suspenders and suspend ourselves waiting - * to suspend another process we might deadlock. - * In this case we have to yield, be suspended by - * someone else and then do it all over again. - */ - if (!c_p->pending_suspenders) { - /* Mark rp pending for suspend by c_p */ - add_pend_suspend(rp, c_p->common.id, handle_pend_sync_suspend); - ASSERT(is_nil(c_p->suspendee)); - - /* Suspend c_p; when rp is suspended c_p will be resumed. */ - suspend_process(c_p, c_p); - c_p->flags |= F_P2PNR_RESCHED; - } - /* Yield (caller is assumed to yield immediately in bif). */ - erts_proc_unlock(rp, ERTS_PROC_LOCK_STATUS); - rp = ERTS_PROC_LOCK_BUSY; - } - else { - ErtsProcLocks need_locks = pid_locks & ~ERTS_PROC_LOCK_STATUS; - if (need_locks && erts_proc_trylock(rp, need_locks) == EBUSY) { - if ((ERTS_PSFLG_RUNNING_SYS|ERTS_PSFLG_DIRTY_RUNNING_SYS) - & erts_atomic32_read_nob(&rp->state)) { - /* Executing system task... */ - resume_process(rp, ERTS_PROC_LOCK_STATUS); - goto running; - } - erts_proc_unlock(rp, ERTS_PROC_LOCK_STATUS); - /* - * If we are unlucky, the process just got selected for - * execution of a system task. In this case we may be - * blocked here for quite a while... Execution of system - * tasks are fortunately quite rare events. We try to - * avoid this by checking if it is in a state executing - * system tasks (above), but it will not prevent all - * scenarios for a long block here... - */ - rp = erts_pid2proc(c_p, c_p_locks|ERTS_PROC_LOCK_STATUS, - pid, pid_locks|ERTS_PROC_LOCK_STATUS); - if (!rp) - goto done; - } - - /* - * The previous suspend has prevented the process - * from being selected for normal execution regardless - * of locks held or not held on it... - */ -#ifdef DEBUG - { - erts_aint32_t state; - state = erts_atomic32_read_nob(&rp->state); - ASSERT(!(state & ERTS_PSFLG_RUNNING)); - } -#endif - - if (!suspend) - resume_process(rp, pid_locks|ERTS_PROC_LOCK_STATUS); - } - } - - done: - - if (rp && rp != ERTS_PROC_LOCK_BUSY && !(pid_locks & ERTS_PROC_LOCK_STATUS)) - erts_proc_unlock(rp, ERTS_PROC_LOCK_STATUS); - if (unlock_c_p_status) - erts_proc_unlock(c_p, ERTS_PROC_LOCK_STATUS); - return rp; -} - - -/* - * Like erts_pid2proc() but: - * - * * At least ERTS_PROC_LOCK_MAIN have to be held on c_p. - * * At least ERTS_PROC_LOCK_MAIN have to be taken on pid. - * * It also waits for proc to be in a state != running and garbing. - * * If ERTS_PROC_LOCK_BUSY is returned, the calling process has to - * yield (ERTS_BIF_YIELD[0-3]()). c_p might in this case have been - * suspended. - */ -Process * -erts_pid2proc_not_running(Process *c_p, ErtsProcLocks c_p_locks, - Eterm pid, ErtsProcLocks pid_locks) -{ - return pid2proc_not_running(c_p, c_p_locks, pid, pid_locks, 0); -} - -/* - * erts_pid2proc_nropt() is normally the same as - * erts_pid2proc_not_running(). However it is only - * to be used when 'not running' is a pure optimization, - * not a requirement. - */ - -Process * -erts_pid2proc_nropt(Process *c_p, ErtsProcLocks c_p_locks, - Eterm pid, ErtsProcLocks pid_locks) -{ - if (erts_disable_proc_not_running_opt) - return erts_pid2proc(c_p, c_p_locks, pid, pid_locks); - else - return erts_pid2proc_not_running(c_p, c_p_locks, pid, pid_locks); -} - -static ERTS_INLINE int -do_bif_suspend_process(Process *c_p, - ErtsMonitorSuspend *smon, - Process *suspendee) -{ - ASSERT(suspendee); - ASSERT(!ERTS_PROC_IS_EXITING(suspendee)); - ERTS_LC_ASSERT(ERTS_PROC_LOCK_STATUS - & erts_proc_lc_my_proc_locks(suspendee)); - if (smon) { - if (!smon->active) { - if (!suspend_process(c_p, suspendee)) - return 0; - } - smon->active += smon->pending; - ASSERT(smon->active); - smon->pending = 0; - return 1; - } - return 0; -} - -static void -handle_pend_bif_sync_suspend(Process *suspendee, - ErtsProcLocks suspendee_locks, - int suspendee_alive, - Eterm suspender_pid) -{ - Process *suspender; - - ERTS_LC_ASSERT(suspendee_locks & ERTS_PROC_LOCK_STATUS); - - suspender = erts_pid2proc(suspendee, - suspendee_locks, - suspender_pid, - ERTS_PROC_LOCK_STATUS); - if (suspender) { - ErtsMonitorSuspend *smon; - ErtsMonitor *mon; - mon = erts_monitor_tree_lookup(suspender->suspend_monitors, - suspendee->common.id); - smon = erts_monitor_suspend(mon); - - ASSERT(is_nil(suspender->suspendee)); - if (!suspendee_alive) { - if (mon) { - erts_monitor_tree_delete(&suspender->suspend_monitors, - mon); - erts_monitor_suspend_destroy(smon); - } - } - else { -#ifdef DEBUG - int res = -#endif - do_bif_suspend_process(suspendee, smon, suspendee); - ASSERT(!smon || res != 0); - suspender->suspendee = suspendee->common.id; - } - /* suspender is suspended waiting for suspendee to suspend; - resume suspender */ - ASSERT(suspender != suspendee); - resume_process(suspender, ERTS_PROC_LOCK_STATUS); - erts_proc_unlock(suspender, ERTS_PROC_LOCK_STATUS); - } -} - -static void -handle_pend_bif_async_suspend(Process *suspendee, - ErtsProcLocks suspendee_locks, - int suspendee_alive, - Eterm suspender_pid) -{ - - Process *suspender; - - ERTS_LC_ASSERT(suspendee_locks & ERTS_PROC_LOCK_STATUS); - - suspender = erts_pid2proc(suspendee, - suspendee_locks, - suspender_pid, - ERTS_PROC_LOCK_STATUS); - if (suspender) { - ErtsMonitorSuspend *smon; - ErtsMonitor *mon; - mon = erts_monitor_tree_lookup(suspender->suspend_monitors, - suspendee->common.id); - smon = erts_monitor_suspend(mon); - ASSERT(is_nil(suspender->suspendee)); - if (!suspendee_alive) { - if (mon) { - erts_monitor_tree_delete(&suspender->suspend_monitors, - mon); - erts_monitor_suspend_destroy(smon); - } - } - else { -#ifdef DEBUG - int res = -#endif - do_bif_suspend_process(suspendee, smon, suspendee); - ASSERT(!smon || res != 0); - } - erts_proc_unlock(suspender, ERTS_PROC_LOCK_STATUS); - } -} - - -/* - * The erlang:suspend_process/2 BIF - */ - BIF_RETTYPE -suspend_process_2(BIF_ALIST_2) +erts_internal_suspend_process_2(BIF_ALIST_2) { Eterm res; - Process* suspendee = NULL; - ErtsMonitorSuspend *smon; - ErtsProcLocks xlocks = (ErtsProcLocks) 0; - int created; - - /* Options and default values: */ - int asynchronous = 0; + Eterm reply_tag = THE_NON_VALUE; + Eterm reply_res = THE_NON_VALUE; + int suspend; + int sync = 0; + int async = 0; int unless_suspending = 0; - + erts_aint_t mstate; + ErtsMonitorSuspend *msp; + ErtsMonitorData *mdp; if (BIF_P->common.id == BIF_ARG_1) - goto badarg; /* We are not allowed to suspend ourselves */ + BIF_RET(am_badarg); /* We are not allowed to suspend ourselves */ if (is_not_nil(BIF_ARG_2)) { /* Parse option list */ @@ -8987,191 +8584,127 @@ suspend_process_2(BIF_ALIST_2) unless_suspending = 1; break; case am_asynchronous: - asynchronous = 1; + async = 1; break; - default: - goto badarg; + default: { + if (is_tuple_arity(arg, 2)) { + Eterm *tp = tuple_val(arg); + if (tp[1] == am_asynchronous) { + async = 1; + reply_tag = tp[2]; + break; + } + } + BIF_RET(am_badarg); } + } arg = CDR(lp); - } + } if (is_not_nil(arg)) - goto badarg; + BIF_RET(am_badarg); } - xlocks = ERTS_PROC_LOCK_STATUS; - - erts_proc_lock(BIF_P, xlocks); - - suspendee = erts_pid2proc(BIF_P, - ERTS_PROC_LOCK_MAIN|xlocks, - BIF_ARG_1, - ERTS_PROC_LOCK_STATUS); - if (!suspendee) - goto no_suspendee; - - smon = erts_monitor_suspend_tree_lookup_create(&BIF_P->suspend_monitors, - &created, - BIF_ARG_1); - - if (asynchronous) { - /* --- Asynchronous suspend begin ---------------------------------- */ - - ERTS_LC_ASSERT(ERTS_PROC_LOCK_STATUS - & erts_proc_lc_my_proc_locks(BIF_P)); - ERTS_LC_ASSERT(ERTS_PROC_LOCK_STATUS - == erts_proc_lc_my_proc_locks(suspendee)); - - if (smon->active) { - smon->active += smon->pending; - smon->pending = 0; - if (unless_suspending) - res = am_false; - else if (smon->active == INT_MAX) - goto system_limit; - else { - smon->active++; - res = am_true; - } - /* done */ - } - else { - /* We havn't got any active suspends on the suspendee */ - if (smon->pending && unless_suspending) - res = am_false; - else { - if (smon->pending == INT_MAX) - goto system_limit; - - smon->pending++; - - if (!do_bif_suspend_process(BIF_P, smon, suspendee)) - add_pend_suspend(suspendee, - BIF_P->common.id, - handle_pend_bif_async_suspend); - - res = am_true; - } - /* done */ - } - /* --- Asynchronous suspend end ------------------------------------ */ - } - else /* if (!asynchronous) */ { - /* --- Synchronous suspend begin ----------------------------------- */ - - ERTS_LC_ASSERT(((ERTS_PROC_LOCK_STATUS|ERTS_PROC_LOCK_STATUS) - & erts_proc_lc_my_proc_locks(BIF_P)) - == (ERTS_PROC_LOCK_STATUS|ERTS_PROC_LOCK_STATUS)); - ERTS_LC_ASSERT(ERTS_PROC_LOCK_STATUS - == erts_proc_lc_my_proc_locks(suspendee)); - - if (BIF_P->suspendee == BIF_ARG_1) { - /* We are back after a yield and the suspendee - has been suspended on behalf of us. */ - ASSERT(smon->active >= 1); - BIF_P->suspendee = NIL; - res = (!unless_suspending || smon->active == 1 - ? am_true - : am_false); - /* done */ - } - else if (smon->active) { - if (unless_suspending) - res = am_false; - else { - smon->active++; - res = am_true; - } - /* done */ - } - else { - /* We haven't got any active suspends on the suspendee */ - - /* - * If we have pending suspenders and suspend ourselves waiting - * to suspend another process, or suspend another process - * we might deadlock. In this case we have to yield, - * be suspended by someone else, and then do it all over again. - */ - if (BIF_P->pending_suspenders) - goto yield; - - if (!unless_suspending && smon->pending == INT_MAX) - goto system_limit; - if (!unless_suspending || smon->pending == 0) - smon->pending++; - - if (do_bif_suspend_process(BIF_P, smon, suspendee)) { - res = (!unless_suspending || smon->active == 1 - ? am_true - : am_false); - /* done */ - } - else { - /* Mark suspendee pending for suspend by BIF_P */ - add_pend_suspend(suspendee, - BIF_P->common.id, - handle_pend_bif_sync_suspend); - - ASSERT(is_nil(BIF_P->suspendee)); - - /* - * Suspend BIF_P; when suspendee is suspended, BIF_P - * will be resumed and this BIF will be called again. - * This time with BIF_P->suspendee == BIF_ARG_1 (see - * above). - */ - suspend_process(BIF_P, BIF_P); - goto yield; - } - } - /* --- Synchronous suspend end ------------------------------------- */ - } - -#ifdef DEBUG - { - erts_aint32_t state = erts_atomic32_read_acqb(&suspendee->state); - ASSERT((state & ERTS_PSFLG_SUSPENDED) - || (asynchronous && smon->pending)); - ASSERT((state & ERTS_PSFLG_SUSPENDED) - || !smon->active); + if (!unless_suspending) { + ErtsMonitor *mon; + mon = erts_monitor_tree_lookup_create(&ERTS_P_MONITORS(BIF_P), + &suspend, + ERTS_MON_TYPE_SUSPEND, + BIF_P->common.id, + BIF_ARG_1); + ASSERT(mon->other.item == BIF_ARG_1); + + mdp = erts_monitor_to_data(mon); + msp = (ErtsMonitorSuspend *) mdp; + + mstate = erts_atomic_inc_read_relb(&msp->state); + ASSERT(suspend || (mstate & ERTS_MSUSPEND_STATE_COUNTER_MASK) > 1); + sync = !async & !suspend & !(mstate & ERTS_MSUSPEND_STATE_FLG_ACTIVE); + suspend = !!suspend; /* ensure 0|1 */ + res = am_true; } -#endif - - erts_proc_unlock(suspendee, ERTS_PROC_LOCK_STATUS); - erts_proc_unlock(BIF_P, xlocks); - BIF_RET(res); - - system_limit: - ERTS_BIF_PREP_ERROR(res, BIF_P, SYSTEM_LIMIT); - goto do_return; - - no_suspendee: { + else { ErtsMonitor *mon; - BIF_P->suspendee = NIL; - mon = erts_monitor_tree_lookup(BIF_P->suspend_monitors, BIF_ARG_1); + mon = erts_monitor_tree_lookup(ERTS_P_MONITORS(BIF_P), + BIF_ARG_1); if (mon) { - erts_monitor_tree_delete(&BIF_P->suspend_monitors, mon); - erts_monitor_suspend_destroy(erts_monitor_suspend(mon)); + ASSERT(mon->type == ERTS_MON_TYPE_SUSPEND); + mdp = erts_monitor_to_data(mon); + msp = (ErtsMonitorSuspend *) mdp; + mstate = erts_atomic_read_nob(&msp->state); + ASSERT((mstate & ERTS_MSUSPEND_STATE_COUNTER_MASK) > 0); + mdp = NULL; + sync = !async & !(mstate & ERTS_MSUSPEND_STATE_FLG_ACTIVE); + suspend = 0; + res = am_false; + } + else { + mdp = erts_monitor_create(ERTS_MON_TYPE_SUSPEND, NIL, + BIF_P->common.id, + BIF_ARG_1, NIL); + mon = &mdp->origin; + erts_monitor_tree_insert(&ERTS_P_MONITORS(BIF_P), mon); + msp = (ErtsMonitorSuspend *) mdp; + mstate = erts_atomic_inc_read_relb(&msp->state); + ASSERT(!(mstate & ERTS_MSUSPEND_STATE_FLG_ACTIVE)); + suspend = !0; + res = am_true; } } - badarg: - ERTS_BIF_PREP_ERROR(res, BIF_P, BADARG); - goto do_return; + if (suspend) { + erts_aint32_t state; + Process *rp; + int send_sig = 0; + + /* fail state... */ + state = (ERTS_PSFLG_EXITING + | ERTS_PSFLG_RUNNING + | ERTS_PSFLG_RUNNING_SYS + | ERTS_PSFLG_DIRTY_RUNNING + | ERTS_PSFLG_DIRTY_RUNNING_SYS); + + rp = erts_try_lock_sig_free_proc(BIF_ARG_1, + ERTS_PROC_LOCK_MAIN|ERTS_PROC_LOCK_STATUS, + &state); + if (!rp) + goto noproc; + if (rp == ERTS_PROC_LOCK_BUSY) + send_sig = !0; + else { + send_sig = !suspend_process(BIF_P, rp); + if (!send_sig) { + erts_monitor_list_insert(&ERTS_P_LT_MONITORS(rp), &mdp->target); + erts_atomic_read_bor_relb(&msp->state, + ERTS_MSUSPEND_STATE_FLG_ACTIVE); + } + erts_proc_unlock(rp, ERTS_PROC_LOCK_MAIN|ERTS_PROC_LOCK_STATUS); + } + if (send_sig) { + if (erts_proc_sig_send_monitor(&mdp->target, BIF_ARG_1)) + sync = !async; + else { + noproc: + erts_monitor_tree_delete(&ERTS_P_MONITORS(BIF_P), &mdp->origin); + erts_monitor_release_both(mdp); + if (!async) + res = am_badarg; + } + } + } - yield: - ERTS_BIF_PREP_YIELD2(res, bif_export[BIF_suspend_process_2], - BIF_P, BIF_ARG_1, BIF_ARG_2); - - do_return: - if (suspendee) - erts_proc_unlock(suspendee, ERTS_PROC_LOCK_STATUS); - if (xlocks) - erts_proc_unlock(BIF_P, xlocks); - return res; + if (sync) { + ASSERT(is_non_value(reply_tag)); + reply_res = res; + reply_tag = res = erts_make_ref(BIF_P); + ERTS_RECV_MARK_SAVE(BIF_P); + ERTS_RECV_MARK_SET(BIF_P); + } -} + if (is_value(reply_tag)) + erts_proc_sig_send_sync_suspend(BIF_P, BIF_ARG_1, reply_tag, reply_res); + BIF_RET(res); +} /* * The erlang:resume_process/1 BIF @@ -9181,90 +8714,32 @@ BIF_RETTYPE resume_process_1(BIF_ALIST_1) { ErtsMonitor *mon; - ErtsMonitorSuspend *smon; - Process *suspendee; - int is_active; + ErtsMonitorSuspend *msp; + erts_aint_t mstate; if (BIF_P->common.id == BIF_ARG_1) BIF_ERROR(BIF_P, BADARG); - erts_proc_lock(BIF_P, ERTS_PROC_LOCK_STATUS); - mon = erts_monitor_tree_lookup(BIF_P->suspend_monitors, BIF_ARG_1); - smon = erts_monitor_suspend(mon); - - if (!smon) { + mon = erts_monitor_tree_lookup(ERTS_P_MONITORS(BIF_P), + BIF_ARG_1); + if (!mon) { /* No previous suspend or dead suspendee */ - goto error; + BIF_ERROR(BIF_P, BADARG); } - else if (smon->pending) { - smon->pending--; - ASSERT(smon->pending >= 0); - if (smon->active) { - smon->active += smon->pending; - smon->pending = 0; - } - is_active = smon->active; - } - else if (smon->active) { - smon->active--; - ASSERT(smon->pending == 0); - is_active = 1; - } - else { - /* No previous suspend or dead suspendee */ - goto no_suspendee; - } - - if (smon->active || smon->pending || !is_active) { - /* Leave the suspendee as it is; just verify that it is still alive */ - suspendee = erts_proc_lookup(BIF_ARG_1); - if (!suspendee) - goto no_suspendee; - - } - else { - /* Resume */ - suspendee = erts_pid2proc(BIF_P, - ERTS_PROC_LOCK_MAIN|ERTS_PROC_LOCK_STATUS, - BIF_ARG_1, - ERTS_PROC_LOCK_STATUS); - if (!suspendee) { - mon = erts_monitor_tree_lookup(BIF_P->suspend_monitors, BIF_ARG_1); - smon = erts_monitor_suspend(mon); - if (!mon) - goto error; - goto no_suspendee; - } - ASSERT(mon == erts_monitor_tree_lookup(BIF_P->suspend_monitors, BIF_ARG_1)); + ASSERT(mon->type == ERTS_MON_TYPE_SUSPEND); + msp = (ErtsMonitorSuspend *) erts_monitor_to_data(mon); - ASSERT(ERTS_PSFLG_SUSPENDED - & erts_atomic32_read_nob(&suspendee->state)); - ASSERT(BIF_P != suspendee); - resume_process(suspendee, ERTS_PROC_LOCK_STATUS); + mstate = erts_atomic_dec_read_relb(&msp->state); - erts_proc_unlock(suspendee, ERTS_PROC_LOCK_STATUS); - } + ASSERT((mstate & ERTS_MSUSPEND_STATE_COUNTER_MASK) >= 0); - if (!smon->active && !smon->pending) { - ASSERT(mon); - erts_monitor_tree_delete(&BIF_P->suspend_monitors, mon); - erts_monitor_suspend_destroy(smon); + if ((mstate & ERTS_MSUSPEND_STATE_COUNTER_MASK) == 0) { + erts_monitor_tree_delete(&ERTS_P_MONITORS(BIF_P), mon); + erts_proc_sig_send_demonitor(mon); } - erts_proc_unlock(BIF_P, ERTS_PROC_LOCK_STATUS); - BIF_RET(am_true); - - no_suspendee: - /* cleanup */ - ASSERT(mon); - erts_monitor_tree_delete(&BIF_P->suspend_monitors, mon); - erts_monitor_suspend_destroy(smon); - - error: - erts_proc_unlock(BIF_P, ERTS_PROC_LOCK_STATUS); - BIF_ERROR(BIF_P, BADARG); } BIF_RETTYPE @@ -9694,12 +9169,13 @@ Process *erts_schedule(ErtsSchedulerData *esdp, Process *p, int calls) ASSERT(esdp->current_process == p || esdp->free_process == p); - sched_out_proc: - - ERTS_CHK_HAVE_ONLY_MAIN_PROC_LOCK(p); reds = actual_reds = calls - esdp->virtual_reds; + internal_sched_out_proc: + + ERTS_CHK_HAVE_ONLY_MAIN_PROC_LOCK(p); + ASSERT(actual_reds >= 0); if (reds < ERTS_PROC_MIN_CONTEXT_SWITCH_REDS_COST) reds = ERTS_PROC_MIN_CONTEXT_SWITCH_REDS_COST; @@ -9741,11 +9217,6 @@ Process *erts_schedule(ErtsSchedulerData *esdp, Process *p, int calls) /* have to re-read state after taking lock */ state = erts_atomic32_read_nob(&p->state); - if (p->pending_suspenders) - handle_pending_suspend(p, (ERTS_PROC_LOCK_MAIN - | ERTS_PROC_LOCK_TRACE - | ERTS_PROC_LOCK_STATUS)); - esdp->reductions += reds; { @@ -10195,8 +9666,9 @@ Process *erts_schedule(ErtsSchedulerData *esdp, Process *p, int calls) if (is_normal_sched) { if (state & ERTS_PSFLG_RUNNING_SYS) { if (state & (ERTS_PSFLG_SIG_Q|ERTS_PSFLG_SIG_IN_Q)) { - int local_only = !!(p->flags & F_LOCAL_SIGS_ONLY); - if (!local_only || (state & ERTS_PSFLG_SIG_Q)) { + int local_only = (!!(p->flags & F_LOCAL_SIGS_ONLY) + & !(state & ERTS_PSFLG_SUSPENDED)); + if (!local_only | !!(state & ERTS_PSFLG_SIG_Q)) { int sig_reds; /* * If we have dirty work scheduled we allow @@ -10282,7 +9754,17 @@ Process *erts_schedule(ErtsSchedulerData *esdp, Process *p, int calls) } p->fcalls = reds; - + if (reds != context_reds) { + actual_reds = context_reds - reds - esdp->virtual_reds; + ASSERT(actual_reds >= 0); + esdp->virtual_reds = 0; + p->reds += actual_reds; + ERTS_PROC_REDUCTIONS_EXECUTED(esdp, rq, + (int) ERTS_PSFLGS_GET_USR_PRIO(state), + reds, + actual_reds); + } + ERTS_CHK_HAVE_ONLY_MAIN_PROC_LOCK(p); ASSERT(erts_proc_read_refc(p) > 0); @@ -10332,6 +9814,14 @@ Process *erts_schedule(ErtsSchedulerData *esdp, Process *p, int calls) #endif return p; + + sched_out_proc: + actual_reds = context_reds; + actual_reds -= reds; + actual_reds -= esdp->virtual_reds; + reds = actual_reds; + goto internal_sched_out_proc; + } } @@ -11844,7 +11334,6 @@ erl_create_process(Process* parent, /* Parent of process (default group leader). #ifdef HIPE hipe_init_process(&p->hipe); - hipe_init_process_smp(&p->hipe_smp); #endif p->heap = (Eterm *) ERTS_HEAP_ALLOC(ERTS_ALC_T_HEAP, sizeof(Eterm)*sz); p->old_hend = p->old_htop = p->old_heap = NULL; @@ -11893,7 +11382,6 @@ erl_create_process(Process* parent, /* Parent of process (default group leader). ERTS_P_LINKS(p) = NULL; ERTS_P_MONITORS(p) = NULL; ERTS_P_LT_MONITORS(p) = NULL; - p->suspend_monitors = NULL; ASSERT(is_pid(parent->group_leader)); @@ -11950,8 +11438,6 @@ erl_create_process(Process* parent, /* Parent of process (default group leader). p->trace_msg_q = NULL; p->scheduler_data = NULL; - p->suspendee = NIL; - p->pending_suspenders = NULL; #if !defined(NO_FPE_SIGNALS) || defined(HIPE) p->fp_exception = 0; @@ -12118,7 +11604,6 @@ void erts_init_empty_process(Process *p) ERTS_P_MONITORS(p) = NULL; ERTS_P_LT_MONITORS(p) = NULL; ERTS_P_LINKS(p) = NULL; /* List of links */ - p->suspend_monitors = NULL; p->sig_qs.first = NULL; p->sig_qs.last = &p->sig_qs.first; p->sig_qs.cont = NULL; @@ -12169,7 +11654,6 @@ void erts_init_empty_process(Process *p) #ifdef HIPE hipe_init_process(&p->hipe); - hipe_init_process_smp(&p->hipe_smp); #endif INIT_HOLE_CHECK(p); @@ -12182,8 +11666,6 @@ void erts_init_empty_process(Process *p) erts_atomic32_init_nob(&p->state, (erts_aint32_t) PRIORITY_NORMAL); p->scheduler_data = NULL; - p->suspendee = NIL; - p->pending_suspenders = NULL; erts_proc_lock_init(p); erts_proc_unlock(p, ERTS_PROC_LOCKS_ALL); erts_init_runq_proc(p, ERTS_RUNQ_IX(0), 0); @@ -12221,7 +11703,6 @@ erts_debug_verify_clean_empty_process(Process* p) ASSERT(ERTS_P_MONITORS(p) == NULL); ASSERT(ERTS_P_LT_MONITORS(p) == NULL); ASSERT(ERTS_P_LINKS(p) == NULL); - ASSERT(p->suspend_monitors == NULL); ASSERT(p->sig_qs.first == NULL); ASSERT(p->sig_qs.len == 0); ASSERT(p->bif_timers == NULL); @@ -12235,8 +11716,6 @@ erts_debug_verify_clean_empty_process(Process* p) ASSERT(p->sig_inq.first == NULL); ASSERT(p->sig_inq.len == 0); - ASSERT(p->suspendee == NIL); - ASSERT(p->pending_suspenders == NULL); /* Thing that erts_cleanup_empty_process() cleans up */ @@ -12342,8 +11821,6 @@ delete_process(Process* p) erts_cleanup_messages(p->sig_qs.cont); p->sig_qs.cont = NULL; - ASSERT(!p->suspend_monitors); - p->fvalue = NIL; } @@ -12423,6 +11900,7 @@ erts_proc_exit_handle_monitor(ErtsMonitor *mon, void *vctxt) if (erts_monitor_is_target(mon)) { /* We are being watched... */ switch (mon->type) { + case ERTS_MON_TYPE_SUSPEND: case ERTS_MON_TYPE_PROC: erts_proc_sig_send_monitor_down(mon, reason); mon = NULL; @@ -12494,6 +11972,7 @@ erts_proc_exit_handle_monitor(ErtsMonitor *mon, void *vctxt) else { /* Origin monitor */ /* We are watching someone else... */ switch (mon->type) { + case ERTS_MON_TYPE_SUSPEND: case ERTS_MON_TYPE_PROC: erts_proc_sig_send_demonitor(mon); mon = NULL; @@ -12646,21 +12125,6 @@ erts_proc_exit_handle_link(ErtsLink *lnk, void *vctxt) erts_link_release(lnk); } -static void -resume_suspend_monitor(ErtsMonitor *mon, void *vc_p) -{ - ErtsMonitorSuspend *smon = erts_monitor_suspend(mon); - Process *suspendee = erts_pid2proc((Process *) vc_p, ERTS_PROC_LOCK_MAIN, - smon->mon.other.item, ERTS_PROC_LOCK_STATUS); - if (suspendee) { - ASSERT(suspendee != vc_p); - if (smon->active) - resume_process(suspendee, ERTS_PROC_LOCK_STATUS); - erts_proc_unlock(suspendee, ERTS_PROC_LOCK_STATUS); - } - erts_monitor_suspend_destroy(smon); -} - /* this function fishishes a process and propagates exit messages - called by process_main when a process dies */ void @@ -12692,8 +12156,6 @@ erts_do_exit_process(Process* p, Eterm reason) set_self_exiting(p, reason, NULL, NULL, NULL); - cancel_suspend_of_suspendee(p, ERTS_PROC_LOCKS_ALL); - if (IS_TRACED(p)) { if (IS_TRACED_FL(p, F_TRACE_CALLS)) erts_schedule_time_break(p, ERTS_BP_CALL_TIME_SCHEDULE_EXITING); @@ -12826,11 +12288,6 @@ erts_continue_exit_process(Process *p) p->flags &= ~F_USING_DDLL; } - if (p->suspend_monitors) - erts_monitor_tree_foreach_delete(&p->suspend_monitors, - resume_suspend_monitor, - p); - /* * The registered name *should* be the last "erlang resource" to * cleanup. @@ -13038,7 +12495,13 @@ erts_try_lock_sig_free_proc(Eterm pid, ErtsProcLocks locks, erts_aint32_t *statep) { Process *rp = erts_proc_lookup_raw(pid); + erts_aint32_t fail_state = ERTS_PSFLG_SIG_IN_Q|ERTS_PSFLG_SIG_Q; erts_aint32_t state; + ErtsProcLocks tmp_locks = ERTS_PROC_LOCK_MAIN|ERTS_PROC_LOCK_MSGQ; + + tmp_locks |= locks; + if (statep) + fail_state |= *statep; if (!rp) { if (statep) @@ -13055,28 +12518,28 @@ erts_try_lock_sig_free_proc(Eterm pid, ErtsProcLocks locks, if (state & ERTS_PSFLG_FREE) return NULL; - if (state & (ERTS_PSFLG_SIG_IN_Q|ERTS_PSFLG_SIG_Q)) + if (state & fail_state) return ERTS_PROC_LOCK_BUSY; - if (!locks) - return rp; - - if (erts_proc_trylock(rp, locks) == EBUSY) + if (erts_proc_trylock(rp, tmp_locks) == EBUSY) return ERTS_PROC_LOCK_BUSY; state = erts_atomic32_read_nob(&rp->state); if (statep) *statep = state; - if (state & ERTS_PSFLG_FREE) { - erts_proc_unlock(rp, locks); - return NULL; + if ((state & fail_state) + || rp->sig_inq.first + || rp->sig_qs.cont) { + erts_proc_unlock(rp, tmp_locks); + if (state & ERTS_PSFLG_FREE) + return NULL; + else + return ERTS_PROC_LOCK_BUSY; } - if (state & (ERTS_PSFLG_SIG_IN_Q|ERTS_PSFLG_SIG_Q)) { - erts_proc_unlock(rp, locks); - return ERTS_PROC_LOCK_BUSY; - } + if (tmp_locks != locks) + erts_proc_unlock(rp, tmp_locks & ~locks); return rp; } diff --git a/erts/emulator/beam/erl_process.h b/erts/emulator/beam/erl_process.h index b66272194c..a60e117bab 100644 --- a/erts/emulator/beam/erl_process.h +++ b/erts/emulator/beam/erl_process.h @@ -805,14 +805,15 @@ erts_reset_max_len(ErtsRunQueue *rq, ErtsRunQueueInfo *rqi) #define ERTS_PSD_ETS_OWNED_TABLES 6 #define ERTS_PSD_ETS_FIXED_TABLES 7 #define ERTS_PSD_DIST_ENTRY 8 -#define ERTS_PSD_SUSPENDED_SAVED_CALLS_BUF 9 /* keep last... */ +#define ERTS_PSD_PENDING_SUSPEND 9 +#define ERTS_PSD_SUSPENDED_SAVED_CALLS_BUF 10 /* keep last... */ -#define ERTS_PSD_SIZE 10 +#define ERTS_PSD_SIZE 11 #if !defined(HIPE) # undef ERTS_PSD_SUSPENDED_SAVED_CALLS_BUF # undef ERTS_PSD_SIZE -# define ERTS_PSD_SIZE 9 +# define ERTS_PSD_SIZE 10 #endif typedef struct { @@ -849,6 +850,9 @@ typedef struct { #define ERTS_PSD_DIST_ENTRY_GET_LOCKS ERTS_PROC_LOCK_MAIN #define ERTS_PSD_DIST_ENTRY_SET_LOCKS ERTS_PROC_LOCK_MAIN +#define ERTS_PSD_PENDING_SUSPEND_GET_LOCKS ERTS_PROC_LOCK_MAIN +#define ERTS_PSD_PENDING_SUSPEND_SET_LOCKS ERTS_PROC_LOCK_MAIN + typedef struct { ErtsProcLocks get_locks; ErtsProcLocks set_locks; @@ -884,20 +888,6 @@ typedef struct { typedef struct ErtsProcSysTask_ ErtsProcSysTask; typedef struct ErtsProcSysTaskQs_ ErtsProcSysTaskQs; - -typedef struct ErtsPendingSuspend_ ErtsPendingSuspend; -struct ErtsPendingSuspend_ { - ErtsPendingSuspend *next; - ErtsPendingSuspend *end; - Eterm pid; - void (*handle_func)(Process *suspendee, - ErtsProcLocks suspendee_locks, - int suspendee_alive, - Eterm pid); -}; - - - /* Defines to ease the change of memory architecture */ # define HEAP_START(p) (p)->heap # define HEAP_TOP(p) (p)->htop @@ -992,9 +982,6 @@ struct process { Process *next; /* Pointer to next process in run queue */ - ErtsMonitor *suspend_monitors; /* Processes suspended by this process via - erlang:suspend_process/1 */ - ErtsSignalPrivQueues sig_qs; /* Signal queues */ ErtsBifTimers *bif_timers; /* Bif timers aiming at this process */ @@ -1058,12 +1045,7 @@ struct process { ErlTraceMessageQueue *trace_msg_q; erts_proc_lock_t lock; ErtsSchedulerData *scheduler_data; - Eterm suspendee; - ErtsPendingSuspend *pending_suspenders; erts_atomic_t run_queue; -#ifdef HIPE - struct hipe_process_state_smp hipe_smp; -#endif #ifdef CHECK_FOR_HOLES Eterm* last_htop; /* No need to scan the heap below this point. */ @@ -1380,7 +1362,7 @@ extern int erts_system_profile_ts_type; #define F_DISTRIBUTION (1 << 6) /* Process used in distribution */ #define F_USING_DDLL (1 << 7) /* Process has used the DDLL interface */ #define F_HAVE_BLCKD_MSCHED (1 << 8) /* Process has blocked multi-scheduling */ -#define F_P2PNR_RESCHED (1 << 9) /* Process has been rescheduled via erts_pid2proc_not_running() */ +#define F_UNUSED (1 << 9) #define F_FORCE_GC (1 << 10) /* Force gc at process in-scheduling */ #define F_DISABLE_GC (1 << 11) /* Disable GC (see below) */ #define F_OFF_HEAP_MSGQ (1 << 12) /* Off heap msg queue */ @@ -1397,10 +1379,12 @@ extern int erts_system_profile_ts_type; #define F_DIRTY_MAJOR_GC (1 << 23) /* Dirty major GC scheduled */ #define F_DIRTY_MINOR_GC (1 << 24) /* Dirty minor GC scheduled */ #define F_HIBERNATED (1 << 25) /* Hibernated */ -#define F_LOCAL_SIGS_ONLY (1 << 26) +#define F_LOCAL_SIGS_ONLY (1 << 26) /* Handle privq sigs only */ #define F_TRAP_EXIT (1 << 27) /* Trapping exit */ -#define F_DEFERRED_SAVED_LAST (1 << 28) -#define F_DELAYED_PSIGQS_LEN (1 << 29) +#define F_DEFERRED_SAVED_LAST (1 << 28) /* Deferred sig_qs.saved_last */ +#define F_DELAYED_PSIGQS_LEN (1 << 29) /* Delayed update of sig_qs.len */ +#define F_HIPE_RECV_LOCKED (1 << 30) /* HiPE message queue locked */ +#define F_HIPE_RECV_YIELD (1 << 31) /* HiPE receive yield */ /* * F_DISABLE_GC and F_DELAY_GC are similar. Both will prevent @@ -2048,6 +2032,11 @@ erts_psd_set(Process *p, int ix, void *data) #define ERTS_PROC_SET_DIST_ENTRY(P, DE) \ ((DistEntry *) erts_psd_set((P), ERTS_PSD_DIST_ENTRY, (void *) (DE))) +#define ERTS_PROC_GET_PENDING_SUSPEND(P) \ + ((void *) erts_psd_get((P), ERTS_PSD_PENDING_SUSPEND)) +#define ERTS_PROC_SET_PENDING_SUSPEND(P, PS) \ + ((void *) erts_psd_set((P), ERTS_PSD_PENDING_SUSPEND, (void *) (PS))) + #ifdef HIPE #define ERTS_PROC_GET_SUSPENDED_SAVED_CALLS_BUF(P) \ ((struct saved_calls *) erts_psd_get((P), ERTS_PSD_SUSPENDED_SAVED_CALLS_BUF)) @@ -2612,16 +2601,6 @@ Process *erts_try_lock_sig_free_proc(Eterm pid, ErtsProcLocks locks, erts_aint32_t *statep); -Process *erts_pid2proc_not_running(Process *, - ErtsProcLocks, - Eterm, - ErtsProcLocks); -Process *erts_pid2proc_nropt(Process *c_p, - ErtsProcLocks c_p_locks, - Eterm pid, - ErtsProcLocks pid_locks); -extern int erts_disable_proc_not_running_opt; - #ifdef DEBUG #define ERTS_ASSERT_IS_NOT_EXITING(P) \ do { ASSERT(!ERTS_PROC_IS_EXITING((P))); } while (0) diff --git a/erts/emulator/beam/erl_trace.c b/erts/emulator/beam/erl_trace.c index 065a560b52..f074dd8bdb 100644 --- a/erts/emulator/beam/erl_trace.c +++ b/erts/emulator/beam/erl_trace.c @@ -2629,6 +2629,38 @@ erts_tracer_to_term(Process *p, ErtsTracer tracer) } } +Eterm +erts_build_tracer_to_term(Eterm **hpp, ErlOffHeap *ohp, Uint *szp, ErtsTracer tracer) +{ + Eterm res; + Eterm state; + Uint sz; + + if (ERTS_TRACER_IS_NIL(tracer)) + return am_false; + + state = ERTS_TRACER_STATE(tracer); + sz = is_immed(state) ? 0 : size_object(state); + + if (szp) + *szp += sz; + + if (hpp) + res = is_immed(state) ? state : copy_struct(state, sz, hpp, ohp); + else + res = THE_NON_VALUE; + + if (ERTS_TRACER_MODULE(tracer) != am_erl_tracer) { + if (szp) + *szp += 3; + if (hpp) { + res = TUPLE2(*hpp, ERTS_TRACER_MODULE(tracer), res); + *hpp += 3; + } + } + + return res; +} static ERTS_INLINE int send_to_tracer_nif_raw(Process *c_p, Process *tracee, diff --git a/erts/emulator/beam/erl_trace.h b/erts/emulator/beam/erl_trace.h index dbf7ebd2a1..3228e19809 100644 --- a/erts/emulator/beam/erl_trace.h +++ b/erts/emulator/beam/erl_trace.h @@ -198,6 +198,8 @@ int erts_is_tracer_proc_enabled_send(Process* c_p, ErtsProcLocks c_p_locks, ErtsPTabElementCommon *t_p); int erts_is_tracer_enabled(const ErtsTracer tracer, ErtsPTabElementCommon *t_p); Eterm erts_tracer_to_term(Process *p, ErtsTracer tracer); +Eterm erts_build_tracer_to_term(Eterm **hpp, ErlOffHeap *ohp, Uint *szp, ErtsTracer tracer); + ErtsTracer erts_term_to_tracer(Eterm prefix, Eterm term); void erts_tracer_replace(ErtsPTabElementCommon *t_p, const ErtsTracer new_tracer); |