aboutsummaryrefslogtreecommitdiffstats
path: root/erts/emulator/beam/erl_node_tables.c
diff options
context:
space:
mode:
authorRickard Green <[email protected]>2017-08-28 15:00:48 +0200
committerRickard Green <[email protected]>2017-08-28 15:00:48 +0200
commit3aa559f584559af3a76ecd8c2367f59e062dbe0e (patch)
tree8cdd89cabdb2b6592f03627826a7b7041441d178 /erts/emulator/beam/erl_node_tables.c
parent7d8beec59350c1b3e92cee50c222f8e2ccdd30d5 (diff)
parentffd59fbd9ac262b7aba4b86e7da4992a3e668e24 (diff)
downloadotp-3aa559f584559af3a76ecd8c2367f59e062dbe0e.tar.gz
otp-3aa559f584559af3a76ecd8c2367f59e062dbe0e.tar.bz2
otp-3aa559f584559af3a76ecd8c2367f59e062dbe0e.zip
Merge branch 'rickard/dist/OTP-14459' into rickard/dist/master/OTP-14459
Conflicts: erts/emulator/beam/bif.c erts/emulator/beam/dist.c erts/emulator/beam/dist.h erts/emulator/beam/erl_bif_info.c erts/emulator/beam/erl_node_tables.c erts/emulator/beam/erl_node_tables.h erts/emulator/beam/external.c
Diffstat (limited to 'erts/emulator/beam/erl_node_tables.c')
-rw-r--r--erts/emulator/beam/erl_node_tables.c346
1 files changed, 260 insertions, 86 deletions
diff --git a/erts/emulator/beam/erl_node_tables.c b/erts/emulator/beam/erl_node_tables.c
index f8e9fec27a..0f3dfa797c 100644
--- a/erts/emulator/beam/erl_node_tables.c
+++ b/erts/emulator/beam/erl_node_tables.c
@@ -29,6 +29,8 @@
#include "error.h"
#include "erl_thr_progress.h"
#include "dtrace-wrapper.h"
+#include "erl_binary.h"
+#include "erl_bif_unique.h"
Hash erts_dist_table;
Hash erts_node_table;
@@ -57,6 +59,58 @@ static ErtsMonotonicTime node_tab_delete_delay;
/* -- The distribution table ---------------------------------------------- */
+#define ErtsBin2DistEntry(B) \
+ ((DistEntry *) ERTS_MAGIC_BIN_DATA((B)))
+#define ErtsDistEntry2Bin(DEP) \
+ ((Binary *) ERTS_MAGIC_BIN_FROM_DATA((DEP)))
+
+static ERTS_INLINE erts_aint_t
+de_refc_read(DistEntry *dep, erts_aint_t min)
+{
+ return erts_refc_read(&ErtsDistEntry2Bin(dep)->intern.refc, min);
+}
+
+static ERTS_INLINE erts_aint_t
+de_refc_inc_read(DistEntry *dep, erts_aint_t min)
+{
+ return erts_refc_inctest(&ErtsDistEntry2Bin(dep)->intern.refc, min);
+}
+
+static ERTS_INLINE void
+de_refc_inc(DistEntry *dep, erts_aint_t min)
+{
+ erts_refc_inc(&ErtsDistEntry2Bin(dep)->intern.refc, min);
+}
+
+static ERTS_INLINE void
+de_refc_dec(DistEntry *dep, erts_aint_t min)
+{
+#ifdef DEBUG
+ (void) erts_refc_read(&ErtsDistEntry2Bin(dep)->intern.refc, min+1);
+#endif
+ erts_bin_release(ErtsDistEntry2Bin(dep));
+}
+
+static ERTS_INLINE erts_aint_t
+de_refc_dec_read(DistEntry *dep, erts_aint_t min)
+{
+ return erts_refc_dectest(&ErtsDistEntry2Bin(dep)->intern.refc, min);
+}
+
+void
+erts_ref_dist_entry(DistEntry *dep)
+{
+ ASSERT(dep);
+ de_refc_inc(dep, 1);
+}
+
+void
+erts_deref_dist_entry(DistEntry *dep)
+{
+ ASSERT(dep);
+ de_refc_dec(dep, 0);
+}
+
#ifdef DEBUG
static int
is_in_de_list(DistEntry *dep, DistEntry *dep_list)
@@ -85,22 +139,39 @@ dist_table_cmp(void *dep1, void *dep2)
static void*
dist_table_alloc(void *dep_tmpl)
{
+#ifdef DEBUG
+ erts_aint_t refc;
+#endif
Eterm sysname;
+ Binary *bin;
DistEntry *dep;
erts_rwmtx_opt_t rwmtx_opt = ERTS_RWMTX_OPT_DEFAULT_INITER;
rwmtx_opt.type = ERTS_RWMTX_TYPE_FREQUENT_READ;
sysname = ((DistEntry *) dep_tmpl)->sysname;
- dep = (DistEntry *) erts_alloc(ERTS_ALC_T_DIST_ENTRY, sizeof(DistEntry));
+
+ bin = erts_create_magic_binary_x(sizeof(DistEntry),
+ erts_dist_entry_destructor,
+ ERTS_ALC_T_DIST_ENTRY,
+ 0);
+ dep = ErtsBin2DistEntry(bin);
dist_entries++;
+#ifdef DEBUG
+ refc =
+#else
+ (void)
+#endif
+ de_refc_dec_read(dep, -1);
+ ASSERT(refc == -1);
+
dep->prev = NULL;
- erts_refc_init(&dep->refc, -1);
erts_rwmtx_init_opt(&dep->rwmtx, &rwmtx_opt, "dist_entry", sysname,
ERTS_LOCK_FLAGS_CATEGORY_DISTRIBUTION);
dep->sysname = sysname;
dep->cid = NIL;
+ erts_atomic_init_nob(&dep->input_handler, (erts_aint_t) NIL);
dep->connection_id = 0;
dep->status = 0;
dep->flags = 0;
@@ -114,12 +185,16 @@ dist_table_alloc(void *dep_tmpl)
erts_mtx_init(&dep->qlock, "dist_entry_out_queue", sysname,
ERTS_LOCK_FLAGS_CATEGORY_DISTRIBUTION);
- dep->qflgs = 0;
- dep->qsize = 0;
+ erts_atomic32_init_nob(&dep->qflgs, 0);
+ erts_atomic_init_nob(&dep->qsize, 0);
+ erts_atomic64_init_nob(&dep->in, 0);
+ erts_atomic64_init_nob(&dep->out, 0);
dep->out_queue.first = NULL;
dep->out_queue.last = NULL;
dep->suspended = NULL;
+ dep->tmp_out_queue.first = NULL;
+ dep->tmp_out_queue.last = NULL;
dep->finalized_out_queue.first = NULL;
dep->finalized_out_queue.last = NULL;
@@ -181,7 +256,7 @@ dist_table_free(void *vdep)
#ifdef DEBUG
sys_memset(vdep, 0x77, sizeof(DistEntry));
#endif
- erts_free(ERTS_ALC_T_DIST_ENTRY, (void *) dep);
+ erts_bin_free(ErtsDistEntry2Bin(dep));
ASSERT(dist_entries > 0);
dist_entries--;
@@ -199,19 +274,52 @@ erts_dist_table_info(fmtfn_t to, void *to_arg)
erts_rwmtx_runlock(&erts_dist_table_rwmtx);
}
+static ERTS_INLINE DistEntry *find_dist_entry(Eterm sysname,
+ int inc_refc,
+ int connected_only)
+{
+ DistEntry *res;
+ DistEntry de;
+ de.sysname = sysname;
+ erts_rwmtx_rlock(&erts_dist_table_rwmtx);
+ res = hash_get(&erts_dist_table, (void *) &de);
+ if (res) {
+ if (connected_only && is_nil(res->cid))
+ res = NULL;
+ else {
+ int pend_delete;
+ erts_aint_t refc;
+ if (inc_refc) {
+ refc = de_refc_inc_read(res, 1);
+ pend_delete = refc < 2;
+ }
+ else {
+ refc = de_refc_read(res, 0);
+ pend_delete = refc < 1;
+ }
+ if (pend_delete) /* Pending delete */
+ de_refc_inc(res, 1);
+ }
+ }
+ erts_rwmtx_runlock(&erts_dist_table_rwmtx);
+ return res;
+}
+
DistEntry *
erts_channel_no_to_dist_entry(Uint cno)
{
+ /*
+ * Does NOT increase reference count!
+ */
+
/*
* For this node (and previous incarnations of this node),
* ERST_INTERNAL_CHANNEL_NO (will always be 0 I guess) is used as
* channel no. For other nodes, the atom index of the atom corresponding
* to the node name is used as channel no.
*/
- if(cno == ERST_INTERNAL_CHANNEL_NO) {
- erts_refc_inc(&erts_this_dist_entry->refc, 2);
+ if (cno == ERST_INTERNAL_CHANNEL_NO)
return erts_this_dist_entry;
- }
if((cno > MAX_ATOM_INDEX)
|| (cno >= atom_table_size())
@@ -220,80 +328,97 @@ erts_channel_no_to_dist_entry(Uint cno)
/* cno is a valid atom index; find corresponding dist entry (if there
is one) */
- return erts_find_dist_entry(make_atom(cno));
+ return find_dist_entry(make_atom(cno), 0, 0);
}
-
DistEntry *
erts_sysname_to_connected_dist_entry(Eterm sysname)
{
- DistEntry de;
- DistEntry *res_dep;
- de.sysname = sysname;
-
- if(erts_this_dist_entry->sysname == sysname) {
- erts_refc_inc(&erts_this_dist_entry->refc, 2);
+ /*
+ * Does NOT increase reference count!
+ */
+ if(erts_this_dist_entry->sysname == sysname)
return erts_this_dist_entry;
- }
-
- erts_rwmtx_rlock(&erts_dist_table_rwmtx);
- res_dep = (DistEntry *) hash_get(&erts_dist_table, (void *) &de);
- if (res_dep) {
- erts_aint_t refc = erts_refc_inctest(&res_dep->refc, 1);
- if (refc < 2) /* Pending delete */
- erts_refc_inc(&res_dep->refc, 1);
- }
- erts_rwmtx_runlock(&erts_dist_table_rwmtx);
- if (res_dep) {
- int deref;
- erts_rwmtx_rlock(&res_dep->rwmtx);
- deref = is_nil(res_dep->cid);
- erts_rwmtx_runlock(&res_dep->rwmtx);
- if (deref) {
- erts_deref_dist_entry(res_dep);
- res_dep = NULL;
- }
- }
- return res_dep;
+ return find_dist_entry(sysname, 0, 1);
}
DistEntry *erts_find_or_insert_dist_entry(Eterm sysname)
{
+ /*
+ * This function DOES increase reference count!
+ */
DistEntry *res;
DistEntry de;
erts_aint_t refc;
- res = erts_find_dist_entry(sysname);
+ res = find_dist_entry(sysname, 1, 0);
if (res)
return res;
de.sysname = sysname;
erts_rwmtx_rwlock(&erts_dist_table_rwmtx);
res = hash_put(&erts_dist_table, (void *) &de);
- refc = erts_refc_inctest(&res->refc, 0);
+ refc = de_refc_inc_read(res, 0);
if (refc < 2) /* New or pending delete */
- erts_refc_inc(&res->refc, 1);
+ de_refc_inc(res, 1);
erts_rwmtx_rwunlock(&erts_dist_table_rwmtx);
return res;
}
DistEntry *erts_find_dist_entry(Eterm sysname)
{
- DistEntry *res;
- DistEntry de;
- de.sysname = sysname;
- erts_rwmtx_rlock(&erts_dist_table_rwmtx);
- res = hash_get(&erts_dist_table, (void *) &de);
- if (res) {
- erts_aint_t refc = erts_refc_inctest(&res->refc, 1);
- if (refc < 2) /* Pending delete */
- erts_refc_inc(&res->refc, 1);
- }
- erts_rwmtx_runlock(&erts_dist_table_rwmtx);
- return res;
+ /*
+ * Does NOT increase reference count!
+ */
+ return find_dist_entry(sysname, 0, 0);
}
-static void try_delete_dist_entry(void *vdep)
+DistEntry *
+erts_dhandle_to_dist_entry(Eterm dhandle)
{
- DistEntry *dep = (DistEntry *) vdep;
+ Binary *bin;
+ if (!is_internal_magic_ref(dhandle))
+ return NULL;
+ bin = erts_magic_ref2bin(dhandle);
+ if (ERTS_MAGIC_BIN_DESTRUCTOR(bin) != erts_dist_entry_destructor)
+ return NULL;
+ return ErtsBin2DistEntry(bin);
+}
+
+Eterm
+erts_make_dhandle(Process *c_p, DistEntry *dep)
+{
+ Binary *bin;
+ Eterm *hp;
+
+ bin = ErtsDistEntry2Bin(dep);
+ ASSERT(bin);
+ ASSERT(ERTS_MAGIC_BIN_DESTRUCTOR(bin) == erts_dist_entry_destructor);
+ hp = HAlloc(c_p, ERTS_MAGIC_REF_THING_SIZE);
+ return erts_mk_magic_ref(&hp, &c_p->off_heap, bin);
+}
+
+static void try_delete_dist_entry(void *vbin);
+
+static void
+prepare_try_delete_dist_entry(void *vbin)
+{
+ Binary *bin = (Binary *) vbin;
+ DistEntry *dep = ErtsBin2DistEntry(bin);
+ Uint size;
+ erts_aint_t refc;
+
+ refc = de_refc_read(dep, 0);
+ if (refc > 0)
+ return;
+
+ size = ERTS_MAGIC_BIN_SIZE(sizeof(DistEntry));
+ erts_schedule_thr_prgr_later_cleanup_op(try_delete_dist_entry,
+ vbin, &dep->later_op, size);
+}
+
+static void try_delete_dist_entry(void *vbin)
+{
+ Binary *bin = (Binary *) vbin;
+ DistEntry *dep = ErtsBin2DistEntry(bin);
erts_aint_t refc;
erts_rwmtx_rwlock(&erts_dist_table_rwmtx);
@@ -312,26 +437,39 @@ static void try_delete_dist_entry(void *vdep)
*
* If refc > 0, the entry is in use. Keep the entry.
*/
- refc = erts_refc_dectest(&dep->refc, -1);
+ refc = de_refc_dec_read(dep, -1);
if (refc == -1)
(void) hash_erase(&erts_dist_table, (void *) dep);
erts_rwmtx_rwunlock(&erts_dist_table_rwmtx);
- if (refc == 0)
- erts_schedule_delete_dist_entry(dep);
+ if (refc == 0) {
+ if (node_tab_delete_delay == 0)
+ prepare_try_delete_dist_entry(vbin);
+ else if (node_tab_delete_delay > 0)
+ erts_start_timer_callback(node_tab_delete_delay,
+ prepare_try_delete_dist_entry,
+ vbin);
+ }
}
-void erts_schedule_delete_dist_entry(DistEntry *dep)
+int erts_dist_entry_destructor(Binary *bin)
{
- ASSERT(dep != erts_this_dist_entry);
- if (dep != erts_this_dist_entry) {
- if (node_tab_delete_delay == 0)
- try_delete_dist_entry((void *) dep);
- else if (node_tab_delete_delay > 0)
- erts_start_timer_callback(node_tab_delete_delay,
- try_delete_dist_entry,
- (void *) dep);
- }
+ DistEntry *dep = ErtsBin2DistEntry(bin);
+ erts_aint_t refc;
+
+ refc = de_refc_read(dep, -1);
+
+ if (refc == -1)
+ return 1; /* Allow deallocation of structure... */
+
+ if (node_tab_delete_delay == 0)
+ prepare_try_delete_dist_entry((void *) bin);
+ else if (node_tab_delete_delay > 0)
+ erts_start_timer_callback(node_tab_delete_delay,
+ prepare_try_delete_dist_entry,
+ (void *) bin);
+
+ return 0;
}
Uint
@@ -384,7 +522,7 @@ erts_set_dist_entry_not_connected(DistEntry *dep)
erts_rwmtx_rwlock(&erts_dist_table_rwmtx);
ASSERT(dep != erts_this_dist_entry);
- ASSERT(is_internal_port(dep->cid));
+ ASSERT(is_internal_port(dep->cid) || is_internal_pid(dep->cid));
if(dep->flags & DFLAG_PUBLISHED) {
if(dep->prev) {
@@ -439,7 +577,7 @@ erts_set_dist_entry_connected(DistEntry *dep, Eterm cid, Uint flags)
ASSERT(dep != erts_this_dist_entry);
ASSERT(is_nil(dep->cid));
- ASSERT(is_internal_port(cid));
+ ASSERT(is_internal_port(cid) || is_internal_pid(cid));
if(dep->prev) {
ASSERT(is_in_de_list(dep, erts_not_connected_dist_entries));
@@ -459,10 +597,19 @@ erts_set_dist_entry_connected(DistEntry *dep, Eterm cid, Uint flags)
dep->status |= ERTS_DE_SFLG_CONNECTED;
dep->flags = flags;
dep->cid = cid;
+ erts_atomic_set_nob(&dep->input_handler,
+ (erts_aint_t) cid);
+
dep->connection_id++;
dep->connection_id &= ERTS_DIST_EXT_CON_ID_MASK;
dep->prev = NULL;
+ erts_atomic64_set_nob(&dep->in, 0);
+ erts_atomic64_set_nob(&dep->out, 0);
+ erts_atomic32_set_nob(&dep->qflgs,
+ (is_internal_port(cid)
+ ? ERTS_DE_QFLG_PORT_CTRL
+ : ERTS_DE_QFLG_PROC_CTRL));
if(flags & DFLAG_PUBLISHED) {
dep->next = erts_visible_dist_entries;
if(erts_visible_dist_entries) {
@@ -716,19 +863,18 @@ void
erts_set_this_node(Eterm sysname, Uint creation)
{
ERTS_LC_ASSERT(erts_thr_progress_is_blocking());
- ASSERT(erts_refc_read(&erts_this_dist_entry->refc, 2));
+ ASSERT(2 <= de_refc_read(erts_this_dist_entry, 2));
if (erts_refc_dectest(&erts_this_node->refc, 0) == 0)
try_delete_node(erts_this_node);
- if (erts_refc_dectest(&erts_this_dist_entry->refc, 0) == 0)
- try_delete_dist_entry(erts_this_dist_entry);
+ erts_deref_dist_entry(erts_this_dist_entry);
erts_this_node = NULL; /* to make sure refc is bumped for this node */
erts_this_node = erts_find_or_insert_node(sysname, creation);
erts_this_dist_entry = erts_this_node->dist_entry;
- erts_refc_inc(&erts_this_dist_entry->refc, 2);
+ erts_ref_dist_entry(erts_this_dist_entry);
erts_this_node_sysname = erts_this_node_sysname_BUFFER;
erts_snprintf(erts_this_node_sysname, sizeof(erts_this_node_sysname_BUFFER),
@@ -797,9 +943,9 @@ void erts_init_node_tables(int dd_sec)
ASSERT(erts_this_node->dist_entry != NULL);
erts_this_dist_entry = erts_this_node->dist_entry;
/* +1 for erts_this_dist_entry */
- /* +1 for erts_this_node->dist_entry */
- erts_refc_init(&erts_this_dist_entry->refc, 2);
+ erts_ref_dist_entry(erts_this_dist_entry);
+ ASSERT(2 == de_refc_read(erts_this_dist_entry, 2));
erts_this_node_sysname = erts_this_node_sysname_BUFFER;
erts_snprintf(erts_this_node_sysname, sizeof(erts_this_node_sysname_BUFFER),
@@ -876,6 +1022,7 @@ static Eterm AM_node_references;
static Eterm AM_system;
static Eterm AM_timer;
static Eterm AM_delayed_delete_timer;
+static Eterm AM_thread_progress_delete_timer;
static void setup_reference_table(void);
static Eterm reference_table_term(Uint **hpp, ErlOffHeap *ohp, Uint *szp);
@@ -965,6 +1112,7 @@ erts_get_node_and_dist_references(struct process *proc)
INIT_AM(timer);
INIT_AM(system);
INIT_AM(delayed_delete_timer);
+ INIT_AM(thread_progress_delete_timer);
references_atoms_need_init = 0;
}
@@ -1148,6 +1296,10 @@ insert_offheap2(ErlOffHeap *oh, void *arg)
insert_offheap(oh, a->type, a->id);
}
+#define ErtsIsDistEntryBinary(Bin) \
+ (((Bin)->intern.flags & BIN_FLAG_MAGIC) \
+ && ERTS_MAGIC_BIN_DESTRUCTOR((Bin)) == erts_dist_entry_destructor)
+
static void
insert_offheap(ErlOffHeap *oh, int type, Eterm id)
{
@@ -1158,7 +1310,10 @@ insert_offheap(ErlOffHeap *oh, int type, Eterm id)
for (u.hdr = oh->first; u.hdr; u.hdr = u.hdr->next) {
switch (thing_subtag(u.hdr->thing_word)) {
case REF_SUBTAG:
- if(IsMatchProgBinary(u.mref->mb)) {
+ if (ErtsIsDistEntryBinary(u.mref->mb))
+ insert_dist_entry(ErtsBin2DistEntry(u.mref->mb),
+ type, id, 0);
+ else if(IsMatchProgBinary(u.mref->mb)) {
InsertedBin *ib;
int insert_bin = 1;
for (ib = inserted_bins; ib; ib = ib->next)
@@ -1301,26 +1456,34 @@ insert_delayed_delete_node(void *state,
ErtsMonotonicTime timeout_pos,
void *vnp)
{
- DeclareTmpHeapNoproc(heap,3);
- UseTmpHeapNoproc(3);
+ Eterm heap[3];
insert_node((ErlNode *) vnp,
SYSTEM_REF,
TUPLE2(&heap[0], AM_system, AM_delayed_delete_timer));
- UnUseTmpHeapNoproc(3);
+}
+
+static void
+insert_thr_prgr_delete_dist_entry(void *arg, ErtsThrPrgrVal thr_prgr, void *vbin)
+{
+ DistEntry *dep = ErtsBin2DistEntry(vbin);
+ Eterm heap[3];
+ insert_dist_entry(dep,
+ SYSTEM_REF,
+ TUPLE2(&heap[0], AM_system, AM_thread_progress_delete_timer),
+ 0);
}
static void
insert_delayed_delete_dist_entry(void *state,
ErtsMonotonicTime timeout_pos,
- void *vdep)
+ void *vbin)
{
- DeclareTmpHeapNoproc(heap,3);
- UseTmpHeapNoproc(3);
- insert_dist_entry((DistEntry *) vdep,
+ DistEntry *dep = ErtsBin2DistEntry(vbin);
+ Eterm heap[3];
+ insert_dist_entry(dep,
SYSTEM_REF,
TUPLE2(&heap[0], AM_system, AM_delayed_delete_timer),
0);
- UnUseTmpHeapNoproc(3);
}
static void
@@ -1354,9 +1517,12 @@ setup_reference_table(void)
erts_debug_callback_timer_foreach(try_delete_node,
insert_delayed_delete_node,
NULL);
- erts_debug_callback_timer_foreach(try_delete_dist_entry,
+ erts_debug_callback_timer_foreach(prepare_try_delete_dist_entry,
insert_delayed_delete_dist_entry,
NULL);
+ erts_debug_later_op_foreach(try_delete_dist_entry,
+ insert_thr_prgr_delete_dist_entry,
+ NULL);
UseTmpHeapNoproc(3);
insert_node(erts_this_node,
@@ -1421,6 +1587,14 @@ setup_reference_table(void)
insert_links(ERTS_P_LINKS(proc), proc->common.id);
if (ERTS_P_MONITORS(proc))
insert_monitors(ERTS_P_MONITORS(proc), proc->common.id);
+ {
+ DistEntry *dep = ERTS_PROC_GET_DIST_ENTRY(proc);
+ if (dep)
+ insert_dist_entry(dep,
+ CTRL_REF,
+ proc->common.id,
+ 0);
+ }
}
}
@@ -1719,7 +1893,7 @@ reference_table_term(Uint **hpp, ErlOffHeap *ohp, Uint *szp)
/* DistList = [{Dist, Refc, ReferenceIdList}] */
tup = MK_3TUP(referred_dists[i].dist->sysname,
- MK_UINT(erts_refc_read(&referred_dists[i].dist->refc, 0)),
+ MK_UINT(de_refc_read(referred_dists[i].dist, 0)),
dril);
dl = MK_CONS(tup, dl);
}