aboutsummaryrefslogtreecommitdiffstats
path: root/erts/emulator
diff options
context:
space:
mode:
Diffstat (limited to 'erts/emulator')
-rw-r--r--erts/emulator/beam/atom.names3
-rw-r--r--erts/emulator/beam/beam_bif_load.c32
-rw-r--r--erts/emulator/beam/beam_emu.c3
-rw-r--r--erts/emulator/beam/bif.c181
-rw-r--r--erts/emulator/beam/bif.h37
-rw-r--r--erts/emulator/beam/bif.tab9
-rw-r--r--erts/emulator/beam/dist.c406
-rw-r--r--erts/emulator/beam/erl_alloc.types3
-rw-r--r--erts/emulator/beam/erl_bif_info.c202
-rw-r--r--erts/emulator/beam/erl_bif_trace.c190
-rw-r--r--erts/emulator/beam/erl_gc.c15
-rw-r--r--erts/emulator/beam/erl_monitor_link.c97
-rw-r--r--erts/emulator/beam/erl_monitor_link.h57
-rw-r--r--erts/emulator/beam/erl_proc_sig_queue.c469
-rw-r--r--erts/emulator/beam/erl_proc_sig_queue.h116
-rw-r--r--erts/emulator/beam/erl_process.c897
-rw-r--r--erts/emulator/beam/erl_process.h57
-rw-r--r--erts/emulator/beam/erl_trace.c32
-rw-r--r--erts/emulator/beam/erl_trace.h2
-rw-r--r--erts/emulator/hipe/hipe_mode_switch.c13
-rw-r--r--erts/emulator/hipe/hipe_native_bif.c23
-rw-r--r--erts/emulator/hipe/hipe_process.h9
-rw-r--r--erts/emulator/test/trace_SUITE.erl62
-rw-r--r--erts/emulator/test/tracer_SUITE.erl6
24 files changed, 1614 insertions, 1307 deletions
diff --git a/erts/emulator/beam/atom.names b/erts/emulator/beam/atom.names
index fba0611042..45b7540aeb 100644
--- a/erts/emulator/beam/atom.names
+++ b/erts/emulator/beam/atom.names
@@ -142,6 +142,7 @@ atom bsr
atom bsr_anycrlf
atom bsr_unicode
atom build_type
+atom busy
atom busy_dist_port
atom busy_port
atom call
@@ -252,6 +253,7 @@ atom exception_from
atom exception_trace
atom exclusive
atom exit_status
+atom exited
atom existing
atom existing_processes
atom existing_ports
@@ -445,6 +447,7 @@ atom no_float
atom no_integer
atom no_network
atom no_start_optimize
+atom not_suspended
atom not
atom not_a_list
atom not_loaded
diff --git a/erts/emulator/beam/beam_bif_load.c b/erts/emulator/beam/beam_bif_load.c
index d9312f4df8..a0dbd9ec7b 100644
--- a/erts/emulator/beam/beam_bif_load.c
+++ b/erts/emulator/beam/beam_bif_load.c
@@ -603,8 +603,9 @@ badarg:
BIF_RETTYPE erts_internal_check_dirty_process_code_2(BIF_ALIST_2)
{
+ erts_aint32_t state;
Process *rp;
- int reds = 0;
+ int dirty, busy, reds = 0;
Eterm res;
if (BIF_P != erts_dirty_process_signal_handler
@@ -618,20 +619,29 @@ BIF_RETTYPE erts_internal_check_dirty_process_code_2(BIF_ALIST_2)
if (is_not_atom(BIF_ARG_2))
BIF_ERROR(BIF_P, BADARG);
- rp = erts_pid2proc_not_running(BIF_P, ERTS_PROC_LOCK_MAIN,
- BIF_ARG_1, ERTS_PROC_LOCK_MAIN);
- if (rp == ERTS_PROC_LOCK_BUSY)
- ERTS_BIF_YIELD2(bif_export[BIF_erts_internal_check_dirty_process_code_2],
- BIF_P, BIF_ARG_1, BIF_ARG_2);
+ if (BIF_ARG_1 == BIF_P->common.id)
+ BIF_RET(am_normal);
+
+ rp = erts_proc_lookup_raw(BIF_ARG_1);
if (!rp)
- BIF_RET(am_false);
-
+ BIF_RET(am_false);
+
+ state = erts_atomic32_read_nob(&rp->state);
+ dirty = (state & (ERTS_PSFLG_DIRTY_RUNNING
+ | ERTS_PSFLG_DIRTY_RUNNING_SYS));
+ if (!dirty)
+ BIF_RET(am_normal);
+
+ busy = erts_proc_trylock(rp, ERTS_PROC_LOCK_MAIN) == EBUSY;
+
+ if (busy)
+ BIF_RET(am_busy);
+
res = erts_check_process_code(rp, BIF_ARG_2, &reds, BIF_P->fcalls);
- if (BIF_P != rp)
- erts_proc_unlock(rp, ERTS_PROC_LOCK_MAIN);
+ erts_proc_unlock(rp, ERTS_PROC_LOCK_MAIN);
- ASSERT(is_value(res));
+ ASSERT(res == am_true || res == am_false);
BIF_RET2(res, reds);
}
diff --git a/erts/emulator/beam/beam_emu.c b/erts/emulator/beam/beam_emu.c
index ee287243a4..ab5920a67e 100644
--- a/erts/emulator/beam/beam_emu.c
+++ b/erts/emulator/beam/beam_emu.c
@@ -1166,6 +1166,9 @@ void erts_dirty_process_main(ErtsSchedulerData *esdp)
reds_used = treds > INT_MAX ? INT_MAX : (int) treds;
}
+ if (c_p && ERTS_PROC_GET_PENDING_SUSPEND(c_p))
+ erts_proc_sig_handle_pending_suspend(c_p);
+
PROCESS_MAIN_CHK_LOCKS(c_p);
ERTS_UNREQ_PROC_MAIN_LOCK(c_p);
ERTS_VERIFY_UNUSED_TEMP_ALLOC(c_p);
diff --git a/erts/emulator/beam/bif.c b/erts/emulator/beam/bif.c
index 79244b8544..97e1ee1286 100644
--- a/erts/emulator/beam/bif.c
+++ b/erts/emulator/beam/bif.c
@@ -1364,13 +1364,14 @@ BIF_RETTYPE exit_signal_2(BIF_ALIST_2)
/* Handle flags common to both process_flag_2 and process_flag_3. */
-static BIF_RETTYPE process_flag_aux(Process *BIF_P,
- Process *rp,
- Eterm flag,
- Eterm val)
+static Eterm process_flag_aux(Process *c_p, int *redsp, Eterm flag, Eterm val)
{
Eterm old_value = NIL; /* shut up warning about use before set */
Sint i;
+
+ if (redsp)
+ *redsp = 1;
+
if (flag == am_save_calls) {
struct saved_calls *scb;
if (!is_small(val))
@@ -1390,30 +1391,89 @@ static BIF_RETTYPE process_flag_aux(Process *BIF_P,
}
#ifdef HIPE
- if (rp->flags & F_HIPE_MODE) {
- ASSERT(!ERTS_PROC_GET_SAVED_CALLS_BUF(rp));
- scb = ERTS_PROC_SET_SUSPENDED_SAVED_CALLS_BUF(rp, scb);
+ if (c_p->flags & F_HIPE_MODE) {
+ ASSERT(!ERTS_PROC_GET_SAVED_CALLS_BUF(c_p));
+ scb = ERTS_PROC_SET_SUSPENDED_SAVED_CALLS_BUF(c_p, scb);
}
else
#endif
{
#ifdef HIPE
- ASSERT(!ERTS_PROC_GET_SUSPENDED_SAVED_CALLS_BUF(rp));
+ ASSERT(!ERTS_PROC_GET_SUSPENDED_SAVED_CALLS_BUF(c_p));
#endif
- scb = ERTS_PROC_SET_SAVED_CALLS_BUF(rp, scb);
- if (rp == BIF_P && ((scb && i == 0) || (!scb && i != 0))) {
- /* Adjust fcalls to match save calls setting... */
- if (i == 0)
- BIF_P->fcalls += CONTEXT_REDS; /* disabled it */
- else
- BIF_P->fcalls -= CONTEXT_REDS; /* enabled it */
-
- /*
- * Make sure we reschedule immediately so the
- * change take effect at once.
- */
- ERTS_VBUMP_ALL_REDS(BIF_P);
- }
+ scb = ERTS_PROC_SET_SAVED_CALLS_BUF(c_p, scb);
+
+ if (((scb && i == 0) || (!scb && i != 0))) {
+
+ /*
+ * Make sure we reschedule immediately so the
+ * change take effect at once.
+ */
+ if (!redsp) {
+ /* Executed via BIF call.. */
+ via_bif:
+
+ /* Adjust fcalls to match save calls setting... */
+ if (i == 0)
+ c_p->fcalls += CONTEXT_REDS; /* disabled it */
+ else
+ c_p->fcalls -= CONTEXT_REDS; /* enabled it */
+
+ ERTS_VBUMP_ALL_REDS(c_p);
+ }
+ else {
+ erts_aint32_t state;
+ /*
+ * Executed via signal handler. Try to figure
+ * out in what context we are executing...
+ */
+
+ state = erts_atomic32_read_nob(&c_p->state);
+ if (state & (ERTS_PSFLG_RUNNING_SYS
+ | ERTS_PSFLG_DIRTY_RUNNING_SYS
+ | ERTS_PSFLG_DIRTY_RUNNING)) {
+ /*
+ * We are either processing signals before
+ * being executed or executing dirty. That
+ * is, no need to adjust anything...
+ */
+ *redsp = 1;
+ }
+ else {
+ ErtsSchedulerData *esdp;
+ ASSERT(state & ERTS_PSFLG_RUNNING);
+
+ /*
+ * F_DELAY_GC is currently only set when
+ * we handle signals in state running via
+ * receive helper...
+ */
+
+ if (!(c_p->flags & F_DELAY_GC)) {
+ *redsp = 1;
+ goto via_bif;
+ }
+
+ /*
+ * Executing via receive helper...
+ *
+ * We utilize the virtual reds counter
+ * in order to get correct calculation
+ * of reductions consumed when scheduling
+ * out the process...
+ */
+
+ esdp = erts_get_scheduler_data();
+
+ if (i == 0)
+ esdp->virtual_reds += CONTEXT_REDS; /* disabled it */
+ else
+ esdp->virtual_reds -= CONTEXT_REDS; /* enabled it */
+
+ *redsp = -1;
+ }
+ }
+ }
}
if (!scb)
@@ -1423,11 +1483,12 @@ static BIF_RETTYPE process_flag_aux(Process *BIF_P,
erts_free(ERTS_ALC_T_CALLS_BUF, (void *) scb);
}
- BIF_RET(old_value);
+ ASSERT(is_immed(old_value));
+ return old_value;
}
error:
- BIF_ERROR(BIF_P, BADARG);
+ return am_badarg;
}
BIF_RETTYPE process_flag_2(BIF_ALIST_2)
@@ -1596,29 +1657,73 @@ BIF_RETTYPE process_flag_2(BIF_ALIST_2)
/* Fall through and try process_flag_aux() ... */
}
- BIF_RET(process_flag_aux(BIF_P, BIF_P, BIF_ARG_1, BIF_ARG_2));
+ old_value = process_flag_aux(BIF_P, NULL, BIF_ARG_1, BIF_ARG_2);
+ if (old_value != am_badarg)
+ BIF_RET(old_value);
error:
BIF_ERROR(BIF_P, BADARG);
}
-BIF_RETTYPE process_flag_3(BIF_ALIST_3)
+typedef struct {
+ Eterm flag;
+ Eterm value;
+ ErlOffHeap oh;
+ Eterm heap[1];
+} ErtsProcessFlag3Args;
+
+static Eterm
+exec_process_flag_3(Process *c_p, void *arg, int *redsp, ErlHeapFragment **bpp)
{
- Process *rp;
- Eterm res;
+ ErtsProcessFlag3Args *pf3a = arg;
+ Eterm res;
+
+ if (ERTS_PROC_IS_EXITING(c_p))
+ res = am_badarg;
+ else
+ res = process_flag_aux(c_p, redsp, pf3a->flag, pf3a->value);
+ erts_cleanup_offheap(&pf3a->oh);
+ erts_free(ERTS_ALC_T_PF3_ARGS, arg);
+ return res;
+}
+
+
+BIF_RETTYPE erts_internal_process_flag_3(BIF_ALIST_3)
+{
+ Eterm res, *hp;
+ ErlOffHeap *ohp;
+ ErtsProcessFlag3Args *pf3a;
+ Uint flag_sz, value_sz;
+
+ if (BIF_P->common.id == BIF_ARG_1) {
+ res = process_flag_aux(BIF_P, NULL, BIF_ARG_2, BIF_ARG_3);
+ BIF_RET(res);
+ }
+
+ if (is_not_internal_pid(BIF_ARG_1))
+ BIF_RET(am_badarg);
+
+ flag_sz = is_immed(BIF_ARG_2) ? 0 : size_object(BIF_ARG_2);
+ value_sz = is_immed(BIF_ARG_3) ? 0 : size_object(BIF_ARG_3);
+
+ pf3a = erts_alloc(ERTS_ALC_T_PF3_ARGS,
+ sizeof(ErtsProcessFlag3Args)
+ + sizeof(Eterm)*(flag_sz+value_sz-1));
+
+ ohp = &pf3a->oh;
+ ERTS_INIT_OFF_HEAP(&pf3a->oh);
- rp = erts_pid2proc_not_running(BIF_P, ERTS_PROC_LOCK_MAIN,
- BIF_ARG_1, ERTS_PROC_LOCK_MAIN);
- if (rp == ERTS_PROC_LOCK_BUSY)
- ERTS_BIF_YIELD3(bif_export[BIF_process_flag_3], BIF_P,
- BIF_ARG_1, BIF_ARG_2, BIF_ARG_3);
+ hp = &pf3a->heap[0];
- if (!rp)
- BIF_ERROR(BIF_P, BADARG);
+ pf3a->flag = copy_struct(BIF_ARG_2, flag_sz, &hp, ohp);
+ pf3a->value = copy_struct(BIF_ARG_3, value_sz, &hp, ohp);
- res = process_flag_aux(BIF_P, rp, BIF_ARG_2, BIF_ARG_3);
+ res = erts_proc_sig_send_rpc_request(BIF_P, BIF_ARG_1,
+ !0,
+ exec_process_flag_3,
+ (void *) pf3a);
- if (rp != BIF_P)
- erts_proc_unlock(rp, ERTS_PROC_LOCK_MAIN);
+ if (is_non_value(res))
+ BIF_RET(am_badarg);
return res;
}
diff --git a/erts/emulator/beam/bif.h b/erts/emulator/beam/bif.h
index a47339253e..cf9f61c0b8 100644
--- a/erts/emulator/beam/bif.h
+++ b/erts/emulator/beam/bif.h
@@ -295,6 +295,19 @@ do { \
(Ret) = THE_NON_VALUE; \
} while (0)
+#define ERTS_BIF_PREP_TRAP4(Ret, Trap, Proc, A0, A1, A2, A3) \
+do { \
+ Eterm* reg = erts_proc_sched_data((Proc))->x_reg_array; \
+ (Proc)->arity = 4; \
+ reg[0] = (Eterm) (A0); \
+ reg[1] = (Eterm) (A1); \
+ reg[2] = (Eterm) (A2); \
+ reg[3] = (Eterm) (A3); \
+ (Proc)->i = (BeamInstr*) ((Trap)->addressv[erts_active_code_ix()]); \
+ (Proc)->freason = TRAP; \
+ (Ret) = THE_NON_VALUE; \
+} while (0)
+
#define ERTS_BIF_PREP_TRAP3_NO_RET(Trap, Proc, A0, A1, A2)\
do { \
Eterm* reg = erts_proc_sched_data((Proc))->x_reg_array; \
@@ -343,6 +356,18 @@ do { \
return THE_NON_VALUE; \
} while(0)
+#define BIF_TRAP4(Trap_, p, A0, A1, A2, A3) do { \
+ Eterm* reg = erts_proc_sched_data((p))->x_reg_array; \
+ (p)->arity = 4; \
+ reg[0] = (A0); \
+ reg[1] = (A1); \
+ reg[2] = (A2); \
+ reg[3] = (A3); \
+ (p)->i = (BeamInstr*) ((Trap_)->addressv[erts_active_code_ix()]); \
+ (p)->freason = TRAP; \
+ return THE_NON_VALUE; \
+ } while(0)
+
#define BIF_TRAP_CODE_PTR_0(p, Code_) do { \
(p)->arity = 0; \
(p)->i = (BeamInstr*) (Code_); \
@@ -401,6 +426,12 @@ do { \
ERTS_BIF_PREP_TRAP3(RET, (TRP), (P), (A0), (A1), (A2)); \
} while (0)
+#define ERTS_BIF_PREP_YIELD4(RET, TRP, P, A0, A1, A2, A3) \
+do { \
+ ERTS_VBUMP_ALL_REDS((P)); \
+ ERTS_BIF_PREP_TRAP4(RET, (TRP), (P), (A0), (A1), (A2), (A3)); \
+} while (0)
+
#define ERTS_BIF_YIELD0(TRP, P) \
do { \
ERTS_VBUMP_ALL_REDS((P)); \
@@ -425,6 +456,12 @@ do { \
BIF_TRAP3((TRP), (P), (A0), (A1), (A2)); \
} while (0)
+#define ERTS_BIF_YIELD4(TRP, P, A0, A1, A2, A3) \
+do { \
+ ERTS_VBUMP_ALL_REDS((P)); \
+ BIF_TRAP4((TRP), (P), (A0), (A1), (A2), (A3)); \
+} while (0)
+
#define ERTS_BIF_PREP_EXITED(RET, PROC) \
do { \
KILL_CATCHES((PROC)); \
diff --git a/erts/emulator/beam/bif.tab b/erts/emulator/beam/bif.tab
index 33738dc20b..21dda9ff03 100644
--- a/erts/emulator/beam/bif.tab
+++ b/erts/emulator/beam/bif.tab
@@ -125,7 +125,7 @@ bif erlang:pid_to_list/1
bif erlang:ports/0
bif erlang:pre_loaded/0
bif erlang:process_flag/2
-bif erlang:process_flag/3
+bif erts_internal:process_flag/3
bif erlang:process_info/1
bif erlang:process_info/2
bif erlang:processes/0
@@ -154,7 +154,6 @@ bif erlang:unregister/1
bif erlang:whereis/1
bif erlang:spawn_opt/1
bif erlang:setnode/2
-bif erlang:setnode/3
bif erlang:dist_get_stat/1
bif erlang:dist_ctrl_input_handler/2
bif erlang:dist_ctrl_put_data/2
@@ -191,6 +190,8 @@ bif erts_internal:scheduler_wall_time/1
bif erts_internal:dirty_process_handle_signals/1
+bif erts_internal:create_dist_channel/4
+
# inet_db support
bif erlang:port_set_data/2
bif erlang:port_get_data/1
@@ -204,9 +205,9 @@ bif erlang:seq_trace/2
bif erlang:seq_trace_info/1
bif erlang:seq_trace_print/1
bif erlang:seq_trace_print/2
-bif erlang:suspend_process/2
+bif erts_internal:suspend_process/2
bif erlang:resume_process/1
-bif erlang:process_display/2
+bif erts_internal:process_display/2
bif erlang:bump_reductions/1
diff --git a/erts/emulator/beam/dist.c b/erts/emulator/beam/dist.c
index 026f0a62d4..70474898b2 100644
--- a/erts/emulator/beam/dist.c
+++ b/erts/emulator/beam/dist.c
@@ -3138,60 +3138,60 @@ BIF_RETTYPE setnode_2(BIF_ALIST_2)
BIF_ERROR(BIF_P, BADARG);
}
-/**********************************************************************
- ** Allocate a dist entry, set node name install the connection handler
- ** setnode_3({name@host, Creation}, Cid, {Type, Version, Initial, IC, OC})
- ** Type = flag field, where the flags are specified in dist.h
- ** Version = distribution version, >= 1
- ** IC = in_cookie (ignored)
- ** OC = out_cookie (ignored)
- **
- ** Note that in distribution protocols above 1, the Initial parameter
- ** is always NIL and the cookies are always the atom '', cookies are not
- ** sent in the distribution messages but are only used in
- ** the handshake.
- **
- ***********************************************************************/
+/*
+ * erts_internal:create_dist_channel/4 is used by
+ * erlang:setnode/3.
+ */
+
+typedef struct {
+ DistEntry *dep;
+ Uint flags;
+ Uint version;
+} ErtsSetupConnDistCtrl;
+
+static void
+setup_connection_epiloge_rwunlock(Process *c_p, DistEntry *dep,
+ Eterm ctrlr, Uint flags,
+ Uint version);
-BIF_RETTYPE setnode_3(BIF_ALIST_3)
+static Eterm
+setup_connection_distctrl(Process *c_p, void *arg,
+ int *redsp, ErlHeapFragment **bpp);
+
+BIF_RETTYPE erts_internal_create_dist_channel_4(BIF_ALIST_4)
{
BIF_RETTYPE ret;
Uint flags;
- unsigned long version;
- Eterm ic, oc;
- Eterm *tp;
+ Uint version;
+ Eterm *hp, res_tag = THE_NON_VALUE, res = THE_NON_VALUE;
DistEntry *dep = NULL;
- ErtsProcLocks proc_unlock = 0;
- Process *proc;
+ int de_locked = 0;
Port *pp = NULL;
- Eterm notify_proc;
- erts_aint32_t qflgs;
/*
* Check and pick out arguments
*/
- if (!is_node_name_atom(BIF_ARG_1) ||
- !(is_internal_port(BIF_ARG_2)
- || is_internal_pid(BIF_ARG_2))
- || (erts_this_node->sysname == am_Noname)) {
- goto badarg;
- }
+ /* Node name... */
+ if (!is_node_name_atom(BIF_ARG_1))
+ goto badarg;
- if (!is_tuple(BIF_ARG_3))
- goto badarg;
- tp = tuple_val(BIF_ARG_3);
- if (*tp++ != make_arityval(4))
- goto badarg;
- if (!is_small(*tp))
- goto badarg;
- flags = unsigned_val(*tp++);
- if (!is_small(*tp) || (version = unsigned_val(*tp)) == 0)
- goto badarg;
- ic = *(++tp);
- oc = *(++tp);
- if (!is_atom(ic) || !is_atom(oc))
- goto badarg;
+ /* Distribution controller... */
+ if (!is_internal_port(BIF_ARG_2) && !is_internal_pid(BIF_ARG_2))
+ goto badarg;
+
+ /* Dist flags... */
+ if (!is_small(BIF_ARG_3))
+ goto badarg;
+ flags = unsigned_val(BIF_ARG_3);
+
+ /* Version... */
+ if (!is_small(BIF_ARG_4))
+ goto badarg;
+ version = unsigned_val(BIF_ARG_4);
+
+ if (version == 0)
+ goto badarg;
if (~flags & DFLAG_DIST_MANDATORY) {
erts_dsprintf_buf_t *dsbufp = erts_create_logger_dsbuf();
@@ -3222,74 +3222,79 @@ BIF_RETTYPE setnode_3(BIF_ALIST_3)
else if (!dep)
goto system_limit; /* Should never happen!!! */
+ erts_de_rlock(dep);
+ de_locked = -1;
+
+ if (dep->state == ERTS_DE_STATE_EXITING) {
+ /* Suspend on dist entry waiting for the exit to finish */
+ ErtsProcList *plp = erts_proclist_create(BIF_P);
+ plp->next = NULL;
+ erts_suspend(BIF_P, ERTS_PROC_LOCK_MAIN, NULL);
+ erts_mtx_lock(&dep->qlock);
+ erts_proclist_store_last(&dep->suspended, plp);
+ erts_mtx_unlock(&dep->qlock);
+ goto yield;
+ }
+
+ erts_de_runlock(dep);
+ de_locked = 0;
+
if (is_internal_pid(BIF_ARG_2)) {
if (BIF_P->common.id == BIF_ARG_2) {
- proc_unlock = 0;
- proc = BIF_P;
- }
- else {
- proc_unlock = ERTS_PROC_LOCK_MAIN;
- proc = erts_pid2proc_not_running(BIF_P, ERTS_PROC_LOCK_MAIN,
- BIF_ARG_2, proc_unlock);
- }
- erts_de_rwlock(dep);
-
- if (!proc)
- goto badarg;
- else if (proc == ERTS_PROC_LOCK_BUSY) {
- proc_unlock = 0;
- goto yield;
- }
+ ErtsSetupConnDistCtrl scdc;
- erts_proc_lock(proc, ERTS_PROC_LOCK_STATUS);
- proc_unlock |= ERTS_PROC_LOCK_STATUS;
+ scdc.dep = dep;
+ scdc.flags = flags;
+ scdc.version = version;
- if (ERTS_PROC_GET_DIST_ENTRY(proc)) {
- if (dep == ERTS_PROC_GET_DIST_ENTRY(proc)
- && (proc->flags & F_DISTRIBUTION)
- && dep->cid == BIF_ARG_2) {
- ERTS_BIF_PREP_RET(ret, erts_make_dhandle(BIF_P, dep));
- goto done;
- }
- goto badarg;
- }
+ res = setup_connection_distctrl(BIF_P, &scdc, NULL, NULL);
+ BUMP_REDS(BIF_P, 5);
+ dep = NULL;
- if (dep->state == ERTS_DE_STATE_EXITING) {
- /* Suspend on dist entry waiting for the exit to finish */
- ErtsProcList *plp = erts_proclist_create(BIF_P);
- plp->next = NULL;
- erts_suspend(BIF_P, ERTS_PROC_LOCK_MAIN, NULL);
- erts_mtx_lock(&dep->qlock);
- erts_proclist_store_last(&dep->suspended, plp);
- erts_mtx_unlock(&dep->qlock);
- goto yield;
- }
- if (dep->state != ERTS_DE_STATE_PENDING) {
- if (dep->state == ERTS_DE_STATE_IDLE)
- erts_set_dist_entry_pending(dep);
- else
+ if (res == am_badarg)
goto badarg;
+
+ ASSERT(is_internal_magic_ref(res));
+ res_tag = am_ok; /* Connection up */
}
+ else {
+ ErtsSetupConnDistCtrl *scdcp;
- if (is_not_nil(dep->cid))
- goto badarg;
+ scdcp = erts_alloc(ERTS_ALC_T_SETUP_CONN_ARG,
+ sizeof(ErtsSetupConnDistCtrl));
- proc->flags |= F_DISTRIBUTION;
- ERTS_PROC_SET_DIST_ENTRY(proc, dep);
+ scdcp->dep = dep;
+ scdcp->flags = flags;
+ scdcp->version = version;
- proc_unlock &= ~ERTS_PROC_LOCK_STATUS;
- erts_proc_unlock(proc, ERTS_PROC_LOCK_STATUS);
+ res = erts_proc_sig_send_rpc_request(BIF_P,
+ BIF_ARG_2,
+ !0,
+ setup_connection_distctrl,
+ (void *) scdcp);
+ if (is_non_value(res))
+ goto badarg;
- dep->send = NULL; /* Only for distr ports... */
+ dep = NULL;
+ ASSERT(is_internal_ordinary_ref(res));
+
+ res_tag = am_message; /* Caller need to wait for dhandle in message */
+ }
+ hp = HAlloc(BIF_P, 3);
}
else {
+ int new;
pp = erts_id2port_sflgs(BIF_ARG_2,
BIF_P,
ERTS_PROC_LOCK_MAIN,
ERTS_PORT_SFLGS_INVALID_LOOKUP);
erts_de_rwlock(dep);
+ de_locked = 1;
+
+ if (dep->state == ERTS_DE_STATE_EXITING)
+ goto badarg;
if (!pp || (erts_atomic32_read_nob(&pp->state)
& ERTS_PORT_SFLG_EXITING))
@@ -3298,65 +3303,108 @@ BIF_RETTYPE setnode_3(BIF_ALIST_3)
if ((pp->drv_ptr->flags & ERL_DRV_FLAG_SOFT_BUSY) == 0)
goto badarg;
- if (dep->cid == BIF_ARG_2 && pp->dist_entry == dep) {
- ERTS_BIF_PREP_RET(ret, erts_make_dhandle(BIF_P, dep));
- goto done; /* Already set */
- }
+ if (dep->cid == BIF_ARG_2 && pp->dist_entry == dep)
+ new = 0;
+ else {
+ if (dep->state != ERTS_DE_STATE_PENDING) {
+ if (dep->state == ERTS_DE_STATE_IDLE)
+ erts_set_dist_entry_pending(dep);
+ else
+ goto badarg;
+ }
- if (dep->state == ERTS_DE_STATE_EXITING) {
- /* Suspend on dist entry waiting for the exit to finish */
- ErtsProcList *plp = erts_proclist_create(BIF_P);
- plp->next = NULL;
- erts_suspend(BIF_P, ERTS_PROC_LOCK_MAIN, NULL);
- erts_mtx_lock(&dep->qlock);
- erts_proclist_store_last(&dep->suspended, plp);
- erts_mtx_unlock(&dep->qlock);
- goto yield;
- }
- if (dep->state != ERTS_DE_STATE_PENDING) {
- if (dep->state == ERTS_DE_STATE_IDLE)
- erts_set_dist_entry_pending(dep);
- else
+ if (pp->dist_entry || is_not_nil(dep->cid))
goto badarg;
- }
- if (pp->dist_entry || is_not_nil(dep->cid))
- goto badarg;
+ erts_atomic32_read_bor_nob(&pp->state, ERTS_PORT_SFLG_DISTRIBUTION);
- erts_atomic32_read_bor_nob(&pp->state, ERTS_PORT_SFLG_DISTRIBUTION);
+ pp->dist_entry = dep;
- pp->dist_entry = dep;
+ ASSERT(pp->drv_ptr->outputv || pp->drv_ptr->output);
- ASSERT(pp->drv_ptr->outputv || pp->drv_ptr->output);
+ dep->send = (pp->drv_ptr->outputv
+ ? dist_port_commandv
+ : dist_port_command);
+ ASSERT(dep->send);
- dep->send = (pp->drv_ptr->outputv
- ? dist_port_commandv
- : dist_port_command);
- ASSERT(dep->send);
+ /*
+ * Dist-ports do not use the "busy port message queue" functionality, but
+ * instead use "busy dist entry" functionality.
+ */
+ {
+ ErlDrvSizeT disable = ERL_DRV_BUSY_MSGQ_DISABLED;
+ erl_drv_busy_msgq_limits(ERTS_Port2ErlDrvPort(pp), &disable, NULL);
+ }
- /*
- * Dist-ports do not use the "busy port message queue" functionality, but
- * instead use "busy dist entry" functionality.
- */
- {
- ErlDrvSizeT disable = ERL_DRV_BUSY_MSGQ_DISABLED;
- erl_drv_busy_msgq_limits(ERTS_Port2ErlDrvPort(pp), &disable, NULL);
+ setup_connection_epiloge_rwunlock(BIF_P, dep, BIF_ARG_2, flags, version);
+ de_locked = 0;
+ new = !0;
}
+ hp = HAlloc(BIF_P, 3 + ERTS_MAGIC_REF_THING_SIZE);
+ res = erts_build_dhandle(&hp, &BIF_P->off_heap, dep);
+ res_tag = am_ok; /* Connection up */
+ if (new)
+ dep = NULL; /* inc of refc transferred to port (dist_entry field) */
+ }
+
+ ASSERT(is_value(res) && is_value(res_tag));
+
+ res = TUPLE2(hp, res_tag, res);
+
+ ERTS_BIF_PREP_RET(ret, res);
+
+ done:
+
+ if (dep && dep != erts_this_dist_entry) {
+ if (de_locked) {
+ if (de_locked > 0)
+ erts_de_rwunlock(dep);
+ else
+ erts_de_runlock(dep);
+ }
+ erts_deref_dist_entry(dep);
}
+ if (pp)
+ erts_port_release(pp);
+
+ return ret;
+
+ yield:
+ ERTS_BIF_PREP_YIELD4(ret,
+ bif_export[BIF_erts_internal_create_dist_channel_4],
+ BIF_P, BIF_ARG_1, BIF_ARG_2, BIF_ARG_3, BIF_ARG_4);
+ goto done;
+
+ badarg:
+ ERTS_BIF_PREP_RET(ret, am_badarg);
+ goto done;
+
+ system_limit:
+ ERTS_BIF_PREP_RET(ret, am_system_limit);
+ goto done;
+}
+
+static void
+setup_connection_epiloge_rwunlock(Process *c_p, DistEntry *dep,
+ Eterm ctrlr, Uint flags,
+ Uint version)
+{
+ Eterm notify_proc = NIL;
+ erts_aint32_t qflgs;
+
dep->version = version;
dep->creation = 0;
-#ifdef DEBUG
+ ASSERT(is_internal_port(ctrlr) || is_internal_pid(ctrlr));
ASSERT(erts_atomic_read_nob(&dep->qsize) == 0
|| (dep->state == ERTS_DE_STATE_PENDING));
-#endif
if (flags & DFLAG_DIST_HDR_ATOM_CACHE)
create_cache(dep);
- erts_set_dist_entry_connected(dep, BIF_ARG_2, flags);
+ erts_set_dist_entry_connected(dep, ctrlr, flags);
notify_proc = NIL;
if (erts_atomic_read_nob(&dep->qsize)) {
@@ -3375,50 +3423,100 @@ BIF_RETTYPE setnode_3(BIF_ALIST_3)
}
}
}
- erts_de_rwunlock(dep);
- if (is_internal_pid(notify_proc))
- notify_dist_data(BIF_P, notify_proc);
- ERTS_BIF_PREP_RET(ret, erts_make_dhandle(BIF_P, dep));
+ erts_de_rwunlock(dep);
- dep = NULL; /* inc of refc transferred to port (dist_entry field) */
+ if (is_internal_pid(notify_proc))
+ notify_dist_data(c_p, notify_proc);
inc_no_nodes();
- send_nodes_mon_msgs(BIF_P,
+ send_nodes_mon_msgs(c_p,
am_nodeup,
- BIF_ARG_1,
+ dep->sysname,
flags & DFLAG_PUBLISHED ? am_visible : am_hidden,
NIL);
- done:
+}
- if (dep && dep != erts_this_dist_entry) {
- erts_de_rwunlock(dep);
- erts_deref_dist_entry(dep);
+static Eterm
+setup_connection_distctrl(Process *c_p, void *arg, int *redsp, ErlHeapFragment **bpp)
+{
+ ErtsSetupConnDistCtrl *scdcp = (ErtsSetupConnDistCtrl *) arg;
+ DistEntry *dep = scdcp->dep;
+ int dep_locked = 0;
+ Eterm *hp;
+ erts_aint32_t state;
+
+ if (redsp)
+ *redsp = 1;
+
+ state = erts_atomic32_read_nob(&c_p->state);
+
+ if (state & ERTS_PSFLG_EXITING)
+ goto badarg;
+
+ erts_de_rwlock(dep);
+ dep_locked = !0;
+
+ if (dep->state == ERTS_DE_STATE_EXITING)
+ goto badarg;
+
+ if (ERTS_PROC_GET_DIST_ENTRY(c_p)) {
+ if (dep == ERTS_PROC_GET_DIST_ENTRY(c_p)
+ && (c_p->flags & F_DISTRIBUTION)
+ && dep->cid == c_p->common.id) {
+ goto connected;
+ }
+ goto badarg;
}
- if (pp)
- erts_port_release(pp);
+ if (dep->state != ERTS_DE_STATE_PENDING) {
+ if (dep->state == ERTS_DE_STATE_IDLE)
+ erts_set_dist_entry_pending(dep);
+ else
+ goto badarg;
+ }
- if (proc_unlock)
- erts_proc_unlock(proc, proc_unlock);
+ if (is_not_nil(dep->cid))
+ goto badarg;
- return ret;
+ c_p->flags |= F_DISTRIBUTION;
+ ERTS_PROC_SET_DIST_ENTRY(c_p, dep);
- yield:
- ERTS_BIF_PREP_YIELD3(ret, bif_export[BIF_setnode_3], BIF_P,
- BIF_ARG_1, BIF_ARG_2, BIF_ARG_3);
- goto done;
+ dep->send = NULL; /* Only for distr ports... */
- badarg:
- ERTS_BIF_PREP_ERROR(ret, BIF_P, BADARG);
- goto done;
+ if (redsp)
+ *redsp = 5;
- system_limit:
- ERTS_BIF_PREP_ERROR(ret, BIF_P, SYSTEM_LIMIT);
- goto done;
+ setup_connection_epiloge_rwunlock(c_p, dep, c_p->common.id,
+ scdcp->flags, scdcp->version);
+connected:
+
+ /* we take over previous inc in refc of dep */
+
+ if (!bpp) /* called directly... */
+ return erts_make_dhandle(c_p, dep);
+
+ erts_free(ERTS_ALC_T_SETUP_CONN_ARG, arg);
+
+ *bpp = new_message_buffer(ERTS_MAGIC_REF_THING_SIZE);
+ hp = (*bpp)->mem;
+ return erts_build_dhandle(&hp, &(*bpp)->off_heap, dep);
+
+badarg:
+
+ if (bpp) /* not called directly */
+ erts_free(ERTS_ALC_T_SETUP_CONN_ARG, arg);
+
+ if (dep_locked)
+ erts_de_rwunlock(dep);
+
+ erts_deref_dist_entry(dep);
+
+ return am_badarg;
}
+
BIF_RETTYPE erts_internal_get_dflags_0(BIF_ALIST_0)
{
return erts_dflags_record;
diff --git a/erts/emulator/beam/erl_alloc.types b/erts/emulator/beam/erl_alloc.types
index 4a6a19b210..9db600dce0 100644
--- a/erts/emulator/beam/erl_alloc.types
+++ b/erts/emulator/beam/erl_alloc.types
@@ -287,6 +287,8 @@ type DIST_DEMONITOR SHORT_LIVED PROCESSES dist_demonitor
type CML_CLEANUP SHORT_LIVED SYSTEM connection_ml_cleanup
type ML_YIELD_STATE SHORT_LIVED SYSTEM monitor_link_yield_state
type ML_DIST STANDARD SYSTEM monitor_link_dist
+type PF3_ARGS SHORT_LIVED PROCESSES process_flag_3_arguments
+type SETUP_CONN_ARG SHORT_LIVED PROCESSES setup_connection_argument
type ENVIRONMENT SYSTEM SYSTEM environment
@@ -346,6 +348,7 @@ type NIF_TRAP_EXPORT STANDARD PROCESSES nif_trap_export_entry
type NIF_EXP_TRACE FIXED_SIZE PROCESSES nif_export_trace
type EXPORT LONG_LIVED CODE export_entry
type MONITOR FIXED_SIZE PROCESSES monitor
+type MONITOR_SUSPEND STANDARD PROCESSES monitor_suspend
type LINK FIXED_SIZE PROCESSES link
type AINFO_REQ SHORT_LIVED SYSTEM alloc_info_request
type SCHED_WTIME_REQ SHORT_LIVED SYSTEM sched_wall_time_request
diff --git a/erts/emulator/beam/erl_bif_info.c b/erts/emulator/beam/erl_bif_info.c
index 6f9e507228..55450e397f 100644
--- a/erts/emulator/beam/erl_bif_info.c
+++ b/erts/emulator/beam/erl_bif_info.c
@@ -617,17 +617,13 @@ static void collect_one_target_monitor(ErtsMonitor *mon, void *vmicp)
}
typedef struct {
- Process *c_p;
- ErtsProcLocks c_p_locks;
ErtsMonitorSuspend **smi;
Uint smi_i;
Uint smi_max;
- int sz;
+ Uint sz;
} ErtsSuspendMonitorInfoCollection;
-#define ERTS_INIT_SUSPEND_MONITOR_INFOS(SMIC, CP, CPL) do { \
- (SMIC).c_p = (CP); \
- (SMIC).c_p_locks = (CPL); \
+#define ERTS_INIT_SUSPEND_MONITOR_INFOS(SMIC) do { \
(SMIC).smi = NULL; \
(SMIC).smi_i = (SMIC).smi_max = 0; \
(SMIC).sz = 0; \
@@ -660,34 +656,26 @@ do { \
static void
collect_one_suspend_monitor(ErtsMonitor *mon, void *vsmicp)
{
- ErtsMonitorSuspend *smon = erts_monitor_suspend(mon);
- ErtsSuspendMonitorInfoCollection *smicp = vsmicp;
- Process *suspendee = erts_pid2proc(smicp->c_p,
- smicp->c_p_locks,
- mon->other.item,
- 0);
- if (suspendee) { /* suspendee is alive */
- Sint a, p;
- if (smon->active) {
- smon->active += smon->pending;
- smon->pending = 0;
- }
+ if (mon->type == ERTS_MON_TYPE_SUSPEND) {
+ Sint count;
+ erts_aint_t mstate;
+ ErtsMonitorSuspend *msp;
+ ErtsSuspendMonitorInfoCollection *smicp;
- ASSERT((smon->active && !smon->pending)
- || (smon->pending && !smon->active));
+ msp = (ErtsMonitorSuspend *) erts_monitor_to_data(mon);
+ smicp = vsmicp;
ERTS_EXTEND_SUSPEND_MONITOR_INFOS(smicp);
- smicp->smi[smicp->smi_i] = smon;
+ smicp->smi[smicp->smi_i] = msp;
smicp->sz += 2 /* cons */ + 4 /* 3-tuple */;
- a = (Sint) smon->active; /* quiet compiler warnings */
- p = (Sint) smon->pending; /* on 64-bit machines */
+ mstate = erts_atomic_read_nob(&msp->state);
- if (!IS_SSMALL(a))
- smicp->sz += BIG_UINT_HEAP_SIZE;
- if (!IS_SSMALL(p))
+ count = (Sint) (mstate & ERTS_MSUSPEND_STATE_COUNTER_MASK);
+ if (!IS_SSMALL(count))
smicp->sz += BIG_UINT_HEAP_SIZE;
+
smicp->smi_i++;
}
}
@@ -1075,8 +1063,10 @@ process_info_bif(Process *c_p, Eterm pid, Eterm opt, int always_wrap, int pi2)
if (c_p->common.id == pid) {
int local_only = c_p->flags & F_LOCAL_SIGS_ONLY;
- int sreds = ERTS_BIF_REDS_LEFT(c_p);
- int sres;
+ int sres, sreds, reds_left;
+
+ reds_left = ERTS_BIF_REDS_LEFT(c_p);
+ sreds = reds_left;
if (!local_only) {
erts_proc_lock(c_p, ERTS_PROC_LOCK_MSGQ);
@@ -1085,15 +1075,19 @@ process_info_bif(Process *c_p, Eterm pid, Eterm opt, int always_wrap, int pi2)
}
sres = erts_proc_sig_handle_incoming(c_p, &state, &sreds, sreds, !0);
+
+ BUMP_REDS(c_p, (int) sreds);
+ reds_left -= sreds;
+
if (state & ERTS_PSFLG_EXITING) {
c_p->flags &= ~F_LOCAL_SIGS_ONLY;
goto exited;
}
- if (!sres) {
+ if (!sres | (reds_left <= 0)) {
/*
- * More signals to handle; need to yield and continue.
- * Prevent fetching of more signals by setting
- * local-sigs-only flag.
+ * More signals to handle or out of reds; need
+ * to yield and continue. Prevent fetching of
+ * more signals by setting local-sigs-only flag.
*/
c_p->flags |= F_LOCAL_SIGS_ONLY;
goto yield;
@@ -1166,6 +1160,7 @@ process_info_bif(Process *c_p, Eterm pid, Eterm opt, int always_wrap, int pi2)
else {
if (flags & ERTS_PI_FLAG_FORCE_SIG_SEND)
goto send_signal;
+ state = ERTS_PSFLG_RUNNING; /* fail state... */
rp = erts_try_lock_sig_free_proc(pid, locks, &state);
if (!rp)
goto undefined;
@@ -1627,56 +1622,56 @@ process_info_aux(Process *c_p,
case ERTS_PI_IX_SUSPENDING: {
ErtsSuspendMonitorInfoCollection smic;
int i;
- Eterm item;
- erts_proc_lock(rp, ERTS_PROC_LOCK_STATUS);
+ ERTS_INIT_SUSPEND_MONITOR_INFOS(smic);
- ERTS_INIT_SUSPEND_MONITOR_INFOS(smic,
- c_p,
- (c_p == rp
- ? ERTS_PROC_LOCK_MAIN
- : 0) | ERTS_PROC_LOCK_STATUS);
-
- erts_monitor_tree_foreach(rp->suspend_monitors,
- &collect_one_suspend_monitor,
- &smic);
+ erts_monitor_tree_foreach(ERTS_P_MONITORS(rp),
+ collect_one_suspend_monitor,
+ (void *) &smic);
reserve_size += smic.sz;
res = NIL;
for (i = 0; i < smic.smi_i; i++) {
- Sint a = (Sint) smic.smi[i]->active; /* quiet compiler warnings */
- Sint p = (Sint) smic.smi[i]->pending; /* on 64-bit machines... */
- Eterm active;
- Eterm pending;
+ ErtsMonitorSuspend *msp;
+ erts_aint_t mstate;
+ Sint ci;
+ Eterm ct, active, pending, item;
Uint sz = 4 + 2;
- if (!IS_SSMALL(a))
- sz += BIG_UINT_HEAP_SIZE;
- if (!IS_SSMALL(p))
- sz += BIG_UINT_HEAP_SIZE;
+
+ msp = smic.smi[i];
+ mstate = erts_atomic_read_nob(&msp->state);
+
+ ci = (Sint) (mstate & ERTS_MSUSPEND_STATE_COUNTER_MASK);
+ if (!IS_SSMALL(ci))
+ sz += BIG_UINT_HEAP_SIZE;
ERTS_PI_UNRESERVE(reserve_size, sz);
hp = erts_produce_heap(hfact, sz, reserve_size);
- if (IS_SSMALL(a))
- active = make_small(a);
- else {
- active = small_to_big(a, hp);
- hp += BIG_UINT_HEAP_SIZE;
- }
- if (IS_SSMALL(p))
- pending = make_small(p);
- else {
- pending = small_to_big(p, hp);
- hp += BIG_UINT_HEAP_SIZE;
- }
- item = TUPLE3(hp, smic.smi[i]->mon.other.item, active, pending);
+ if (IS_SSMALL(ci))
+ ct = make_small(ci);
+ else {
+ ct = small_to_big(ci, hp);
+ hp += BIG_UINT_HEAP_SIZE;
+ }
+
+ if (mstate & ERTS_MSUSPEND_STATE_FLG_ACTIVE) {
+ active = ct;
+ pending = make_small(0);
+ }
+ else {
+ active = make_small(0);
+ pending = ct;
+ }
+
+ ASSERT(is_internal_pid(msp->md.origin.other.item));
+
+ item = TUPLE3(hp, msp->md.origin.other.item, active, pending);
hp += 4;
res = CONS(hp, item, res);
}
- erts_proc_unlock(rp, ERTS_PROC_LOCK_STATUS);
-
*reds += (Uint) smic.smi_i / 4;
ERTS_DESTROY_SUSPEND_MONITOR_INFOS(smic);
@@ -3637,26 +3632,46 @@ BIF_RETTYPE is_process_alive_1(BIF_ALIST_1)
BIF_ERROR(BIF_P, BADARG);
}
-BIF_RETTYPE process_display_2(BIF_ALIST_2)
+static Eterm
+process_display(Process *c_p, void *arg, int *redsp, ErlHeapFragment **bpp)
{
- Process *rp;
+ if (redsp)
+ *redsp = 1;
- if (BIF_ARG_2 != am_backtrace)
- BIF_ERROR(BIF_P, BADARG);
+ if (ERTS_PROC_IS_EXITING(c_p))
+ return am_badarg;
- rp = erts_pid2proc_nropt(BIF_P, ERTS_PROC_LOCK_MAIN,
- BIF_ARG_1, ERTS_PROC_LOCKS_ALL);
- if(!rp) {
- BIF_ERROR(BIF_P, BADARG);
- }
- if (rp == ERTS_PROC_LOCK_BUSY)
- ERTS_BIF_YIELD2(bif_export[BIF_process_display_2], BIF_P,
- BIF_ARG_1, BIF_ARG_2);
- erts_stack_dump(ERTS_PRINT_STDERR, NULL, rp);
- erts_proc_unlock(rp, (BIF_P == rp
- ? ERTS_PROC_LOCKS_ALL_MINOR
- : ERTS_PROC_LOCKS_ALL));
- BIF_RET(am_true);
+ erts_proc_lock(c_p, ERTS_PROC_LOCKS_ALL_MINOR);
+ erts_stack_dump(ERTS_PRINT_STDERR, NULL, c_p);
+ erts_proc_unlock(c_p, ERTS_PROC_LOCKS_ALL_MINOR);
+
+ return am_true;
+}
+
+
+BIF_RETTYPE erts_internal_process_display_2(BIF_ALIST_2)
+{
+ Eterm res;
+
+ if (BIF_ARG_2 != am_backtrace)
+ BIF_RET(am_badarg);
+
+ if (BIF_P->common.id == BIF_ARG_1) {
+ res = process_display(BIF_P, NULL, NULL, NULL);
+ BIF_RET(res);
+ }
+
+ if (is_not_internal_pid(BIF_ARG_1))
+ BIF_RET(am_badarg);
+
+ res = erts_proc_sig_send_rpc_request(BIF_P, BIF_ARG_1,
+ !0,
+ process_display,
+ NULL);
+ if (is_non_value(res))
+ BIF_RET(am_badarg);
+
+ BIF_RET(res);
}
/* this is a general call which return some possibly useful information */
@@ -4597,27 +4612,6 @@ BIF_RETTYPE erts_debug_set_internal_state_2(BIF_ALIST_2)
BIF_RET(am_true);
}
}
- else if (ERTS_IS_ATOM_STR("not_running_optimization", BIF_ARG_1)) {
- int old_use_opt, use_opt;
- switch (BIF_ARG_2) {
- case am_true:
- use_opt = 1;
- break;
- case am_false:
- use_opt = 0;
- break;
- default:
- BIF_ERROR(BIF_P, BADARG);
- }
-
- erts_proc_unlock(BIF_P, ERTS_PROC_LOCK_MAIN);
- erts_thr_progress_block();
- old_use_opt = !erts_disable_proc_not_running_opt;
- erts_disable_proc_not_running_opt = !use_opt;
- erts_thr_progress_unblock();
- erts_proc_lock(BIF_P, ERTS_PROC_LOCK_MAIN);
- BIF_RET(old_use_opt ? am_true : am_false);
- }
else if (ERTS_IS_ATOM_STR("wait", BIF_ARG_1)) {
if (ERTS_IS_ATOM_STR("deallocations", BIF_ARG_2)) {
int flag = ERTS_DEBUG_WAIT_COMPLETED_DEALLOCATIONS;
diff --git a/erts/emulator/beam/erl_bif_trace.c b/erts/emulator/beam/erl_bif_trace.c
index 1953f79d79..f9d351e69e 100644
--- a/erts/emulator/beam/erl_bif_trace.c
+++ b/erts/emulator/beam/erl_bif_trace.c
@@ -809,10 +809,129 @@ Eterm trace_info_2(BIF_ALIST_2)
BIF_ERROR(p, BADARG);
}
erts_release_code_write_permission();
+
+ if (is_internal_ref(res))
+ BIF_TRAP1(erts_await_result, BIF_P, res);
+
BIF_RET(res);
}
static Eterm
+build_trace_flags_term(Eterm **hpp, Uint *szp, Uint trace_flags)
+{
+
+#define ERTS_TFLAG__(F, FN) \
+ if (trace_flags & F) { \
+ if (szp) \
+ sz += 2; \
+ if (hp) { \
+ res = CONS(hp, FN, res); \
+ hp += 2; \
+ } \
+ }
+
+ Eterm res;
+ Uint sz = 0;
+ Eterm *hp;
+
+ if (hpp) {
+ hp = *hpp;
+ res = NIL;
+ }
+ else {
+ hp = NULL;
+ res = THE_NON_VALUE;
+ }
+
+ ERTS_TFLAG__(F_NOW_TS, am_timestamp);
+ ERTS_TFLAG__(F_STRICT_MON_TS, am_strict_monotonic_timestamp);
+ ERTS_TFLAG__(F_MON_TS, am_monotonic_timestamp);
+ ERTS_TFLAG__(F_TRACE_SEND, am_send);
+ ERTS_TFLAG__(F_TRACE_RECEIVE, am_receive);
+ ERTS_TFLAG__(F_TRACE_SOS, am_set_on_spawn);
+ ERTS_TFLAG__(F_TRACE_CALLS, am_call);
+ ERTS_TFLAG__(F_TRACE_PROCS, am_procs);
+ ERTS_TFLAG__(F_TRACE_SOS1, am_set_on_first_spawn);
+ ERTS_TFLAG__(F_TRACE_SOL, am_set_on_link);
+ ERTS_TFLAG__(F_TRACE_SOL1, am_set_on_first_link);
+ ERTS_TFLAG__(F_TRACE_SCHED, am_running);
+ ERTS_TFLAG__(F_TRACE_SCHED_EXIT, am_exiting);
+ ERTS_TFLAG__(F_TRACE_GC, am_garbage_collection);
+ ERTS_TFLAG__(F_TRACE_ARITY_ONLY, am_arity);
+ ERTS_TFLAG__(F_TRACE_RETURN_TO, am_return_to);
+ ERTS_TFLAG__(F_TRACE_SILENT, am_silent);
+ ERTS_TFLAG__(F_TRACE_SCHED_NO, am_scheduler_id);
+ ERTS_TFLAG__(F_TRACE_PORTS, am_ports);
+ ERTS_TFLAG__(F_TRACE_SCHED_PORTS, am_running_ports);
+ ERTS_TFLAG__(F_TRACE_SCHED_PROCS, am_running_procs);
+
+ if (szp)
+ *szp += sz;
+
+ if (hpp)
+ *hpp = hp;
+
+ return res;
+
+#undef ERTS_TFLAG__
+}
+
+static Eterm
+trace_info_tracee(Process *c_p, void *arg, int *redsp, ErlHeapFragment **bpp)
+{
+ ErlHeapFragment *bp;
+ Eterm *hp, res, key;
+ Uint sz;
+
+ *redsp = 1;
+
+ if (ERTS_PROC_IS_EXITING(c_p))
+ return am_undefined;
+
+ key = (Eterm) arg;
+ sz = 3;
+
+ if (!ERTS_TRACER_IS_NIL(ERTS_TRACER(c_p)))
+ erts_is_tracer_proc_enabled(c_p, ERTS_PROC_LOCK_MAIN,
+ &c_p->common);
+
+ switch (key) {
+ case am_tracer:
+
+ erts_build_tracer_to_term(NULL, NULL, &sz, ERTS_TRACER(c_p));
+ bp = new_message_buffer(sz);
+ hp = bp->mem;
+ res = erts_build_tracer_to_term(&hp, &bp->off_heap,
+ NULL, ERTS_TRACER(c_p));
+ if (res == am_false)
+ res = NIL;
+ break;
+
+ case am_flags:
+
+ build_trace_flags_term(NULL, &sz, ERTS_TRACE_FLAGS(c_p));
+ bp = new_message_buffer(sz);
+ hp = bp->mem;
+ res = build_trace_flags_term(&hp, NULL, ERTS_TRACE_FLAGS(c_p));
+ break;
+
+ default:
+
+ ERTS_INTERNAL_ERROR("Key not supported");
+ res = NIL;
+ bp = NULL;
+ hp = NULL;
+ break;
+ }
+
+ *redsp += 2;
+
+ res = TUPLE2(hp, key, res);
+ *bpp = bp;
+ return res;
+}
+
+static Eterm
trace_info_pid(Process* p, Eterm pid_spec, Eterm key)
{
Eterm tracer;
@@ -846,24 +965,19 @@ trace_info_pid(Process* p, Eterm pid_spec, Eterm key)
erts_port_release(tracee);
} else if (is_internal_pid(pid_spec)) {
- Process *tracee = erts_pid2proc_not_running(p, ERTS_PROC_LOCK_MAIN,
- pid_spec, ERTS_PROC_LOCK_MAIN);
-
- if (tracee == ERTS_PROC_LOCK_BUSY)
- ERTS_BIF_YIELD2(bif_export[BIF_trace_info_2], p, pid_spec, key);
+ Eterm ref;
- if (!tracee)
- return am_undefined;
+ if (key != am_flags && key != am_tracer)
+ goto error;
- if (!ERTS_TRACER_IS_NIL(ERTS_TRACER(tracee)))
- erts_is_tracer_proc_enabled(tracee, ERTS_PROC_LOCK_MAIN,
- &tracee->common);
+ ref = erts_proc_sig_send_rpc_request(p, pid_spec, !0,
+ trace_info_tracee,
+ (void *) key);
- tracer = erts_tracer_to_term(p, ERTS_TRACER(tracee));
- trace_flags = ERTS_TRACE_FLAGS(tracee);
+ if (is_non_value(ref))
+ return am_undefined;
- if (tracee != p)
- erts_proc_unlock(tracee, ERTS_PROC_LOCK_MAIN);
+ return ref;
} else if (is_external_pid(pid_spec)
&& external_pid_dist_entry(pid_spec) == erts_this_dist_entry) {
return am_undefined;
@@ -873,48 +987,16 @@ trace_info_pid(Process* p, Eterm pid_spec, Eterm key)
}
if (key == am_flags) {
- int num_flags = 21; /* MAXIMUM number of flags. */
- Uint needed = 3+2*num_flags;
- Eterm flag_list = NIL;
- Eterm* limit;
+ Eterm flag_list;
+ Uint sz = 3;
+ Eterm *hp;
-#define FLAG0(flag_mask,flag) \
- if (trace_flags & (flag_mask)) { flag_list = CONS(hp, flag, flag_list); hp += 2; } else {}
+ build_trace_flags_term(NULL, &sz, trace_flags);
+
+ hp = HAlloc(p, sz);
+
+ flag_list = build_trace_flags_term(&hp, NULL, trace_flags);
-#if defined(DEBUG)
- /*
- * Check num_flags if this assertion fires.
- */
-# define FLAG ASSERT(num_flags-- > 0); FLAG0
-#else
-# define FLAG FLAG0
-#endif
- hp = HAlloc(p, needed);
- limit = hp+needed;
- FLAG(F_NOW_TS, am_timestamp);
- FLAG(F_STRICT_MON_TS, am_strict_monotonic_timestamp);
- FLAG(F_MON_TS, am_monotonic_timestamp);
- FLAG(F_TRACE_SEND, am_send);
- FLAG(F_TRACE_RECEIVE, am_receive);
- FLAG(F_TRACE_SOS, am_set_on_spawn);
- FLAG(F_TRACE_CALLS, am_call);
- FLAG(F_TRACE_PROCS, am_procs);
- FLAG(F_TRACE_SOS1, am_set_on_first_spawn);
- FLAG(F_TRACE_SOL, am_set_on_link);
- FLAG(F_TRACE_SOL1, am_set_on_first_link);
- FLAG(F_TRACE_SCHED, am_running);
- FLAG(F_TRACE_SCHED_EXIT, am_exiting);
- FLAG(F_TRACE_GC, am_garbage_collection);
- FLAG(F_TRACE_ARITY_ONLY, am_arity);
- FLAG(F_TRACE_RETURN_TO, am_return_to);
- FLAG(F_TRACE_SILENT, am_silent);
- FLAG(F_TRACE_SCHED_NO, am_scheduler_id);
- FLAG(F_TRACE_PORTS, am_ports);
- FLAG(F_TRACE_SCHED_PORTS, am_running_ports);
- FLAG(F_TRACE_SCHED_PROCS, am_running_procs);
-#undef FLAG0
-#undef FLAG
- HRelease(p,limit,hp+3);
return TUPLE2(hp, key, flag_list);
} else if (key == am_tracer) {
if (tracer == am_false)
diff --git a/erts/emulator/beam/erl_gc.c b/erts/emulator/beam/erl_gc.c
index 0692cea0ee..a65dbbf42b 100644
--- a/erts/emulator/beam/erl_gc.c
+++ b/erts/emulator/beam/erl_gc.c
@@ -413,21 +413,20 @@ erts_gc_after_bif_call_lhf(Process* p, ErlHeapFragment *live_hf_end,
{
int cost;
- if (p->flags & F_HIBERNATE_SCHED) {
+ if (p->flags & (F_HIBERNATE_SCHED|F_HIPE_RECV_LOCKED)) {
/*
* We just hibernated. We do *not* want to mess
* up the hibernation by an ordinary GC...
+ *
+ * OR
+ *
+ * We left a receive in HiPE with message
+ * queue lock locked, and we do not want to
+ * do a GC with message queue locked...
*/
return result;
}
-#ifdef HIPE
- if (p->hipe_smp.have_receive_locks) {
- /* Do not want to GC with message queue locked... */
- return result;
- }
-#endif
-
if (!p->mbuf) {
/* Must have GC:d in BIF call... invalidate live_hf_end */
live_hf_end = ERTS_INVALID_HFRAG_PTR;
diff --git a/erts/emulator/beam/erl_monitor_link.c b/erts/emulator/beam/erl_monitor_link.c
index 70f36fb6b7..48d9bd4ca5 100644
--- a/erts/emulator/beam/erl_monitor_link.c
+++ b/erts/emulator/beam/erl_monitor_link.c
@@ -630,7 +630,9 @@ erts_monitor_tree_lookup_create(ErtsMonitor **root, int *created, Uint16 type,
ErtsMonitor *res;
ErtsMonitorCreateCtxt cctxt = {type, origin};
- ERTS_ML_ASSERT(type == ERTS_MON_TYPE_NODE || type == ERTS_MON_TYPE_NODES);
+ ERTS_ML_ASSERT(type == ERTS_MON_TYPE_NODE
+ || type == ERTS_MON_TYPE_NODES
+ || type == ERTS_MON_TYPE_SUSPEND);
res = (ErtsMonitor *) ml_rbt_lookup_create((ErtsMonLnkNode **) root,
target, create_monitor,
@@ -760,11 +762,13 @@ erts_monitor_create(Uint16 type, Eterm ref, Eterm orgn, Eterm trgt, Eterm name)
switch (type) {
case ERTS_MON_TYPE_PROC:
case ERTS_MON_TYPE_PORT:
- case ERTS_MON_TYPE_TIME_OFFSET:
if (is_nil(name)) {
ErtsMonitorDataHeap *mdhp;
ErtsORefThing *ortp;
+ case ERTS_MON_TYPE_TIME_OFFSET:
+
+ ERTS_ML_ASSERT(is_nil(name));
ERTS_ML_ASSERT(is_immed(orgn) && is_immed(trgt));
ERTS_ML_ASSERT(is_internal_ordinary_ref(ref));
@@ -860,10 +864,38 @@ erts_monitor_create(Uint16 type, Eterm ref, Eterm orgn, Eterm trgt, Eterm name)
mdep->dist = NULL;
break;
}
- case ERTS_MON_TYPE_SUSPEND:
- ERTS_INTERNAL_ERROR("Use erts_monitor_suspend_create() instead...");
- mdp = NULL;
+ case ERTS_MON_TYPE_SUSPEND: {
+ ErtsMonitorSuspend *msp;
+
+ ERTS_ML_ASSERT(is_nil(name));
+ ERTS_ML_ASSERT(is_nil(ref));
+ ERTS_ML_ASSERT(is_internal_pid(orgn) && is_internal_pid(trgt));
+
+ msp = erts_alloc(ERTS_ALC_T_MONITOR_SUSPEND,
+ sizeof(ErtsMonitorSuspend));
+ mdp = &msp->md;
+ ERTS_ML_ASSERT(((void *) mdp) == ((void *) msp));
+
+ mdp->ref = NIL;
+
+ mdp->origin.other.item = trgt;
+ mdp->origin.offset = (Uint16) offsetof(ErtsMonitorData, origin);
+ mdp->origin.key_offset = (Uint16) offsetof(ErtsMonitor, other.item);
+ ERTS_ML_ASSERT(mdp->origin.key_offset >= mdp->origin.offset);
+ mdp->origin.flags = (Uint16) ERTS_ML_FLG_EXTENDED;
+ mdp->origin.type = type;
+
+ mdp->target.other.item = orgn;
+ mdp->target.offset = (Uint16) offsetof(ErtsMonitorData, target);
+ mdp->target.key_offset = (Uint16) offsetof(ErtsMonitor, other.item);
+ mdp->target.flags = ERTS_ML_FLG_TARGET|ERTS_ML_FLG_EXTENDED;
+ mdp->target.type = type;
+
+ msp->next = NULL;
+ erts_atomic_init_relb(&msp->state, 0);
+
break;
+ }
default:
ERTS_INTERNAL_ERROR("Invalid monitor type");
mdp = NULL;
@@ -887,10 +919,11 @@ erts_monitor_destroy__(ErtsMonitorData *mdp)
ERTS_ML_ASSERT(!(mdp->target.flags & ERTS_ML_FLG_IN_TABLE));
ERTS_ML_ASSERT((mdp->origin.flags & ERTS_ML_FLGS_SAME)
== (mdp->target.flags & ERTS_ML_FLGS_SAME));
- ERTS_ML_ASSERT(mdp->origin.type != ERTS_MON_TYPE_SUSPEND);
if (!(mdp->origin.flags & ERTS_ML_FLG_EXTENDED))
erts_free(ERTS_ALC_T_MONITOR, mdp);
+ else if (mdp->origin.type == ERTS_MON_TYPE_SUSPEND)
+ erts_free(ERTS_ALC_T_MONITOR_SUSPEND, mdp);
else {
ErtsMonitorDataExtended *mdep = (ErtsMonitorDataExtended *) mdp;
ErlOffHeap oh;
@@ -927,10 +960,10 @@ erts_monitor_size(ErtsMonitor *mon)
Uint size, refc;
ErtsMonitorData *mdp = erts_monitor_to_data(mon);
- ERTS_ML_ASSERT(mon->type != ERTS_MON_TYPE_SUSPEND);
-
if (!(mon->flags & ERTS_ML_FLG_EXTENDED))
size = sizeof(ErtsMonitorDataHeap);
+ else if (mon->type == ERTS_MON_TYPE_SUSPEND)
+ size = sizeof(ErtsMonitorSuspend);
else {
ErtsMonitorDataExtended *mdep;
Uint hsz = 0;
@@ -957,54 +990,6 @@ erts_monitor_size(ErtsMonitor *mon)
return size / refc;
}
-
-/* suspend monitors... */
-
-ErtsMonitorSuspend *
-erts_monitor_suspend_create(Eterm pid)
-{
- ErtsMonitorSuspend *msp;
-
- ERTS_ML_ASSERT(is_internal_pid(pid));
-
- msp = erts_alloc(ERTS_ALC_T_SUSPEND_MON,
- sizeof(ErtsMonitorSuspend));
- msp->mon.offset = (Uint16) offsetof(ErtsMonitorSuspend, mon);
- msp->mon.key_offset = (Uint16) offsetof(ErtsMonitor, other.item);
- msp->mon.other.item = pid;
- msp->mon.flags = 0;
- msp->mon.type = ERTS_MON_TYPE_SUSPEND;
- msp->pending = 0;
- msp->active = 0;
- return msp;
-}
-
-static ErtsMonLnkNode *
-create_monitor_suspend(Eterm pid, void *unused)
-{
- ErtsMonitorSuspend *msp = erts_monitor_suspend_create(pid);
- return (ErtsMonLnkNode *) &msp->mon;
-}
-
-ErtsMonitorSuspend *
-erts_monitor_suspend_tree_lookup_create(ErtsMonitor **root, int *created,
- Eterm pid)
-{
- ErtsMonitor *mon;
- mon = (ErtsMonitor *) ml_rbt_lookup_create((ErtsMonLnkNode **) root,
- pid, create_monitor_suspend,
- NULL,
- created);
- return erts_monitor_suspend(mon);
-}
-
-void
-erts_monitor_suspend_destroy(ErtsMonitorSuspend *msp)
-{
- ERTS_ML_ASSERT(!(msp->mon.flags & ERTS_ML_FLG_IN_TABLE));
- erts_free(ERTS_ALC_T_SUSPEND_MON, msp);
-}
-
/* * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * * *\
* Link Operations *
* *
diff --git a/erts/emulator/beam/erl_monitor_link.h b/erts/emulator/beam/erl_monitor_link.h
index 603aead8cc..9ff8aa509a 100644
--- a/erts/emulator/beam/erl_monitor_link.h
+++ b/erts/emulator/beam/erl_monitor_link.h
@@ -246,15 +246,28 @@
*
* --- ERTS_MON_TYPE_SUSPEND -------------------------------------
*
- * Suspend monitor.
+ * Suspend monitor. A local process (origin) suspends another
+ * local process (target).
*
- * Other Item: Suspendee process identifier
- * Key: Suspendee process identifier
- *
- * Valid keys are only ordinary internal references.
+ * Origin:
+ * Other Item: Process identifier of suspendee
+ * (target)
+ * Key: Process identifier of suspendee
+ * (target)
+ * Target:
+ * Other Item: Process identifier of suspender
+ * (origin)
+ * Key: Process identifier of suspender
+ * (origin)
+ * Shared:
+ * Next: Pointer to another suspend monitor
+ * State: Number of suspends and a flag
+ * indicating if the suspend is
+ * active or not.
*
- * This type of monitor is a bit strange and the whole process
- * suspend functionality should be improved...
+ * Origin part of the monitor is stored in the monitor tree of
+ * origin process and target part of the monitor is stored in
+ * monitor list for local targets on the target process.
*
*
*
@@ -638,11 +651,15 @@ struct ErtsMonitorDataExtended__ {
Eterm heap[1]; /* heap start... */
};
-typedef struct {
- ErtsMonitor mon;
- int pending;
- int active;
-} ErtsMonitorSuspend;
+typedef struct ErtsMonitorSuspend__ ErtsMonitorSuspend;
+
+struct ErtsMonitorSuspend__ {
+ ErtsMonitorData md; /* origin = suspender; target = suspendee */
+ ErtsMonitorSuspend *next;
+ erts_atomic_t state;
+};
+#define ERTS_MSUSPEND_STATE_FLG_ACTIVE ((erts_aint_t) (((Uint) 1) << (sizeof(Uint)*8 - 1)))
+#define ERTS_MSUSPEND_STATE_COUNTER_MASK (~ERTS_MSUSPEND_STATE_FLG_ACTIVE)
/*
* --- Monitor tree operations ---
@@ -1094,24 +1111,25 @@ int erts_monitor_list_foreach_delete_yielding(ErtsMonitor **list,
*
* @brief Create a monitor
*
- * Can create all types of monitors exept for suspend monitors
+ * Can create all types of monitors
*
* When the funcion is called it is assumed that:
* - 'ref' is an internal ordinary reference if type is ERTS_MON_TYPE_PROC,
* ERTS_MON_TYPE_PORT, ERTS_MON_TYPE_TIME_OFFSET, or ERTS_MON_TYPE_RESOURCE
- * - 'ref' is NIL if type is ERTS_MON_TYPE_NODE or ERTS_MON_TYPE_NODES
+ * - 'ref' is NIL if type is ERTS_MON_TYPE_NODE, ERTS_MON_TYPE_NODES, or
+ * ERTS_MON_TYPE_SUSPEND
* - 'ref' is and ordinary internal reference or an external reference if
* type is ERTS_MON_TYPE_DIST_PROC
* - 'name' is an atom or NIL if type is ERTS_MON_TYPE_PROC,
* ERTS_MON_TYPE_PORT, or ERTS_MON_TYPE_DIST_PROC
* - 'name is NIL if type is ERTS_MON_TYPE_TIME_OFFSET, ERTS_MON_TYPE_RESOURCE,
- * ERTS_MON_TYPE_NODE, or ERTS_MON_TYPE_NODES
+ * ERTS_MON_TYPE_NODE, ERTS_MON_TYPE_NODES, or ERTS_MON_TYPE_SUSPEND
* If the above is not true, bad things will happen.
*
* @param[in] type ERTS_MON_TYPE_PROC, ERTS_MON_TYPE_PORT,
* ERTS_MON_TYPE_TIME_OFFSET, ERTS_MON_TYPE_DIST_PROC,
* ERTS_MON_TYPE_RESOURCE, ERTS_MON_TYPE_NODE,
- * or ERTS_MON_TYPE_NODES
+ * ERTS_MON_TYPE_NODES, or ERTS_MON_TYPE_SUSPEND
*
* @param[in] ref A reference or NIL depending on type
*
@@ -1119,6 +1137,10 @@ int erts_monitor_list_foreach_delete_yielding(ErtsMonitor **list,
*
* @param[in] target The key of the target
*
+ * @param[in] name An atom (the name) or NIL depending on type
+ *
+ * @returns A pointer to monitor data structure
+ *
*/
ErtsMonitorData *erts_monitor_create(Uint16 type, Eterm ref, Eterm origin,
Eterm target, Eterm name);
@@ -1347,7 +1369,8 @@ erts_monitor_to_data(ErtsMonitor *mon)
ERTS_ML_ASSERT(erts_monitor_origin_offset == (size_t) mdp->origin.offset);
ERTS_ML_ASSERT(!!(mdp->target.flags & ERTS_ML_FLG_TARGET));
ERTS_ML_ASSERT(erts_monitor_target_offset == (size_t) mdp->target.offset);
- if (mon->type == ERTS_MON_TYPE_NODE || mon->type == ERTS_MON_TYPE_NODES) {
+ if (mon->type == ERTS_MON_TYPE_NODE || mon->type == ERTS_MON_TYPE_NODES
+ || mon->type == ERTS_MON_TYPE_SUSPEND) {
ERTS_ML_ASSERT(erts_monitor_node_key_offset == (size_t) mdp->origin.key_offset);
ERTS_ML_ASSERT(erts_monitor_node_key_offset == (size_t) mdp->target.key_offset);
}
diff --git a/erts/emulator/beam/erl_proc_sig_queue.c b/erts/emulator/beam/erl_proc_sig_queue.c
index 6d81044c39..e9b41ad298 100644
--- a/erts/emulator/beam/erl_proc_sig_queue.c
+++ b/erts/emulator/beam/erl_proc_sig_queue.c
@@ -39,6 +39,7 @@
#include "big.h"
#include "erl_gc.h"
#include "bif.h"
+#include "erl_bif_unique.h"
#include "erl_proc_sig_queue.h"
#include "dtrace-wrapper.h"
@@ -49,7 +50,7 @@
* Note that not all signal are handled using this functionality!
*/
-#define ERTS_SIG_Q_OP_MAX 11
+#define ERTS_SIG_Q_OP_MAX 13
#define ERTS_SIG_Q_OP_EXIT 0
#define ERTS_SIG_Q_OP_EXIT_LINKED 1
@@ -62,7 +63,9 @@
#define ERTS_SIG_Q_OP_TRACE_CHANGE_STATE 8
#define ERTS_SIG_Q_OP_PERSISTENT_MON_MSG 9
#define ERTS_SIG_Q_OP_IS_ALIVE 10
-#define ERTS_SIG_Q_OP_PROCESS_INFO ERTS_SIG_Q_OP_MAX
+#define ERTS_SIG_Q_OP_PROCESS_INFO 11
+#define ERTS_SIG_Q_OP_SYNC_SUSPEND 12
+#define ERTS_SIG_Q_OP_RPC ERTS_SIG_Q_OP_MAX
#define ERTS_SIG_Q_TYPE_MAX (ERTS_MON_LNK_TYPE_MAX + 5)
@@ -154,6 +157,17 @@ typedef struct {
} ErtsIsAliveRequest;
typedef struct {
+ Eterm message;
+ Eterm requester;
+ int async;
+} ErtsSyncSuspendRequest;
+
+typedef struct {
+ ErtsMonitorSuspend *mon;
+ ErtsMessage *sync;
+} ErtsProcSigPendingSuspend;
+
+typedef struct {
ErtsSignalCommon common;
Sint refc;
Sint delayed_len;
@@ -176,6 +190,15 @@ typedef struct {
#define ERTS_PROC_SIG_PI_MSGQ_LEN_IGNORE ((Sint) -1)
#define ERTS_PROC_SIG_PI_MSGQ_LEN_SYNC ((Sint) -2)
+typedef struct {
+ ErtsSignalCommon common;
+ Eterm requester;
+ Eterm (*func)(Process *, void *, int *, ErlHeapFragment **);
+ void *arg;
+ Eterm ref;
+ ErtsORefThing oref_thing;
+} ErtsProcSigRPC;
+
static int handle_msg_tracing(Process *c_p,
ErtsSigRecvTracing *tracing,
ErtsMessage ***next_nm_sig);
@@ -1308,6 +1331,8 @@ erts_proc_sig_send_monitor_down(ErtsMonitor *mon, Eterm reason)
/* Pass signal using old monitor structure... */
ErtsSignal *sig;
+ send_using_monitor_struct:
+
mon->other.item = reason; /* Pass immed reason via other.item... */
sig = (ErtsSignal *) mon;
sig->common.tag = ERTS_PROC_SIG_MAKE_TAG(ERTS_SIG_Q_OP_MONITOR_DOWN,
@@ -1319,6 +1344,18 @@ erts_proc_sig_send_monitor_down(ErtsMonitor *mon, Eterm reason)
ErtsMonitorData *mdp = erts_monitor_to_data(mon);
Eterm from_tag, monitored, heap[3];
+ if (mon->type == ERTS_MON_TYPE_SUSPEND) {
+ /*
+ * Set reason to 'undefined', since exit
+ * reason is not used for suspend monitors,
+ * and send using monitor structure. This
+ * since we don't want to trigger
+ * unnecessary memory allocation etc...
+ */
+ reason = am_undefined;
+ goto send_using_monitor_struct;
+ }
+
if (!(mon->flags & ERTS_ML_FLG_NAME)) {
from_tag = monitored = mdp->origin.other.item;
if (is_external_pid(from_tag)) {
@@ -1596,7 +1633,173 @@ erts_proc_sig_send_process_info_request(Process *c_p,
else
erts_free(ERTS_ALC_T_SIG_DATA, pis);
return res;
-}
+}
+
+void
+erts_proc_sig_send_sync_suspend(Process *c_p, Eterm to, Eterm tag, Eterm reply)
+{
+ ErlHeapFragment *hfrag;
+ Uint hsz, tag_sz;
+ Eterm *hp, *start_hp, tag_cpy, msg, default_reply;
+ ErlOffHeap *ohp;
+ ErtsMessage *mp;
+ ErtsSyncSuspendRequest *ssusp;
+ int async_suspend;
+
+ tag_sz = size_object(tag);
+
+ hsz = 3 + tag_sz + sizeof(ErtsSyncSuspendRequest)/sizeof(Eterm);
+
+ mp = erts_alloc_message(hsz, &hp);
+ hfrag = &mp->hfrag;
+ mp->next = NULL;
+ ohp = &hfrag->off_heap;
+ start_hp = hp;
+
+ tag_cpy = copy_struct(tag, tag_sz, &hp, ohp);
+
+ async_suspend = is_non_value(reply);
+ default_reply = async_suspend ? am_suspended : reply;
+
+ msg = TUPLE2(hp, tag_cpy, default_reply);
+ hp += 3;
+
+ hfrag->used_size = hp - start_hp;
+
+ ssusp = (ErtsSyncSuspendRequest *) (char *) hp;
+ ssusp->message = msg;
+ ssusp->requester = c_p->common.id;
+ ssusp->async = async_suspend;
+
+ ERL_MESSAGE_TERM(mp) = ERTS_PROC_SIG_MAKE_TAG(ERTS_SIG_Q_OP_SYNC_SUSPEND,
+ ERTS_SIG_Q_TYPE_UNDEFINED,
+ 0);
+ ERL_MESSAGE_TOKEN(mp) = NIL;
+ ERL_MESSAGE_FROM(mp) = am_system;
+#ifdef USE_VM_PROBES
+ ERL_MESSAGE_DT_UTAG(mp) = NIL;
+#endif
+
+ if (proc_queue_signal(c_p, to, (ErtsSignal *) mp, ERTS_SIG_Q_OP_SYNC_SUSPEND))
+ (void) maybe_elevate_sig_handling_prio(c_p, to);
+ else {
+ Eterm *tp;
+ /* It wasn't alive; reply to ourselves... */
+ mp->next = NULL;
+ mp->data.attached = ERTS_MSG_COMBINED_HFRAG;
+ tp = tuple_val(msg);
+ tp[2] = async_suspend ? am_badarg : am_exited;
+ erts_queue_message(c_p, ERTS_PROC_LOCK_MAIN,
+ mp, msg, am_system);
+ }
+}
+
+Eterm
+erts_proc_sig_send_rpc_request(Process *c_p,
+ Eterm to,
+ int reply,
+ Eterm (*func)(Process *, void *, int *, ErlHeapFragment **),
+ void *arg)
+{
+ Eterm res;
+ ErtsProcSigRPC *sig = erts_alloc(ERTS_ALC_T_SIG_DATA,
+ sizeof(ErtsProcSigRPC));
+ sig->common.tag = ERTS_PROC_SIG_MAKE_TAG(ERTS_SIG_Q_OP_RPC,
+ ERTS_SIG_Q_TYPE_UNDEFINED,
+ 0);
+ sig->requester = reply ? c_p->common.id : NIL;
+ sig->func = func;
+ sig->arg = arg;
+
+ if (!reply) {
+ res = am_ok;
+ sig->ref = am_ok;
+ }
+ else {
+ res = erts_make_ref(c_p);
+
+ sys_memcpy((void *) &sig->oref_thing,
+ (void *) internal_ref_val(res),
+ sizeof(ErtsORefThing));
+
+ sig->ref = make_internal_ref(&sig->oref_thing);
+
+ ERTS_RECV_MARK_SAVE(c_p);
+ ERTS_RECV_MARK_SET(c_p);
+ }
+
+ if (proc_queue_signal(c_p, to, (ErtsSignal *) sig, ERTS_SIG_Q_OP_RPC))
+ (void) maybe_elevate_sig_handling_prio(c_p, to);
+ else {
+ erts_free(ERTS_ALC_T_SIG_DATA, sig);
+ res = THE_NON_VALUE;
+ if (reply)
+ JOIN_MESSAGE(c_p);
+ }
+
+ return res;
+}
+
+static int
+handle_rpc(Process *c_p, ErtsProcSigRPC *rpc, int cnt, int limit, int *yieldp)
+{
+ Process *rp;
+ ErlHeapFragment *bp = NULL;
+ Eterm res;
+ Uint hsz;
+ int reds, out_cnt;
+
+ /*
+ * reds in:
+ * Reductions left.
+ *
+ * reds out:
+ * Absolute value of reds out equals consumed
+ * amount of reds. If a negative value, force
+ * a yield.
+ */
+
+ reds = (limit - cnt) / ERTS_SIG_REDS_CNT_FACTOR;
+ if (reds <= 0)
+ reds = 1;
+
+ res = (*rpc->func)(c_p, rpc->arg, &reds, &bp);
+
+ if (reds < 0) {
+ /* Force yield... */
+ *yieldp = !0;
+ reds *= -1;
+ }
+
+ out_cnt = reds*ERTS_SIG_REDS_CNT_FACTOR;
+
+ hsz = 3 + sizeof(ErtsORefThing)/sizeof(Eterm);
+
+ rp = erts_proc_lookup(rpc->requester);
+ if (!rp) {
+ if (bp)
+ free_message_buffer(bp);
+ }
+ else {
+ Eterm *hp, msg, ref;
+ ErtsMessage *mp = erts_alloc_message(hsz, &hp);
+
+ sys_memcpy((void *) hp, (void *) &rpc->oref_thing,
+ sizeof(rpc->oref_thing));
+
+ ref = make_internal_ref(hp);
+ hp += sizeof(rpc->oref_thing)/sizeof(Eterm);
+ msg = TUPLE2(hp, ref, res);
+
+ mp->hfrag.next = bp;
+
+ erts_queue_proc_message(c_p, rp, 0, mp, msg);
+ }
+
+ erts_free(ERTS_ALC_T_SIG_DATA, rpc);
+
+ return out_cnt;
+}
static void
is_alive_response(Process *c_p, ErtsMessage *mp, int is_alive)
@@ -2640,6 +2843,155 @@ handle_process_info(Process *c_p, ErtsSigRecvTracing *tracing,
return ((int) reds)*4 + 8;
}
+static void
+handle_suspend(Process *c_p, ErtsMonitor *mon, int *yieldp)
+{
+ erts_aint32_t state = erts_atomic32_read_nob(&c_p->state);
+
+ ASSERT(mon->type == ERTS_MON_TYPE_SUSPEND);
+
+ if (!(state & ERTS_PSFLG_DIRTY_RUNNING)) {
+ ErtsMonitorSuspend *msp;
+ erts_aint_t mstate;
+
+ msp = (ErtsMonitorSuspend *) erts_monitor_to_data(mon);
+ mstate = erts_atomic_read_bor_acqb(&msp->state,
+ ERTS_MSUSPEND_STATE_FLG_ACTIVE);
+ ASSERT(!(mstate & ERTS_MSUSPEND_STATE_FLG_ACTIVE)); (void) mstate;
+ erts_suspend(c_p, ERTS_PROC_LOCK_MAIN, NULL);
+ *yieldp = !0;
+ }
+ else {
+ /* Executing dirty; delay suspend... */
+ ErtsProcSigPendingSuspend *psusp;
+ ErtsMonitorSuspend *msp;
+
+ psusp = ERTS_PROC_GET_PENDING_SUSPEND(c_p);
+ if (!psusp) {
+ psusp = erts_alloc(ERTS_ALC_T_SIG_DATA,
+ sizeof(ErtsProcSigPendingSuspend));
+ psusp->mon = NULL;
+ psusp->sync = NULL;
+ ERTS_PROC_SET_PENDING_SUSPEND(c_p, (void *) psusp);
+ }
+
+ msp = (ErtsMonitorSuspend *) erts_monitor_to_data(mon);
+
+ msp->next = psusp->mon;
+ psusp->mon = msp;
+
+ erts_atomic32_inc_nob(&msp->md.refc);
+ }
+}
+
+static void
+sync_suspend_reply(Process *c_p, ErtsMessage *mp, erts_aint32_t state)
+{
+ /*
+ * Sender prepared the message for us. Just patch
+ * the result if necessary. The default prepared
+ * result is 'false'.
+ */
+ Process *rp;
+ ErtsSyncSuspendRequest *ssusp;
+
+ ssusp = (ErtsSyncSuspendRequest *) (char *) (&mp->hfrag.mem[0]
+ + mp->hfrag.used_size);
+
+ ASSERT(ERTS_SIG_IS_NON_MSG(mp));
+ ASSERT(ERTS_PROC_SIG_OP(((ErtsSignal *) mp)->common.tag)
+ == ERTS_SIG_Q_OP_SYNC_SUSPEND);
+ ASSERT(mp->hfrag.alloc_size > mp->hfrag.used_size);
+ ASSERT((mp->hfrag.alloc_size - mp->hfrag.used_size)*sizeof(UWord)
+ >= sizeof(ErtsSyncSuspendRequest));
+ ASSERT(is_internal_pid(ssusp->requester));
+ ASSERT(ssusp->requester != c_p->common.id);
+ ASSERT(is_tuple_arity(ssusp->message, 2));
+ ASSERT(is_immed(tuple_val(ssusp->message)[2]));
+
+ ERL_MESSAGE_TERM(mp) = ssusp->message;
+ mp->data.attached = ERTS_MSG_COMBINED_HFRAG;
+ mp->next = NULL;
+
+ rp = erts_proc_lookup(ssusp->requester);
+ if (!rp)
+ erts_cleanup_messages(mp);
+ else {
+ if ((state & (ERTS_PSFLG_EXITING
+ | ERTS_PSFLG_SUSPENDED)) != ERTS_PSFLG_SUSPENDED) {
+ /* Not suspended -> patch result... */
+ if (state & ERTS_PSFLG_EXITING) {
+ Eterm *tp = tuple_val(ssusp->message);
+ tp[2] = ssusp->async ? am_exited : am_badarg;
+ }
+ else {
+ Eterm *tp = tuple_val(ssusp->message);
+ ASSERT(!(state & ERTS_PSFLG_SUSPENDED));
+ tp[2] = ssusp->async ? am_not_suspended : am_internal_error;
+ }
+ }
+ erts_queue_proc_message(c_p, rp, 0, mp, ssusp->message);
+ }
+}
+
+static void
+handle_sync_suspend(Process *c_p, ErtsMessage *mp)
+{
+ ErtsProcSigPendingSuspend *psusp;
+
+ psusp = (ErtsProcSigPendingSuspend *) ERTS_PROC_GET_PENDING_SUSPEND(c_p);
+ if (!psusp)
+ sync_suspend_reply(c_p, mp, erts_atomic32_read_nob(&c_p->state));
+ else {
+ mp->next = psusp->sync;
+ psusp->sync = mp;
+ }
+}
+
+void
+erts_proc_sig_handle_pending_suspend(Process *c_p)
+{
+ ErtsMonitorSuspend *msp;
+ ErtsMessage *sync;
+ ErtsProcSigPendingSuspend *psusp;
+ erts_aint32_t state = erts_atomic32_read_nob(&c_p->state);
+
+ psusp = (ErtsProcSigPendingSuspend *) ERTS_PROC_GET_PENDING_SUSPEND(c_p);
+
+ msp = psusp->mon;
+
+ while (msp) {
+ ErtsMonitorSuspend *next_msp = msp->next;
+ msp->next = NULL;
+ if (!(state & ERTS_PSFLG_EXITING)
+ && erts_monitor_is_in_table(&msp->md.target)) {
+ erts_aint_t mstate;
+
+ mstate = erts_atomic_read_bor_acqb(&msp->state,
+ ERTS_MSUSPEND_STATE_FLG_ACTIVE);
+ ASSERT(!(mstate & ERTS_MSUSPEND_STATE_FLG_ACTIVE)); (void) mstate;
+ erts_suspend(c_p, ERTS_PROC_LOCK_MAIN, NULL);
+ }
+
+ erts_monitor_release(&msp->md.target);
+
+ msp = next_msp;
+ }
+
+ sync = psusp->sync;
+
+ while (sync) {
+ ErtsMessage *next_sync = sync->next;
+ sync->next = NULL;
+ sync_suspend_reply(c_p, sync, state);
+ sync = next_sync;
+ }
+
+ erts_free(ERTS_ALC_T_SIG_DATA, psusp);
+
+ ERTS_PROC_SET_PENDING_SUSPEND(c_p, NULL);
+}
+
/*
* Called in order to handle incoming signals.
*/
@@ -2650,7 +3002,7 @@ erts_proc_sig_handle_incoming(Process *c_p, erts_aint32_t *statep,
{
Eterm tag;
erts_aint32_t state;
- int cnt, limit, abs_lim, msg_tracing;
+ int yield, cnt, limit, abs_lim, msg_tracing;
ErtsMessage *sig, ***next_nm_sig;
ErtsSigRecvTracing tracing;
@@ -2670,6 +3022,7 @@ erts_proc_sig_handle_incoming(Process *c_p, erts_aint32_t *statep,
limit = *redsp;
*redsp = 0;
+ yield = 0;
if (!c_p->sig_qs.cont) {
if (state == -1)
@@ -2783,6 +3136,18 @@ erts_proc_sig_handle_incoming(Process *c_p, erts_aint32_t *statep,
cnt += handle_nodedown(c_p, sig, mdp, next_nm_sig);
}
break;
+ case ERTS_MON_TYPE_SUSPEND:
+ tmon = (ErtsMonitor *) sig;
+ ASSERT(erts_monitor_is_target(tmon));
+ ASSERT(!erts_monitor_is_in_table(tmon));
+ mdp = erts_monitor_to_data(tmon);
+ if (erts_monitor_is_in_table(&mdp->origin)) {
+ erts_monitor_tree_delete(&ERTS_P_MONITORS(c_p),
+ &mdp->origin);
+ omon = &mdp->origin;
+ }
+ remove_nm_sig(c_p, sig, next_nm_sig);
+ break;
default:
ERTS_INTERNAL_ERROR("invalid monitor type");
break;
@@ -2846,9 +3211,13 @@ erts_proc_sig_handle_incoming(Process *c_p, erts_aint32_t *statep,
if (mon->type == ERTS_MON_TYPE_DIST_PROC)
erts_monitor_tree_insert(&ERTS_P_MONITORS(c_p), mon);
- else
+ else {
erts_monitor_list_insert(&ERTS_P_LT_MONITORS(c_p), mon);
+ if (mon->type == ERTS_MON_TYPE_SUSPEND)
+ handle_suspend(c_p, mon, &yield);
+ }
ERTS_PROC_SIG_HDBG_PRIV_CHKQ(c_p, &tracing, next_nm_sig);
+ cnt += 2;
break;
}
@@ -2892,9 +3261,16 @@ erts_proc_sig_handle_incoming(Process *c_p, erts_aint32_t *statep,
erts_monitor_tree_delete(&ERTS_P_MONITORS(c_p), tmon);
else {
erts_monitor_list_delete(&ERTS_P_LT_MONITORS(c_p), tmon);
- if (type == ERTS_MON_TYPE_RESOURCE) {
+ switch (type) {
+ case ERTS_MON_TYPE_RESOURCE:
erts_nif_demonitored((ErtsResource *) tmon->other.ptr);
cnt++;
+ break;
+ case ERTS_MON_TYPE_SUSPEND:
+ erts_resume(c_p, ERTS_PROC_LOCK_MAIN);
+ break;
+ default:
+ break;
}
}
erts_monitor_release_both(mdp);
@@ -3009,6 +3385,21 @@ erts_proc_sig_handle_incoming(Process *c_p, erts_aint32_t *statep,
ERTS_PROC_SIG_HDBG_PRIV_CHKQ(c_p, &tracing, next_nm_sig);
break;
+ case ERTS_SIG_Q_OP_SYNC_SUSPEND:
+ ERTS_PROC_SIG_HDBG_PRIV_CHKQ(c_p, &tracing, next_nm_sig);
+ remove_nm_sig(c_p, sig, next_nm_sig);
+ handle_sync_suspend(c_p, sig);
+ ERTS_PROC_SIG_HDBG_PRIV_CHKQ(c_p, &tracing, next_nm_sig);
+ break;
+
+ case ERTS_SIG_Q_OP_RPC:
+ ERTS_PROC_SIG_HDBG_PRIV_CHKQ(c_p, &tracing, next_nm_sig);
+ remove_nm_sig(c_p, sig, next_nm_sig);
+ cnt += handle_rpc(c_p, (ErtsProcSigRPC *) sig, cnt,
+ limit, &yield);
+ ERTS_PROC_SIG_HDBG_PRIV_CHKQ(c_p, &tracing, next_nm_sig);
+ break;
+
case ERTS_SIG_Q_OP_TRACE_CHANGE_STATE: {
Uint16 type = ERTS_PROC_SIG_TYPE(tag);
@@ -3166,6 +3557,15 @@ stop: {
*redsp = cnt/4 + 1;
+ if (yield) {
+ int vreds = max_reds - *redsp;
+ if (vreds > 0) {
+ ErtsSchedulerData *esdp = erts_get_scheduler_data();
+ esdp->virtual_reds += vreds;
+ }
+ *redsp = max_reds;
+ }
+
return res;
}
}
@@ -3274,6 +3674,8 @@ erts_proc_sig_handle_exit(Process *c_p, int *redsp)
case ERTS_MON_TYPE_PROC:
case ERTS_MON_TYPE_DIST_PROC:
case ERTS_MON_TYPE_NODE:
+ case ERTS_MON_TYPE_NODES:
+ case ERTS_MON_TYPE_SUSPEND:
erts_monitor_release((ErtsMonitor *) sig);
break;
default:
@@ -3329,6 +3731,17 @@ erts_proc_sig_handle_exit(Process *c_p, int *redsp)
handle_process_info(c_p, NULL, sig, next_nm_sig, 0);
break;
+ case ERTS_SIG_Q_OP_SYNC_SUSPEND:
+ handle_sync_suspend(c_p, sig);
+ break;
+
+ case ERTS_SIG_Q_OP_RPC: {
+ int yield = 0;
+ handle_rpc(c_p, (ErtsProcSigRPC *) sig,
+ cnt, limit, &yield);
+ break;
+ }
+
case ERTS_SIG_Q_OP_TRACE_CHANGE_STATE:
destroy_trace_info((ErtsSigTraceInfo *) sig);
break;
@@ -3464,6 +3877,7 @@ erts_proc_sig_signal_size(ErtsSignal *sig)
}
break;
+ case ERTS_SIG_Q_OP_SYNC_SUSPEND:
case ERTS_SIG_Q_OP_PERSISTENT_MON_MSG:
case ERTS_SIG_Q_OP_IS_ALIVE:
size = ((ErtsMessage *) sig)->hfrag.alloc_size;
@@ -3519,6 +3933,10 @@ erts_proc_sig_signal_size(ErtsSignal *sig)
break;
}
+ case ERTS_SIG_Q_OP_RPC:
+ size = sizeof(ErtsProcSigRPC);
+ break;
+
default:
ERTS_INTERNAL_ERROR("Unknown signal");
break;
@@ -3595,17 +4013,13 @@ erts_proc_sig_receive_helper(Process *c_p,
*/
*get_outp = 0;
*msgpp = NULL;
+
return consumed_reds;
}
erts_proc_unlock(c_p, ERTS_PROC_LOCK_MSGQ);
- if (left_reds <= 0) {
- *get_outp = -1; /* yield */
- *msgpp = NULL;
-
- ASSERT(consumed_reds >= (fcalls - neg_o_reds));
- return consumed_reds;
- }
+ if (left_reds <= 0)
+ break; /* Yield */
/* handle newly arrived signals... */
}
@@ -3626,19 +4040,27 @@ erts_proc_sig_receive_helper(Process *c_p,
max_reds, !0);
consumed_reds += reds;
left_reds -= reds;
- /* we may have exited by an incoming signal... */
- if (state & ERTS_PSFLG_EXITING) {
+
+ /* we may have exited or suspended by an incoming signal... */
+
+ if (state & (ERTS_PSFLG_EXITING|ERTS_PSFLG_SUSPENDED)) {
+ if (state & ERTS_PSFLG_SUSPENDED)
+ break; /* Yield */
+
/*
* Process need to schedule out in order
* to terminate. Prepare this a bit...
*/
+ ASSERT(state & ERTS_PSFLG_EXITING);
ASSERT(c_p->flags & F_DELAY_GC);
c_p->flags &= ~F_DELAY_GC;
c_p->arity = 0;
c_p->current = NULL;
+
*get_outp = 1;
*msgpp = NULL;
+
return consumed_reds;
}
@@ -3649,17 +4071,20 @@ erts_proc_sig_receive_helper(Process *c_p,
return consumed_reds;
}
- if (left_reds <= 0) {
- *get_outp = -1; /* yield */
- *msgpp = NULL;
-
- ASSERT(consumed_reds >= (fcalls - neg_o_reds));
- return consumed_reds;
- }
+ if (left_reds <= 0)
+ break; /* yield */
ASSERT(!c_p->sig_qs.cont);
/* Go fetch again... */
}
+
+ /* Yield... */
+
+ *get_outp = -1;
+ *msgpp = NULL;
+
+ ASSERT(consumed_reds >= (fcalls - neg_o_reds));
+ return consumed_reds;
}
static int
diff --git a/erts/emulator/beam/erl_proc_sig_queue.h b/erts/emulator/beam/erl_proc_sig_queue.h
index 8edd277309..efa7c08664 100644
--- a/erts/emulator/beam/erl_proc_sig_queue.h
+++ b/erts/emulator/beam/erl_proc_sig_queue.h
@@ -33,6 +33,11 @@
* - Group leader
* - Is process alive
* - Process info request
+ * - Suspend request (monitor of suspend type)
+ * - Resume request (demonitor of suspend type)
+ * - Suspend cleanup (monitor down of suspend type)
+ * - Sync suspend
+ * - RPC request
* - Trace change
*
* The signal queue consists of three parts:
@@ -557,6 +562,102 @@ erts_proc_sig_send_process_info_request(Process *c_p,
Uint reserve_size,
Eterm ref);
+/**
+ *
+ * @brief Send a 'sync suspend' signal to a process.
+ *
+ * A response message '{Tag, Reply}' is sent to the
+ * sender when performed where Tag is the term passed
+ * as 'tag' argument. Reply is either 'suspended',
+ * 'not_suspended', 'exited' if the operation is
+ * asynchronous; otherwise, the 'reply' argument or
+ * 'badarg' if process terminated.
+ *
+ * This signal does *not* change the suspend state, only
+ * reads and reply the state. This signal is typically
+ * sent after a suspend request (monitor of suspend type)
+ * signal has been sent to the process in order to get a
+ * response when the suspend monitor has been processed.
+ *
+ * @param[in] c_p Pointer to process struct of
+ * currently executing process.
+ *
+ * @param[in] to Identifier of receiver.
+ *
+ * @param[in] tag Tag to use in response
+ * message to the sending
+ * process (i.e., c_p).
+ *
+ * @param[in] reply Reply to send if this
+ * is a synchronous operation;
+ * otherwise, THE_NON_VALUE.
+ */
+void
+erts_proc_sig_send_sync_suspend(Process *c_p, Eterm to,
+ Eterm tag, Eterm reply);
+
+/**
+ *
+ * @brief Send an 'rpc' signal to a process.
+ *
+ * The function 'func' will be executed in the
+ * context of the receiving process. A response
+ * message '{Ref, Result}' is sent to the sender
+ * when 'func' has been called. 'Ref' is the reference
+ * returned by this function and 'Result' is the
+ * term returned by 'func'. If the return value of
+ * 'func' is not an immediate term, 'func' has to
+ * allocate a heap fragment where the result is stored
+ * and update the the heap fragment pointer pointer
+ * passed as third argument to point to it.
+ *
+ * If this function returns a reference, 'func' will
+ * be called in the context of the receiver. However,
+ * note that this might happen when the receiver is in
+ * an exiting state. The caller of this function
+ * *unconditionally* has to enter a receive that match
+ * on the returned reference in all clauses as next
+ * receive; otherwise, bad things will happen!
+ *
+ * If THE_NON_VALUE is returned, the receiver did not
+ * exist. The signal was not sent, and no specific
+ * receive has to be entered by the caller.
+ *
+ * @param[in] c_p Pointer to process struct of
+ * currently executing process.
+ *
+ * @param[in] to Identifier of receiver process.
+ *
+ * @param[in] reply Non-zero if a reply is wanted.
+ *
+ * @param[in] func Function to execute in the
+ * context of the receiver.
+ * First argument will be a
+ * pointer to the process struct
+ * of the receiver process.
+ * Second argument will be 'arg'
+ * (see below). Third argument
+ * will be a pointer to a pointer
+ * to a heap fragment for storage
+ * of result returned from 'func'
+ * (i.e. an 'out' parameter).
+ *
+ * @param[in] arg Void pointer to argument
+ * to pass as second argument
+ * in call of 'func'.
+ *
+ * @returns If the request was sent,
+ * an internal ordinary
+ * reference; otherwise,
+ * THE_NON_VALUE (non-existing
+ * receiver).
+ */
+Eterm
+erts_proc_sig_send_rpc_request(Process *c_p,
+ Eterm to,
+ int reply,
+ Eterm (*func)(Process *, void *, int *, ErlHeapFragment **),
+ void *arg);
/*
* End of send operations of currently supported process signals.
@@ -820,6 +921,21 @@ void
erts_proc_sig_clear_seq_trace_tokens(Process *c_p);
/**
+ *
+ * @brief Handle pending suspend requests
+ *
+ * Should be called by processes when they stop
+ * execution on a dirty scheduler if they have
+ * pending suspend requests (i.e. when
+ * ERTS_PROC_GET_PENDING_SUSPEND(c_p) != NULL).
+ *
+ * @param[in] c_p Pointer to executing
+ * process
+ */
+void
+erts_proc_sig_handle_pending_suspend(Process *c_p);
+
+/**
* @brief Initialize this functionality
*/
void erts_proc_sig_queue_init(void);
diff --git a/erts/emulator/beam/erl_process.c b/erts/emulator/beam/erl_process.c
index ad7ac27ac3..b373d08a6b 100644
--- a/erts/emulator/beam/erl_process.c
+++ b/erts/emulator/beam/erl_process.c
@@ -185,8 +185,6 @@ sched_get_busy_wait_params(ErtsSchedulerData *esdp)
return &sched_busy_wait_params[esdp->type];
}
-int erts_disable_proc_not_running_opt;
-
static ErtsAuxWorkData *aux_thread_aux_work_data;
static ErtsAuxWorkData *poll_thread_aux_work_data;
@@ -730,6 +728,11 @@ erts_pre_init_process(void)
= ERTS_PSD_DIST_ENTRY_GET_LOCKS;
erts_psd_required_locks[ERTS_PSD_DIST_ENTRY].set_locks
= ERTS_PSD_DIST_ENTRY_SET_LOCKS;
+
+ erts_psd_required_locks[ERTS_PSD_PENDING_SUSPEND].get_locks
+ = ERTS_PSD_PENDING_SUSPEND_GET_LOCKS;
+ erts_psd_required_locks[ERTS_PSD_PENDING_SUSPEND].set_locks
+ = ERTS_PSD_PENDING_SUSPEND_SET_LOCKS;
#endif
}
@@ -744,7 +747,6 @@ void
erts_init_process(int ncpu, int proc_tab_size, int legacy_proc_tab)
{
- erts_disable_proc_not_running_opt = 0;
erts_init_proc_lock(ncpu);
init_proclist_alloc();
@@ -8553,427 +8555,22 @@ erts_start_schedulers(void)
}
}
-
-
-static void
-add_pend_suspend(Process *suspendee,
- Eterm originator_pid,
- void (*handle_func)(Process *,
- ErtsProcLocks,
- int,
- Eterm))
-{
- ErtsPendingSuspend *psp = erts_alloc(ERTS_ALC_T_PEND_SUSPEND,
- sizeof(ErtsPendingSuspend));
- psp->next = NULL;
-#ifdef DEBUG
-#if defined(ARCH_64)
- psp->end = (ErtsPendingSuspend *) 0xdeaddeaddeaddead;
-#else
- psp->end = (ErtsPendingSuspend *) 0xdeaddead;
-#endif
-#endif
- psp->pid = originator_pid;
- psp->handle_func = handle_func;
-
- if (suspendee->pending_suspenders)
- suspendee->pending_suspenders->end->next = psp;
- else
- suspendee->pending_suspenders = psp;
- suspendee->pending_suspenders->end = psp;
-}
-
-static void
-handle_pending_suspend(Process *p, ErtsProcLocks p_locks)
-{
- ErtsPendingSuspend *psp;
- int is_alive = !ERTS_PROC_IS_EXITING(p);
-
- ERTS_LC_ASSERT(p_locks & ERTS_PROC_LOCK_STATUS);
-
- /*
- * New pending suspenders might appear while we are processing
- * (since we may release the status lock on p while processing).
- */
- while (p->pending_suspenders) {
- psp = p->pending_suspenders;
- p->pending_suspenders = NULL;
- while (psp) {
- ErtsPendingSuspend *free_psp;
- (*psp->handle_func)(p, p_locks, is_alive, psp->pid);
- free_psp = psp;
- psp = psp->next;
- erts_free(ERTS_ALC_T_PEND_SUSPEND, (void *) free_psp);
- }
- }
-
-}
-
-static ERTS_INLINE void
-cancel_suspend_of_suspendee(Process *p, ErtsProcLocks p_locks)
-{
- if (is_not_nil(p->suspendee)) {
- ErtsMonitor *mon;
- Eterm suspendee = p->suspendee;
- Process *rp;
- if (!(p_locks & ERTS_PROC_LOCK_STATUS))
- erts_proc_lock(p, ERTS_PROC_LOCK_STATUS);
- rp = erts_pid2proc(p, p_locks|ERTS_PROC_LOCK_STATUS,
- suspendee, ERTS_PROC_LOCK_STATUS);
- if (rp) {
- erts_resume(rp, ERTS_PROC_LOCK_STATUS);
- erts_proc_unlock(rp, ERTS_PROC_LOCK_STATUS);
- }
- if (!(p_locks & ERTS_PROC_LOCK_STATUS))
- erts_proc_unlock(p, ERTS_PROC_LOCK_STATUS);
- p->suspendee = NIL;
-
- mon = erts_monitor_tree_lookup(p->suspend_monitors,
- suspendee);
- if (mon) {
- erts_monitor_tree_delete(&p->suspend_monitors,
- mon);
- erts_monitor_suspend_destroy(erts_monitor_suspend(mon));
- }
- }
-}
-
-static void
-handle_pend_sync_suspend(Process *suspendee,
- ErtsProcLocks suspendee_locks,
- int suspendee_alive,
- Eterm suspender_pid)
-{
- Process *suspender;
-
- ERTS_LC_ASSERT(suspendee_locks & ERTS_PROC_LOCK_STATUS);
-
- suspender = erts_pid2proc(suspendee,
- suspendee_locks,
- suspender_pid,
- ERTS_PROC_LOCK_STATUS);
- if (suspender) {
- ASSERT(is_nil(suspender->suspendee));
- if (suspendee_alive) {
- erts_suspend(suspendee, suspendee_locks, NULL);
- suspender->suspendee = suspendee->common.id;
- }
- /* suspender is suspended waiting for suspendee to suspend;
- resume suspender */
- ASSERT(suspendee != suspender);
- resume_process(suspender, ERTS_PROC_LOCK_STATUS);
- erts_proc_unlock(suspender, ERTS_PROC_LOCK_STATUS);
- }
-}
-
-static Process *
-pid2proc_not_running(Process *c_p, ErtsProcLocks c_p_locks,
- Eterm pid, ErtsProcLocks pid_locks, int suspend)
-{
- Process *rp;
- int unlock_c_p_status;
-
- ERTS_LC_ASSERT(c_p_locks == erts_proc_lc_my_proc_locks(c_p));
-
- ERTS_LC_ASSERT(c_p_locks & ERTS_PROC_LOCK_MAIN);
- ERTS_LC_ASSERT(pid_locks & (ERTS_PROC_LOCK_MAIN|ERTS_PROC_LOCK_STATUS));
-
- if (c_p->common.id == pid)
- return erts_pid2proc(c_p, c_p_locks, pid, pid_locks);
-
- if (c_p_locks & ERTS_PROC_LOCK_STATUS)
- unlock_c_p_status = 0;
- else {
- unlock_c_p_status = 1;
- erts_proc_lock(c_p, ERTS_PROC_LOCK_STATUS);
- }
-
- if (c_p->suspendee == pid) {
- /* Process previously suspended by c_p (below)... */
- ErtsProcLocks rp_locks = pid_locks|ERTS_PROC_LOCK_STATUS;
- rp = erts_pid2proc(c_p, c_p_locks|ERTS_PROC_LOCK_STATUS, pid, rp_locks);
- c_p->suspendee = NIL;
- ASSERT(c_p->flags & F_P2PNR_RESCHED);
- c_p->flags &= ~F_P2PNR_RESCHED;
- if (!suspend && rp)
- resume_process(rp, rp_locks);
- }
- else {
- rp = erts_pid2proc(c_p, c_p_locks|ERTS_PROC_LOCK_STATUS,
- pid, ERTS_PROC_LOCK_STATUS);
-
- if (!rp) {
- c_p->flags &= ~F_P2PNR_RESCHED;
- goto done;
- }
-
- ASSERT(!(c_p->flags & F_P2PNR_RESCHED));
-
- /*
- * Suspend the other process in order to prevent
- * it from being selected for normal execution.
- * This will however not prevent it from being
- * selected for execution of a system task. If
- * it is selected for execution of a system task
- * we might be blocked for quite a while if the
- * try-lock below fails. That is, there is room
- * for improvement here...
- */
-
- if (!suspend_process(c_p, rp)) {
- /* Other process running */
-
- ASSERT((ERTS_PSFLG_RUNNING | ERTS_PSFLG_DIRTY_RUNNING)
- & erts_atomic32_read_nob(&rp->state));
-
- if (!suspend
- && (erts_atomic32_read_nob(&rp->state)
- & ERTS_PSFLG_DIRTY_RUNNING)) {
- ErtsProcLocks need_locks = pid_locks & ~ERTS_PROC_LOCK_STATUS;
- if (need_locks && erts_proc_trylock(rp, need_locks) == EBUSY) {
- erts_proc_unlock(rp, ERTS_PROC_LOCK_STATUS);
- rp = erts_pid2proc(c_p, c_p_locks|ERTS_PROC_LOCK_STATUS,
- pid, pid_locks|ERTS_PROC_LOCK_STATUS);
- }
- goto done;
- }
-
- running:
-
- /*
- * If we got pending suspenders and suspend ourselves waiting
- * to suspend another process we might deadlock.
- * In this case we have to yield, be suspended by
- * someone else and then do it all over again.
- */
- if (!c_p->pending_suspenders) {
- /* Mark rp pending for suspend by c_p */
- add_pend_suspend(rp, c_p->common.id, handle_pend_sync_suspend);
- ASSERT(is_nil(c_p->suspendee));
-
- /* Suspend c_p; when rp is suspended c_p will be resumed. */
- suspend_process(c_p, c_p);
- c_p->flags |= F_P2PNR_RESCHED;
- }
- /* Yield (caller is assumed to yield immediately in bif). */
- erts_proc_unlock(rp, ERTS_PROC_LOCK_STATUS);
- rp = ERTS_PROC_LOCK_BUSY;
- }
- else {
- ErtsProcLocks need_locks = pid_locks & ~ERTS_PROC_LOCK_STATUS;
- if (need_locks && erts_proc_trylock(rp, need_locks) == EBUSY) {
- if ((ERTS_PSFLG_RUNNING_SYS|ERTS_PSFLG_DIRTY_RUNNING_SYS)
- & erts_atomic32_read_nob(&rp->state)) {
- /* Executing system task... */
- resume_process(rp, ERTS_PROC_LOCK_STATUS);
- goto running;
- }
- erts_proc_unlock(rp, ERTS_PROC_LOCK_STATUS);
- /*
- * If we are unlucky, the process just got selected for
- * execution of a system task. In this case we may be
- * blocked here for quite a while... Execution of system
- * tasks are fortunately quite rare events. We try to
- * avoid this by checking if it is in a state executing
- * system tasks (above), but it will not prevent all
- * scenarios for a long block here...
- */
- rp = erts_pid2proc(c_p, c_p_locks|ERTS_PROC_LOCK_STATUS,
- pid, pid_locks|ERTS_PROC_LOCK_STATUS);
- if (!rp)
- goto done;
- }
-
- /*
- * The previous suspend has prevented the process
- * from being selected for normal execution regardless
- * of locks held or not held on it...
- */
-#ifdef DEBUG
- {
- erts_aint32_t state;
- state = erts_atomic32_read_nob(&rp->state);
- ASSERT(!(state & ERTS_PSFLG_RUNNING));
- }
-#endif
-
- if (!suspend)
- resume_process(rp, pid_locks|ERTS_PROC_LOCK_STATUS);
- }
- }
-
- done:
-
- if (rp && rp != ERTS_PROC_LOCK_BUSY && !(pid_locks & ERTS_PROC_LOCK_STATUS))
- erts_proc_unlock(rp, ERTS_PROC_LOCK_STATUS);
- if (unlock_c_p_status)
- erts_proc_unlock(c_p, ERTS_PROC_LOCK_STATUS);
- return rp;
-}
-
-
-/*
- * Like erts_pid2proc() but:
- *
- * * At least ERTS_PROC_LOCK_MAIN have to be held on c_p.
- * * At least ERTS_PROC_LOCK_MAIN have to be taken on pid.
- * * It also waits for proc to be in a state != running and garbing.
- * * If ERTS_PROC_LOCK_BUSY is returned, the calling process has to
- * yield (ERTS_BIF_YIELD[0-3]()). c_p might in this case have been
- * suspended.
- */
-Process *
-erts_pid2proc_not_running(Process *c_p, ErtsProcLocks c_p_locks,
- Eterm pid, ErtsProcLocks pid_locks)
-{
- return pid2proc_not_running(c_p, c_p_locks, pid, pid_locks, 0);
-}
-
-/*
- * erts_pid2proc_nropt() is normally the same as
- * erts_pid2proc_not_running(). However it is only
- * to be used when 'not running' is a pure optimization,
- * not a requirement.
- */
-
-Process *
-erts_pid2proc_nropt(Process *c_p, ErtsProcLocks c_p_locks,
- Eterm pid, ErtsProcLocks pid_locks)
-{
- if (erts_disable_proc_not_running_opt)
- return erts_pid2proc(c_p, c_p_locks, pid, pid_locks);
- else
- return erts_pid2proc_not_running(c_p, c_p_locks, pid, pid_locks);
-}
-
-static ERTS_INLINE int
-do_bif_suspend_process(Process *c_p,
- ErtsMonitorSuspend *smon,
- Process *suspendee)
-{
- ASSERT(suspendee);
- ASSERT(!ERTS_PROC_IS_EXITING(suspendee));
- ERTS_LC_ASSERT(ERTS_PROC_LOCK_STATUS
- & erts_proc_lc_my_proc_locks(suspendee));
- if (smon) {
- if (!smon->active) {
- if (!suspend_process(c_p, suspendee))
- return 0;
- }
- smon->active += smon->pending;
- ASSERT(smon->active);
- smon->pending = 0;
- return 1;
- }
- return 0;
-}
-
-static void
-handle_pend_bif_sync_suspend(Process *suspendee,
- ErtsProcLocks suspendee_locks,
- int suspendee_alive,
- Eterm suspender_pid)
-{
- Process *suspender;
-
- ERTS_LC_ASSERT(suspendee_locks & ERTS_PROC_LOCK_STATUS);
-
- suspender = erts_pid2proc(suspendee,
- suspendee_locks,
- suspender_pid,
- ERTS_PROC_LOCK_STATUS);
- if (suspender) {
- ErtsMonitorSuspend *smon;
- ErtsMonitor *mon;
- mon = erts_monitor_tree_lookup(suspender->suspend_monitors,
- suspendee->common.id);
- smon = erts_monitor_suspend(mon);
-
- ASSERT(is_nil(suspender->suspendee));
- if (!suspendee_alive) {
- if (mon) {
- erts_monitor_tree_delete(&suspender->suspend_monitors,
- mon);
- erts_monitor_suspend_destroy(smon);
- }
- }
- else {
-#ifdef DEBUG
- int res =
-#endif
- do_bif_suspend_process(suspendee, smon, suspendee);
- ASSERT(!smon || res != 0);
- suspender->suspendee = suspendee->common.id;
- }
- /* suspender is suspended waiting for suspendee to suspend;
- resume suspender */
- ASSERT(suspender != suspendee);
- resume_process(suspender, ERTS_PROC_LOCK_STATUS);
- erts_proc_unlock(suspender, ERTS_PROC_LOCK_STATUS);
- }
-}
-
-static void
-handle_pend_bif_async_suspend(Process *suspendee,
- ErtsProcLocks suspendee_locks,
- int suspendee_alive,
- Eterm suspender_pid)
-{
-
- Process *suspender;
-
- ERTS_LC_ASSERT(suspendee_locks & ERTS_PROC_LOCK_STATUS);
-
- suspender = erts_pid2proc(suspendee,
- suspendee_locks,
- suspender_pid,
- ERTS_PROC_LOCK_STATUS);
- if (suspender) {
- ErtsMonitorSuspend *smon;
- ErtsMonitor *mon;
- mon = erts_monitor_tree_lookup(suspender->suspend_monitors,
- suspendee->common.id);
- smon = erts_monitor_suspend(mon);
- ASSERT(is_nil(suspender->suspendee));
- if (!suspendee_alive) {
- if (mon) {
- erts_monitor_tree_delete(&suspender->suspend_monitors,
- mon);
- erts_monitor_suspend_destroy(smon);
- }
- }
- else {
-#ifdef DEBUG
- int res =
-#endif
- do_bif_suspend_process(suspendee, smon, suspendee);
- ASSERT(!smon || res != 0);
- }
- erts_proc_unlock(suspender, ERTS_PROC_LOCK_STATUS);
- }
-}
-
-
-/*
- * The erlang:suspend_process/2 BIF
- */
-
BIF_RETTYPE
-suspend_process_2(BIF_ALIST_2)
+erts_internal_suspend_process_2(BIF_ALIST_2)
{
Eterm res;
- Process* suspendee = NULL;
- ErtsMonitorSuspend *smon;
- ErtsProcLocks xlocks = (ErtsProcLocks) 0;
- int created;
-
- /* Options and default values: */
- int asynchronous = 0;
+ Eterm reply_tag = THE_NON_VALUE;
+ Eterm reply_res = THE_NON_VALUE;
+ int suspend;
+ int sync = 0;
+ int async = 0;
int unless_suspending = 0;
-
+ erts_aint_t mstate;
+ ErtsMonitorSuspend *msp;
+ ErtsMonitorData *mdp;
if (BIF_P->common.id == BIF_ARG_1)
- goto badarg; /* We are not allowed to suspend ourselves */
+ BIF_RET(am_badarg); /* We are not allowed to suspend ourselves */
if (is_not_nil(BIF_ARG_2)) {
/* Parse option list */
@@ -8987,191 +8584,127 @@ suspend_process_2(BIF_ALIST_2)
unless_suspending = 1;
break;
case am_asynchronous:
- asynchronous = 1;
+ async = 1;
break;
- default:
- goto badarg;
+ default: {
+ if (is_tuple_arity(arg, 2)) {
+ Eterm *tp = tuple_val(arg);
+ if (tp[1] == am_asynchronous) {
+ async = 1;
+ reply_tag = tp[2];
+ break;
+ }
+ }
+ BIF_RET(am_badarg);
}
+ }
arg = CDR(lp);
- }
+ }
if (is_not_nil(arg))
- goto badarg;
+ BIF_RET(am_badarg);
}
- xlocks = ERTS_PROC_LOCK_STATUS;
-
- erts_proc_lock(BIF_P, xlocks);
-
- suspendee = erts_pid2proc(BIF_P,
- ERTS_PROC_LOCK_MAIN|xlocks,
- BIF_ARG_1,
- ERTS_PROC_LOCK_STATUS);
- if (!suspendee)
- goto no_suspendee;
-
- smon = erts_monitor_suspend_tree_lookup_create(&BIF_P->suspend_monitors,
- &created,
- BIF_ARG_1);
-
- if (asynchronous) {
- /* --- Asynchronous suspend begin ---------------------------------- */
-
- ERTS_LC_ASSERT(ERTS_PROC_LOCK_STATUS
- & erts_proc_lc_my_proc_locks(BIF_P));
- ERTS_LC_ASSERT(ERTS_PROC_LOCK_STATUS
- == erts_proc_lc_my_proc_locks(suspendee));
-
- if (smon->active) {
- smon->active += smon->pending;
- smon->pending = 0;
- if (unless_suspending)
- res = am_false;
- else if (smon->active == INT_MAX)
- goto system_limit;
- else {
- smon->active++;
- res = am_true;
- }
- /* done */
- }
- else {
- /* We havn't got any active suspends on the suspendee */
- if (smon->pending && unless_suspending)
- res = am_false;
- else {
- if (smon->pending == INT_MAX)
- goto system_limit;
-
- smon->pending++;
-
- if (!do_bif_suspend_process(BIF_P, smon, suspendee))
- add_pend_suspend(suspendee,
- BIF_P->common.id,
- handle_pend_bif_async_suspend);
-
- res = am_true;
- }
- /* done */
- }
- /* --- Asynchronous suspend end ------------------------------------ */
- }
- else /* if (!asynchronous) */ {
- /* --- Synchronous suspend begin ----------------------------------- */
-
- ERTS_LC_ASSERT(((ERTS_PROC_LOCK_STATUS|ERTS_PROC_LOCK_STATUS)
- & erts_proc_lc_my_proc_locks(BIF_P))
- == (ERTS_PROC_LOCK_STATUS|ERTS_PROC_LOCK_STATUS));
- ERTS_LC_ASSERT(ERTS_PROC_LOCK_STATUS
- == erts_proc_lc_my_proc_locks(suspendee));
-
- if (BIF_P->suspendee == BIF_ARG_1) {
- /* We are back after a yield and the suspendee
- has been suspended on behalf of us. */
- ASSERT(smon->active >= 1);
- BIF_P->suspendee = NIL;
- res = (!unless_suspending || smon->active == 1
- ? am_true
- : am_false);
- /* done */
- }
- else if (smon->active) {
- if (unless_suspending)
- res = am_false;
- else {
- smon->active++;
- res = am_true;
- }
- /* done */
- }
- else {
- /* We haven't got any active suspends on the suspendee */
-
- /*
- * If we have pending suspenders and suspend ourselves waiting
- * to suspend another process, or suspend another process
- * we might deadlock. In this case we have to yield,
- * be suspended by someone else, and then do it all over again.
- */
- if (BIF_P->pending_suspenders)
- goto yield;
-
- if (!unless_suspending && smon->pending == INT_MAX)
- goto system_limit;
- if (!unless_suspending || smon->pending == 0)
- smon->pending++;
-
- if (do_bif_suspend_process(BIF_P, smon, suspendee)) {
- res = (!unless_suspending || smon->active == 1
- ? am_true
- : am_false);
- /* done */
- }
- else {
- /* Mark suspendee pending for suspend by BIF_P */
- add_pend_suspend(suspendee,
- BIF_P->common.id,
- handle_pend_bif_sync_suspend);
-
- ASSERT(is_nil(BIF_P->suspendee));
-
- /*
- * Suspend BIF_P; when suspendee is suspended, BIF_P
- * will be resumed and this BIF will be called again.
- * This time with BIF_P->suspendee == BIF_ARG_1 (see
- * above).
- */
- suspend_process(BIF_P, BIF_P);
- goto yield;
- }
- }
- /* --- Synchronous suspend end ------------------------------------- */
- }
-
-#ifdef DEBUG
- {
- erts_aint32_t state = erts_atomic32_read_acqb(&suspendee->state);
- ASSERT((state & ERTS_PSFLG_SUSPENDED)
- || (asynchronous && smon->pending));
- ASSERT((state & ERTS_PSFLG_SUSPENDED)
- || !smon->active);
+ if (!unless_suspending) {
+ ErtsMonitor *mon;
+ mon = erts_monitor_tree_lookup_create(&ERTS_P_MONITORS(BIF_P),
+ &suspend,
+ ERTS_MON_TYPE_SUSPEND,
+ BIF_P->common.id,
+ BIF_ARG_1);
+ ASSERT(mon->other.item == BIF_ARG_1);
+
+ mdp = erts_monitor_to_data(mon);
+ msp = (ErtsMonitorSuspend *) mdp;
+
+ mstate = erts_atomic_inc_read_relb(&msp->state);
+ ASSERT(suspend || (mstate & ERTS_MSUSPEND_STATE_COUNTER_MASK) > 1);
+ sync = !async & !suspend & !(mstate & ERTS_MSUSPEND_STATE_FLG_ACTIVE);
+ suspend = !!suspend; /* ensure 0|1 */
+ res = am_true;
}
-#endif
-
- erts_proc_unlock(suspendee, ERTS_PROC_LOCK_STATUS);
- erts_proc_unlock(BIF_P, xlocks);
- BIF_RET(res);
-
- system_limit:
- ERTS_BIF_PREP_ERROR(res, BIF_P, SYSTEM_LIMIT);
- goto do_return;
-
- no_suspendee: {
+ else {
ErtsMonitor *mon;
- BIF_P->suspendee = NIL;
- mon = erts_monitor_tree_lookup(BIF_P->suspend_monitors, BIF_ARG_1);
+ mon = erts_monitor_tree_lookup(ERTS_P_MONITORS(BIF_P),
+ BIF_ARG_1);
if (mon) {
- erts_monitor_tree_delete(&BIF_P->suspend_monitors, mon);
- erts_monitor_suspend_destroy(erts_monitor_suspend(mon));
+ ASSERT(mon->type == ERTS_MON_TYPE_SUSPEND);
+ mdp = erts_monitor_to_data(mon);
+ msp = (ErtsMonitorSuspend *) mdp;
+ mstate = erts_atomic_read_nob(&msp->state);
+ ASSERT((mstate & ERTS_MSUSPEND_STATE_COUNTER_MASK) > 0);
+ mdp = NULL;
+ sync = !async & !(mstate & ERTS_MSUSPEND_STATE_FLG_ACTIVE);
+ suspend = 0;
+ res = am_false;
+ }
+ else {
+ mdp = erts_monitor_create(ERTS_MON_TYPE_SUSPEND, NIL,
+ BIF_P->common.id,
+ BIF_ARG_1, NIL);
+ mon = &mdp->origin;
+ erts_monitor_tree_insert(&ERTS_P_MONITORS(BIF_P), mon);
+ msp = (ErtsMonitorSuspend *) mdp;
+ mstate = erts_atomic_inc_read_relb(&msp->state);
+ ASSERT(!(mstate & ERTS_MSUSPEND_STATE_FLG_ACTIVE));
+ suspend = !0;
+ res = am_true;
}
}
- badarg:
- ERTS_BIF_PREP_ERROR(res, BIF_P, BADARG);
- goto do_return;
+ if (suspend) {
+ erts_aint32_t state;
+ Process *rp;
+ int send_sig = 0;
+
+ /* fail state... */
+ state = (ERTS_PSFLG_EXITING
+ | ERTS_PSFLG_RUNNING
+ | ERTS_PSFLG_RUNNING_SYS
+ | ERTS_PSFLG_DIRTY_RUNNING
+ | ERTS_PSFLG_DIRTY_RUNNING_SYS);
+
+ rp = erts_try_lock_sig_free_proc(BIF_ARG_1,
+ ERTS_PROC_LOCK_MAIN|ERTS_PROC_LOCK_STATUS,
+ &state);
+ if (!rp)
+ goto noproc;
+ if (rp == ERTS_PROC_LOCK_BUSY)
+ send_sig = !0;
+ else {
+ send_sig = !suspend_process(BIF_P, rp);
+ if (!send_sig) {
+ erts_monitor_list_insert(&ERTS_P_LT_MONITORS(rp), &mdp->target);
+ erts_atomic_read_bor_relb(&msp->state,
+ ERTS_MSUSPEND_STATE_FLG_ACTIVE);
+ }
+ erts_proc_unlock(rp, ERTS_PROC_LOCK_MAIN|ERTS_PROC_LOCK_STATUS);
+ }
+ if (send_sig) {
+ if (erts_proc_sig_send_monitor(&mdp->target, BIF_ARG_1))
+ sync = !async;
+ else {
+ noproc:
+ erts_monitor_tree_delete(&ERTS_P_MONITORS(BIF_P), &mdp->origin);
+ erts_monitor_release_both(mdp);
+ if (!async)
+ res = am_badarg;
+ }
+ }
+ }
- yield:
- ERTS_BIF_PREP_YIELD2(res, bif_export[BIF_suspend_process_2],
- BIF_P, BIF_ARG_1, BIF_ARG_2);
-
- do_return:
- if (suspendee)
- erts_proc_unlock(suspendee, ERTS_PROC_LOCK_STATUS);
- if (xlocks)
- erts_proc_unlock(BIF_P, xlocks);
- return res;
+ if (sync) {
+ ASSERT(is_non_value(reply_tag));
+ reply_res = res;
+ reply_tag = res = erts_make_ref(BIF_P);
+ ERTS_RECV_MARK_SAVE(BIF_P);
+ ERTS_RECV_MARK_SET(BIF_P);
+ }
-}
+ if (is_value(reply_tag))
+ erts_proc_sig_send_sync_suspend(BIF_P, BIF_ARG_1, reply_tag, reply_res);
+ BIF_RET(res);
+}
/*
* The erlang:resume_process/1 BIF
@@ -9181,90 +8714,32 @@ BIF_RETTYPE
resume_process_1(BIF_ALIST_1)
{
ErtsMonitor *mon;
- ErtsMonitorSuspend *smon;
- Process *suspendee;
- int is_active;
+ ErtsMonitorSuspend *msp;
+ erts_aint_t mstate;
if (BIF_P->common.id == BIF_ARG_1)
BIF_ERROR(BIF_P, BADARG);
- erts_proc_lock(BIF_P, ERTS_PROC_LOCK_STATUS);
- mon = erts_monitor_tree_lookup(BIF_P->suspend_monitors, BIF_ARG_1);
- smon = erts_monitor_suspend(mon);
-
- if (!smon) {
+ mon = erts_monitor_tree_lookup(ERTS_P_MONITORS(BIF_P),
+ BIF_ARG_1);
+ if (!mon) {
/* No previous suspend or dead suspendee */
- goto error;
+ BIF_ERROR(BIF_P, BADARG);
}
- else if (smon->pending) {
- smon->pending--;
- ASSERT(smon->pending >= 0);
- if (smon->active) {
- smon->active += smon->pending;
- smon->pending = 0;
- }
- is_active = smon->active;
- }
- else if (smon->active) {
- smon->active--;
- ASSERT(smon->pending == 0);
- is_active = 1;
- }
- else {
- /* No previous suspend or dead suspendee */
- goto no_suspendee;
- }
-
- if (smon->active || smon->pending || !is_active) {
- /* Leave the suspendee as it is; just verify that it is still alive */
- suspendee = erts_proc_lookup(BIF_ARG_1);
- if (!suspendee)
- goto no_suspendee;
-
- }
- else {
- /* Resume */
- suspendee = erts_pid2proc(BIF_P,
- ERTS_PROC_LOCK_MAIN|ERTS_PROC_LOCK_STATUS,
- BIF_ARG_1,
- ERTS_PROC_LOCK_STATUS);
- if (!suspendee) {
- mon = erts_monitor_tree_lookup(BIF_P->suspend_monitors, BIF_ARG_1);
- smon = erts_monitor_suspend(mon);
- if (!mon)
- goto error;
- goto no_suspendee;
- }
- ASSERT(mon == erts_monitor_tree_lookup(BIF_P->suspend_monitors, BIF_ARG_1));
+ ASSERT(mon->type == ERTS_MON_TYPE_SUSPEND);
+ msp = (ErtsMonitorSuspend *) erts_monitor_to_data(mon);
- ASSERT(ERTS_PSFLG_SUSPENDED
- & erts_atomic32_read_nob(&suspendee->state));
- ASSERT(BIF_P != suspendee);
- resume_process(suspendee, ERTS_PROC_LOCK_STATUS);
+ mstate = erts_atomic_dec_read_relb(&msp->state);
- erts_proc_unlock(suspendee, ERTS_PROC_LOCK_STATUS);
- }
+ ASSERT((mstate & ERTS_MSUSPEND_STATE_COUNTER_MASK) >= 0);
- if (!smon->active && !smon->pending) {
- ASSERT(mon);
- erts_monitor_tree_delete(&BIF_P->suspend_monitors, mon);
- erts_monitor_suspend_destroy(smon);
+ if ((mstate & ERTS_MSUSPEND_STATE_COUNTER_MASK) == 0) {
+ erts_monitor_tree_delete(&ERTS_P_MONITORS(BIF_P), mon);
+ erts_proc_sig_send_demonitor(mon);
}
- erts_proc_unlock(BIF_P, ERTS_PROC_LOCK_STATUS);
-
BIF_RET(am_true);
-
- no_suspendee:
- /* cleanup */
- ASSERT(mon);
- erts_monitor_tree_delete(&BIF_P->suspend_monitors, mon);
- erts_monitor_suspend_destroy(smon);
-
- error:
- erts_proc_unlock(BIF_P, ERTS_PROC_LOCK_STATUS);
- BIF_ERROR(BIF_P, BADARG);
}
BIF_RETTYPE
@@ -9694,12 +9169,13 @@ Process *erts_schedule(ErtsSchedulerData *esdp, Process *p, int calls)
ASSERT(esdp->current_process == p
|| esdp->free_process == p);
- sched_out_proc:
-
- ERTS_CHK_HAVE_ONLY_MAIN_PROC_LOCK(p);
reds = actual_reds = calls - esdp->virtual_reds;
+ internal_sched_out_proc:
+
+ ERTS_CHK_HAVE_ONLY_MAIN_PROC_LOCK(p);
+
ASSERT(actual_reds >= 0);
if (reds < ERTS_PROC_MIN_CONTEXT_SWITCH_REDS_COST)
reds = ERTS_PROC_MIN_CONTEXT_SWITCH_REDS_COST;
@@ -9741,11 +9217,6 @@ Process *erts_schedule(ErtsSchedulerData *esdp, Process *p, int calls)
/* have to re-read state after taking lock */
state = erts_atomic32_read_nob(&p->state);
- if (p->pending_suspenders)
- handle_pending_suspend(p, (ERTS_PROC_LOCK_MAIN
- | ERTS_PROC_LOCK_TRACE
- | ERTS_PROC_LOCK_STATUS));
-
esdp->reductions += reds;
{
@@ -10195,8 +9666,9 @@ Process *erts_schedule(ErtsSchedulerData *esdp, Process *p, int calls)
if (is_normal_sched) {
if (state & ERTS_PSFLG_RUNNING_SYS) {
if (state & (ERTS_PSFLG_SIG_Q|ERTS_PSFLG_SIG_IN_Q)) {
- int local_only = !!(p->flags & F_LOCAL_SIGS_ONLY);
- if (!local_only || (state & ERTS_PSFLG_SIG_Q)) {
+ int local_only = (!!(p->flags & F_LOCAL_SIGS_ONLY)
+ & !(state & ERTS_PSFLG_SUSPENDED));
+ if (!local_only | !!(state & ERTS_PSFLG_SIG_Q)) {
int sig_reds;
/*
* If we have dirty work scheduled we allow
@@ -10282,7 +9754,17 @@ Process *erts_schedule(ErtsSchedulerData *esdp, Process *p, int calls)
}
p->fcalls = reds;
-
+ if (reds != context_reds) {
+ actual_reds = context_reds - reds - esdp->virtual_reds;
+ ASSERT(actual_reds >= 0);
+ esdp->virtual_reds = 0;
+ p->reds += actual_reds;
+ ERTS_PROC_REDUCTIONS_EXECUTED(esdp, rq,
+ (int) ERTS_PSFLGS_GET_USR_PRIO(state),
+ reds,
+ actual_reds);
+ }
+
ERTS_CHK_HAVE_ONLY_MAIN_PROC_LOCK(p);
ASSERT(erts_proc_read_refc(p) > 0);
@@ -10332,6 +9814,14 @@ Process *erts_schedule(ErtsSchedulerData *esdp, Process *p, int calls)
#endif
return p;
+
+ sched_out_proc:
+ actual_reds = context_reds;
+ actual_reds -= reds;
+ actual_reds -= esdp->virtual_reds;
+ reds = actual_reds;
+ goto internal_sched_out_proc;
+
}
}
@@ -11844,7 +11334,6 @@ erl_create_process(Process* parent, /* Parent of process (default group leader).
#ifdef HIPE
hipe_init_process(&p->hipe);
- hipe_init_process_smp(&p->hipe_smp);
#endif
p->heap = (Eterm *) ERTS_HEAP_ALLOC(ERTS_ALC_T_HEAP, sizeof(Eterm)*sz);
p->old_hend = p->old_htop = p->old_heap = NULL;
@@ -11893,7 +11382,6 @@ erl_create_process(Process* parent, /* Parent of process (default group leader).
ERTS_P_LINKS(p) = NULL;
ERTS_P_MONITORS(p) = NULL;
ERTS_P_LT_MONITORS(p) = NULL;
- p->suspend_monitors = NULL;
ASSERT(is_pid(parent->group_leader));
@@ -11950,8 +11438,6 @@ erl_create_process(Process* parent, /* Parent of process (default group leader).
p->trace_msg_q = NULL;
p->scheduler_data = NULL;
- p->suspendee = NIL;
- p->pending_suspenders = NULL;
#if !defined(NO_FPE_SIGNALS) || defined(HIPE)
p->fp_exception = 0;
@@ -12118,7 +11604,6 @@ void erts_init_empty_process(Process *p)
ERTS_P_MONITORS(p) = NULL;
ERTS_P_LT_MONITORS(p) = NULL;
ERTS_P_LINKS(p) = NULL; /* List of links */
- p->suspend_monitors = NULL;
p->sig_qs.first = NULL;
p->sig_qs.last = &p->sig_qs.first;
p->sig_qs.cont = NULL;
@@ -12169,7 +11654,6 @@ void erts_init_empty_process(Process *p)
#ifdef HIPE
hipe_init_process(&p->hipe);
- hipe_init_process_smp(&p->hipe_smp);
#endif
INIT_HOLE_CHECK(p);
@@ -12182,8 +11666,6 @@ void erts_init_empty_process(Process *p)
erts_atomic32_init_nob(&p->state, (erts_aint32_t) PRIORITY_NORMAL);
p->scheduler_data = NULL;
- p->suspendee = NIL;
- p->pending_suspenders = NULL;
erts_proc_lock_init(p);
erts_proc_unlock(p, ERTS_PROC_LOCKS_ALL);
erts_init_runq_proc(p, ERTS_RUNQ_IX(0), 0);
@@ -12221,7 +11703,6 @@ erts_debug_verify_clean_empty_process(Process* p)
ASSERT(ERTS_P_MONITORS(p) == NULL);
ASSERT(ERTS_P_LT_MONITORS(p) == NULL);
ASSERT(ERTS_P_LINKS(p) == NULL);
- ASSERT(p->suspend_monitors == NULL);
ASSERT(p->sig_qs.first == NULL);
ASSERT(p->sig_qs.len == 0);
ASSERT(p->bif_timers == NULL);
@@ -12235,8 +11716,6 @@ erts_debug_verify_clean_empty_process(Process* p)
ASSERT(p->sig_inq.first == NULL);
ASSERT(p->sig_inq.len == 0);
- ASSERT(p->suspendee == NIL);
- ASSERT(p->pending_suspenders == NULL);
/* Thing that erts_cleanup_empty_process() cleans up */
@@ -12342,8 +11821,6 @@ delete_process(Process* p)
erts_cleanup_messages(p->sig_qs.cont);
p->sig_qs.cont = NULL;
- ASSERT(!p->suspend_monitors);
-
p->fvalue = NIL;
}
@@ -12423,6 +11900,7 @@ erts_proc_exit_handle_monitor(ErtsMonitor *mon, void *vctxt)
if (erts_monitor_is_target(mon)) {
/* We are being watched... */
switch (mon->type) {
+ case ERTS_MON_TYPE_SUSPEND:
case ERTS_MON_TYPE_PROC:
erts_proc_sig_send_monitor_down(mon, reason);
mon = NULL;
@@ -12494,6 +11972,7 @@ erts_proc_exit_handle_monitor(ErtsMonitor *mon, void *vctxt)
else { /* Origin monitor */
/* We are watching someone else... */
switch (mon->type) {
+ case ERTS_MON_TYPE_SUSPEND:
case ERTS_MON_TYPE_PROC:
erts_proc_sig_send_demonitor(mon);
mon = NULL;
@@ -12646,21 +12125,6 @@ erts_proc_exit_handle_link(ErtsLink *lnk, void *vctxt)
erts_link_release(lnk);
}
-static void
-resume_suspend_monitor(ErtsMonitor *mon, void *vc_p)
-{
- ErtsMonitorSuspend *smon = erts_monitor_suspend(mon);
- Process *suspendee = erts_pid2proc((Process *) vc_p, ERTS_PROC_LOCK_MAIN,
- smon->mon.other.item, ERTS_PROC_LOCK_STATUS);
- if (suspendee) {
- ASSERT(suspendee != vc_p);
- if (smon->active)
- resume_process(suspendee, ERTS_PROC_LOCK_STATUS);
- erts_proc_unlock(suspendee, ERTS_PROC_LOCK_STATUS);
- }
- erts_monitor_suspend_destroy(smon);
-}
-
/* this function fishishes a process and propagates exit messages - called
by process_main when a process dies */
void
@@ -12692,8 +12156,6 @@ erts_do_exit_process(Process* p, Eterm reason)
set_self_exiting(p, reason, NULL, NULL, NULL);
- cancel_suspend_of_suspendee(p, ERTS_PROC_LOCKS_ALL);
-
if (IS_TRACED(p)) {
if (IS_TRACED_FL(p, F_TRACE_CALLS))
erts_schedule_time_break(p, ERTS_BP_CALL_TIME_SCHEDULE_EXITING);
@@ -12826,11 +12288,6 @@ erts_continue_exit_process(Process *p)
p->flags &= ~F_USING_DDLL;
}
- if (p->suspend_monitors)
- erts_monitor_tree_foreach_delete(&p->suspend_monitors,
- resume_suspend_monitor,
- p);
-
/*
* The registered name *should* be the last "erlang resource" to
* cleanup.
@@ -13038,7 +12495,13 @@ erts_try_lock_sig_free_proc(Eterm pid, ErtsProcLocks locks,
erts_aint32_t *statep)
{
Process *rp = erts_proc_lookup_raw(pid);
+ erts_aint32_t fail_state = ERTS_PSFLG_SIG_IN_Q|ERTS_PSFLG_SIG_Q;
erts_aint32_t state;
+ ErtsProcLocks tmp_locks = ERTS_PROC_LOCK_MAIN|ERTS_PROC_LOCK_MSGQ;
+
+ tmp_locks |= locks;
+ if (statep)
+ fail_state |= *statep;
if (!rp) {
if (statep)
@@ -13055,28 +12518,28 @@ erts_try_lock_sig_free_proc(Eterm pid, ErtsProcLocks locks,
if (state & ERTS_PSFLG_FREE)
return NULL;
- if (state & (ERTS_PSFLG_SIG_IN_Q|ERTS_PSFLG_SIG_Q))
+ if (state & fail_state)
return ERTS_PROC_LOCK_BUSY;
- if (!locks)
- return rp;
-
- if (erts_proc_trylock(rp, locks) == EBUSY)
+ if (erts_proc_trylock(rp, tmp_locks) == EBUSY)
return ERTS_PROC_LOCK_BUSY;
state = erts_atomic32_read_nob(&rp->state);
if (statep)
*statep = state;
- if (state & ERTS_PSFLG_FREE) {
- erts_proc_unlock(rp, locks);
- return NULL;
+ if ((state & fail_state)
+ || rp->sig_inq.first
+ || rp->sig_qs.cont) {
+ erts_proc_unlock(rp, tmp_locks);
+ if (state & ERTS_PSFLG_FREE)
+ return NULL;
+ else
+ return ERTS_PROC_LOCK_BUSY;
}
- if (state & (ERTS_PSFLG_SIG_IN_Q|ERTS_PSFLG_SIG_Q)) {
- erts_proc_unlock(rp, locks);
- return ERTS_PROC_LOCK_BUSY;
- }
+ if (tmp_locks != locks)
+ erts_proc_unlock(rp, tmp_locks & ~locks);
return rp;
}
diff --git a/erts/emulator/beam/erl_process.h b/erts/emulator/beam/erl_process.h
index b66272194c..a60e117bab 100644
--- a/erts/emulator/beam/erl_process.h
+++ b/erts/emulator/beam/erl_process.h
@@ -805,14 +805,15 @@ erts_reset_max_len(ErtsRunQueue *rq, ErtsRunQueueInfo *rqi)
#define ERTS_PSD_ETS_OWNED_TABLES 6
#define ERTS_PSD_ETS_FIXED_TABLES 7
#define ERTS_PSD_DIST_ENTRY 8
-#define ERTS_PSD_SUSPENDED_SAVED_CALLS_BUF 9 /* keep last... */
+#define ERTS_PSD_PENDING_SUSPEND 9
+#define ERTS_PSD_SUSPENDED_SAVED_CALLS_BUF 10 /* keep last... */
-#define ERTS_PSD_SIZE 10
+#define ERTS_PSD_SIZE 11
#if !defined(HIPE)
# undef ERTS_PSD_SUSPENDED_SAVED_CALLS_BUF
# undef ERTS_PSD_SIZE
-# define ERTS_PSD_SIZE 9
+# define ERTS_PSD_SIZE 10
#endif
typedef struct {
@@ -849,6 +850,9 @@ typedef struct {
#define ERTS_PSD_DIST_ENTRY_GET_LOCKS ERTS_PROC_LOCK_MAIN
#define ERTS_PSD_DIST_ENTRY_SET_LOCKS ERTS_PROC_LOCK_MAIN
+#define ERTS_PSD_PENDING_SUSPEND_GET_LOCKS ERTS_PROC_LOCK_MAIN
+#define ERTS_PSD_PENDING_SUSPEND_SET_LOCKS ERTS_PROC_LOCK_MAIN
+
typedef struct {
ErtsProcLocks get_locks;
ErtsProcLocks set_locks;
@@ -884,20 +888,6 @@ typedef struct {
typedef struct ErtsProcSysTask_ ErtsProcSysTask;
typedef struct ErtsProcSysTaskQs_ ErtsProcSysTaskQs;
-
-typedef struct ErtsPendingSuspend_ ErtsPendingSuspend;
-struct ErtsPendingSuspend_ {
- ErtsPendingSuspend *next;
- ErtsPendingSuspend *end;
- Eterm pid;
- void (*handle_func)(Process *suspendee,
- ErtsProcLocks suspendee_locks,
- int suspendee_alive,
- Eterm pid);
-};
-
-
-
/* Defines to ease the change of memory architecture */
# define HEAP_START(p) (p)->heap
# define HEAP_TOP(p) (p)->htop
@@ -992,9 +982,6 @@ struct process {
Process *next; /* Pointer to next process in run queue */
- ErtsMonitor *suspend_monitors; /* Processes suspended by this process via
- erlang:suspend_process/1 */
-
ErtsSignalPrivQueues sig_qs; /* Signal queues */
ErtsBifTimers *bif_timers; /* Bif timers aiming at this process */
@@ -1058,12 +1045,7 @@ struct process {
ErlTraceMessageQueue *trace_msg_q;
erts_proc_lock_t lock;
ErtsSchedulerData *scheduler_data;
- Eterm suspendee;
- ErtsPendingSuspend *pending_suspenders;
erts_atomic_t run_queue;
-#ifdef HIPE
- struct hipe_process_state_smp hipe_smp;
-#endif
#ifdef CHECK_FOR_HOLES
Eterm* last_htop; /* No need to scan the heap below this point. */
@@ -1380,7 +1362,7 @@ extern int erts_system_profile_ts_type;
#define F_DISTRIBUTION (1 << 6) /* Process used in distribution */
#define F_USING_DDLL (1 << 7) /* Process has used the DDLL interface */
#define F_HAVE_BLCKD_MSCHED (1 << 8) /* Process has blocked multi-scheduling */
-#define F_P2PNR_RESCHED (1 << 9) /* Process has been rescheduled via erts_pid2proc_not_running() */
+#define F_UNUSED (1 << 9)
#define F_FORCE_GC (1 << 10) /* Force gc at process in-scheduling */
#define F_DISABLE_GC (1 << 11) /* Disable GC (see below) */
#define F_OFF_HEAP_MSGQ (1 << 12) /* Off heap msg queue */
@@ -1397,10 +1379,12 @@ extern int erts_system_profile_ts_type;
#define F_DIRTY_MAJOR_GC (1 << 23) /* Dirty major GC scheduled */
#define F_DIRTY_MINOR_GC (1 << 24) /* Dirty minor GC scheduled */
#define F_HIBERNATED (1 << 25) /* Hibernated */
-#define F_LOCAL_SIGS_ONLY (1 << 26)
+#define F_LOCAL_SIGS_ONLY (1 << 26) /* Handle privq sigs only */
#define F_TRAP_EXIT (1 << 27) /* Trapping exit */
-#define F_DEFERRED_SAVED_LAST (1 << 28)
-#define F_DELAYED_PSIGQS_LEN (1 << 29)
+#define F_DEFERRED_SAVED_LAST (1 << 28) /* Deferred sig_qs.saved_last */
+#define F_DELAYED_PSIGQS_LEN (1 << 29) /* Delayed update of sig_qs.len */
+#define F_HIPE_RECV_LOCKED (1 << 30) /* HiPE message queue locked */
+#define F_HIPE_RECV_YIELD (1 << 31) /* HiPE receive yield */
/*
* F_DISABLE_GC and F_DELAY_GC are similar. Both will prevent
@@ -2048,6 +2032,11 @@ erts_psd_set(Process *p, int ix, void *data)
#define ERTS_PROC_SET_DIST_ENTRY(P, DE) \
((DistEntry *) erts_psd_set((P), ERTS_PSD_DIST_ENTRY, (void *) (DE)))
+#define ERTS_PROC_GET_PENDING_SUSPEND(P) \
+ ((void *) erts_psd_get((P), ERTS_PSD_PENDING_SUSPEND))
+#define ERTS_PROC_SET_PENDING_SUSPEND(P, PS) \
+ ((void *) erts_psd_set((P), ERTS_PSD_PENDING_SUSPEND, (void *) (PS)))
+
#ifdef HIPE
#define ERTS_PROC_GET_SUSPENDED_SAVED_CALLS_BUF(P) \
((struct saved_calls *) erts_psd_get((P), ERTS_PSD_SUSPENDED_SAVED_CALLS_BUF))
@@ -2612,16 +2601,6 @@ Process *erts_try_lock_sig_free_proc(Eterm pid,
ErtsProcLocks locks,
erts_aint32_t *statep);
-Process *erts_pid2proc_not_running(Process *,
- ErtsProcLocks,
- Eterm,
- ErtsProcLocks);
-Process *erts_pid2proc_nropt(Process *c_p,
- ErtsProcLocks c_p_locks,
- Eterm pid,
- ErtsProcLocks pid_locks);
-extern int erts_disable_proc_not_running_opt;
-
#ifdef DEBUG
#define ERTS_ASSERT_IS_NOT_EXITING(P) \
do { ASSERT(!ERTS_PROC_IS_EXITING((P))); } while (0)
diff --git a/erts/emulator/beam/erl_trace.c b/erts/emulator/beam/erl_trace.c
index 065a560b52..f074dd8bdb 100644
--- a/erts/emulator/beam/erl_trace.c
+++ b/erts/emulator/beam/erl_trace.c
@@ -2629,6 +2629,38 @@ erts_tracer_to_term(Process *p, ErtsTracer tracer)
}
}
+Eterm
+erts_build_tracer_to_term(Eterm **hpp, ErlOffHeap *ohp, Uint *szp, ErtsTracer tracer)
+{
+ Eterm res;
+ Eterm state;
+ Uint sz;
+
+ if (ERTS_TRACER_IS_NIL(tracer))
+ return am_false;
+
+ state = ERTS_TRACER_STATE(tracer);
+ sz = is_immed(state) ? 0 : size_object(state);
+
+ if (szp)
+ *szp += sz;
+
+ if (hpp)
+ res = is_immed(state) ? state : copy_struct(state, sz, hpp, ohp);
+ else
+ res = THE_NON_VALUE;
+
+ if (ERTS_TRACER_MODULE(tracer) != am_erl_tracer) {
+ if (szp)
+ *szp += 3;
+ if (hpp) {
+ res = TUPLE2(*hpp, ERTS_TRACER_MODULE(tracer), res);
+ *hpp += 3;
+ }
+ }
+
+ return res;
+}
static ERTS_INLINE int
send_to_tracer_nif_raw(Process *c_p, Process *tracee,
diff --git a/erts/emulator/beam/erl_trace.h b/erts/emulator/beam/erl_trace.h
index dbf7ebd2a1..3228e19809 100644
--- a/erts/emulator/beam/erl_trace.h
+++ b/erts/emulator/beam/erl_trace.h
@@ -198,6 +198,8 @@ int erts_is_tracer_proc_enabled_send(Process* c_p, ErtsProcLocks c_p_locks,
ErtsPTabElementCommon *t_p);
int erts_is_tracer_enabled(const ErtsTracer tracer, ErtsPTabElementCommon *t_p);
Eterm erts_tracer_to_term(Process *p, ErtsTracer tracer);
+Eterm erts_build_tracer_to_term(Eterm **hpp, ErlOffHeap *ohp, Uint *szp, ErtsTracer tracer);
+
ErtsTracer erts_term_to_tracer(Eterm prefix, Eterm term);
void erts_tracer_replace(ErtsPTabElementCommon *t_p,
const ErtsTracer new_tracer);
diff --git a/erts/emulator/hipe/hipe_mode_switch.c b/erts/emulator/hipe/hipe_mode_switch.c
index bc9a700204..0a65e317ed 100644
--- a/erts/emulator/hipe/hipe_mode_switch.c
+++ b/erts/emulator/hipe/hipe_mode_switch.c
@@ -490,16 +490,21 @@ Process *hipe_mode_switch(Process *p, unsigned cmd, Eterm reg[])
/* same semantics, different debug trace messages */
/* XXX: BEAM has different entries for the locked and unlocked
cases. HiPE doesn't, so we must check dynamically. */
- if (p->hipe_smp.have_receive_locks)
- p->hipe_smp.have_receive_locks = 0;
+ if (p->flags & F_HIPE_RECV_LOCKED)
+ p->flags &= ~F_HIPE_RECV_LOCKED;
else
erts_proc_lock(p, ERTS_PROC_LOCKS_MSG_RECEIVE);
p->i = hipe_beam_pc_resume;
p->arity = 0;
if (erts_atomic32_read_nob(&p->state) & ERTS_PSFLG_EXITING)
ASSERT(erts_atomic32_read_nob(&p->state) & ERTS_PSFLG_ACTIVE);
- else
+ else if (!(p->flags & F_HIPE_RECV_YIELD))
erts_atomic32_read_band_relb(&p->state, ~ERTS_PSFLG_ACTIVE);
+ else {
+ /* Yielded from receive */
+ ERTS_VBUMP_ALL_REDS(p);
+ p->flags &= ~F_HIPE_RECV_YIELD;
+ }
erts_proc_unlock(p, ERTS_PROC_LOCKS_MSG_RECEIVE);
do_schedule:
{
@@ -522,7 +527,7 @@ Process *hipe_mode_switch(Process *p, unsigned cmd, Eterm reg[])
p = erts_schedule(NULL, p, reds_in - p->fcalls);
ERTS_REQ_PROC_MAIN_LOCK(p);
ASSERT(!(p->flags & F_HIPE_MODE));
- p->hipe_smp.have_receive_locks = 0;
+ p->flags &= ~F_HIPE_RECV_LOCKED;
reg = p->scheduler_data->x_reg_array;
}
{
diff --git a/erts/emulator/hipe/hipe_native_bif.c b/erts/emulator/hipe/hipe_native_bif.c
index 24078af046..211ce0492a 100644
--- a/erts/emulator/hipe/hipe_native_bif.c
+++ b/erts/emulator/hipe/hipe_native_bif.c
@@ -144,8 +144,8 @@ BIF_RETTYPE nbif_impl_hipe_set_timeout(NBIF_ALIST_1)
else {
int tres = erts_set_proc_timer_term(p, timeout_value);
if (tres != 0) { /* Wrong time */
- if (p->hipe_smp.have_receive_locks) {
- p->hipe_smp.have_receive_locks = 0;
+ if (p->flags & F_HIPE_RECV_LOCKED) {
+ p->flags &= ~F_HIPE_RECV_LOCKED;
erts_proc_unlock(p, ERTS_PROC_LOCKS_MSG_RECEIVE);
}
BIF_ERROR(p, EXC_TIMEOUT_VALUE);
@@ -549,19 +549,14 @@ Eterm hipe_check_get_msg(Process *c_p)
c_p->i = NULL;
c_p->arity = 0;
c_p->current = NULL;
- (void) erts_proc_sig_receive_helper(c_p, CONTEXT_REDS, 0,
+ (void) erts_proc_sig_receive_helper(c_p, CONTEXT_REDS/4, 0,
&msgp, &get_out);
/* FIXME: Need to bump reductions... */
if (!msgp) {
if (get_out) {
- if (get_out < 0) {
- /*
- * FIXME: We should get out yielding
- * here...
- */
- goto next_message;
- }
- /* Go exit... */
+ if (get_out < 0)
+ c_p->flags |= F_HIPE_RECV_YIELD; /* yield... */
+ /* else: go exit... */
return THE_NON_VALUE;
}
@@ -573,7 +568,7 @@ Eterm hipe_check_get_msg(Process *c_p)
*/
/* XXX: BEAM doesn't need this */
- c_p->hipe_smp.have_receive_locks = 1;
+ c_p->flags |= F_HIPE_RECV_LOCKED;
c_p->flags &= ~F_DELAY_GC;
return THE_NON_VALUE;
}
@@ -618,8 +613,8 @@ void hipe_clear_timeout(Process *c_p)
*/
/* XXX: BEAM has different entries for the locked and unlocked
cases. HiPE doesn't, so we must check dynamically. */
- if (c_p->hipe_smp.have_receive_locks) {
- c_p->hipe_smp.have_receive_locks = 0;
+ if (c_p->flags & F_HIPE_RECV_LOCKED) {
+ c_p->flags &= ~F_HIPE_RECV_LOCKED;
erts_proc_unlock(c_p, ERTS_PROC_LOCKS_MSG_RECEIVE);
}
if (IS_TRACED_FL(c_p, F_TRACE_RECEIVE)) {
diff --git a/erts/emulator/hipe/hipe_process.h b/erts/emulator/hipe/hipe_process.h
index ef14c75f6c..18354ba0a6 100644
--- a/erts/emulator/hipe/hipe_process.h
+++ b/erts/emulator/hipe/hipe_process.h
@@ -82,13 +82,4 @@ static __inline__ void hipe_delete_process(struct hipe_process_state *p)
erts_free(ERTS_ALC_T_HIPE_STK, (void*)p->nstack);
}
-struct hipe_process_state_smp {
- int have_receive_locks;
-};
-
-static __inline__ void hipe_init_process_smp(struct hipe_process_state_smp *p)
-{
- p->have_receive_locks = 0;
-}
-
#endif /* HIPE_PROCESS_H */
diff --git a/erts/emulator/test/trace_SUITE.erl b/erts/emulator/test/trace_SUITE.erl
index def25dba7d..138aefb29c 100644
--- a/erts/emulator/test/trace_SUITE.erl
+++ b/erts/emulator/test/trace_SUITE.erl
@@ -29,7 +29,7 @@
receive_trace/1, link_receive_call_correlation/1, self_send/1,
timeout_trace/1, send_trace/1,
procs_trace/1, dist_procs_trace/1, procs_new_trace/1,
- suspend/1, mutual_suspend/1, suspend_exit/1, suspender_exit/1,
+ suspend/1, suspend_exit/1, suspender_exit/1,
suspend_system_limit/1, suspend_opts/1, suspend_waiting/1,
new_clear/1, existing_clear/1, tracer_die/1,
set_on_spawn/1, set_on_first_spawn/1, cpu_timestamp/1,
@@ -53,7 +53,7 @@ all() ->
[cpu_timestamp, receive_trace, link_receive_call_correlation,
self_send, timeout_trace,
send_trace, procs_trace, dist_procs_trace, suspend,
- mutual_suspend, suspend_exit, suspender_exit,
+ suspend_exit, suspender_exit,
suspend_system_limit, suspend_opts, suspend_waiting,
new_clear, existing_clear, tracer_die, set_on_spawn,
set_on_first_spawn, set_on_link, set_on_first_link,
@@ -1234,55 +1234,6 @@ do_suspend(Pid, N) ->
erlang:yield(),
do_suspend(Pid, N-1).
-
-
-mutual_suspend(Config) when is_list(Config) ->
- TimeoutSecs = 5*60,
- ct:timetrap({seconds, TimeoutSecs}),
- Parent = self(),
- Fun = fun () ->
- receive
- {go, Pid} ->
- do_mutual_suspend(Pid, 100000)
- end,
- Parent ! {done, self()},
- receive after infinity -> ok end
- end,
- P1 = spawn_link(Fun),
- P2 = spawn_link(Fun),
- T1 = erlang:start_timer((TimeoutSecs - 5)*1000, self(), oops),
- T2 = erlang:start_timer((TimeoutSecs - 5)*1000, self(), oops),
- P1 ! {go, P2},
- P2 ! {go, P1},
- Res1 = receive
- {done, P1} -> done;
- {timeout,T1,_} -> timeout
- end,
- Res2 = receive
- {done, P2} -> done;
- {timeout,T2,_} -> timeout
- end,
- P1S = process_info(P1, status),
- P2S = process_info(P2, status),
- io:format("P1S=~p P2S=~p", [P1S, P2S]),
- false = {status, suspended} == P1S,
- false = {status, suspended} == P2S,
- unlink(P1), exit(P1, bang),
- unlink(P2), exit(P2, bang),
- done = Res1,
- done = Res2,
- ok.
-
-do_mutual_suspend(_Pid, 0) ->
- ok;
-do_mutual_suspend(Pid, N) ->
- %% Suspend a process and test that it is suspended.
- true = erlang:suspend_process(Pid),
- {status, suspended} = process_info(Pid, status),
- %% Unsuspend the process.
- true = erlang:resume_process(Pid),
- do_mutual_suspend(Pid, N-1).
-
suspend_exit(Config) when is_list(Config) ->
ct:timetrap({minutes, 2}),
rand:seed(exsplus, {4711,17,4711}),
@@ -1513,7 +1464,8 @@ suspend_opts(Config) when is_list(Config) ->
dbl_async = AA,
synced = S,
async_once = AO} = Acc) ->
- erlang:suspend_process(Tok, [asynchronous]),
+ Tag = {make_ref(), self()},
+ erlang:suspend_process(Tok, [{asynchronous, Tag}]),
Res = case {suspend_count(Tok), N rem 4} of
{0, 2} ->
erlang:suspend_process(Tok,
@@ -1549,7 +1501,11 @@ suspend_opts(Config) when is_list(Config) ->
_ ->
Acc
end,
- erlang:resume_process(Tok),
+ receive
+ {Tag, Result} ->
+ suspended = Result,
+ erlang:resume_process(Tok)
+ end,
erlang:yield(),
Res
end,
diff --git a/erts/emulator/test/tracer_SUITE.erl b/erts/emulator/test/tracer_SUITE.erl
index e1362ef07a..070462b0f1 100644
--- a/erts/emulator/test/tracer_SUITE.erl
+++ b/erts/emulator/test/tracer_SUITE.erl
@@ -623,7 +623,7 @@ test(Event, TraceFlag, Tc, Expect, _Removes, Dies) ->
Expect(Pid1, State1, Opts),
receive M11 -> ct:fail({unexpected, M11}) after 0 -> ok end,
- if not Dies ->
+ if not Dies andalso Event /= in ->
{flags, [TraceFlag]} = erlang:trace_info(Pid1, flags),
{tracer, {tracer_test, State1}} = erlang:trace_info(Pid1, tracer),
erlang:trace(Pid1, false, [TraceFlag]);
@@ -640,7 +640,7 @@ test(Event, TraceFlag, Tc, Expect, _Removes, Dies) ->
Expect(Pid1T, State1, Opts#{ scheduler_id => number,
timestamp => timestamp}),
receive M11T -> ct:fail({unexpected, M11T}) after 0 -> ok end,
- if not Dies ->
+ if not Dies andalso Event /= in ->
{flags, [scheduler_id, TraceFlag, timestamp]}
= erlang:trace_info(Pid1T, flags),
{tracer, {tracer_test, State1}} = erlang:trace_info(Pid1T, tracer),
@@ -655,7 +655,7 @@ test(Event, TraceFlag, Tc, Expect, _Removes, Dies) ->
Tc(Pid2),
ok = trace_delivered(Pid2),
receive M2 -> ct:fail({unexpected, M2}) after 0 -> ok end,
- if not Dies ->
+ if not Dies andalso Event /= in ->
{flags, [TraceFlag]} = erlang:trace_info(Pid2, flags),
{tracer, {tracer_test, State2}} = erlang:trace_info(Pid2, tracer),
erlang:trace(Pid2, false, [TraceFlag]);