diff options
Diffstat (limited to 'erts/emulator/beam/erl_node_tables.c')
-rw-r--r-- | erts/emulator/beam/erl_node_tables.c | 240 |
1 files changed, 169 insertions, 71 deletions
diff --git a/erts/emulator/beam/erl_node_tables.c b/erts/emulator/beam/erl_node_tables.c index 18ed782ae3..afafaf48dc 100644 --- a/erts/emulator/beam/erl_node_tables.c +++ b/erts/emulator/beam/erl_node_tables.c @@ -201,6 +201,7 @@ dist_table_alloc(void *dep_tmpl) dep->send = NULL; dep->cache = NULL; dep->transcode_ctx = NULL; + dep->sequences = NULL; /* Link in */ @@ -801,8 +802,9 @@ node_table_hash(void *venp) static int node_table_cmp(void *venp1, void *venp2) { - return ((((ErlNode *) venp1)->sysname == ((ErlNode *) venp2)->sysname - && ((ErlNode *) venp1)->creation == ((ErlNode *) venp2)->creation) + return ((((ErlNode *) venp1)->sysname == ((ErlNode *) venp2)->sysname) && + ((((ErlNode *) venp1)->creation == ((ErlNode *) venp2)->creation) || + (((ErlNode *) venp1)->creation == 0 || ((ErlNode *) venp2)->creation == 0)) ? 0 : 1); } @@ -816,11 +818,16 @@ node_table_alloc(void *venp_tmpl) node_entries++; - erts_refc_init(&enp->refc, -1); + erts_init_node_entry(enp, -1); enp->creation = ((ErlNode *) venp_tmpl)->creation; enp->sysname = ((ErlNode *) venp_tmpl)->sysname; enp->dist_entry = erts_find_or_insert_dist_entry(((ErlNode *) venp_tmpl)->sysname); +#ifdef ERL_NODE_BOOKKEEP + erts_atomic_init_nob(&enp->slot, 0); + sys_memzero(enp->books, sizeof(struct erl_node_bookkeeping) * 1024); +#endif + return (void *) enp; } @@ -873,7 +880,7 @@ erts_node_table_info(fmtfn_t to, void *to_arg) } -ErlNode *erts_find_or_insert_node(Eterm sysname, Uint32 creation) +ErlNode *erts_find_or_insert_node(Eterm sysname, Uint32 creation, Eterm book) { ErlNode *res; ErlNode ne; @@ -883,9 +890,9 @@ ErlNode *erts_find_or_insert_node(Eterm sysname, Uint32 creation) erts_rwmtx_rlock(&erts_node_table_rwmtx); res = hash_get(&erts_node_table, (void *) &ne); if (res && res != erts_this_node) { - erts_aint_t refc = erts_refc_inctest(&res->refc, 0); + erts_aint_t refc = erts_ref_node_entry(res, 0, book); if (refc < 2) /* New or pending delete */ - erts_refc_inc(&res->refc, 1); + erts_ref_node_entry(res, 1, THE_NON_VALUE); } erts_rwmtx_runlock(&erts_node_table_rwmtx); if (res) @@ -895,9 +902,9 @@ ErlNode *erts_find_or_insert_node(Eterm sysname, Uint32 creation) res = hash_put(&erts_node_table, (void *) &ne); ASSERT(res); if (res != erts_this_node) { - erts_aint_t refc = erts_refc_inctest(&res->refc, 0); + erts_aint_t refc = erts_ref_node_entry(res, 0, book); if (refc < 2) /* New or pending delete */ - erts_refc_inc(&res->refc, 1); + erts_ref_node_entry(res, 1, THE_NON_VALUE); } erts_rwmtx_rwunlock(&erts_node_table_rwmtx); return res; @@ -924,6 +931,7 @@ static void try_delete_node(void *venp) * * If refc > 0, the entry is in use. Keep the entry. */ + erts_node_bookkeep(enp, THE_NON_VALUE, ERL_NODE_DEC); refc = erts_refc_dectest(&enp->refc, -1); if (refc == -1) (void) hash_erase(&erts_node_table, (void *) enp); @@ -1021,7 +1029,7 @@ erts_set_this_node(Eterm sysname, Uint creation) erts_deref_dist_entry(erts_this_dist_entry); erts_this_node = NULL; /* to make sure refc is bumped for this node */ - erts_this_node = erts_find_or_insert_node(sysname, creation); + erts_this_node = erts_find_or_insert_node(sysname, creation, THE_NON_VALUE); erts_this_dist_entry = erts_this_node->dist_entry; erts_ref_dist_entry(erts_this_dist_entry); @@ -1090,7 +1098,7 @@ void erts_init_node_tables(int dd_sec) node_tmpl.creation = 0; erts_this_node = hash_put(&erts_node_table, &node_tmpl); /* +1 for erts_this_node */ - erts_refc_init(&erts_this_node->refc, 1); + erts_init_node_entry(erts_this_node, 1); ASSERT(erts_this_node->dist_entry != NULL); erts_this_dist_entry = erts_this_node->dist_entry; @@ -1177,6 +1185,7 @@ static Eterm AM_system; static Eterm AM_timer; static Eterm AM_delayed_delete_timer; static Eterm AM_thread_progress_delete_timer; +static Eterm AM_sequence; static Eterm AM_signal; static void setup_reference_table(void); @@ -1218,6 +1227,7 @@ typedef struct dist_referrer_ { int ctrl_ref; int system_ref; int signal_ref; + int sequence_ref; Eterm id; Uint creation; Uint id_heap[ID_HEAP_SIZE]; @@ -1272,6 +1282,7 @@ erts_get_node_and_dist_references(struct process *proc) INIT_AM(delayed_delete_timer); INIT_AM(thread_progress_delete_timer); INIT_AM(signal); + INIT_AM(sequence); references_atoms_need_init = 0; } @@ -1309,8 +1320,9 @@ erts_get_node_and_dist_references(struct process *proc) #define TIMER_REF 8 #define SYSTEM_REF 9 #define SIGNAL_REF 10 +#define SEQUENCE_REF 11 -#define INC_TAB_SZ 10 +#define INC_TAB_SZ 11 static void insert_dist_referrer(ReferredDist *referred_dist, @@ -1344,6 +1356,7 @@ insert_dist_referrer(ReferredDist *referred_dist, drp->ctrl_ref = 0; drp->system_ref = 0; drp->signal_ref = 0; + drp->sequence_ref = 0; } switch (type) { @@ -1353,6 +1366,7 @@ insert_dist_referrer(ReferredDist *referred_dist, case ETS_REF: drp->ets_ref++; break; case SYSTEM_REF: drp->system_ref++; break; case SIGNAL_REF: drp->signal_ref++; break; + case SEQUENCE_REF: drp->sequence_ref++; break; default: ASSERT(0); } } @@ -1509,7 +1523,7 @@ insert_offheap(ErlOffHeap *oh, int type, Eterm id) } } else if (IsSendCtxBinary(u.mref->mb)) { - ErtsSendContext* ctx = ERTS_MAGIC_BIN_DATA(u.mref->mb); + ErtsDSigSendContext* ctx = ERTS_MAGIC_BIN_DATA(u.mref->mb); if (ctx->deref_dep) insert_dist_entry(ctx->dep, type, id, 0); } @@ -1544,16 +1558,18 @@ static void insert_monitor_data(ErtsMonitor *mon, int type, Eterm id) mdp->origin.flags |= ERTS_ML_FLG_DBG_VISITED; } -static void insert_monitor(ErtsMonitor *mon, void *idp) +static int insert_monitor(ErtsMonitor *mon, void *idp, Sint reds) { Eterm id = *((Eterm *) idp); insert_monitor_data(mon, MONITOR_REF, id); + return 1; } -static void clear_visited_monitor(ErtsMonitor *mon, void *p) +static int clear_visited_monitor(ErtsMonitor *mon, void *p, Sint reds) { ErtsMonitorData *mdp = erts_monitor_to_data(mon); mdp->origin.flags &= ~ERTS_ML_FLG_DBG_VISITED; + return 1; } static void @@ -1581,6 +1597,20 @@ insert_dist_monitors(DistEntry *dep) } } + +static int +insert_sequence(ErtsDistExternal *edep, void *arg, Sint reds) +{ + insert_dist_entry(edep->dep, SEQUENCE_REF, *(Eterm*)arg, 0); + return 1; +} + +static void +insert_dist_sequences(DistEntry *dep) +{ + erts_dist_seq_tree_foreach(dep, insert_sequence, (void *) &dep->sysname); +} + static void clear_visited_p_monitors(ErtsPTabElementCommon *p) { @@ -1621,16 +1651,18 @@ static void insert_link_data(ErtsLink *lnk, int type, Eterm id) ldp->a.flags |= ERTS_ML_FLG_DBG_VISITED; } -static void insert_link(ErtsLink *lnk, void *idp) +static int insert_link(ErtsLink *lnk, void *idp, Sint reds) { Eterm id = *((Eterm *) idp); insert_link_data(lnk, LINK_REF, id); + return 1; } -static void clear_visited_link(ErtsLink *lnk, void *p) +static int clear_visited_link(ErtsLink *lnk, void *p, Sint reds) { ErtsLinkData *ldp = erts_link_to_data(lnk); ldp->a.flags &= ~ERTS_ML_FLG_DBG_VISITED; + return 1; } static void @@ -1770,11 +1802,9 @@ insert_message(ErtsMessage *msg, int type, Process *proc) else if (ERTS_SIG_IS_INTERNAL_MSG(msg)) heap_frag = msg->data.heap_frag; else { - if (msg->data.dist_ext->dep) - insert_dist_entry(msg->data.dist_ext->dep, - type, proc->common.id, 0); - if (is_not_nil(ERL_MESSAGE_TOKEN(msg))) - heap_frag = erts_dist_ext_trailer(msg->data.dist_ext); + heap_frag = msg->data.heap_frag; + insert_dist_entry(erts_get_dist_ext(heap_frag)->dep, + type, proc->common.id, 0); } } while (heap_frag) { @@ -1798,24 +1828,115 @@ insert_sig_offheap(ErlOffHeap *ohp, void *arg) insert_offheap(ohp, SIGNAL_REF, proc->common.id); } -static void -insert_sig_monitor(ErtsMonitor *mon, void *arg) +static int +insert_sig_monitor(ErtsMonitor *mon, void *arg, Sint reds) { Process *proc = arg; insert_monitor_data(mon, SIGNAL_REF, proc->common.id); + return 1; } -static void -insert_sig_link(ErtsLink *lnk, void *arg) +static int +insert_sig_link(ErtsLink *lnk, void *arg, Sint reds) { Process *proc = arg; insert_link_data(lnk, SIGNAL_REF, proc->common.id); + return 1; } static void -setup_reference_table(void) +insert_sig_ext(ErtsDistExternal *edep, void *arg) { + Process *proc = arg; + insert_dist_entry(edep->dep, SIGNAL_REF, proc->common.id, 0); +} + +static void +insert_process(Process *proc) +{ + int mli; + ErtsMessage *msg_list[] = {proc->msg_frag}; ErlHeapFragment *hfp; + + /* Insert Heap */ + insert_offheap(&(proc->off_heap), + HEAP_REF, + proc->common.id); + /* Insert heap fragments buffers */ + for(hfp = proc->mbuf; hfp; hfp = hfp->next) + insert_offheap(&(hfp->off_heap), + HEAP_REF, + proc->common.id); + + /* Insert msg buffers */ + for (mli = 0; mli < sizeof(msg_list)/sizeof(msg_list[0]); mli++) { + ErtsMessage *msg; + for (msg = msg_list[mli]; msg; msg = msg->next) + insert_message(msg, HEAP_REF, proc); + } + + /* Insert signal queue */ + erts_proc_sig_debug_foreach_sig(proc, + insert_sig_msg, + insert_sig_offheap, + insert_sig_monitor, + insert_sig_link, + insert_sig_ext, + (void *) proc); + + /* If the process is FREE, the proc->common field has been + re-used by the ptab delete, so we cannot trust it. */ + if (!(erts_atomic32_read_nob(&proc->state) & ERTS_PSFLG_FREE)) { + /* Insert links */ + insert_p_links(&proc->common); + + /* Insert monitors */ + insert_p_monitors(&proc->common); + } + + { + DistEntry *dep = ERTS_PROC_GET_DIST_ENTRY(proc); + if (dep) + insert_dist_entry(dep, + CTRL_REF, + proc->common.id, + 0); + } +} + +static void +insert_dist_suspended_procs(DistEntry *dep) +{ + ErtsProcList *plist = erts_proclist_peek_first(dep->suspended); + while (plist) { + if (is_not_immed(plist->u.pid)) + insert_process(plist->u.p); + plist = erts_proclist_peek_next(dep->suspended, plist); + } +} + +#ifdef ERL_NODE_BOOKKEEP +void +erts_node_bookkeep(ErlNode *np, Eterm term, int what) +{ + erts_aint_t slot = (erts_atomic_inc_read_nob(&np->slot) - 1) % 1024; + ErtsSchedulerData *esdp = erts_get_scheduler_data(); + Eterm who = THE_NON_VALUE; + ASSERT(np); + np->books[slot].what = what; + np->books[slot].term = term; + if (esdp->current_process) { + who = esdp->current_process->common.id; + } else if (esdp->current_port) { + who = esdp->current_port->common.id; + } + np->books[slot].who = who; +} +#endif + +static void +setup_reference_table(void) +{ DistEntry *dep; HashInfo hi; int i, max; @@ -1865,52 +1986,10 @@ setup_reference_table(void) /* Insert all processes */ for (i = 0; i < max; i++) { Process *proc = erts_pix2proc(i); - if (proc) { - int mli; - ErtsMessage *msg_list[] = {proc->msg_frag}; - - /* Insert Heap */ - insert_offheap(&(proc->off_heap), - HEAP_REF, - proc->common.id); - /* Insert heap fragments buffers */ - for(hfp = proc->mbuf; hfp; hfp = hfp->next) - insert_offheap(&(hfp->off_heap), - HEAP_REF, - proc->common.id); - - /* Insert msg buffers */ - for (mli = 0; mli < sizeof(msg_list)/sizeof(msg_list[0]); mli++) { - ErtsMessage *msg; - for (msg = msg_list[mli]; msg; msg = msg->next) - insert_message(msg, HEAP_REF, proc); - } - - /* Insert signal queue */ - erts_proc_sig_debug_foreach_sig(proc, - insert_sig_msg, - insert_sig_offheap, - insert_sig_monitor, - insert_sig_link, - (void *) proc); - - /* Insert links */ - insert_p_links(&proc->common); - - /* Insert monitors */ - insert_p_monitors(&proc->common); - - { - DistEntry *dep = ERTS_PROC_GET_DIST_ENTRY(proc); - if (dep) - insert_dist_entry(dep, - CTRL_REF, - proc->common.id, - 0); - } - } + if (proc) + insert_process(proc); } - + erts_foreach_sys_msg_in_q(insert_sys_msg); /* Insert all ports */ @@ -1983,16 +2062,22 @@ setup_reference_table(void) for(dep = erts_visible_dist_entries; dep; dep = dep->next) { insert_dist_links(dep); insert_dist_monitors(dep); + insert_dist_sequences(dep); + insert_dist_suspended_procs(dep); } for(dep = erts_hidden_dist_entries; dep; dep = dep->next) { insert_dist_links(dep); insert_dist_monitors(dep); + insert_dist_sequences(dep); + insert_dist_suspended_procs(dep); } for(dep = erts_pending_dist_entries; dep; dep = dep->next) { insert_dist_links(dep); insert_dist_monitors(dep); + insert_dist_sequences(dep); + insert_dist_suspended_procs(dep); } /* Not connected dist entries should not have any links, @@ -2000,6 +2085,8 @@ setup_reference_table(void) for(dep = erts_not_connected_dist_entries; dep; dep = dep->next) { insert_dist_links(dep); insert_dist_monitors(dep); + insert_dist_sequences(dep); + insert_dist_suspended_procs(dep); } /* Insert all ets tables */ @@ -2173,6 +2260,10 @@ reference_table_term(Uint **hpp, ErlOffHeap *ohp, Uint *szp) tup = MK_2TUP(AM_system, MK_UINT(drp->system_ref)); drl = MK_CONS(tup, drl); } + if(drp->sequence_ref) { + tup = MK_2TUP(AM_sequence, MK_UINT(drp->sequence_ref)); + drl = MK_CONS(tup, drl); + } if(drp->signal_ref) { tup = MK_2TUP(AM_signal, MK_UINT(drp->signal_ref)); drl = MK_CONS(tup, drl); @@ -2247,6 +2338,12 @@ static void noop_sig_offheap(ErlOffHeap *oh, void *arg) } +static void noop_sig_ext(ErtsDistExternal *ext, void *arg) +{ + +} + + static void delete_reference_table(void) { @@ -2297,6 +2394,7 @@ delete_reference_table(void) noop_sig_offheap, clear_visited_monitor, clear_visited_link, + noop_sig_ext, (void *) proc); } } |