diff options
Diffstat (limited to 'erts/emulator/beam/erl_node_tables.c')
-rw-r--r-- | erts/emulator/beam/erl_node_tables.c | 408 |
1 files changed, 307 insertions, 101 deletions
diff --git a/erts/emulator/beam/erl_node_tables.c b/erts/emulator/beam/erl_node_tables.c index e8901a652f..ca83e70046 100644 --- a/erts/emulator/beam/erl_node_tables.c +++ b/erts/emulator/beam/erl_node_tables.c @@ -1,7 +1,7 @@ /* * %CopyrightBegin% * - * Copyright Ericsson AB 2001-2017. All Rights Reserved. + * Copyright Ericsson AB 2001-2018. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -31,6 +31,7 @@ #include "dtrace-wrapper.h" #include "erl_binary.h" #include "erl_bif_unique.h" +#include "erl_proc_sig_queue.h" Hash erts_dist_table; Hash erts_node_table; @@ -174,11 +175,7 @@ dist_table_alloc(void *dep_tmpl) dep->flags = 0; dep->version = 0; - erts_mtx_init(&dep->lnk_mtx, "dist_entry_links", sysname, - ERTS_LOCK_FLAGS_CATEGORY_DISTRIBUTION); - dep->node_links = NULL; - dep->nlinks = NULL; - dep->monitors = NULL; + dep->mld = NULL; erts_mtx_init(&dep->qlock, "dist_entry_out_queue", sysname, ERTS_LOCK_FLAGS_CATEGORY_DISTRIBUTION); @@ -225,9 +222,7 @@ dist_table_free(void *vdep) ASSERT(de_refc_read(dep, -1) == -1); ASSERT(dep->state == ERTS_DE_STATE_IDLE); ASSERT(is_nil(dep->cid)); - ASSERT(dep->nlinks == NULL); - ASSERT(dep->node_links == NULL); - ASSERT(dep->monitors == NULL); + ASSERT(dep->mld == NULL); /* Link out */ @@ -250,7 +245,6 @@ dist_table_free(void *vdep) ASSERT(!dep->cache); erts_rwmtx_destroy(&dep->rwmtx); - erts_mtx_destroy(&dep->lnk_mtx); erts_mtx_destroy(&dep->qlock); #ifdef DEBUG @@ -606,7 +600,9 @@ erts_set_dist_entry_not_connected(DistEntry *dep) void erts_set_dist_entry_pending(DistEntry *dep) { + ErtsMonLnkDist *mld = erts_mon_link_dist_create(dep->sysname); ERTS_LC_ASSERT(erts_lc_is_de_rwlocked(dep)); + erts_rwmtx_rwlock(&erts_dist_table_rwmtx); ASSERT(dep != erts_this_dist_entry); @@ -631,6 +627,10 @@ erts_set_dist_entry_pending(DistEntry *dep) dep->flags = (DFLAG_DIST_MANDATORY | DFLAG_DIST_HOPEFULLY | DFLAG_NO_MAGIC); dep->connection_id = (dep->connection_id + 1) & ERTS_DIST_CON_ID_MASK; + ASSERT(!dep->mld); + mld->connection_id = dep->connection_id; + dep->mld = mld; + dep->prev = NULL; dep->next = erts_pending_dist_entries; if(erts_pending_dist_entries) { @@ -647,6 +647,8 @@ erts_set_dist_entry_connected(DistEntry *dep, Eterm cid, Uint flags) { erts_aint32_t set_qflgs; + ASSERT(dep->mld); + ERTS_LC_ASSERT(erts_lc_is_de_rwlocked(dep)); erts_rwmtx_rwlock(&erts_dist_table_rwmtx); @@ -1099,6 +1101,7 @@ static Eterm AM_system; static Eterm AM_timer; static Eterm AM_delayed_delete_timer; static Eterm AM_thread_progress_delete_timer; +static Eterm AM_signal; static void setup_reference_table(void); static Eterm reference_table_term(Uint **hpp, ErlOffHeap *ohp, Uint *szp); @@ -1120,6 +1123,7 @@ typedef struct node_referrer_ { int bin_ref; int timer_ref; int system_ref; + int signal_ref; Eterm id; Uint id_heap[ID_HEAP_SIZE]; ErlOffHeap off_heap; @@ -1137,6 +1141,7 @@ typedef struct dist_referrer_ { int node_ref; int ctrl_ref; int system_ref; + int signal_ref; Eterm id; Uint creation; Uint id_heap[ID_HEAP_SIZE]; @@ -1190,6 +1195,7 @@ erts_get_node_and_dist_references(struct process *proc) INIT_AM(system); INIT_AM(delayed_delete_timer); INIT_AM(thread_progress_delete_timer); + INIT_AM(signal); references_atoms_need_init = 0; } @@ -1226,6 +1232,7 @@ erts_get_node_and_dist_references(struct process *proc) #define MONITOR_REF 7 #define TIMER_REF 8 #define SYSTEM_REF 9 +#define SIGNAL_REF 10 #define INC_TAB_SZ 10 @@ -1260,6 +1267,7 @@ insert_dist_referrer(ReferredDist *referred_dist, drp->node_ref = 0; drp->ctrl_ref = 0; drp->system_ref = 0; + drp->signal_ref = 0; } switch (type) { @@ -1268,6 +1276,7 @@ insert_dist_referrer(ReferredDist *referred_dist, case HEAP_REF: drp->heap_ref++; break; case ETS_REF: drp->ets_ref++; break; case SYSTEM_REF: drp->system_ref++; break; + case SIGNAL_REF: drp->signal_ref++; break; default: ASSERT(0); } } @@ -1321,6 +1330,7 @@ insert_node_referrer(ReferredNode *referred_node, int type, Eterm id) nrp->bin_ref = 0; nrp->timer_ref = 0; nrp->system_ref = 0; + nrp->signal_ref = 0; } switch (type) { @@ -1331,6 +1341,7 @@ insert_node_referrer(ReferredNode *referred_node, int type, Eterm id) case MONITOR_REF: nrp->monitor_ref++; break; case TIMER_REF: nrp->timer_ref++; break; case SYSTEM_REF: nrp->system_ref++; break; + case SIGNAL_REF: nrp->signal_ref++; break; default: ASSERT(0); } } @@ -1438,49 +1449,145 @@ insert_offheap(ErlOffHeap *oh, int type, Eterm id) } } -static void doit_insert_monitor(ErtsMonitor *monitor, void *p) +static void insert_monitor_data(ErtsMonitor *mon, int type, Eterm id) +{ + ErtsMonitorData *mdp = erts_monitor_to_data(mon); + if ((mdp->origin.flags & (ERTS_ML_FLG_DBG_VISITED + | ERTS_ML_FLG_EXTENDED)) == ERTS_ML_FLG_EXTENDED) { + if (mon->type != ERTS_MON_TYPE_NODE) { + ErtsMonitorDataExtended *mdep = (ErtsMonitorDataExtended *) mdp; + ASSERT(mon->flags & ERTS_ML_FLG_EXTENDED); + if (mdep->uptr.ohhp) { + ErlOffHeap oh; + ERTS_INIT_OFF_HEAP(&oh); + oh.first = mdep->uptr.ohhp; + insert_offheap(&oh, type, id); + } + } + } + mdp->origin.flags |= ERTS_ML_FLG_DBG_VISITED; +} + +static void insert_monitor(ErtsMonitor *mon, void *idp) +{ + Eterm id = *((Eterm *) idp); + insert_monitor_data(mon, MONITOR_REF, id); +} + +static void clear_visited_monitor(ErtsMonitor *mon, void *p) +{ + ErtsMonitorData *mdp = erts_monitor_to_data(mon); + mdp->origin.flags &= ~ERTS_ML_FLG_DBG_VISITED; +} + +static void +insert_p_monitors(ErtsPTabElementCommon *p) +{ + Eterm id = p->id; + erts_monitor_tree_foreach(p->u.alive.monitors, + insert_monitor, + (void *) &id); + erts_monitor_list_foreach(p->u.alive.lt_monitors, + insert_monitor, + (void *) &id); +} + +static void +insert_dist_monitors(DistEntry *dep) +{ + if (dep->mld) { + erts_monitor_list_foreach(dep->mld->monitors, + insert_monitor, + (void *) &dep->sysname); + erts_monitor_tree_foreach(dep->mld->orig_name_monitors, + insert_monitor, + (void *) &dep->sysname); + } +} + +static void +clear_visited_p_monitors(ErtsPTabElementCommon *p) +{ + erts_monitor_tree_foreach(p->u.alive.monitors, + clear_visited_monitor, + NULL); + erts_monitor_list_foreach(p->u.alive.lt_monitors, + clear_visited_monitor, + NULL); +} + +static void +clear_visited_dist_monitors(DistEntry *dep) +{ + if (dep->mld) { + erts_monitor_list_foreach(dep->mld->monitors, + clear_visited_monitor, + NULL); + erts_monitor_tree_foreach(dep->mld->orig_name_monitors, + clear_visited_monitor, + NULL); + } +} + +static void insert_link_data(ErtsLink *lnk, int type, Eterm id) { - Eterm *idp = p; - if(monitor->type != MON_NIF_TARGET && is_external(monitor->u.pid)) - insert_node(external_thing_ptr(monitor->u.pid)->node, MONITOR_REF, *idp); - if(is_external(monitor->ref)) - insert_node(external_thing_ptr(monitor->ref)->node, MONITOR_REF, *idp); + ErtsLinkData *ldp = erts_link_to_data(lnk); + if ((ldp->a.flags & (ERTS_ML_FLG_DBG_VISITED + | ERTS_ML_FLG_EXTENDED)) == ERTS_ML_FLG_EXTENDED) { + ErtsLinkDataExtended *ldep = (ErtsLinkDataExtended *) ldp; + if (ldep->ohhp) { + ErlOffHeap oh; + ERTS_INIT_OFF_HEAP(&oh); + oh.first = ldep->ohhp; + insert_offheap(&oh, type, id); + } + } + ldp->a.flags |= ERTS_ML_FLG_DBG_VISITED; } -static void doit_insert_link(ErtsLink *lnk, void *p) +static void insert_link(ErtsLink *lnk, void *idp) { - Eterm *idp = p; - if(is_external(lnk->pid)) - insert_node(external_thing_ptr(lnk->pid)->node, LINK_REF, - *idp); + Eterm id = *((Eterm *) idp); + insert_link_data(lnk, LINK_REF, id); } +static void clear_visited_link(ErtsLink *lnk, void *p) +{ + ErtsLinkData *ldp = erts_link_to_data(lnk); + ldp->a.flags &= ~ERTS_ML_FLG_DBG_VISITED; +} static void -insert_monitors(ErtsMonitor *monitors, Eterm id) +insert_p_links(ErtsPTabElementCommon *p) { - erts_doforall_monitors(monitors,&doit_insert_monitor,&id); + Eterm id = p->id; + erts_link_tree_foreach(p->u.alive.links, insert_link, (void *) &id); } static void -insert_links(ErtsLink *lnk, Eterm id) +insert_dist_links(DistEntry *dep) { - erts_doforall_links(lnk,&doit_insert_link,&id); + if (dep->mld) + erts_link_list_foreach(dep->mld->links, + insert_link, + (void *) &dep->sysname); } -static void doit_insert_link2(ErtsLink *lnk, void *p) +static void +clear_visited_p_links(ErtsPTabElementCommon *p) { - Eterm *idp = p; - if(is_external(lnk->pid)) - insert_node(external_thing_ptr(lnk->pid)->node, LINK_REF, - *idp); - insert_links(ERTS_LINK_ROOT(lnk), *idp); + erts_link_tree_foreach(p->u.alive.links, + clear_visited_link, + NULL); } static void -insert_links2(ErtsLink *lnk, Eterm id) +clear_visited_dist_links(DistEntry *dep) { - erts_doforall_links(lnk,&doit_insert_link2,&id); + if (dep->mld) + erts_link_list_foreach(dep->mld->links, + clear_visited_link, + NULL); } static void @@ -1576,6 +1683,60 @@ insert_delayed_delete_dist_entry(void *state, } static void +insert_message(ErtsMessage *msg, int type, Process *proc) +{ + ErlHeapFragment *heap_frag = NULL; + + ASSERT(ERTS_SIG_IS_MSG(msg)); + if (msg->data.attached) { + if (msg->data.attached == ERTS_MSG_COMBINED_HFRAG) + heap_frag = &msg->hfrag; + else if (ERTS_SIG_IS_INTERNAL_MSG(msg)) + heap_frag = msg->data.heap_frag; + else { + if (msg->data.dist_ext->dep) + insert_dist_entry(msg->data.dist_ext->dep, + type, proc->common.id, 0); + if (is_not_nil(ERL_MESSAGE_TOKEN(msg))) + heap_frag = erts_dist_ext_trailer(msg->data.dist_ext); + } + } + while (heap_frag) { + insert_offheap(&(heap_frag->off_heap), + type, + proc->common.id); + heap_frag = heap_frag->next; + } +} + +static void +insert_sig_msg(ErtsMessage *msg, void *arg) +{ + insert_message(msg, SIGNAL_REF, (Process *) arg); +} + +static void +insert_sig_offheap(ErlOffHeap *ohp, void *arg) +{ + Process *proc = arg; + insert_offheap(ohp, SIGNAL_REF, proc->common.id); +} + +static void +insert_sig_monitor(ErtsMonitor *mon, void *arg) +{ + Process *proc = arg; + insert_monitor_data(mon, SIGNAL_REF, proc->common.id); +} + +static void +insert_sig_link(ErtsLink *lnk, void *arg) +{ + Process *proc = arg; + insert_link_data(lnk, SIGNAL_REF, proc->common.id); +} + +static void setup_reference_table(void) { ErlHeapFragment *hfp; @@ -1630,10 +1791,7 @@ setup_reference_table(void) Process *proc = erts_pix2proc(i); if (proc) { int mli; - ErtsMessage *msg_list[] = { - proc->msg.first, - proc->msg_inq.first, - proc->msg_frag}; + ErtsMessage *msg_list[] = {proc->msg_frag}; /* Insert Heap */ insert_offheap(&(proc->off_heap), @@ -1648,34 +1806,24 @@ setup_reference_table(void) /* Insert msg buffers */ for (mli = 0; mli < sizeof(msg_list)/sizeof(msg_list[0]); mli++) { ErtsMessage *msg; - for (msg = msg_list[mli]; msg; msg = msg->next) { - ErlHeapFragment *heap_frag = NULL; - if (msg->data.attached) { - if (msg->data.attached == ERTS_MSG_COMBINED_HFRAG) - heap_frag = &msg->hfrag; - else if (is_value(ERL_MESSAGE_TERM(msg))) - heap_frag = msg->data.heap_frag; - else { - if (msg->data.dist_ext->dep) - insert_dist_entry(msg->data.dist_ext->dep, - HEAP_REF, proc->common.id, 0); - if (is_not_nil(ERL_MESSAGE_TOKEN(msg))) - heap_frag = erts_dist_ext_trailer(msg->data.dist_ext); - } - } - while (heap_frag) { - insert_offheap(&(heap_frag->off_heap), - HEAP_REF, - proc->common.id); - heap_frag = heap_frag->next; - } - } + for (msg = msg_list[mli]; msg; msg = msg->next) + insert_message(msg, HEAP_REF, proc); } + + /* Insert signal queue */ + erts_proc_sig_debug_foreach_sig(proc, + insert_sig_msg, + insert_sig_offheap, + insert_sig_monitor, + insert_sig_link, + (void *) proc); + /* Insert links */ - if (ERTS_P_LINKS(proc)) - insert_links(ERTS_P_LINKS(proc), proc->common.id); - if (ERTS_P_MONITORS(proc)) - insert_monitors(ERTS_P_MONITORS(proc), proc->common.id); + insert_p_links(&proc->common); + + /* Insert monitors */ + insert_p_monitors(&proc->common); + { DistEntry *dep = ERTS_PROC_GET_DIST_ENTRY(proc); if (dep) @@ -1705,11 +1853,9 @@ setup_reference_table(void) continue; /* Insert links */ - if (ERTS_P_LINKS(prt)) - insert_links(ERTS_P_LINKS(prt), prt->common.id); + insert_p_links(&prt->common); /* Insert monitors */ - if (ERTS_P_MONITORS(prt)) - insert_monitors(ERTS_P_MONITORS(prt), prt->common.id); + insert_p_monitors(&prt->common); /* Insert port data */ ohp = erts_port_data_offheap(prt); if (ohp) @@ -1758,41 +1904,25 @@ setup_reference_table(void) /* Insert all dist links */ for(dep = erts_visible_dist_entries; dep; dep = dep->next) { - if(dep->nlinks) - insert_links2(dep->nlinks, dep->sysname); - if(dep->node_links) - insert_links(dep->node_links, dep->sysname); - if(dep->monitors) - insert_monitors(dep->monitors, dep->sysname); + insert_dist_links(dep); + insert_dist_monitors(dep); } for(dep = erts_hidden_dist_entries; dep; dep = dep->next) { - if(dep->nlinks) - insert_links2(dep->nlinks, dep->sysname); - if(dep->node_links) - insert_links(dep->node_links, dep->sysname); - if(dep->monitors) - insert_monitors(dep->monitors, dep->sysname); + insert_dist_links(dep); + insert_dist_monitors(dep); } for(dep = erts_pending_dist_entries; dep; dep = dep->next) { - if(dep->nlinks) - insert_links2(dep->nlinks, dep->sysname); - if(dep->node_links) - insert_links(dep->node_links, dep->sysname); - if(dep->monitors) - insert_monitors(dep->monitors, dep->sysname); + insert_dist_links(dep); + insert_dist_monitors(dep); } /* Not connected dist entries should not have any links, but inspect them anyway */ for(dep = erts_not_connected_dist_entries; dep; dep = dep->next) { - if(dep->nlinks) - insert_links2(dep->nlinks, dep->sysname); - if(dep->node_links) - insert_links(dep->node_links, dep->sysname); - if(dep->monitors) - insert_monitors(dep->monitors, dep->sysname); + insert_dist_links(dep); + insert_dist_monitors(dep); } /* Insert all ets tables */ @@ -1877,6 +2007,10 @@ reference_table_term(Uint **hpp, ErlOffHeap *ohp, Uint *szp) tup = MK_2TUP(AM_system, MK_UINT(nrp->system_ref)); nrl = MK_CONS(tup, nrl); } + if(nrp->signal_ref) { + tup = MK_2TUP(AM_signal, MK_UINT(nrp->signal_ref)); + nrl = MK_CONS(tup, nrl); + } nrid = nrp->id; if (!IS_CONST(nrp->id)) { @@ -1900,24 +2034,25 @@ reference_table_term(Uint **hpp, ErlOffHeap *ohp, Uint *szp) } else if(is_internal_port(nrid)) { ASSERT(!nrp->heap_ref && !nrp->ets_ref && !nrp->bin_ref - && !nrp->timer_ref && !nrp->system_ref); + && !nrp->timer_ref && !nrp->system_ref && !nrp->signal_ref); tup = MK_2TUP(AM_port, nrid); } else if(nrp->ets_ref) { ASSERT(!nrp->heap_ref && !nrp->link_ref && !nrp->monitor_ref && !nrp->bin_ref - && !nrp->timer_ref && !nrp->system_ref); + && !nrp->timer_ref && !nrp->system_ref && !nrp->signal_ref); tup = MK_2TUP(AM_ets, nrid); } else if(nrp->bin_ref) { ASSERT(is_small(nrid) || is_big(nrid)); ASSERT(!nrp->heap_ref && !nrp->ets_ref && !nrp->link_ref && !nrp->monitor_ref && !nrp->timer_ref - && !nrp->system_ref); + && !nrp->system_ref && !nrp->signal_ref); tup = MK_2TUP(AM_match_spec, nrid); } else { - ASSERT(!nrp->heap_ref && !nrp->ets_ref && !nrp->bin_ref); + ASSERT(!nrp->heap_ref && !nrp->ets_ref && !nrp->bin_ref + && !nrp->signal_ref); ASSERT(is_atom(nrid)); tup = MK_2TUP(AM_dist, nrid); } @@ -1961,30 +2096,36 @@ reference_table_term(Uint **hpp, ErlOffHeap *ohp, Uint *szp) tup = MK_2TUP(AM_system, MK_UINT(drp->system_ref)); drl = MK_CONS(tup, drl); } + if(drp->signal_ref) { + tup = MK_2TUP(AM_signal, MK_UINT(drp->signal_ref)); + drl = MK_CONS(tup, drl); + } if (is_internal_pid(drp->id)) { ASSERT(!drp->node_ref); tup = MK_2TUP(AM_process, drp->id); } else if(is_internal_port(drp->id)) { - ASSERT(drp->ctrl_ref && !drp->node_ref); + ASSERT(drp->ctrl_ref && !drp->node_ref && !drp->signal_ref); tup = MK_2TUP(AM_port, drp->id); } else if (is_tuple(drp->id)) { Eterm *t; ASSERT(drp->system_ref && !drp->node_ref - && !drp->ctrl_ref && !drp->heap_ref && !drp->ets_ref); + && !drp->ctrl_ref && !drp->heap_ref && !drp->ets_ref + && !drp->signal_ref); t = tuple_val(drp->id); ASSERT(2 == arityval(t[0])); tup = MK_2TUP(t[1], t[2]); } else if (drp->ets_ref) { ASSERT(!drp->heap_ref && !drp->node_ref && - !drp->ctrl_ref && !drp->system_ref); + !drp->ctrl_ref && !drp->system_ref + && !drp->signal_ref); tup = MK_2TUP(AM_ets, drp->id); } - else { - ASSERT(!drp->ctrl_ref && drp->node_ref); + else { + ASSERT(!drp->ctrl_ref && drp->node_ref && !drp->signal_ref); ASSERT(is_atom(drp->id)); tup = MK_2TUP(drp->id, MK_UINT(drp->creation)); tup = MK_2TUP(AM_node, tup); @@ -2019,10 +2160,21 @@ reference_table_term(Uint **hpp, ErlOffHeap *ohp, Uint *szp) } +static void noop_sig_msg(ErtsMessage *msg, void *arg) +{ + +} + +static void noop_sig_offheap(ErlOffHeap *oh, void *arg) +{ + +} + static void delete_reference_table(void) { - Uint i; + DistEntry *dep; + int i, max; for(i = 0; i < no_referred_nodes; i++) { NodeReferrer *nrp; NodeReferrer *tnrp; @@ -2054,6 +2206,60 @@ delete_reference_table(void) inserted_bins = inserted_bins->next; erts_free(ERTS_ALC_T_NC_TMP, (void *)ib); } + + /* Cleanup... */ + + max = erts_ptab_max(&erts_proc); + for (i = 0; i < max; i++) { + Process *proc = erts_pix2proc(i); + if (proc) { + clear_visited_p_links(&proc->common); + clear_visited_p_monitors(&proc->common); + erts_proc_sig_debug_foreach_sig(proc, + noop_sig_msg, + noop_sig_offheap, + clear_visited_monitor, + clear_visited_link, + (void *) proc); + } + } + + max = erts_ptab_max(&erts_port); + for (i = 0; i < max; i++) { + erts_aint32_t state; + Port *prt; + + prt = erts_pix2port(i); + if (!prt) + continue; + + state = erts_atomic32_read_nob(&prt->state); + if (state & ERTS_PORT_SFLGS_DEAD) + continue; + + clear_visited_p_links(&prt->common); + clear_visited_p_monitors(&prt->common); + } + + for(dep = erts_visible_dist_entries; dep; dep = dep->next) { + clear_visited_dist_links(dep); + clear_visited_dist_monitors(dep); + } + + for(dep = erts_hidden_dist_entries; dep; dep = dep->next) { + clear_visited_dist_links(dep); + clear_visited_dist_monitors(dep); + } + + for(dep = erts_pending_dist_entries; dep; dep = dep->next) { + clear_visited_dist_links(dep); + clear_visited_dist_monitors(dep); + } + + for(dep = erts_not_connected_dist_entries; dep; dep = dep->next) { + clear_visited_dist_links(dep); + clear_visited_dist_monitors(dep); + } } void |