aboutsummaryrefslogtreecommitdiffstats
path: root/erts
diff options
context:
space:
mode:
authorLukas Larsson <[email protected]>2019-01-31 10:52:35 +0100
committerLukas Larsson <[email protected]>2019-02-22 11:12:54 +0100
commitfc0967392625626289f03e6955c24e73ea1fd617 (patch)
tree8a315eb719e3da00d44bb193724f8f3eaac04f88 /erts
parent2bf27ec51e331371412576a9a9a67353109109ad (diff)
downloadotp-fc0967392625626289f03e6955c24e73ea1fd617.tar.gz
otp-fc0967392625626289f03e6955c24e73ea1fd617.tar.bz2
otp-fc0967392625626289f03e6955c24e73ea1fd617.zip
erts: Implement trapping while sending distr exit/down
The reason in EXIT and DOWN may be arbitrarily large, so we yield and allow other processes to execute while encoding and sending the signals over the distribution.
Diffstat (limited to 'erts')
-rw-r--r--erts/emulator/beam/erl_node_tables.c118
-rw-r--r--erts/emulator/beam/erl_port_task.c2
-rw-r--r--erts/emulator/beam/erl_proc_sig_queue.c2
-rw-r--r--erts/emulator/beam/erl_process.c377
-rw-r--r--erts/emulator/beam/erl_process.h10
-rw-r--r--erts/emulator/beam/io.c2
6 files changed, 391 insertions, 120 deletions
diff --git a/erts/emulator/beam/erl_node_tables.c b/erts/emulator/beam/erl_node_tables.c
index 60c3be3223..8fe5c3c690 100644
--- a/erts/emulator/beam/erl_node_tables.c
+++ b/erts/emulator/beam/erl_node_tables.c
@@ -1850,6 +1850,70 @@ insert_sig_ext(ErtsDistExternal *edep, void *arg)
insert_dist_entry(edep->dep, SIGNAL_REF, proc->common.id, 0);
}
+static void
+insert_process(Process *proc)
+{
+ int mli;
+ ErtsMessage *msg_list[] = {proc->msg_frag};
+ ErlHeapFragment *hfp;
+
+ /* Insert Heap */
+ insert_offheap(&(proc->off_heap),
+ HEAP_REF,
+ proc->common.id);
+ /* Insert heap fragments buffers */
+ for(hfp = proc->mbuf; hfp; hfp = hfp->next)
+ insert_offheap(&(hfp->off_heap),
+ HEAP_REF,
+ proc->common.id);
+
+ /* Insert msg buffers */
+ for (mli = 0; mli < sizeof(msg_list)/sizeof(msg_list[0]); mli++) {
+ ErtsMessage *msg;
+ for (msg = msg_list[mli]; msg; msg = msg->next)
+ insert_message(msg, HEAP_REF, proc);
+ }
+
+ /* Insert signal queue */
+ erts_proc_sig_debug_foreach_sig(proc,
+ insert_sig_msg,
+ insert_sig_offheap,
+ insert_sig_monitor,
+ insert_sig_link,
+ insert_sig_ext,
+ (void *) proc);
+
+ /* If the process is FREE, the proc->common field has been
+ re-used by the ptab delete, so we cannot trust it. */
+ if (!(erts_atomic32_read_nob(&proc->state) & ERTS_PSFLG_FREE)) {
+ /* Insert links */
+ insert_p_links(&proc->common);
+
+ /* Insert monitors */
+ insert_p_monitors(&proc->common);
+ }
+
+ {
+ DistEntry *dep = ERTS_PROC_GET_DIST_ENTRY(proc);
+ if (dep)
+ insert_dist_entry(dep,
+ CTRL_REF,
+ proc->common.id,
+ 0);
+ }
+}
+
+static void
+insert_dist_suspended_procs(DistEntry *dep)
+{
+ ErtsProcList *plist = erts_proclist_peek_first(dep->suspended);
+ while (plist) {
+ if (is_not_immed(plist->u.pid))
+ insert_process(plist->u.p);
+ plist = erts_proclist_peek_next(dep->suspended, plist);
+ }
+}
+
#ifdef ERL_NODE_BOOKKEEP
void
erts_node_bookkeep(ErlNode *np, Eterm term, int what)
@@ -1872,7 +1936,6 @@ erts_node_bookkeep(ErlNode *np, Eterm term, int what)
static void
setup_reference_table(void)
{
- ErlHeapFragment *hfp;
DistEntry *dep;
HashInfo hi;
int i, max;
@@ -1922,53 +1985,10 @@ setup_reference_table(void)
/* Insert all processes */
for (i = 0; i < max; i++) {
Process *proc = erts_pix2proc(i);
- if (proc) {
- int mli;
- ErtsMessage *msg_list[] = {proc->msg_frag};
-
- /* Insert Heap */
- insert_offheap(&(proc->off_heap),
- HEAP_REF,
- proc->common.id);
- /* Insert heap fragments buffers */
- for(hfp = proc->mbuf; hfp; hfp = hfp->next)
- insert_offheap(&(hfp->off_heap),
- HEAP_REF,
- proc->common.id);
-
- /* Insert msg buffers */
- for (mli = 0; mli < sizeof(msg_list)/sizeof(msg_list[0]); mli++) {
- ErtsMessage *msg;
- for (msg = msg_list[mli]; msg; msg = msg->next)
- insert_message(msg, HEAP_REF, proc);
- }
-
- /* Insert signal queue */
- erts_proc_sig_debug_foreach_sig(proc,
- insert_sig_msg,
- insert_sig_offheap,
- insert_sig_monitor,
- insert_sig_link,
- insert_sig_ext,
- (void *) proc);
-
- /* Insert links */
- insert_p_links(&proc->common);
-
- /* Insert monitors */
- insert_p_monitors(&proc->common);
-
- {
- DistEntry *dep = ERTS_PROC_GET_DIST_ENTRY(proc);
- if (dep)
- insert_dist_entry(dep,
- CTRL_REF,
- proc->common.id,
- 0);
- }
- }
+ if (proc)
+ insert_process(proc);
}
-
+
erts_foreach_sys_msg_in_q(insert_sys_msg);
/* Insert all ports */
@@ -2042,18 +2062,21 @@ setup_reference_table(void)
insert_dist_links(dep);
insert_dist_monitors(dep);
insert_dist_sequences(dep);
+ insert_dist_suspended_procs(dep);
}
for(dep = erts_hidden_dist_entries; dep; dep = dep->next) {
insert_dist_links(dep);
insert_dist_monitors(dep);
insert_dist_sequences(dep);
+ insert_dist_suspended_procs(dep);
}
for(dep = erts_pending_dist_entries; dep; dep = dep->next) {
insert_dist_links(dep);
insert_dist_monitors(dep);
insert_dist_sequences(dep);
+ insert_dist_suspended_procs(dep);
}
/* Not connected dist entries should not have any links,
@@ -2062,6 +2085,7 @@ setup_reference_table(void)
insert_dist_links(dep);
insert_dist_monitors(dep);
insert_dist_sequences(dep);
+ insert_dist_suspended_procs(dep);
}
/* Insert all ets tables */
diff --git a/erts/emulator/beam/erl_port_task.c b/erts/emulator/beam/erl_port_task.c
index c8f2e88127..30a7875387 100644
--- a/erts/emulator/beam/erl_port_task.c
+++ b/erts/emulator/beam/erl_port_task.c
@@ -2094,7 +2094,7 @@ begin_port_cleanup(Port *pp, ErtsPortTask **execqp, int *processing_busy_q_p)
erts_snprintf(port_str, sizeof(DTRACE_CHARBUF_NAME(port_str)), "%T", pp->common.id);
while (plp2 != NULL) {
- erts_snprintf(pid_str, sizeof(DTRACE_CHARBUF_NAME(pid_str)), "%T", plp2->pid);
+ erts_snprintf(pid_str, sizeof(DTRACE_CHARBUF_NAME(pid_str)), "%T", plp2->u.pid);
DTRACE2(process_port_unblocked, pid_str, port_str);
}
}
diff --git a/erts/emulator/beam/erl_proc_sig_queue.c b/erts/emulator/beam/erl_proc_sig_queue.c
index d20be0149c..9c74a2c355 100644
--- a/erts/emulator/beam/erl_proc_sig_queue.c
+++ b/erts/emulator/beam/erl_proc_sig_queue.c
@@ -3854,7 +3854,7 @@ erts_proc_sig_handle_exit(Process *c_p, Sint *redsp)
break;
case ERTS_SIG_Q_OP_MONITOR: {
- ErtsProcExitContext pectxt = {c_p, am_noproc};
+ ErtsProcExitContext pectxt = {c_p, am_noproc, NULL, NULL, NIL};
erts_proc_exit_handle_monitor((ErtsMonitor *) sig,
(void *) &pectxt, -1);
cnt += 4;
diff --git a/erts/emulator/beam/erl_process.c b/erts/emulator/beam/erl_process.c
index 422f6a51d9..ddb76a0b00 100644
--- a/erts/emulator/beam/erl_process.c
+++ b/erts/emulator/beam/erl_process.c
@@ -1470,7 +1470,10 @@ proclist_create(Process *p)
{
ErtsProcList *plp = proclist_alloc();
ensure_later_proc_interval(p->common.u.alive.started_interval);
- plp->pid = p->common.id;
+ if (erts_atomic32_read_nob(&p->state) & ERTS_PSFLG_FREE)
+ plp->u.p = p;
+ else
+ plp->u.pid = p->common.id;
plp->started_interval = p->common.u.alive.started_interval;
return plp;
}
@@ -1479,7 +1482,7 @@ static ERTS_INLINE ErtsProcList *
proclist_copy(ErtsProcList *plp0)
{
ErtsProcList *plp1 = proclist_alloc();
- plp1->pid = plp0->pid;
+ plp1->u.pid = plp0->u.pid;
plp1->started_interval = plp0->started_interval;
return plp1;
}
@@ -1514,7 +1517,10 @@ erts_proclist_dump(fmtfn_t to, void *to_arg, ErtsProcList *plp)
ErtsProcList *first = plp;
while (plp) {
- erts_print(to, to_arg, "%T", plp->pid);
+ if (is_pid(plp->u.pid))
+ erts_print(to, to_arg, "%T", plp->u.pid);
+ else
+ erts_print(to, to_arg, "%T", plp->u.p->common.id);
plp = plp->next;
if (plp == first)
break;
@@ -7204,8 +7210,7 @@ schdlr_sspnd_resume_procs(ErtsSchedType sched_type,
while (resume->msb.chngrs) {
ErtsProcList *plp = resume->msb.chngrs;
resume->msb.chngrs = plp->next;
- schdlr_sspnd_resume_proc(sched_type,
- plp->pid);
+ schdlr_sspnd_resume_proc(sched_type, plp->u.pid);
proclist_destroy(plp);
}
}
@@ -7637,7 +7642,7 @@ suspend_scheduler(ErtsSchedulerData *esdp)
else {
schdlr_sspnd.changer = am_true; /* change right in transit */
/* resume process that is queued for next change... */
- resume.onln.nxt = plp->pid;
+ resume.onln.nxt = plp->u.pid;
ASSERT(is_internal_pid(resume.onln.nxt));
}
}
@@ -7857,7 +7862,7 @@ abort_sched_onln_chng_waitq(Process *p)
proclist_destroy(plp);
plp = erts_proclist_peek_first(schdlr_sspnd.chngq);
if (plp)
- resume = plp->pid;
+ resume = plp->u.pid;
else
schdlr_sspnd.changer = am_false;
}
@@ -8357,10 +8362,10 @@ erts_multi_scheduling_blockers(Process *p, int normal)
plp1;
plp1 = erts_proclist_peek_next(msbp->blckrs, plp1)) {
for (plp2 = erts_proclist_peek_first(msbp->blckrs);
- plp2->pid != plp1->pid;
+ plp2->u.pid != plp1->u.pid;
plp2 = erts_proclist_peek_next(msbp->blckrs, plp2));
if (plp2 == plp1) {
- res = CONS(hp, plp1->pid, res);
+ res = CONS(hp, plp1->u.pid, res);
hp += 2;
}
/* else: already in result list */
@@ -9029,8 +9034,13 @@ erts_resume_processes(ErtsProcList *list)
while (plp) {
Process *proc;
ErtsProcList *fplp;
- ASSERT(is_internal_pid(plp->pid));
- proc = erts_pid2proc(NULL, 0, plp->pid, ERTS_PROC_LOCK_STATUS);
+ ASSERT(is_internal_pid(plp->u.pid) || is_CP((Eterm)plp->u.p));
+ if (is_internal_pid(plp->u.pid))
+ proc = erts_pid2proc(NULL, 0, plp->u.pid, ERTS_PROC_LOCK_STATUS);
+ else {
+ proc = plp->u.p;
+ erts_proc_lock(proc, ERTS_PROC_LOCK_STATUS);
+ }
if (proc) {
if (erts_proclist_same(plp, proc)) {
resume_process(proc, ERTS_PROC_LOCK_STATUS);
@@ -12050,12 +12060,91 @@ erts_set_self_exiting(Process *c_p, Eterm reason)
add2runq(enqueue, enq_prio, c_p, state, NULL);
}
+static int
+erts_proc_exit_handle_dist_monitor(ErtsMonitor *mon, void *vctxt, Sint reds)
+{
+ ErtsProcExitContext *ctxt = (ErtsProcExitContext *) vctxt;
+ Process *c_p = ctxt->c_p;
+ Eterm reason = ctxt->reason;
+ int code;
+ ErtsDSigSendContext ctx;
+ ErtsMonLnkDist *dist;
+ DistEntry *dep;
+ Eterm watcher;
+ ErtsMonitorData *mdp = NULL;
+ Eterm watched;
+
+ ASSERT(erts_monitor_is_target(mon) && mon->type == ERTS_MON_TYPE_DIST_PROC);
+
+ mdp = erts_monitor_to_data(mon);
+
+ if (mon->flags & ERTS_ML_FLG_NAME)
+ watched = ((ErtsMonitorDataExtended *) mdp)->u.name;
+ else
+ watched = c_p->common.id;
+ ASSERT(is_internal_pid(watched) || is_atom(watched));
+
+ watcher = mon->other.item;
+ ASSERT(is_external_pid(watcher));
+ dep = external_pid_dist_entry(watcher);
+ ASSERT(dep);
+ dist = ((ErtsMonitorDataExtended *) mdp)->dist;
+ ASSERT(dist);
+
+ code = erts_dsig_prepare(&ctx, dep, c_p, ERTS_PROC_LOCK_MAIN,
+ ERTS_DSP_NO_LOCK, 0, 0, 1);
+
+ ctx.reds = (Sint) (reds * TERM_TO_BINARY_LOOP_FACTOR);
+
+ switch (code) {
+ case ERTS_DSIG_PREP_NOT_ALIVE:
+ case ERTS_DSIG_PREP_NOT_CONNECTED:
+ break;
+ case ERTS_DSIG_PREP_PENDING:
+ case ERTS_DSIG_PREP_CONNECTED:
+ if (dist->connection_id != ctx.connection_id)
+ break;
+ code = erts_dsig_send_m_exit(&ctx,
+ watcher,
+ watched,
+ mdp->ref,
+ reason);
+ switch (code) {
+ case ERTS_DSIG_SEND_CONTINUE:
+ erts_set_gc_state(c_p, 0);
+ ctxt->dist_state = erts_dsend_export_trap_context(c_p, &ctx);
+ /* fall-through */
+ case ERTS_DSIG_SEND_YIELD:
+ break;
+ case ERTS_DSIG_SEND_OK:
+ break;
+ case ERTS_DSIG_SEND_TOO_LRG:
+ erts_set_gc_state(c_p, 1);
+ break;
+ default:
+ ASSERT(! "Invalid dsig send exit monitor result");
+ break;
+ }
+ break;
+ default:
+ ASSERT(! "Invalid dsig prep exit monitor result");
+ break;
+ }
+ if (!erts_monitor_dist_delete(&mdp->origin))
+ erts_monitor_release(mon);
+ else
+ erts_monitor_release_both(mdp);
+ return reds - (ctx.reds / TERM_TO_BINARY_LOOP_FACTOR);
+}
+
int
erts_proc_exit_handle_monitor(ErtsMonitor *mon, void *vctxt, Sint reds)
{
- Process *c_p = ((ErtsProcExitContext *) vctxt)->c_p;
- Eterm reason = ((ErtsProcExitContext *) vctxt)->reason;
+ ErtsProcExitContext *ctxt = (ErtsProcExitContext *) vctxt;
+ Process *c_p = ctxt->c_p;
+ Eterm reason = ctxt->reason;
ErtsMonitorData *mdp = NULL;
+ int res = 1;
if (erts_monitor_is_target(mon)) {
/* We are being watched... */
@@ -12090,38 +12179,43 @@ erts_proc_exit_handle_monitor(ErtsMonitor *mon, void *vctxt, Sint reds)
Eterm watcher;
Eterm watched;
- mdp = erts_monitor_to_data(mon);
+ if (is_immed(reason)) {
+ mdp = erts_monitor_to_data(mon);
- if (mon->flags & ERTS_ML_FLG_NAME)
- watched = ((ErtsMonitorDataExtended *) mdp)->u.name;
- else
- watched = c_p->common.id;
- ASSERT(is_internal_pid(watched) || is_atom(watched));
-
- watcher = mon->other.item;
- ASSERT(is_external_pid(watcher));
- dep = external_pid_dist_entry(watcher);
- ASSERT(dep);
- dist = ((ErtsMonitorDataExtended *) mdp)->dist;
- ASSERT(dist);
- code = erts_dsig_prepare(&ctx, dep, NULL, 0,
- ERTS_DSP_NO_LOCK, 1, 1, 0);
- switch (code) {
- case ERTS_DSIG_PREP_CONNECTED:
- case ERTS_DSIG_PREP_PENDING:
- if (dist->connection_id == ctx.connection_id) {
- code = erts_dsig_send_m_exit(&ctx,
- watcher,
- watched,
- mdp->ref,
- reason);
- ASSERT(code == ERTS_DSIG_SEND_OK);
+ if (mon->flags & ERTS_ML_FLG_NAME)
+ watched = ((ErtsMonitorDataExtended *) mdp)->u.name;
+ else
+ watched = c_p->common.id;
+ ASSERT(is_internal_pid(watched) || is_atom(watched));
+
+ watcher = mon->other.item;
+ ASSERT(is_external_pid(watcher));
+ dep = external_pid_dist_entry(watcher);
+ ASSERT(dep);
+ dist = ((ErtsMonitorDataExtended *) mdp)->dist;
+ ASSERT(dist);
+ code = erts_dsig_prepare(&ctx, dep, NULL, 0,
+ ERTS_DSP_NO_LOCK, 1, 1, 0);
+ switch (code) {
+ case ERTS_DSIG_PREP_CONNECTED:
+ case ERTS_DSIG_PREP_PENDING:
+ if (dist->connection_id == ctx.connection_id) {
+ code = erts_dsig_send_m_exit(&ctx,
+ watcher,
+ watched,
+ mdp->ref,
+ reason);
+ ASSERT(code == ERTS_DSIG_SEND_OK);
+ }
+ default:
+ break;
}
- default:
- break;
+ if (!erts_monitor_dist_delete(&mdp->origin))
+ mdp = NULL;
+ } else {
+ erts_monitor_tree_insert(&ctxt->dist_monitors, mon);
+ return 1;
}
- if (!erts_monitor_dist_delete(&mdp->origin))
- mdp = NULL;
break;
}
default:
@@ -12197,6 +12291,7 @@ erts_proc_exit_handle_monitor(ErtsMonitor *mon, void *vctxt, Sint reds)
}
if (!erts_monitor_dist_delete(&mdp->target))
mdp = NULL;
+ res = 100;
break;
}
default:
@@ -12209,12 +12304,84 @@ erts_proc_exit_handle_monitor(ErtsMonitor *mon, void *vctxt, Sint reds)
erts_monitor_release_both(mdp);
else if (mon)
erts_monitor_release(mon);
- return 1;
+ return res;
+}
+
+static int
+erts_proc_exit_handle_dist_link(ErtsLink *lnk, void *vctxt, Sint reds)
+{
+ ErtsProcExitContext *ctxt = (ErtsProcExitContext *) vctxt;
+ Process *c_p = ctxt->c_p;
+ Eterm reason = ctxt->reason;
+ int code;
+ ErtsDSigSendContext ctx;
+ ErtsMonLnkDist *dist;
+ DistEntry *dep;
+ ErtsLink *dlnk;
+ ErtsLinkData *ldp = NULL;
+
+ ASSERT(lnk->type == ERTS_LNK_TYPE_DIST_PROC);
+ dlnk = erts_link_to_other(lnk, &ldp);
+ dist = ((ErtsLinkDataExtended *) ldp)->dist;
+
+ ASSERT(is_external_pid(lnk->other.item));
+ dep = external_pid_dist_entry(lnk->other.item);
+
+ ASSERT(dep != erts_this_dist_entry);
+
+ if (!erts_link_dist_delete(dlnk))
+ ldp = NULL;
+
+ code = erts_dsig_prepare(&ctx, dep, c_p, ERTS_PROC_LOCK_MAIN,
+ ERTS_DSP_NO_LOCK, 0, 0, 0);
+
+ ctx.reds = (Sint) (reds * TERM_TO_BINARY_LOOP_FACTOR);
+
+ switch (code) {
+ case ERTS_DSIG_PREP_NOT_ALIVE:
+ case ERTS_DSIG_PREP_NOT_CONNECTED:
+ break;
+ case ERTS_DSIG_PREP_PENDING:
+ case ERTS_DSIG_PREP_CONNECTED:
+ if (dist->connection_id != ctx.connection_id)
+ break;
+ code = erts_dsig_send_exit_tt(&ctx,
+ c_p->common.id,
+ lnk->other.item,
+ reason,
+ SEQ_TRACE_TOKEN(c_p));
+ switch (code) {
+ case ERTS_DSIG_SEND_CONTINUE:
+ erts_set_gc_state(c_p, 0);
+ ctxt->dist_state = erts_dsend_export_trap_context(c_p, &ctx);
+ /* fall-through */
+ case ERTS_DSIG_SEND_YIELD:
+ break;
+ case ERTS_DSIG_SEND_OK:
+ break;
+ case ERTS_DSIG_SEND_TOO_LRG:
+ erts_set_gc_state(c_p, 1);
+ break;
+ default:
+ ASSERT(! "Invalid dsig send exit monitor result");
+ break;
+ }
+ break;
+ default:
+ ASSERT(! "Invalid dsig prep exit monitor result");
+ break;
+ }
+ if (ldp)
+ erts_link_release_both(ldp);
+ else if (lnk)
+ erts_link_release(lnk);
+ return reds - (ctx.reds / TERM_TO_BINARY_LOOP_FACTOR);
}
int
erts_proc_exit_handle_link(ErtsLink *lnk, void *vctxt, Sint reds)
{
+ ErtsProcExitContext *ctxt = (ErtsProcExitContext *) vctxt;
Process *c_p = ((ErtsProcExitContext *) vctxt)->c_p;
Eterm reason = ((ErtsProcExitContext *) vctxt)->reason;
ErtsLinkData *ldp = NULL;
@@ -12248,29 +12415,37 @@ erts_proc_exit_handle_link(ErtsLink *lnk, void *vctxt, Sint reds)
ErtsDSigSendContext ctx;
int code;
- dlnk = erts_link_to_other(lnk, &ldp);
- dist = ((ErtsLinkDataExtended *) ldp)->dist;
+ if (is_immed(reason)) {
+ dlnk = erts_link_to_other(lnk, &ldp);
+ dist = ((ErtsLinkDataExtended *) ldp)->dist;
- ASSERT(is_external_pid(lnk->other.item));
- dep = external_pid_dist_entry(lnk->other.item);
+ ASSERT(is_external_pid(lnk->other.item));
+ dep = external_pid_dist_entry(lnk->other.item);
- ASSERT(dep != erts_this_dist_entry);
+ ASSERT(dep != erts_this_dist_entry);
- if (!erts_link_dist_delete(dlnk))
- ldp = NULL;
+ if (!erts_link_dist_delete(dlnk))
+ ldp = NULL;
- code = erts_dsig_prepare(&ctx, dep, c_p, 0, ERTS_DSP_NO_LOCK, 1, 1, 0);
- switch (code) {
- case ERTS_DSIG_PREP_CONNECTED:
- case ERTS_DSIG_PREP_PENDING:
- if (dist->connection_id == ctx.connection_id) {
- code = erts_dsig_send_exit_tt(&ctx,
- c_p->common.id,
- lnk->other.item,
- reason,
- SEQ_TRACE_TOKEN(c_p));
- ASSERT(code == ERTS_DSIG_SEND_OK);
+ code = erts_dsig_prepare(&ctx, dep, c_p, 0, ERTS_DSP_NO_LOCK, 1, 1, 0);
+ switch (code) {
+ case ERTS_DSIG_PREP_CONNECTED:
+ case ERTS_DSIG_PREP_PENDING:
+ if (dist->connection_id == ctx.connection_id) {
+ code = erts_dsig_send_exit_tt(&ctx,
+ c_p->common.id,
+ lnk->other.item,
+ reason,
+ SEQ_TRACE_TOKEN(c_p));
+ ASSERT(code == ERTS_DSIG_SEND_OK);
+ }
+ break;
+ default:
+ break;
}
+ } else {
+ erts_link_tree_insert(&ctxt->dist_links, lnk);
+ return 1;
}
break;
}
@@ -12359,6 +12534,9 @@ enum continue_exit_phase {
ERTS_CONTINUE_EXIT_MONITORS,
ERTS_CONTINUE_EXIT_LT_MONITORS,
ERTS_CONTINUE_EXIT_HANDLE_PROC_SIG,
+ ERTS_CONTINUE_EXIT_DIST_LINKS,
+ ERTS_CONTINUE_EXIT_DIST_MONITORS,
+ ERTS_CONTINUE_EXIT_DONE,
};
struct continue_exit_state {
@@ -12398,7 +12576,7 @@ erts_continue_exit_process(Process *p)
ASSERT(ERTS_PROC_IS_EXITING(p));
ASSERT(erts_proc_read_refc(p) > 0);
-
+restart:
switch (trap_state->phase) {
case ERTS_CONTINUE_EXIT_TIMERS:
if (p->bif_timers) {
@@ -12558,6 +12736,7 @@ erts_continue_exit_process(Process *p)
? ERTS_PROC_SET_DIST_ENTRY(p, NULL)
: NULL);
+ reds -= 50;
erts_proc_unlock(p, ERTS_PROC_LOCKS_ALL_MINOR);
curr_locks = ERTS_PROC_LOCK_MAIN;
@@ -12591,6 +12770,9 @@ erts_continue_exit_process(Process *p)
trap_state->pectxt.c_p = p;
trap_state->pectxt.reason = trap_state->reason;
+ trap_state->pectxt.dist_links = NULL;
+ trap_state->pectxt.dist_monitors = NULL;
+ trap_state->pectxt.dist_state = NIL;
erts_proc_lock(p, ERTS_PROC_LOCK_MSGQ);
@@ -12598,8 +12780,6 @@ erts_continue_exit_process(Process *p)
erts_proc_unlock(p, ERTS_PROC_LOCK_MSGQ);
- reds -= 1000;
-
trap_state->yield_state = NULL;
trap_state->phase = ERTS_CONTINUE_EXIT_LINKS;
if (reds <= 0) goto yield;
@@ -12646,18 +12826,82 @@ erts_continue_exit_process(Process *p)
trap_state->phase = ERTS_CONTINUE_EXIT_HANDLE_PROC_SIG;
case ERTS_CONTINUE_EXIT_HANDLE_PROC_SIG: {
Sint r = reds;
- erts_aint_t state;
if (!erts_proc_sig_handle_exit(p, &r))
goto yield;
reds -= r;
+ trap_state->phase = ERTS_CONTINUE_EXIT_DIST_LINKS;
+ }
+ case ERTS_CONTINUE_EXIT_DIST_LINKS: {
+
+ continue_dist_send:
+ if (is_not_nil(trap_state->pectxt.dist_state)) {
+ Binary* bin = erts_magic_ref2bin(trap_state->pectxt.dist_state);
+ ErtsDSigSendContext* ctx = (ErtsDSigSendContext*) ERTS_MAGIC_BIN_DATA(bin);
+ Sint initial_reds = (Sint) (ERTS_BIF_REDS_LEFT(p) * TERM_TO_BINARY_LOOP_FACTOR);
+ int result;
+
+ ctx->reds = initial_reds;
+ result = erts_dsig_send(ctx);
+
+ /* erts_dsig_send bumps reductions on the process in the ctx */
+ reds = ERTS_BIF_REDS_LEFT(p);
+
+ switch (result) {
+ case ERTS_DSIG_SEND_OK:
+ case ERTS_DSIG_SEND_TOO_LRG: /*SEND_SYSTEM_LIMIT*/
+ case ERTS_DSIG_SEND_YIELD: /*SEND_YIELD_RETURN*/
+ break;
+ case ERTS_DSIG_SEND_CONTINUE: { /*SEND_YIELD_CONTINUE*/
+ goto yield;
+ }
+ }
+ erts_set_gc_state(p, 1);
+ trap_state->pectxt.dist_state = NIL;
+ if (reds <= 0)
+ goto yield;
+ goto restart;
+ }
+
+ reds = erts_link_tree_foreach_delete_yielding(
+ &trap_state->pectxt.dist_links,
+ erts_proc_exit_handle_dist_link,
+ (void *) &trap_state->pectxt,
+ &trap_state->yield_state,
+ reds);
+ if (reds <= 0 || is_not_nil(trap_state->pectxt.dist_state))
+ goto yield;
+ trap_state->phase = ERTS_CONTINUE_EXIT_DIST_MONITORS;
+ }
+ case ERTS_CONTINUE_EXIT_DIST_MONITORS: {
+
+ if (is_not_nil(trap_state->pectxt.dist_state))
+ goto continue_dist_send;
+
+ reds = erts_monitor_tree_foreach_delete_yielding(
+ &trap_state->pectxt.dist_monitors,
+ erts_proc_exit_handle_dist_monitor,
+ (void *) &trap_state->pectxt,
+ &trap_state->yield_state,
+ reds);
+ if (reds <= 0 || is_not_nil(trap_state->pectxt.dist_state))
+ goto yield;
+
+ trap_state->phase = ERTS_CONTINUE_EXIT_DONE;
+ }
+ case ERTS_CONTINUE_EXIT_DONE: {
+ erts_aint_t state;
/*
* From this point on we are no longer allowed to yield
* this process.
*/
+#ifdef DEBUG
+ yield_allowed = 0;
+#endif
+
/* Set state to not active as we don't want this process
to be scheduled in again after this. */
state = erts_atomic32_read_band_relb(&p->state,
@@ -12688,9 +12932,6 @@ erts_continue_exit_process(Process *p)
&p->common.u.release,
sizeof(Process));
-#ifdef DEBUG
- yield_allowed = 0;
-#endif
break;
}
}
diff --git a/erts/emulator/beam/erl_process.h b/erts/emulator/beam/erl_process.h
index cdc03cf940..3b593bce02 100644
--- a/erts/emulator/beam/erl_process.h
+++ b/erts/emulator/beam/erl_process.h
@@ -381,7 +381,10 @@ struct ErtsSchedulerSleepInfo_ {
typedef struct ErtsProcList_ ErtsProcList;
struct ErtsProcList_ {
- Eterm pid;
+ union {
+ Eterm pid;
+ Process *p;
+ } u;
Uint64 started_interval;
ErtsProcList* next;
ErtsProcList* prev;
@@ -1580,7 +1583,7 @@ ERTS_GLB_INLINE int erts_proclist_is_last(ErtsProcList *, ErtsProcList *);
ERTS_GLB_INLINE int
erts_proclist_same(ErtsProcList *plp, Process *p)
{
- return (plp->pid == p->common.id
+ return ((plp->u.pid == p->common.id || plp->u.p == p)
&& (plp->started_interval
== p->common.u.alive.started_interval));
}
@@ -1819,6 +1822,9 @@ Eterm erts_process_info(Process *c_p, ErtsHeapFactory *hfact,
typedef struct {
Process *c_p;
Eterm reason;
+ ErtsLink *dist_links;
+ ErtsMonitor *dist_monitors;
+ Eterm dist_state;
} ErtsProcExitContext;
int erts_proc_exit_handle_monitor(ErtsMonitor *mon, void *vctxt, Sint reds);
int erts_proc_exit_handle_link(ErtsLink *lnk, void *vctxt, Sint reds);
diff --git a/erts/emulator/beam/io.c b/erts/emulator/beam/io.c
index 36824fe62c..b961c639f5 100644
--- a/erts/emulator/beam/io.c
+++ b/erts/emulator/beam/io.c
@@ -5104,7 +5104,7 @@ erts_port_resume_procs(Port *prt)
erts_snprintf(port_str, sizeof(DTRACE_CHARBUF_NAME(port_str)), "%T", prt->common.id);
while (plp2 != NULL) {
- erts_snprintf(pid_str, sizeof(DTRACE_CHARBUF_NAME(pid_str)), "%T", plp2->pid);
+ erts_snprintf(pid_str, sizeof(DTRACE_CHARBUF_NAME(pid_str)), "%T", plp2->u.pid);
DTRACE2(process_port_unblocked, pid_str, port_str);
}
}