aboutsummaryrefslogtreecommitdiffstats
path: root/erts/emulator/beam/erl_node_tables.c
diff options
context:
space:
mode:
authorLukas Larsson <[email protected]>2019-01-31 10:52:35 +0100
committerLukas Larsson <[email protected]>2019-02-22 11:12:54 +0100
commitfc0967392625626289f03e6955c24e73ea1fd617 (patch)
tree8a315eb719e3da00d44bb193724f8f3eaac04f88 /erts/emulator/beam/erl_node_tables.c
parent2bf27ec51e331371412576a9a9a67353109109ad (diff)
downloadotp-fc0967392625626289f03e6955c24e73ea1fd617.tar.gz
otp-fc0967392625626289f03e6955c24e73ea1fd617.tar.bz2
otp-fc0967392625626289f03e6955c24e73ea1fd617.zip
erts: Implement trapping while sending distr exit/down
The reason in EXIT and DOWN may be arbitrarily large, so we yield and allow other processes to execute while encoding and sending the signals over the distribution.
Diffstat (limited to 'erts/emulator/beam/erl_node_tables.c')
-rw-r--r--erts/emulator/beam/erl_node_tables.c118
1 files changed, 71 insertions, 47 deletions
diff --git a/erts/emulator/beam/erl_node_tables.c b/erts/emulator/beam/erl_node_tables.c
index 60c3be3223..8fe5c3c690 100644
--- a/erts/emulator/beam/erl_node_tables.c
+++ b/erts/emulator/beam/erl_node_tables.c
@@ -1850,6 +1850,70 @@ insert_sig_ext(ErtsDistExternal *edep, void *arg)
insert_dist_entry(edep->dep, SIGNAL_REF, proc->common.id, 0);
}
+static void
+insert_process(Process *proc)
+{
+ int mli;
+ ErtsMessage *msg_list[] = {proc->msg_frag};
+ ErlHeapFragment *hfp;
+
+ /* Insert Heap */
+ insert_offheap(&(proc->off_heap),
+ HEAP_REF,
+ proc->common.id);
+ /* Insert heap fragments buffers */
+ for(hfp = proc->mbuf; hfp; hfp = hfp->next)
+ insert_offheap(&(hfp->off_heap),
+ HEAP_REF,
+ proc->common.id);
+
+ /* Insert msg buffers */
+ for (mli = 0; mli < sizeof(msg_list)/sizeof(msg_list[0]); mli++) {
+ ErtsMessage *msg;
+ for (msg = msg_list[mli]; msg; msg = msg->next)
+ insert_message(msg, HEAP_REF, proc);
+ }
+
+ /* Insert signal queue */
+ erts_proc_sig_debug_foreach_sig(proc,
+ insert_sig_msg,
+ insert_sig_offheap,
+ insert_sig_monitor,
+ insert_sig_link,
+ insert_sig_ext,
+ (void *) proc);
+
+ /* If the process is FREE, the proc->common field has been
+ re-used by the ptab delete, so we cannot trust it. */
+ if (!(erts_atomic32_read_nob(&proc->state) & ERTS_PSFLG_FREE)) {
+ /* Insert links */
+ insert_p_links(&proc->common);
+
+ /* Insert monitors */
+ insert_p_monitors(&proc->common);
+ }
+
+ {
+ DistEntry *dep = ERTS_PROC_GET_DIST_ENTRY(proc);
+ if (dep)
+ insert_dist_entry(dep,
+ CTRL_REF,
+ proc->common.id,
+ 0);
+ }
+}
+
+static void
+insert_dist_suspended_procs(DistEntry *dep)
+{
+ ErtsProcList *plist = erts_proclist_peek_first(dep->suspended);
+ while (plist) {
+ if (is_not_immed(plist->u.pid))
+ insert_process(plist->u.p);
+ plist = erts_proclist_peek_next(dep->suspended, plist);
+ }
+}
+
#ifdef ERL_NODE_BOOKKEEP
void
erts_node_bookkeep(ErlNode *np, Eterm term, int what)
@@ -1872,7 +1936,6 @@ erts_node_bookkeep(ErlNode *np, Eterm term, int what)
static void
setup_reference_table(void)
{
- ErlHeapFragment *hfp;
DistEntry *dep;
HashInfo hi;
int i, max;
@@ -1922,53 +1985,10 @@ setup_reference_table(void)
/* Insert all processes */
for (i = 0; i < max; i++) {
Process *proc = erts_pix2proc(i);
- if (proc) {
- int mli;
- ErtsMessage *msg_list[] = {proc->msg_frag};
-
- /* Insert Heap */
- insert_offheap(&(proc->off_heap),
- HEAP_REF,
- proc->common.id);
- /* Insert heap fragments buffers */
- for(hfp = proc->mbuf; hfp; hfp = hfp->next)
- insert_offheap(&(hfp->off_heap),
- HEAP_REF,
- proc->common.id);
-
- /* Insert msg buffers */
- for (mli = 0; mli < sizeof(msg_list)/sizeof(msg_list[0]); mli++) {
- ErtsMessage *msg;
- for (msg = msg_list[mli]; msg; msg = msg->next)
- insert_message(msg, HEAP_REF, proc);
- }
-
- /* Insert signal queue */
- erts_proc_sig_debug_foreach_sig(proc,
- insert_sig_msg,
- insert_sig_offheap,
- insert_sig_monitor,
- insert_sig_link,
- insert_sig_ext,
- (void *) proc);
-
- /* Insert links */
- insert_p_links(&proc->common);
-
- /* Insert monitors */
- insert_p_monitors(&proc->common);
-
- {
- DistEntry *dep = ERTS_PROC_GET_DIST_ENTRY(proc);
- if (dep)
- insert_dist_entry(dep,
- CTRL_REF,
- proc->common.id,
- 0);
- }
- }
+ if (proc)
+ insert_process(proc);
}
-
+
erts_foreach_sys_msg_in_q(insert_sys_msg);
/* Insert all ports */
@@ -2042,18 +2062,21 @@ setup_reference_table(void)
insert_dist_links(dep);
insert_dist_monitors(dep);
insert_dist_sequences(dep);
+ insert_dist_suspended_procs(dep);
}
for(dep = erts_hidden_dist_entries; dep; dep = dep->next) {
insert_dist_links(dep);
insert_dist_monitors(dep);
insert_dist_sequences(dep);
+ insert_dist_suspended_procs(dep);
}
for(dep = erts_pending_dist_entries; dep; dep = dep->next) {
insert_dist_links(dep);
insert_dist_monitors(dep);
insert_dist_sequences(dep);
+ insert_dist_suspended_procs(dep);
}
/* Not connected dist entries should not have any links,
@@ -2062,6 +2085,7 @@ setup_reference_table(void)
insert_dist_links(dep);
insert_dist_monitors(dep);
insert_dist_sequences(dep);
+ insert_dist_suspended_procs(dep);
}
/* Insert all ets tables */