aboutsummaryrefslogtreecommitdiffstats
path: root/erts
diff options
context:
space:
mode:
Diffstat (limited to 'erts')
-rw-r--r--erts/emulator/beam/erl_node_tables.c118
-rw-r--r--erts/emulator/beam/erl_port_task.c2
-rw-r--r--erts/emulator/beam/erl_proc_sig_queue.c2
-rw-r--r--erts/emulator/beam/erl_process.c377
-rw-r--r--erts/emulator/beam/erl_process.h10
-rw-r--r--erts/emulator/beam/io.c2
6 files changed, 391 insertions, 120 deletions
diff --git a/erts/emulator/beam/erl_node_tables.c b/erts/emulator/beam/erl_node_tables.c
index 60c3be3223..8fe5c3c690 100644
--- a/erts/emulator/beam/erl_node_tables.c
+++ b/erts/emulator/beam/erl_node_tables.c
@@ -1850,6 +1850,70 @@ insert_sig_ext(ErtsDistExternal *edep, void *arg)
insert_dist_entry(edep->dep, SIGNAL_REF, proc->common.id, 0);
}
+static void
+insert_process(Process *proc)
+{
+ int mli;
+ ErtsMessage *msg_list[] = {proc->msg_frag};
+ ErlHeapFragment *hfp;
+
+ /* Insert Heap */
+ insert_offheap(&(proc->off_heap),
+ HEAP_REF,
+ proc->common.id);
+ /* Insert heap fragments buffers */
+ for(hfp = proc->mbuf; hfp; hfp = hfp->next)
+ insert_offheap(&(hfp->off_heap),
+ HEAP_REF,
+ proc->common.id);
+
+ /* Insert msg buffers */
+ for (mli = 0; mli < sizeof(msg_list)/sizeof(msg_list[0]); mli++) {
+ ErtsMessage *msg;
+ for (msg = msg_list[mli]; msg; msg = msg->next)
+ insert_message(msg, HEAP_REF, proc);
+ }
+
+ /* Insert signal queue */
+ erts_proc_sig_debug_foreach_sig(proc,
+ insert_sig_msg,
+ insert_sig_offheap,
+ insert_sig_monitor,
+ insert_sig_link,
+ insert_sig_ext,
+ (void *) proc);
+
+ /* If the process is FREE, the proc->common field has been
+ re-used by the ptab delete, so we cannot trust it. */
+ if (!(erts_atomic32_read_nob(&proc->state) & ERTS_PSFLG_FREE)) {
+ /* Insert links */
+ insert_p_links(&proc->common);
+
+ /* Insert monitors */
+ insert_p_monitors(&proc->common);
+ }
+
+ {
+ DistEntry *dep = ERTS_PROC_GET_DIST_ENTRY(proc);
+ if (dep)
+ insert_dist_entry(dep,
+ CTRL_REF,
+ proc->common.id,
+ 0);
+ }
+}
+
+static void
+insert_dist_suspended_procs(DistEntry *dep)
+{
+ ErtsProcList *plist = erts_proclist_peek_first(dep->suspended);
+ while (plist) {
+ if (is_not_immed(plist->u.pid))
+ insert_process(plist->u.p);
+ plist = erts_proclist_peek_next(dep->suspended, plist);
+ }
+}
+
#ifdef ERL_NODE_BOOKKEEP
void
erts_node_bookkeep(ErlNode *np, Eterm term, int what)
@@ -1872,7 +1936,6 @@ erts_node_bookkeep(ErlNode *np, Eterm term, int what)
static void
setup_reference_table(void)
{
- ErlHeapFragment *hfp;
DistEntry *dep;
HashInfo hi;
int i, max;
@@ -1922,53 +1985,10 @@ setup_reference_table(void)
/* Insert all processes */
for (i = 0; i < max; i++) {
Process *proc = erts_pix2proc(i);
- if (proc) {
- int mli;
- ErtsMessage *msg_list[] = {proc->msg_frag};
-
- /* Insert Heap */
- insert_offheap(&(proc->off_heap),
- HEAP_REF,
- proc->common.id);
- /* Insert heap fragments buffers */
- for(hfp = proc->mbuf; hfp; hfp = hfp->next)
- insert_offheap(&(hfp->off_heap),
- HEAP_REF,
- proc->common.id);
-
- /* Insert msg buffers */
- for (mli = 0; mli < sizeof(msg_list)/sizeof(msg_list[0]); mli++) {
- ErtsMessage *msg;
- for (msg = msg_list[mli]; msg; msg = msg->next)
- insert_message(msg, HEAP_REF, proc);
- }
-
- /* Insert signal queue */
- erts_proc_sig_debug_foreach_sig(proc,
- insert_sig_msg,
- insert_sig_offheap,
- insert_sig_monitor,
- insert_sig_link,
- insert_sig_ext,
- (void *) proc);
-
- /* Insert links */
- insert_p_links(&proc->common);
-
- /* Insert monitors */
- insert_p_monitors(&proc->common);
-
- {
- DistEntry *dep = ERTS_PROC_GET_DIST_ENTRY(proc);
- if (dep)
- insert_dist_entry(dep,
- CTRL_REF,
- proc->common.id,
- 0);
- }
- }
+ if (proc)
+ insert_process(proc);
}
-
+
erts_foreach_sys_msg_in_q(insert_sys_msg);
/* Insert all ports */
@@ -2042,18 +2062,21 @@ setup_reference_table(void)
insert_dist_links(dep);
insert_dist_monitors(dep);
insert_dist_sequences(dep);
+ insert_dist_suspended_procs(dep);
}
for(dep = erts_hidden_dist_entries; dep; dep = dep->next) {
insert_dist_links(dep);
insert_dist_monitors(dep);
insert_dist_sequences(dep);
+ insert_dist_suspended_procs(dep);
}
for(dep = erts_pending_dist_entries; dep; dep = dep->next) {
insert_dist_links(dep);
insert_dist_monitors(dep);
insert_dist_sequences(dep);
+ insert_dist_suspended_procs(dep);
}
/* Not connected dist entries should not have any links,
@@ -2062,6 +2085,7 @@ setup_reference_table(void)
insert_dist_links(dep);
insert_dist_monitors(dep);
insert_dist_sequences(dep);
+ insert_dist_suspended_procs(dep);
}
/* Insert all ets tables */
diff --git a/erts/emulator/beam/erl_port_task.c b/erts/emulator/beam/erl_port_task.c
index c8f2e88127..30a7875387 100644
--- a/erts/emulator/beam/erl_port_task.c
+++ b/erts/emulator/beam/erl_port_task.c
@@ -2094,7 +2094,7 @@ begin_port_cleanup(Port *pp, ErtsPortTask **execqp, int *processing_busy_q_p)
erts_snprintf(port_str, sizeof(DTRACE_CHARBUF_NAME(port_str)), "%T", pp->common.id);
while (plp2 != NULL) {
- erts_snprintf(pid_str, sizeof(DTRACE_CHARBUF_NAME(pid_str)), "%T", plp2->pid);
+ erts_snprintf(pid_str, sizeof(DTRACE_CHARBUF_NAME(pid_str)), "%T", plp2->u.pid);
DTRACE2(process_port_unblocked, pid_str, port_str);
}
}
diff --git a/erts/emulator/beam/erl_proc_sig_queue.c b/erts/emulator/beam/erl_proc_sig_queue.c
index d20be0149c..9c74a2c355 100644
--- a/erts/emulator/beam/erl_proc_sig_queue.c
+++ b/erts/emulator/beam/erl_proc_sig_queue.c
@@ -3854,7 +3854,7 @@ erts_proc_sig_handle_exit(Process *c_p, Sint *redsp)
break;
case ERTS_SIG_Q_OP_MONITOR: {
- ErtsProcExitContext pectxt = {c_p, am_noproc};
+ ErtsProcExitContext pectxt = {c_p, am_noproc, NULL, NULL, NIL};
erts_proc_exit_handle_monitor((ErtsMonitor *) sig,
(void *) &pectxt, -1);
cnt += 4;
diff --git a/erts/emulator/beam/erl_process.c b/erts/emulator/beam/erl_process.c
index 422f6a51d9..ddb76a0b00 100644
--- a/erts/emulator/beam/erl_process.c
+++ b/erts/emulator/beam/erl_process.c
@@ -1470,7 +1470,10 @@ proclist_create(Process *p)
{
ErtsProcList *plp = proclist_alloc();
ensure_later_proc_interval(p->common.u.alive.started_interval);
- plp->pid = p->common.id;
+ if (erts_atomic32_read_nob(&p->state) & ERTS_PSFLG_FREE)
+ plp->u.p = p;
+ else
+ plp->u.pid = p->common.id;
plp->started_interval = p->common.u.alive.started_interval;
return plp;
}
@@ -1479,7 +1482,7 @@ static ERTS_INLINE ErtsProcList *
proclist_copy(ErtsProcList *plp0)
{
ErtsProcList *plp1 = proclist_alloc();
- plp1->pid = plp0->pid;
+ plp1->u.pid = plp0->u.pid;
plp1->started_interval = plp0->started_interval;
return plp1;
}
@@ -1514,7 +1517,10 @@ erts_proclist_dump(fmtfn_t to, void *to_arg, ErtsProcList *plp)
ErtsProcList *first = plp;
while (plp) {
- erts_print(to, to_arg, "%T", plp->pid);
+ if (is_pid(plp->u.pid))
+ erts_print(to, to_arg, "%T", plp->u.pid);
+ else
+ erts_print(to, to_arg, "%T", plp->u.p->common.id);
plp = plp->next;
if (plp == first)
break;
@@ -7204,8 +7210,7 @@ schdlr_sspnd_resume_procs(ErtsSchedType sched_type,
while (resume->msb.chngrs) {
ErtsProcList *plp = resume->msb.chngrs;
resume->msb.chngrs = plp->next;
- schdlr_sspnd_resume_proc(sched_type,
- plp->pid);
+ schdlr_sspnd_resume_proc(sched_type, plp->u.pid);
proclist_destroy(plp);
}
}
@@ -7637,7 +7642,7 @@ suspend_scheduler(ErtsSchedulerData *esdp)
else {
schdlr_sspnd.changer = am_true; /* change right in transit */
/* resume process that is queued for next change... */
- resume.onln.nxt = plp->pid;
+ resume.onln.nxt = plp->u.pid;
ASSERT(is_internal_pid(resume.onln.nxt));
}
}
@@ -7857,7 +7862,7 @@ abort_sched_onln_chng_waitq(Process *p)
proclist_destroy(plp);
plp = erts_proclist_peek_first(schdlr_sspnd.chngq);
if (plp)
- resume = plp->pid;
+ resume = plp->u.pid;
else
schdlr_sspnd.changer = am_false;
}
@@ -8357,10 +8362,10 @@ erts_multi_scheduling_blockers(Process *p, int normal)
plp1;
plp1 = erts_proclist_peek_next(msbp->blckrs, plp1)) {
for (plp2 = erts_proclist_peek_first(msbp->blckrs);
- plp2->pid != plp1->pid;
+ plp2->u.pid != plp1->u.pid;
plp2 = erts_proclist_peek_next(msbp->blckrs, plp2));
if (plp2 == plp1) {
- res = CONS(hp, plp1->pid, res);
+ res = CONS(hp, plp1->u.pid, res);
hp += 2;
}
/* else: already in result list */
@@ -9029,8 +9034,13 @@ erts_resume_processes(ErtsProcList *list)
while (plp) {
Process *proc;
ErtsProcList *fplp;
- ASSERT(is_internal_pid(plp->pid));
- proc = erts_pid2proc(NULL, 0, plp->pid, ERTS_PROC_LOCK_STATUS);
+ ASSERT(is_internal_pid(plp->u.pid) || is_CP((Eterm)plp->u.p));
+ if (is_internal_pid(plp->u.pid))
+ proc = erts_pid2proc(NULL, 0, plp->u.pid, ERTS_PROC_LOCK_STATUS);
+ else {
+ proc = plp->u.p;
+ erts_proc_lock(proc, ERTS_PROC_LOCK_STATUS);
+ }
if (proc) {
if (erts_proclist_same(plp, proc)) {
resume_process(proc, ERTS_PROC_LOCK_STATUS);
@@ -12050,12 +12060,91 @@ erts_set_self_exiting(Process *c_p, Eterm reason)
add2runq(enqueue, enq_prio, c_p, state, NULL);
}
+static int
+erts_proc_exit_handle_dist_monitor(ErtsMonitor *mon, void *vctxt, Sint reds)
+{
+ ErtsProcExitContext *ctxt = (ErtsProcExitContext *) vctxt;
+ Process *c_p = ctxt->c_p;
+ Eterm reason = ctxt->reason;
+ int code;
+ ErtsDSigSendContext ctx;
+ ErtsMonLnkDist *dist;
+ DistEntry *dep;
+ Eterm watcher;
+ ErtsMonitorData *mdp = NULL;
+ Eterm watched;
+
+ ASSERT(erts_monitor_is_target(mon) && mon->type == ERTS_MON_TYPE_DIST_PROC);
+
+ mdp = erts_monitor_to_data(mon);
+
+ if (mon->flags & ERTS_ML_FLG_NAME)
+ watched = ((ErtsMonitorDataExtended *) mdp)->u.name;
+ else
+ watched = c_p->common.id;
+ ASSERT(is_internal_pid(watched) || is_atom(watched));
+
+ watcher = mon->other.item;
+ ASSERT(is_external_pid(watcher));
+ dep = external_pid_dist_entry(watcher);
+ ASSERT(dep);
+ dist = ((ErtsMonitorDataExtended *) mdp)->dist;
+ ASSERT(dist);
+
+ code = erts_dsig_prepare(&ctx, dep, c_p, ERTS_PROC_LOCK_MAIN,
+ ERTS_DSP_NO_LOCK, 0, 0, 1);
+
+ ctx.reds = (Sint) (reds * TERM_TO_BINARY_LOOP_FACTOR);
+
+ switch (code) {
+ case ERTS_DSIG_PREP_NOT_ALIVE:
+ case ERTS_DSIG_PREP_NOT_CONNECTED:
+ break;
+ case ERTS_DSIG_PREP_PENDING:
+ case ERTS_DSIG_PREP_CONNECTED:
+ if (dist->connection_id != ctx.connection_id)
+ break;
+ code = erts_dsig_send_m_exit(&ctx,
+ watcher,
+ watched,
+ mdp->ref,
+ reason);
+ switch (code) {
+ case ERTS_DSIG_SEND_CONTINUE:
+ erts_set_gc_state(c_p, 0);
+ ctxt->dist_state = erts_dsend_export_trap_context(c_p, &ctx);
+ /* fall-through */
+ case ERTS_DSIG_SEND_YIELD:
+ break;
+ case ERTS_DSIG_SEND_OK:
+ break;
+ case ERTS_DSIG_SEND_TOO_LRG:
+ erts_set_gc_state(c_p, 1);
+ break;
+ default:
+ ASSERT(! "Invalid dsig send exit monitor result");
+ break;
+ }
+ break;
+ default:
+ ASSERT(! "Invalid dsig prep exit monitor result");
+ break;
+ }
+ if (!erts_monitor_dist_delete(&mdp->origin))
+ erts_monitor_release(mon);
+ else
+ erts_monitor_release_both(mdp);
+ return reds - (ctx.reds / TERM_TO_BINARY_LOOP_FACTOR);
+}
+
int
erts_proc_exit_handle_monitor(ErtsMonitor *mon, void *vctxt, Sint reds)
{
- Process *c_p = ((ErtsProcExitContext *) vctxt)->c_p;
- Eterm reason = ((ErtsProcExitContext *) vctxt)->reason;
+ ErtsProcExitContext *ctxt = (ErtsProcExitContext *) vctxt;
+ Process *c_p = ctxt->c_p;
+ Eterm reason = ctxt->reason;
ErtsMonitorData *mdp = NULL;
+ int res = 1;
if (erts_monitor_is_target(mon)) {
/* We are being watched... */
@@ -12090,38 +12179,43 @@ erts_proc_exit_handle_monitor(ErtsMonitor *mon, void *vctxt, Sint reds)
Eterm watcher;
Eterm watched;
- mdp = erts_monitor_to_data(mon);
+ if (is_immed(reason)) {
+ mdp = erts_monitor_to_data(mon);
- if (mon->flags & ERTS_ML_FLG_NAME)
- watched = ((ErtsMonitorDataExtended *) mdp)->u.name;
- else
- watched = c_p->common.id;
- ASSERT(is_internal_pid(watched) || is_atom(watched));
-
- watcher = mon->other.item;
- ASSERT(is_external_pid(watcher));
- dep = external_pid_dist_entry(watcher);
- ASSERT(dep);
- dist = ((ErtsMonitorDataExtended *) mdp)->dist;
- ASSERT(dist);
- code = erts_dsig_prepare(&ctx, dep, NULL, 0,
- ERTS_DSP_NO_LOCK, 1, 1, 0);
- switch (code) {
- case ERTS_DSIG_PREP_CONNECTED:
- case ERTS_DSIG_PREP_PENDING:
- if (dist->connection_id == ctx.connection_id) {
- code = erts_dsig_send_m_exit(&ctx,
- watcher,
- watched,
- mdp->ref,
- reason);
- ASSERT(code == ERTS_DSIG_SEND_OK);
+ if (mon->flags & ERTS_ML_FLG_NAME)
+ watched = ((ErtsMonitorDataExtended *) mdp)->u.name;
+ else
+ watched = c_p->common.id;
+ ASSERT(is_internal_pid(watched) || is_atom(watched));
+
+ watcher = mon->other.item;
+ ASSERT(is_external_pid(watcher));
+ dep = external_pid_dist_entry(watcher);
+ ASSERT(dep);
+ dist = ((ErtsMonitorDataExtended *) mdp)->dist;
+ ASSERT(dist);
+ code = erts_dsig_prepare(&ctx, dep, NULL, 0,
+ ERTS_DSP_NO_LOCK, 1, 1, 0);
+ switch (code) {
+ case ERTS_DSIG_PREP_CONNECTED:
+ case ERTS_DSIG_PREP_PENDING:
+ if (dist->connection_id == ctx.connection_id) {
+ code = erts_dsig_send_m_exit(&ctx,
+ watcher,
+ watched,
+ mdp->ref,
+ reason);
+ ASSERT(code == ERTS_DSIG_SEND_OK);
+ }
+ default:
+ break;
}
- default:
- break;
+ if (!erts_monitor_dist_delete(&mdp->origin))
+ mdp = NULL;
+ } else {
+ erts_monitor_tree_insert(&ctxt->dist_monitors, mon);
+ return 1;
}
- if (!erts_monitor_dist_delete(&mdp->origin))
- mdp = NULL;
break;
}
default:
@@ -12197,6 +12291,7 @@ erts_proc_exit_handle_monitor(ErtsMonitor *mon, void *vctxt, Sint reds)
}
if (!erts_monitor_dist_delete(&mdp->target))
mdp = NULL;
+ res = 100;
break;
}
default:
@@ -12209,12 +12304,84 @@ erts_proc_exit_handle_monitor(ErtsMonitor *mon, void *vctxt, Sint reds)
erts_monitor_release_both(mdp);
else if (mon)
erts_monitor_release(mon);
- return 1;
+ return res;
+}
+
+static int
+erts_proc_exit_handle_dist_link(ErtsLink *lnk, void *vctxt, Sint reds)
+{
+ ErtsProcExitContext *ctxt = (ErtsProcExitContext *) vctxt;
+ Process *c_p = ctxt->c_p;
+ Eterm reason = ctxt->reason;
+ int code;
+ ErtsDSigSendContext ctx;
+ ErtsMonLnkDist *dist;
+ DistEntry *dep;
+ ErtsLink *dlnk;
+ ErtsLinkData *ldp = NULL;
+
+ ASSERT(lnk->type == ERTS_LNK_TYPE_DIST_PROC);
+ dlnk = erts_link_to_other(lnk, &ldp);
+ dist = ((ErtsLinkDataExtended *) ldp)->dist;
+
+ ASSERT(is_external_pid(lnk->other.item));
+ dep = external_pid_dist_entry(lnk->other.item);
+
+ ASSERT(dep != erts_this_dist_entry);
+
+ if (!erts_link_dist_delete(dlnk))
+ ldp = NULL;
+
+ code = erts_dsig_prepare(&ctx, dep, c_p, ERTS_PROC_LOCK_MAIN,
+ ERTS_DSP_NO_LOCK, 0, 0, 0);
+
+ ctx.reds = (Sint) (reds * TERM_TO_BINARY_LOOP_FACTOR);
+
+ switch (code) {
+ case ERTS_DSIG_PREP_NOT_ALIVE:
+ case ERTS_DSIG_PREP_NOT_CONNECTED:
+ break;
+ case ERTS_DSIG_PREP_PENDING:
+ case ERTS_DSIG_PREP_CONNECTED:
+ if (dist->connection_id != ctx.connection_id)
+ break;
+ code = erts_dsig_send_exit_tt(&ctx,
+ c_p->common.id,
+ lnk->other.item,
+ reason,
+ SEQ_TRACE_TOKEN(c_p));
+ switch (code) {
+ case ERTS_DSIG_SEND_CONTINUE:
+ erts_set_gc_state(c_p, 0);
+ ctxt->dist_state = erts_dsend_export_trap_context(c_p, &ctx);
+ /* fall-through */
+ case ERTS_DSIG_SEND_YIELD:
+ break;
+ case ERTS_DSIG_SEND_OK:
+ break;
+ case ERTS_DSIG_SEND_TOO_LRG:
+ erts_set_gc_state(c_p, 1);
+ break;
+ default:
+ ASSERT(! "Invalid dsig send exit monitor result");
+ break;
+ }
+ break;
+ default:
+ ASSERT(! "Invalid dsig prep exit monitor result");
+ break;
+ }
+ if (ldp)
+ erts_link_release_both(ldp);
+ else if (lnk)
+ erts_link_release(lnk);
+ return reds - (ctx.reds / TERM_TO_BINARY_LOOP_FACTOR);
}
int
erts_proc_exit_handle_link(ErtsLink *lnk, void *vctxt, Sint reds)
{
+ ErtsProcExitContext *ctxt = (ErtsProcExitContext *) vctxt;
Process *c_p = ((ErtsProcExitContext *) vctxt)->c_p;
Eterm reason = ((ErtsProcExitContext *) vctxt)->reason;
ErtsLinkData *ldp = NULL;
@@ -12248,29 +12415,37 @@ erts_proc_exit_handle_link(ErtsLink *lnk, void *vctxt, Sint reds)
ErtsDSigSendContext ctx;
int code;
- dlnk = erts_link_to_other(lnk, &ldp);
- dist = ((ErtsLinkDataExtended *) ldp)->dist;
+ if (is_immed(reason)) {
+ dlnk = erts_link_to_other(lnk, &ldp);
+ dist = ((ErtsLinkDataExtended *) ldp)->dist;
- ASSERT(is_external_pid(lnk->other.item));
- dep = external_pid_dist_entry(lnk->other.item);
+ ASSERT(is_external_pid(lnk->other.item));
+ dep = external_pid_dist_entry(lnk->other.item);
- ASSERT(dep != erts_this_dist_entry);
+ ASSERT(dep != erts_this_dist_entry);
- if (!erts_link_dist_delete(dlnk))
- ldp = NULL;
+ if (!erts_link_dist_delete(dlnk))
+ ldp = NULL;
- code = erts_dsig_prepare(&ctx, dep, c_p, 0, ERTS_DSP_NO_LOCK, 1, 1, 0);
- switch (code) {
- case ERTS_DSIG_PREP_CONNECTED:
- case ERTS_DSIG_PREP_PENDING:
- if (dist->connection_id == ctx.connection_id) {
- code = erts_dsig_send_exit_tt(&ctx,
- c_p->common.id,
- lnk->other.item,
- reason,
- SEQ_TRACE_TOKEN(c_p));
- ASSERT(code == ERTS_DSIG_SEND_OK);
+ code = erts_dsig_prepare(&ctx, dep, c_p, 0, ERTS_DSP_NO_LOCK, 1, 1, 0);
+ switch (code) {
+ case ERTS_DSIG_PREP_CONNECTED:
+ case ERTS_DSIG_PREP_PENDING:
+ if (dist->connection_id == ctx.connection_id) {
+ code = erts_dsig_send_exit_tt(&ctx,
+ c_p->common.id,
+ lnk->other.item,
+ reason,
+ SEQ_TRACE_TOKEN(c_p));
+ ASSERT(code == ERTS_DSIG_SEND_OK);
+ }
+ break;
+ default:
+ break;
}
+ } else {
+ erts_link_tree_insert(&ctxt->dist_links, lnk);
+ return 1;
}
break;
}
@@ -12359,6 +12534,9 @@ enum continue_exit_phase {
ERTS_CONTINUE_EXIT_MONITORS,
ERTS_CONTINUE_EXIT_LT_MONITORS,
ERTS_CONTINUE_EXIT_HANDLE_PROC_SIG,
+ ERTS_CONTINUE_EXIT_DIST_LINKS,
+ ERTS_CONTINUE_EXIT_DIST_MONITORS,
+ ERTS_CONTINUE_EXIT_DONE,
};
struct continue_exit_state {
@@ -12398,7 +12576,7 @@ erts_continue_exit_process(Process *p)
ASSERT(ERTS_PROC_IS_EXITING(p));
ASSERT(erts_proc_read_refc(p) > 0);
-
+restart:
switch (trap_state->phase) {
case ERTS_CONTINUE_EXIT_TIMERS:
if (p->bif_timers) {
@@ -12558,6 +12736,7 @@ erts_continue_exit_process(Process *p)
? ERTS_PROC_SET_DIST_ENTRY(p, NULL)
: NULL);
+ reds -= 50;
erts_proc_unlock(p, ERTS_PROC_LOCKS_ALL_MINOR);
curr_locks = ERTS_PROC_LOCK_MAIN;
@@ -12591,6 +12770,9 @@ erts_continue_exit_process(Process *p)
trap_state->pectxt.c_p = p;
trap_state->pectxt.reason = trap_state->reason;
+ trap_state->pectxt.dist_links = NULL;
+ trap_state->pectxt.dist_monitors = NULL;
+ trap_state->pectxt.dist_state = NIL;
erts_proc_lock(p, ERTS_PROC_LOCK_MSGQ);
@@ -12598,8 +12780,6 @@ erts_continue_exit_process(Process *p)
erts_proc_unlock(p, ERTS_PROC_LOCK_MSGQ);
- reds -= 1000;
-
trap_state->yield_state = NULL;
trap_state->phase = ERTS_CONTINUE_EXIT_LINKS;
if (reds <= 0) goto yield;
@@ -12646,18 +12826,82 @@ erts_continue_exit_process(Process *p)
trap_state->phase = ERTS_CONTINUE_EXIT_HANDLE_PROC_SIG;
case ERTS_CONTINUE_EXIT_HANDLE_PROC_SIG: {
Sint r = reds;
- erts_aint_t state;
if (!erts_proc_sig_handle_exit(p, &r))
goto yield;
reds -= r;
+ trap_state->phase = ERTS_CONTINUE_EXIT_DIST_LINKS;
+ }
+ case ERTS_CONTINUE_EXIT_DIST_LINKS: {
+
+ continue_dist_send:
+ if (is_not_nil(trap_state->pectxt.dist_state)) {
+ Binary* bin = erts_magic_ref2bin(trap_state->pectxt.dist_state);
+ ErtsDSigSendContext* ctx = (ErtsDSigSendContext*) ERTS_MAGIC_BIN_DATA(bin);
+ Sint initial_reds = (Sint) (ERTS_BIF_REDS_LEFT(p) * TERM_TO_BINARY_LOOP_FACTOR);
+ int result;
+
+ ctx->reds = initial_reds;
+ result = erts_dsig_send(ctx);
+
+ /* erts_dsig_send bumps reductions on the process in the ctx */
+ reds = ERTS_BIF_REDS_LEFT(p);
+
+ switch (result) {
+ case ERTS_DSIG_SEND_OK:
+ case ERTS_DSIG_SEND_TOO_LRG: /*SEND_SYSTEM_LIMIT*/
+ case ERTS_DSIG_SEND_YIELD: /*SEND_YIELD_RETURN*/
+ break;
+ case ERTS_DSIG_SEND_CONTINUE: { /*SEND_YIELD_CONTINUE*/
+ goto yield;
+ }
+ }
+ erts_set_gc_state(p, 1);
+ trap_state->pectxt.dist_state = NIL;
+ if (reds <= 0)
+ goto yield;
+ goto restart;
+ }
+
+ reds = erts_link_tree_foreach_delete_yielding(
+ &trap_state->pectxt.dist_links,
+ erts_proc_exit_handle_dist_link,
+ (void *) &trap_state->pectxt,
+ &trap_state->yield_state,
+ reds);
+ if (reds <= 0 || is_not_nil(trap_state->pectxt.dist_state))
+ goto yield;
+ trap_state->phase = ERTS_CONTINUE_EXIT_DIST_MONITORS;
+ }
+ case ERTS_CONTINUE_EXIT_DIST_MONITORS: {
+
+ if (is_not_nil(trap_state->pectxt.dist_state))
+ goto continue_dist_send;
+
+ reds = erts_monitor_tree_foreach_delete_yielding(
+ &trap_state->pectxt.dist_monitors,
+ erts_proc_exit_handle_dist_monitor,
+ (void *) &trap_state->pectxt,
+ &trap_state->yield_state,
+ reds);
+ if (reds <= 0 || is_not_nil(trap_state->pectxt.dist_state))
+ goto yield;
+
+ trap_state->phase = ERTS_CONTINUE_EXIT_DONE;
+ }
+ case ERTS_CONTINUE_EXIT_DONE: {
+ erts_aint_t state;
/*
* From this point on we are no longer allowed to yield
* this process.
*/
+#ifdef DEBUG
+ yield_allowed = 0;
+#endif
+
/* Set state to not active as we don't want this process
to be scheduled in again after this. */
state = erts_atomic32_read_band_relb(&p->state,
@@ -12688,9 +12932,6 @@ erts_continue_exit_process(Process *p)
&p->common.u.release,
sizeof(Process));
-#ifdef DEBUG
- yield_allowed = 0;
-#endif
break;
}
}
diff --git a/erts/emulator/beam/erl_process.h b/erts/emulator/beam/erl_process.h
index cdc03cf940..3b593bce02 100644
--- a/erts/emulator/beam/erl_process.h
+++ b/erts/emulator/beam/erl_process.h
@@ -381,7 +381,10 @@ struct ErtsSchedulerSleepInfo_ {
typedef struct ErtsProcList_ ErtsProcList;
struct ErtsProcList_ {
- Eterm pid;
+ union {
+ Eterm pid;
+ Process *p;
+ } u;
Uint64 started_interval;
ErtsProcList* next;
ErtsProcList* prev;
@@ -1580,7 +1583,7 @@ ERTS_GLB_INLINE int erts_proclist_is_last(ErtsProcList *, ErtsProcList *);
ERTS_GLB_INLINE int
erts_proclist_same(ErtsProcList *plp, Process *p)
{
- return (plp->pid == p->common.id
+ return ((plp->u.pid == p->common.id || plp->u.p == p)
&& (plp->started_interval
== p->common.u.alive.started_interval));
}
@@ -1819,6 +1822,9 @@ Eterm erts_process_info(Process *c_p, ErtsHeapFactory *hfact,
typedef struct {
Process *c_p;
Eterm reason;
+ ErtsLink *dist_links;
+ ErtsMonitor *dist_monitors;
+ Eterm dist_state;
} ErtsProcExitContext;
int erts_proc_exit_handle_monitor(ErtsMonitor *mon, void *vctxt, Sint reds);
int erts_proc_exit_handle_link(ErtsLink *lnk, void *vctxt, Sint reds);
diff --git a/erts/emulator/beam/io.c b/erts/emulator/beam/io.c
index 36824fe62c..b961c639f5 100644
--- a/erts/emulator/beam/io.c
+++ b/erts/emulator/beam/io.c
@@ -5104,7 +5104,7 @@ erts_port_resume_procs(Port *prt)
erts_snprintf(port_str, sizeof(DTRACE_CHARBUF_NAME(port_str)), "%T", prt->common.id);
while (plp2 != NULL) {
- erts_snprintf(pid_str, sizeof(DTRACE_CHARBUF_NAME(pid_str)), "%T", plp2->pid);
+ erts_snprintf(pid_str, sizeof(DTRACE_CHARBUF_NAME(pid_str)), "%T", plp2->u.pid);
DTRACE2(process_port_unblocked, pid_str, port_str);
}
}