From b17db5ae892a8b9c48f8f94f7219c60fe122f629 Mon Sep 17 00:00:00 2001 From: Sverker Eriksson Date: Thu, 21 Sep 2017 19:21:18 +0200 Subject: erts: Fix bug in DistEntry refc dance to handle "lookup without refc++" correctly which was introduced in 4dcb2ae7810a507b701a30072b2f514cab7ebbdb. When decrementing refc to zero (in try_delete or prepare_try_delete) we must always wait thread progress to make sure no thread has done lookup without refc++ and is just about to do refc++ and thereby revive the DistEntry. That is, we wait for a potential other thread to either do refc++ or drop its pointer to the DistEntry. And if that other thread does refc++ (in erts_ref_dist_entry) it must also do the extra refc++ for the scheduled pending delete. --- erts/emulator/beam/erl_node_tables.c | 103 +++++++++++++++++----------- erts/emulator/test/node_container_SUITE.erl | 2 + 2 files changed, 66 insertions(+), 39 deletions(-) (limited to 'erts/emulator') diff --git a/erts/emulator/beam/erl_node_tables.c b/erts/emulator/beam/erl_node_tables.c index 0f3dfa797c..1ab096ab69 100644 --- a/erts/emulator/beam/erl_node_tables.c +++ b/erts/emulator/beam/erl_node_tables.c @@ -101,7 +101,9 @@ void erts_ref_dist_entry(DistEntry *dep) { ASSERT(dep); - de_refc_inc(dep, 1); + if (de_refc_inc_read(dep, 1) == 1) { + de_refc_inc(dep, 2); /* Pending delete */ + } } void @@ -396,44 +398,77 @@ erts_make_dhandle(Process *c_p, DistEntry *dep) return erts_mk_magic_ref(&hp, &c_p->off_heap, bin); } -static void try_delete_dist_entry(void *vbin); +static void start_timer_delete_dist_entry(void *vdep); +static void prepare_try_delete_dist_entry(void *vdep); +static void try_delete_dist_entry(DistEntry*); + +static void schedule_delete_dist_entry(DistEntry* dep) +{ + /* + * Here we need thread progress to wait for other threads, that may have + * done lookup without refc++, to do either refc++ or drop their refs. + * + * Note that timeouts do not guarantee thread progress. + */ + erts_schedule_thr_prgr_later_op(start_timer_delete_dist_entry, + dep, &dep->later_op); +} static void -prepare_try_delete_dist_entry(void *vbin) +start_timer_delete_dist_entry(void *vdep) { - Binary *bin = (Binary *) vbin; - DistEntry *dep = ErtsBin2DistEntry(bin); - Uint size; + if (node_tab_delete_delay == 0) { + prepare_try_delete_dist_entry(vdep); + } + else { + ASSERT(node_tab_delete_delay > 0); + erts_start_timer_callback(node_tab_delete_delay, + prepare_try_delete_dist_entry, + vdep); + } +} + +static void +prepare_try_delete_dist_entry(void *vdep) +{ + DistEntry *dep = vdep; erts_aint_t refc; + /* + * Time has passed since we decremented refc to zero and DistEntry may + * have been revived. Do a fast check without table lock first. + */ refc = de_refc_read(dep, 0); - if (refc > 0) - return; - - size = ERTS_MAGIC_BIN_SIZE(sizeof(DistEntry)); - erts_schedule_thr_prgr_later_cleanup_op(try_delete_dist_entry, - vbin, &dep->later_op, size); + if (refc == 0) { + try_delete_dist_entry(dep); + } + else { + /* + * Someone has done lookup and done refc++ for us. + */ + refc = de_refc_dec_read(dep, 0); + if (refc == 0) + schedule_delete_dist_entry(dep); + } } -static void try_delete_dist_entry(void *vbin) +static void try_delete_dist_entry(DistEntry* dep) { - Binary *bin = (Binary *) vbin; - DistEntry *dep = ErtsBin2DistEntry(bin); erts_aint_t refc; erts_rwmtx_rwlock(&erts_dist_table_rwmtx); /* * Another thread might have looked up this dist entry after * we decided to delete it (refc became zero). If so, the other - * thread incremented refc twice. Once for the new reference - * and once for this thread. + * thread incremented refc one extra step for this thread. * - * If refc reach -1, no one has used the entry since we - * set up the timer. Delete the entry. + * If refc reach -1, no one has done lookup and no one can do lookup + * as we have table lock. Delete the entry. * - * If refc reach 0, the entry is currently not in use - * but has been used since we set up the timer. Set up a - * new timer. + * If refc reach 0, someone raced us and either + * (1) did lookup with own refc++ and already released it again + * (2) did lookup without own refc++ + * Schedule new delete operation. * * If refc > 0, the entry is in use. Keep the entry. */ @@ -443,12 +478,7 @@ static void try_delete_dist_entry(void *vbin) erts_rwmtx_rwunlock(&erts_dist_table_rwmtx); if (refc == 0) { - if (node_tab_delete_delay == 0) - prepare_try_delete_dist_entry(vbin); - else if (node_tab_delete_delay > 0) - erts_start_timer_callback(node_tab_delete_delay, - prepare_try_delete_dist_entry, - vbin); + schedule_delete_dist_entry(dep); } } @@ -462,12 +492,7 @@ int erts_dist_entry_destructor(Binary *bin) if (refc == -1) return 1; /* Allow deallocation of structure... */ - if (node_tab_delete_delay == 0) - prepare_try_delete_dist_entry((void *) bin); - else if (node_tab_delete_delay > 0) - erts_start_timer_callback(node_tab_delete_delay, - prepare_try_delete_dist_entry, - (void *) bin); + schedule_delete_dist_entry(dep); return 0; } @@ -1463,9 +1488,9 @@ insert_delayed_delete_node(void *state, } static void -insert_thr_prgr_delete_dist_entry(void *arg, ErtsThrPrgrVal thr_prgr, void *vbin) +insert_thr_prgr_delete_dist_entry(void *arg, ErtsThrPrgrVal thr_prgr, void *vdep) { - DistEntry *dep = ErtsBin2DistEntry(vbin); + DistEntry *dep = vdep; Eterm heap[3]; insert_dist_entry(dep, SYSTEM_REF, @@ -1476,9 +1501,9 @@ insert_thr_prgr_delete_dist_entry(void *arg, ErtsThrPrgrVal thr_prgr, void *vbin static void insert_delayed_delete_dist_entry(void *state, ErtsMonotonicTime timeout_pos, - void *vbin) + void *vdep) { - DistEntry *dep = ErtsBin2DistEntry(vbin); + DistEntry *dep = vdep; Eterm heap[3]; insert_dist_entry(dep, SYSTEM_REF, @@ -1520,7 +1545,7 @@ setup_reference_table(void) erts_debug_callback_timer_foreach(prepare_try_delete_dist_entry, insert_delayed_delete_dist_entry, NULL); - erts_debug_later_op_foreach(try_delete_dist_entry, + erts_debug_later_op_foreach(start_timer_delete_dist_entry, insert_thr_prgr_delete_dist_entry, NULL); diff --git a/erts/emulator/test/node_container_SUITE.erl b/erts/emulator/test/node_container_SUITE.erl index be90f929df..7df001fec5 100644 --- a/erts/emulator/test/node_container_SUITE.erl +++ b/erts/emulator/test/node_container_SUITE.erl @@ -966,6 +966,8 @@ check_refc(ThisNodeName,ThisCreation,Table,EntryList) when is_list(EntryList) -> {case Referrer of {system,delayed_delete_timer} -> true; + {system,thread_progress_delete_timer} -> + true; _ -> DDT end, -- cgit v1.2.3 From 50f08826065ad29040a6c218addcf32bf12945b1 Mon Sep 17 00:00:00 2001 From: Sverker Eriksson Date: Fri, 22 Sep 2017 16:24:45 +0200 Subject: erts TEST: Add missing ref to DistEntry from send context --- erts/emulator/beam/erl_node_tables.c | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) (limited to 'erts/emulator') diff --git a/erts/emulator/beam/erl_node_tables.c b/erts/emulator/beam/erl_node_tables.c index 1ab096ab69..9931686cbe 100644 --- a/erts/emulator/beam/erl_node_tables.c +++ b/erts/emulator/beam/erl_node_tables.c @@ -1325,6 +1325,11 @@ insert_offheap2(ErlOffHeap *oh, void *arg) (((Bin)->intern.flags & BIN_FLAG_MAGIC) \ && ERTS_MAGIC_BIN_DESTRUCTOR((Bin)) == erts_dist_entry_destructor) +#define IsSendCtxBinary(Bin) \ + (((Bin)->intern.flags & BIN_FLAG_MAGIC) \ + && ERTS_MAGIC_BIN_DESTRUCTOR((Bin)) == erts_dsend_context_dtor) + + static void insert_offheap(ErlOffHeap *oh, int type, Eterm id) { @@ -1361,7 +1366,12 @@ insert_offheap(ErlOffHeap *oh, int type, Eterm id) inserted_bins = nib; UnUseTmpHeapNoproc(BIG_UINT_HEAP_SIZE); } - } + } + else if (IsSendCtxBinary(u.mref->mb)) { + ErtsSendContext* ctx = ERTS_MAGIC_BIN_DATA(u.mref->mb); + if (ctx->deref_dep) + insert_dist_entry(ctx->dep, type, id, 0); + } break; case REFC_BINARY_SUBTAG: case FUN_SUBTAG: -- cgit v1.2.3 From adc71259e87805990ea2a91bc0b5c17861984a47 Mon Sep 17 00:00:00 2001 From: Sverker Eriksson Date: Fri, 14 Jul 2017 15:08:03 +0200 Subject: erts: Remove obsolete code for latin1 in atom cache Distribution flag DFLAG_UTF8_ATOMS is supported since R16 and mandatory since 20.0. --- erts/emulator/beam/external.c | 49 +++++++++++-------------------------------- 1 file changed, 12 insertions(+), 37 deletions(-) (limited to 'erts/emulator') diff --git a/erts/emulator/beam/external.c b/erts/emulator/beam/external.c index 6666c42778..9761a69ad1 100644 --- a/erts/emulator/beam/external.c +++ b/erts/emulator/beam/external.c @@ -220,23 +220,10 @@ erts_destroy_atom_cache_map(ErtsAtomCacheMap *acmp) static ERTS_INLINE void insert_acache_map(ErtsAtomCacheMap *acmp, Eterm atom, Uint32 dflags) { - /* - * If the receiver do not understand utf8 atoms - * and this atom cannot be represented in latin1, - * we are not allowed to cache it. - * - * In this case all atoms are assumed to have - * latin1 encoding in the cache. By refusing it - * in the cache we will instead encode it using - * ATOM_UTF8_EXT/SMALL_ATOM_UTF8_EXT which the - * receiver do not recognize and tear down the - * connection. - */ - if (acmp && acmp->sz < ERTS_MAX_INTERNAL_ATOM_CACHE_ENTRIES - && ((dflags & DFLAG_UTF8_ATOMS) - || atom_tab(atom_val(atom))->latin1_chars >= 0)) { + if (acmp && acmp->sz < ERTS_MAX_INTERNAL_ATOM_CACHE_ENTRIES) { int ix; ASSERT(acmp->hdr_sz < 0); + ASSERT(dflags & DFLAG_UTF8_ATOMS); ix = atom2cix(atom); if (acmp->cache[ix].iix < 0) { acmp->cache[ix].iix = acmp->sz; @@ -256,9 +243,7 @@ get_iix_acache_map(ErtsAtomCacheMap *acmp, Eterm atom, Uint32 dflags) ASSERT(is_atom(atom)); ix = atom2cix(atom); if (acmp->cache[ix].iix < 0) { - ASSERT(acmp->sz == ERTS_MAX_INTERNAL_ATOM_CACHE_ENTRIES - || (!(dflags & DFLAG_UTF8_ATOMS) - && atom_tab(atom_val(atom))->latin1_chars < 0)); + ASSERT(acmp->sz == ERTS_MAX_INTERNAL_ATOM_CACHE_ENTRIES); return -1; } else { @@ -272,7 +257,6 @@ void erts_finalize_atom_cache_map(ErtsAtomCacheMap *acmp, Uint32 dflags) { if (acmp) { - int utf8_atoms = (int) (dflags & DFLAG_UTF8_ATOMS); int long_atoms = 0; /* !0 if one or more atoms are longer than 255. */ int i; int sz; @@ -283,6 +267,7 @@ erts_finalize_atom_cache_map(ErtsAtomCacheMap *acmp, Uint32 dflags) + 1 /* number of internal cache entries */ ; int min_sz; + ASSERT(dflags & DFLAG_UTF8_ATOMS); ASSERT(acmp->hdr_sz < 0); /* Make sure cache update instructions fit */ min_sz = fix_sz+(2+4)*acmp->sz; @@ -294,7 +279,7 @@ erts_finalize_atom_cache_map(ErtsAtomCacheMap *acmp, Uint32 dflags) atom = acmp->cache[acmp->cix[i]].atom; ASSERT(is_atom(atom)); a = atom_tab(atom_val(atom)); - len = (int) (utf8_atoms ? a->len : a->latin1_chars); + len = (int) a->len; ASSERT(len >= 0); if (!long_atoms && len > 255) long_atoms = 1; @@ -371,8 +356,8 @@ byte *erts_encode_ext_dist_header_finalize(byte *ext, ErtsAtomCache *cache, Uint int ci, sz; byte dist_hdr_flags; int long_atoms; - int utf8_atoms = (int) (dflags & DFLAG_UTF8_ATOMS); register byte *ep = ext; + ASSERT(dflags & DFLAG_UTF8_ATOMS); ASSERT(ep[0] == VERSION_MAGIC); if (ep[1] != DIST_HEADER) return ext; @@ -445,17 +430,9 @@ byte *erts_encode_ext_dist_header_finalize(byte *ext, ErtsAtomCache *cache, Uint Atom *a; cache->out_arr[cix] = atom; a = atom_tab(atom_val(atom)); - if (utf8_atoms) { - sz = a->len; - ep -= sz; - sys_memcpy((void *) ep, (void *) a->name, sz); - } - else { - ASSERT(0 <= a->latin1_chars && a->latin1_chars <= MAX_ATOM_CHARACTERS); - ep -= a->latin1_chars; - sz = erts_utf8_to_latin1(ep, a->name, a->len); - ASSERT(a->latin1_chars == sz); - } + sz = a->len; + ep -= sz; + sys_memcpy((void *) ep, (void *) a->name, sz); if (long_atoms) { ep -= 2; put_int16(sz, ep); @@ -641,7 +618,7 @@ erts_prepare_dist_ext(ErtsDistExternal *edep, #endif register byte *ep = ext; - int utf8_atoms = (int) (dep->flags & DFLAG_UTF8_ATOMS); + ASSERT(dep->flags & DFLAG_UTF8_ATOMS); edep->heap_size = -1; edep->ext_endp = ext+size; @@ -804,9 +781,7 @@ erts_prepare_dist_ext(ErtsDistExternal *edep, CHKSIZE(len); atom = erts_atom_put((byte *) ep, len, - (utf8_atoms - ? ERTS_ATOM_ENC_UTF8 - : ERTS_ATOM_ENC_LATIN1), + ERTS_ATOM_ENC_UTF8, 0); if (is_non_value(atom)) ERTS_EXT_HDR_FAIL; @@ -2135,7 +2110,7 @@ enc_atom(ErtsAtomCacheMap *acmp, Eterm atom, byte *ep, Uint32 dflags) { int iix; int len; - int utf8_atoms = (int) (dflags & DFLAG_UTF8_ATOMS); + const int utf8_atoms = (int) (dflags & DFLAG_UTF8_ATOMS); ASSERT(is_atom(atom)); -- cgit v1.2.3 From 8f3cb95bb60db55728bfea339b94ebc164b4dbdb Mon Sep 17 00:00:00 2001 From: Sverker Eriksson Date: Fri, 14 Jul 2017 19:35:26 +0200 Subject: erts: Remove some dead code --- erts/emulator/beam/erl_node_tables.h | 3 --- erts/emulator/beam/external.h | 22 ---------------------- 2 files changed, 25 deletions(-) (limited to 'erts/emulator') diff --git a/erts/emulator/beam/erl_node_tables.h b/erts/emulator/beam/erl_node_tables.h index ee8277b5ea..2cfd18f22f 100644 --- a/erts/emulator/beam/erl_node_tables.h +++ b/erts/emulator/beam/erl_node_tables.h @@ -60,9 +60,6 @@ #define ERTS_DE_SFLG_CONNECTED (((Uint32) 1) << 0) #define ERTS_DE_SFLG_EXITING (((Uint32) 1) << 1) -#define ERTS_DE_SFLGS_ALL (ERTS_DE_SFLG_CONNECTED \ - | ERTS_DE_SFLG_EXITING) - #define ERTS_DE_QFLG_BUSY (((erts_aint32_t) 1) << 0) #define ERTS_DE_QFLG_EXIT (((erts_aint32_t) 1) << 1) #define ERTS_DE_QFLG_REQ_INFO (((erts_aint32_t) 1) << 2) diff --git a/erts/emulator/beam/external.h b/erts/emulator/beam/external.h index 3c61d013da..fb7eec1a31 100644 --- a/erts/emulator/beam/external.h +++ b/erts/emulator/beam/external.h @@ -133,11 +133,6 @@ typedef struct { ErtsAtomTranslationTable attab; } ErtsDistExternal; -typedef struct { - int have_header; - int cache_entries; -} ErtsDistHeaderPeek; - #define ERTS_DIST_EXT_SIZE(EDEP) \ (sizeof(ErtsDistExternal) \ - (((EDEP)->flags & ERTS_DIST_EXT_ATOM_TRANS_TAB) \ @@ -177,9 +172,6 @@ Uint erts_encode_ext_size_ets(Eterm); void erts_encode_ext(Eterm, byte **); byte* erts_encode_ext_ets(Eterm, byte *, struct erl_off_heap_header** ext_off_heap); -#ifdef ERTS_WANT_EXTERNAL_TAGS -ERTS_GLB_INLINE void erts_peek_dist_header(ErtsDistHeaderPeek *, byte *, Uint); -#endif ERTS_GLB_INLINE void erts_free_dist_ext_copy(ErtsDistExternal *); ERTS_GLB_INLINE void *erts_dist_ext_trailer(ErtsDistExternal *); ErtsDistExternal *erts_make_dist_ext_copy(ErtsDistExternal *, Uint); @@ -210,20 +202,6 @@ int erts_debug_atom_to_out_cache_index(Eterm); #if ERTS_GLB_INLINE_INCL_FUNC_DEF -#ifdef ERTS_WANT_EXTERNAL_TAGS -ERTS_GLB_INLINE void -erts_peek_dist_header(ErtsDistHeaderPeek *dhpp, byte *ext, Uint sz) -{ - if (ext[0] == VERSION_MAGIC - || ext[1] != DIST_HEADER - || sz < (1+1+1)) - dhpp->have_header = 0; - else { - dhpp->have_header = 1; - dhpp->cache_entries = (int) get_int8(&ext[2]); - } -} -#endif ERTS_GLB_INLINE void erts_free_dist_ext_copy(ErtsDistExternal *edep) -- cgit v1.2.3 From 1b8515868ee4cadd7e93b281cea8e59078c27be8 Mon Sep 17 00:00:00 2001 From: Sverker Eriksson Date: Mon, 14 Aug 2017 19:38:21 +0200 Subject: erts: Change to #ifndef from #if !defined --- erts/emulator/beam/erl_thr_progress.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'erts/emulator') diff --git a/erts/emulator/beam/erl_thr_progress.h b/erts/emulator/beam/erl_thr_progress.h index fa936b5707..8c029bcf99 100644 --- a/erts/emulator/beam/erl_thr_progress.h +++ b/erts/emulator/beam/erl_thr_progress.h @@ -28,7 +28,7 @@ * Author: Rickard Green */ -#if !defined(ERL_THR_PROGRESS_H__TSD_TYPE__) +#ifndef ERL_THR_PROGRESS_H__TSD_TYPE__ #define ERL_THR_PROGRESS_H__TSD_TYPE__ #include "sys.h" -- cgit v1.2.3 From 8fa8ec018cf7d6ff6f4e1711fdb71f34fcea6db1 Mon Sep 17 00:00:00 2001 From: Sverker Eriksson Date: Wed, 16 Aug 2017 20:14:26 +0200 Subject: erts: Refactoring in distribution_SUITE for set/get_internal_state calls. --- erts/emulator/test/distribution_SUITE.erl | 44 +++++++++++++++++++------------ 1 file changed, 27 insertions(+), 17 deletions(-) (limited to 'erts/emulator') diff --git a/erts/emulator/test/distribution_SUITE.erl b/erts/emulator/test/distribution_SUITE.erl index e2914cbc92..c7d63f9feb 100644 --- a/erts/emulator/test/distribution_SUITE.erl +++ b/erts/emulator/test/distribution_SUITE.erl @@ -1158,8 +1158,6 @@ contended_atom_cache_entry_test(Config, Type) -> spawn_link( SNode, fun () -> - erts_debug:set_internal_state(available_internal_state, - true), Master = self(), CIX = get_cix(), TestAtoms = case Type of @@ -1244,7 +1242,7 @@ get_cix(CIX) when is_integer(CIX), CIX < 0 -> get_cix(CIX) when is_integer(CIX) -> get_cix(CIX, unwanted_cixs(), - erts_debug:get_internal_state(max_atom_out_cache_index)). + get_internal_state(max_atom_out_cache_index)). get_cix(CIX, Unwanted, MaxCIX) when CIX > MaxCIX -> get_cix(0, Unwanted, MaxCIX); @@ -1256,8 +1254,8 @@ get_cix(CIX, Unwanted, MaxCIX) -> unwanted_cixs() -> lists:map(fun (Node) -> - erts_debug:get_internal_state({atom_out_cache_index, - Node}) + get_internal_state({atom_out_cache_index, + Node}) end, nodes()). @@ -1266,7 +1264,7 @@ get_conflicting_atoms(_CIX, 0) -> []; get_conflicting_atoms(CIX, N) -> Atom = list_to_atom("atom" ++ integer_to_list(erlang:unique_integer([positive]))), - case erts_debug:get_internal_state({atom_out_cache_index, Atom}) of + case get_internal_state({atom_out_cache_index, Atom}) of CIX -> [Atom|get_conflicting_atoms(CIX, N-1)]; _ -> @@ -1277,7 +1275,7 @@ get_conflicting_unicode_atoms(_CIX, 0) -> []; get_conflicting_unicode_atoms(CIX, N) -> Atom = string_to_atom([16#1f608] ++ "atom" ++ integer_to_list(erlang:unique_integer([positive]))), - case erts_debug:get_internal_state({atom_out_cache_index, Atom}) of + case get_internal_state({atom_out_cache_index, Atom}) of CIX -> [Atom|get_conflicting_unicode_atoms(CIX, N-1)]; _ -> @@ -1885,11 +1883,26 @@ send_bad_dhdr(BadNode, ToNode) when is_atom(BadNode), is_atom(ToNode) -> receive Done -> ok end. dctrl(Node) when is_atom(Node) -> - case catch erts_debug:get_internal_state(available_internal_state) of - true -> true; - _ -> erts_debug:set_internal_state(available_internal_state, true) - end, - erts_debug:get_internal_state({dist_ctrl, Node}). + get_internal_state({dist_ctrl, Node}). + +get_internal_state(Op) -> + try erts_debug:get_internal_state(Op) of + R -> R + catch + error:undef -> + erts_debug:set_internal_state(available_internal_state, true), + erts_debug:get_internal_state(Op) + end. + +set_internal_state(Op, Val) -> + try erts_debug:set_internal_state(Op, Val) of + R -> R + catch + error:undef -> + erts_debug:set_internal_state(available_internal_state, true), + erts_debug:set_internal_state(Op, Val) + end. + dmsg_hdr() -> [131, % Version Magic @@ -2032,11 +2045,9 @@ freeze_node(Node, MS) -> Freezer = self(), spawn_link(Node, fun () -> - erts_debug:set_internal_state(available_internal_state, - true), dctrl_dop_send(Freezer, DoingIt), receive after Own -> ok end, - erts_debug:set_internal_state(block, MS+Own) + set_internal_state(block, MS+Own) end), receive DoingIt -> ok end, receive after Own -> ok end. @@ -2240,8 +2251,7 @@ forever(Fun) -> forever(Fun). abort(Why) -> - erts_debug:set_internal_state(available_internal_state, true), - erts_debug:set_internal_state(abort, Why). + set_internal_state(abort, Why). start_busy_dist_port_tracer() -> -- cgit v1.2.3 From 92d383cac5ddcb3856432e653ef84b0a3f84cc0a Mon Sep 17 00:00:00 2001 From: Sverker Eriksson Date: Tue, 22 Aug 2017 16:48:33 +0200 Subject: erts: Make DFLAGS_NEW_FUN_TAGS mandatory and remove ugly encoding fallback as {fun, ...} DFLAGS_NEW_FUN_TAGS has been supported by vm/erl_interface/jinterface since R13 or even older. Renamed test case obsolete_funs to term2bin_tuple_fallbacks and removed test for {fun,...} fallback and added missing test for bitstring fallback {Binary, Bits}. --- erts/emulator/beam/erl_bif_info.c | 8 ++-- erts/emulator/beam/external.c | 91 ++++++++++--------------------------- erts/emulator/test/binary_SUITE.erl | 51 +++++++++------------ 3 files changed, 49 insertions(+), 101 deletions(-) (limited to 'erts/emulator') diff --git a/erts/emulator/beam/erl_bif_info.c b/erts/emulator/beam/erl_bif_info.c index 27bbf70c0b..9d05680723 100644 --- a/erts/emulator/beam/erl_bif_info.c +++ b/erts/emulator/beam/erl_bif_info.c @@ -3830,10 +3830,10 @@ BIF_RETTYPE erts_debug_get_internal_state_1(BIF_ALIST_1) BIF_RET(res); } } - else if (ERTS_IS_ATOM_STR("term_to_binary_no_funs", tp[1])) { - Uint dflags = (DFLAG_EXTENDED_REFERENCES | - DFLAG_EXTENDED_PIDS_PORTS | - DFLAG_BIT_BINARIES); + else if (ERTS_IS_ATOM_STR("term_to_binary_tuple_fallbacks", tp[1])) { + Uint dflags = (TERM_TO_BINARY_DFLAGS + & ~DFLAG_EXPORT_PTR_TAG + & ~DFLAG_BIT_BINARIES); BIF_RET(erts_term_to_binary(BIF_P, tp[2], 0, dflags)); } else if (ERTS_IS_ATOM_STR("dist_ctrl", tp[1])) { diff --git a/erts/emulator/beam/external.c b/erts/emulator/beam/external.c index 9761a69ad1..9230e1d8ab 100644 --- a/erts/emulator/beam/external.c +++ b/erts/emulator/beam/external.c @@ -2878,55 +2878,24 @@ enc_term_int(TTBEncodeContext* ctx, ErtsAtomCacheMap *acmp, Eterm obj, byte* ep, ErlFunThing* funp = (ErlFunThing *) fun_val(obj); int ei; - if ((dflags & DFLAG_NEW_FUN_TAGS) != 0) { - *ep++ = NEW_FUN_EXT; - WSTACK_PUSH2(s, ENC_PATCH_FUN_SIZE, - (UWord) ep); /* Position for patching in size */ - ep += 4; - *ep = funp->arity; - ep += 1; - sys_memcpy(ep, funp->fe->uniq, 16); - ep += 16; - put_int32(funp->fe->index, ep); - ep += 4; - put_int32(funp->num_free, ep); - ep += 4; - ep = enc_atom(acmp, funp->fe->module, ep, dflags); - ep = enc_term(acmp, make_small(funp->fe->old_index), ep, dflags, off_heap); - ep = enc_term(acmp, make_small(funp->fe->old_uniq), ep, dflags, off_heap); - ep = enc_pid(acmp, funp->creator, ep, dflags); - } else { - /* - * Communicating with an obsolete erl_interface or - * jinterface node. Convert the fun to a tuple to - * avoid crasching. - */ - - /* Tag, arity */ - *ep++ = SMALL_TUPLE_EXT; - put_int8(5, ep); - ep += 1; - - /* 'fun' */ - ep = enc_atom(acmp, am_fun, ep, dflags); - - /* Module name */ - ep = enc_atom(acmp, funp->fe->module, ep, dflags); - - /* Index, Uniq */ - *ep++ = INTEGER_EXT; - put_int32(funp->fe->old_index, ep); - ep += 4; - *ep++ = INTEGER_EXT; - put_int32(funp->fe->old_uniq, ep); - ep += 4; - - /* Environment sub-tuple arity */ - ASSERT(funp->num_free < MAX_ARG); - *ep++ = SMALL_TUPLE_EXT; - put_int8(funp->num_free, ep); - ep += 1; - } + ASSERT(dflags & DFLAG_NEW_FUN_TAGS); + *ep++ = NEW_FUN_EXT; + WSTACK_PUSH2(s, ENC_PATCH_FUN_SIZE, + (UWord) ep); /* Position for patching in size */ + ep += 4; + *ep = funp->arity; + ep += 1; + sys_memcpy(ep, funp->fe->uniq, 16); + ep += 16; + put_int32(funp->fe->index, ep); + ep += 4; + put_int32(funp->num_free, ep); + ep += 4; + ep = enc_atom(acmp, funp->fe->module, ep, dflags); + ep = enc_term(acmp, make_small(funp->fe->old_index), ep, dflags, off_heap); + ep = enc_term(acmp, make_small(funp->fe->old_uniq), ep, dflags, off_heap); + ep = enc_pid(acmp, funp->creator, ep, dflags); + for (ei = funp->num_free-1; ei > 0; ei--) { WSTACK_PUSH2(s, ENC_TERM, (UWord) funp->env[ei]); } @@ -4281,24 +4250,12 @@ encode_size_struct_int(TTBSizeContext* ctx, ErtsAtomCacheMap *acmp, Eterm obj, { ErlFunThing* funp = (ErlFunThing *) fun_val(obj); - if ((dflags & DFLAG_NEW_FUN_TAGS) != 0) { - result += 20+1+1+4; /* New ID + Tag */ - result += 4; /* Length field (number of free variables */ - result += encode_size_struct2(acmp, funp->creator, dflags); - result += encode_size_struct2(acmp, funp->fe->module, dflags); - result += 2 * (1+4); /* Index, Uniq */ - } else { - /* - * Size when fun is mapped to a tuple. - */ - result += 1 + 1; /* Tuple tag, arity */ - result += 1 + 1 + 2 + - atom_tab(atom_val(am_fun))->len; /* 'fun' */ - result += 1 + 1 + 2 + - atom_tab(atom_val(funp->fe->module))->len; /* Module name */ - result += 2 * (1 + 4); /* Index + Uniq */ - result += 1 + (funp->num_free < 0x100 ? 1 : 4); - } + ASSERT(dflags & DFLAG_NEW_FUN_TAGS); + result += 20+1+1+4; /* New ID + Tag */ + result += 4; /* Length field (number of free variables */ + result += encode_size_struct2(acmp, funp->creator, dflags); + result += encode_size_struct2(acmp, funp->fe->module, dflags); + result += 2 * (1+4); /* Index, Uniq */ if (funp->num_free > 1) { WSTACK_PUSH2(s, (UWord) (funp->env + 1), (UWord) TERM_ARRAY_OP(funp->num_free-1)); diff --git a/erts/emulator/test/binary_SUITE.erl b/erts/emulator/test/binary_SUITE.erl index cbc2d8fae5..b59c22f125 100644 --- a/erts/emulator/test/binary_SUITE.erl +++ b/erts/emulator/test/binary_SUITE.erl @@ -58,7 +58,9 @@ otp_5484/1,otp_5933/1, ordering/1,unaligned_order/1,gc_test/1, bit_sized_binary_sizes/1, - otp_6817/1,deep/1,obsolete_funs/1,robustness/1,otp_8117/1, + otp_6817/1,deep/1, + term2bin_tuple_fallbacks/1, + robustness/1,otp_8117/1, otp_8180/1, trapping/1, large/1, error_after_yield/1, cmp_old_impl/1]). @@ -79,7 +81,8 @@ all() -> bad_term_to_binary, more_bad_terms, otp_5484, otp_5933, ordering, unaligned_order, gc_test, bit_sized_binary_sizes, otp_6817, otp_8117, deep, - obsolete_funs, robustness, otp_8180, trapping, large, + term2bin_tuple_fallbacks, + robustness, otp_8180, trapping, large, error_after_yield, cmp_old_impl]. groups() -> @@ -1300,40 +1303,28 @@ deep_roundtrip(T) -> B = term_to_binary(T), T = binary_to_term(B). -obsolete_funs(Config) when is_list(Config) -> +term2bin_tuple_fallbacks(Config) when is_list(Config) -> erts_debug:set_internal_state(available_internal_state, true), - X = id({1,2,3}), - Y = id([a,b,c,d]), - Z = id({x,y,z}), - obsolete_fun(fun() -> ok end), - obsolete_fun(fun() -> X end), - obsolete_fun(fun(A) -> {A,X} end), - obsolete_fun(fun() -> {X,Y} end), - obsolete_fun(fun() -> {X,Y,Z} end), - - obsolete_fun(fun ?MODULE:all/1), + term2bin_tf(fun ?MODULE:all/1), + term2bin_tf(<<1:1>>), + term2bin_tf(<<90,80:7>>), erts_debug:set_internal_state(available_internal_state, false), ok. -obsolete_fun(Fun) -> - Tuple = case erlang:fun_info(Fun, type) of - {type,external} -> - {module,M} = erlang:fun_info(Fun, module), - {name,F} = erlang:fun_info(Fun, name), - {M,F}; - {type,local} -> - {module,M} = erlang:fun_info(Fun, module), - {index,I} = erlang:fun_info(Fun, index), - {uniq,U} = erlang:fun_info(Fun, uniq), - {env,E} = erlang:fun_info(Fun, env), - {'fun',M,I,U,list_to_tuple(E)} - end, - Tuple = no_fun_roundtrip(Fun). - -no_fun_roundtrip(Term) -> - binary_to_term_stress(erts_debug:get_internal_state({term_to_binary_no_funs,Term})). +term2bin_tf(Term) -> + Tuple = case Term of + Fun when is_function(Fun) -> + {type, external} = erlang:fun_info(Fun, type), + {module,M} = erlang:fun_info(Fun, module), + {name,F} = erlang:fun_info(Fun, name), + {M,F}; + BS when bit_size(BS) rem 8 =/= 0 -> + Bits = bit_size(BS) rem 8, + {<>, Bits} + end, + Tuple = binary_to_term_stress(erts_debug:get_internal_state({term_to_binary_tuple_fallbacks,Term})). %% Test non-standard encodings never generated by term_to_binary/1 %% but recognized by binary_to_term/1. -- cgit v1.2.3 From fe720f6b2051c9bf8ff303f857c3db0a84b1c050 Mon Sep 17 00:00:00 2001 From: Sverker Eriksson Date: Mon, 18 Sep 2017 17:15:19 +0200 Subject: erts: Refactor connection_id in ErtsDistExternal Break out from 'flags' into new dedicated 'connection_id' just for simplicity. Also changed flags to low bits and that affected enif_binary_to_term. --- erts/emulator/beam/erl_nif.c | 11 ++++++----- erts/emulator/beam/external.c | 4 ++-- erts/emulator/beam/external.h | 22 +++++++++------------- 3 files changed, 17 insertions(+), 20 deletions(-) (limited to 'erts/emulator') diff --git a/erts/emulator/beam/erl_nif.c b/erts/emulator/beam/erl_nif.c index c21b139cfa..d1018bab26 100644 --- a/erts/emulator/beam/erl_nif.c +++ b/erts/emulator/beam/erl_nif.c @@ -1201,11 +1201,12 @@ size_t enif_binary_to_term(ErlNifEnv *dst_env, Sint size; ErtsHeapFactory factory; byte *bp = (byte*) data; + Uint32 flags = 0; - ERTS_CT_ASSERT(ERL_NIF_BIN2TERM_SAFE == ERTS_DIST_EXT_BTT_SAFE); - - if (opts & ~ERL_NIF_BIN2TERM_SAFE) { - return 0; + switch ((Uint32)opts) { + case 0: break; + case ERL_NIF_BIN2TERM_SAFE: flags = ERTS_DIST_EXT_BTT_SAFE; break; + default: return 0; } if ((size = erts_decode_ext_size(bp, data_sz)) < 0) return 0; @@ -1217,7 +1218,7 @@ size_t enif_binary_to_term(ErlNifEnv *dst_env, erts_factory_dummy_init(&factory); } - *term = erts_decode_ext(&factory, &bp, (Uint32)opts); + *term = erts_decode_ext(&factory, &bp, flags); if (is_non_value(*term)) { return 0; diff --git a/erts/emulator/beam/external.c b/erts/emulator/beam/external.c index 9230e1d8ab..3b851087d1 100644 --- a/erts/emulator/beam/external.c +++ b/erts/emulator/beam/external.c @@ -654,7 +654,7 @@ erts_prepare_dist_ext(ErtsDistExternal *edep, edep->flags |= ERTS_DIST_EXT_DFLAG_HDR; *connection_id = dep->connection_id; - edep->flags |= (dep->connection_id & ERTS_DIST_EXT_CON_ID_MASK); + edep->connection_id = dep->connection_id; if (ep[1] != DIST_HEADER) { if (edep->flags & ERTS_DIST_EXT_DFLAG_HDR) @@ -868,7 +868,7 @@ bad_dist_ext(ErtsDistExternal *edep) erts_dsprintf(dsbufp, ", %d=%T", i, edep->attab.atom[i]); } erts_send_warning_to_logger_nogl(dsbufp); - erts_kill_dist_connection(dep, ERTS_DIST_EXT_CON_ID(edep)); + erts_kill_dist_connection(dep, edep->connection_id); } } diff --git a/erts/emulator/beam/external.h b/erts/emulator/beam/external.h index fb7eec1a31..d6416edbc3 100644 --- a/erts/emulator/beam/external.h +++ b/erts/emulator/beam/external.h @@ -109,26 +109,22 @@ typedef struct { } ErtsAtomTranslationTable; /* - * These flags are tagged onto the high bits of a connection ID and stored in - * the ErtsDistExternal structure's flags field. They are used to indicate - * various bits of state necessary to decode binaries in a variety of - * scenarios. The mask ERTS_DIST_EXT_CON_ID_MASK is used later to separate the - * connection ID from the flags. Be careful to ensure that the mask does not - * overlap any of the bits used for flags, or ERTS will leak flags bits into - * connection IDs and leak connection ID bits into the flags. + * These flags are stored in the ErtsDistExternal structure's flags field. + * They are used to indicate various bits of state necessary to decode binaries + * in a variety of scenarios. */ -#define ERTS_DIST_EXT_DFLAG_HDR ((Uint32) 0x80000000) -#define ERTS_DIST_EXT_ATOM_TRANS_TAB ((Uint32) 0x40000000) -#define ERTS_DIST_EXT_BTT_SAFE ((Uint32) 0x20000000) -#define ERTS_DIST_EXT_CON_ID_MASK ((Uint32) 0x1fffffff) +#define ERTS_DIST_EXT_DFLAG_HDR ((Uint32) 0x1) +#define ERTS_DIST_EXT_ATOM_TRANS_TAB ((Uint32) 0x2) +#define ERTS_DIST_EXT_BTT_SAFE ((Uint32) 0x4) + +#define ERTS_DIST_CON_ID_MASK ((Uint32) 0x00ffffff) -#define ERTS_DIST_EXT_CON_ID(DIST_EXTP) \ - ((DIST_EXTP)->flags & ERTS_DIST_EXT_CON_ID_MASK) typedef struct { DistEntry *dep; byte *extp; byte *ext_endp; Sint heap_size; + Uint32 connection_id; Uint32 flags; ErtsAtomTranslationTable attab; } ErtsDistExternal; -- cgit v1.2.3 From f89fb92384280e2939414287a2ecb8f86a199318 Mon Sep 17 00:00:00 2001 From: Sverker Eriksson Date: Fri, 14 Jul 2017 19:34:54 +0200 Subject: erts: Introduce asynchronous auto-connect --- erts/emulator/beam/atom.names | 1 + erts/emulator/beam/bif.c | 121 ++++++++++------ erts/emulator/beam/bif.tab | 8 + erts/emulator/beam/dist.c | 274 ++++++++++++++++++++++++++--------- erts/emulator/beam/dist.h | 122 +++++++++++++--- erts/emulator/beam/erl_node_tables.c | 20 ++- erts/emulator/beam/erl_node_tables.h | 7 +- erts/emulator/beam/erl_process.c | 40 +++-- erts/emulator/beam/external.c | 41 +++++- erts/emulator/beam/external.h | 2 +- erts/emulator/beam/io.c | 3 +- 11 files changed, 474 insertions(+), 165 deletions(-) (limited to 'erts/emulator') diff --git a/erts/emulator/beam/atom.names b/erts/emulator/beam/atom.names index fc55b687d4..a534dd44fb 100644 --- a/erts/emulator/beam/atom.names +++ b/erts/emulator/beam/atom.names @@ -108,6 +108,7 @@ atom asynchronous atom atom atom atom_used atom attributes +atom auto_connect atom await_microstate_accounting_modifications atom await_port_send_result atom await_proc_exit diff --git a/erts/emulator/beam/bif.c b/erts/emulator/beam/bif.c index 4b11884f38..34f5afae78 100644 --- a/erts/emulator/beam/bif.c +++ b/erts/emulator/beam/bif.c @@ -227,17 +227,40 @@ BIF_RETTYPE link_1(BIF_ALIST_1) goto res_no_proc; } - code = erts_dsig_prepare(&dsd, dep, BIF_P, ERTS_DSP_RLOCK, 0); + code = erts_dsig_prepare(&dsd, &dep, BIF_P, + (ERTS_PROC_LOCK_MAIN | ERTS_PROC_LOCK_LINK), + ERTS_DSP_RLOCK, 0, 1); switch (code) { case ERTS_DSIG_PREP_NOT_ALIVE: - /* Let the dlink trap handle it */ - case ERTS_DSIG_PREP_NOT_CONNECTED: - erts_proc_unlock(BIF_P, ERTS_PROC_LOCK_LINK); - BIF_TRAP1(dlink_trap, BIF_P, BIF_ARG_1); - + case ERTS_DSIG_PREP_NOT_CONNECTED: { + ErtsProcLocks locks = ERTS_PROC_LOCK_MAIN | ERTS_PROC_LOCK_LINK; + erts_aint32_t state; + erts_proc_lock(BIF_P, (ERTS_PROC_LOCKS_ALL & ~locks)); + locks = ERTS_PROC_LOCKS_ALL; + erts_send_exit_signal(BIF_P, BIF_ARG_1, BIF_P, &locks, + am_noconnection, NIL, NULL, 0); + erts_proc_unlock(BIF_P, locks & ERTS_PROC_LOCKS_ALL_MINOR); + + /* + * Copy-paste from old dist_exit_3, not sure if we really + * need erts_handle_pending_exit when exit_2 does not. + */ + state = erts_atomic32_read_acqb(&BIF_P->state); + if (state & (ERTS_PSFLG_EXITING|ERTS_PSFLG_PENDING_EXIT)) { +#ifdef ERTS_SMP + if (state & ERTS_PSFLG_PENDING_EXIT) + erts_handle_pending_exit(BIF_P, ERTS_PROC_LOCK_MAIN); +#endif + ERTS_BIF_EXITED(BIF_P); + } + BIF_RET(am_true); + } + case ERTS_DSIG_PREP_PENDING: case ERTS_DSIG_PREP_CONNECTED: - /* We are connected. Setup link and send link signal */ - + /* + * We have (pending) connection. + * Setup link and enqueue link signal. + */ erts_de_links_lock(dep); erts_add_link(&ERTS_P_LINKS(BIF_P), LINK_PID, BIF_ARG_1); @@ -256,8 +279,7 @@ BIF_RETTYPE link_1(BIF_ALIST_1) ERTS_BIF_YIELD_RETURN(BIF_P, am_true); BIF_RET(am_true); default: - ASSERT(! "Invalid dsig prepare result"); - BIF_ERROR(BIF_P, EXC_INTERNAL_ERROR); + ERTS_ASSERT(! "Invalid dsig prepare result"); } } } @@ -292,7 +314,8 @@ remote_demonitor(Process *c_p, DistEntry *dep, Eterm ref, Eterm to) ERTS_LC_ASSERT((ERTS_PROC_LOCK_MAIN|ERTS_PROC_LOCK_LINK) == erts_proc_lc_my_proc_locks(c_p)); - code = erts_dsig_prepare(&dsd, dep, c_p, ERTS_DSP_RLOCK, 0); + code = erts_dsig_prepare(&dsd, &dep, c_p, ERTS_PROC_LOCK_MAIN, + ERTS_DSP_RLOCK, 0, 0); switch (code) { case ERTS_DSIG_PREP_NOT_ALIVE: case ERTS_DSIG_PREP_NOT_CONNECTED: @@ -313,6 +336,7 @@ remote_demonitor(Process *c_p, DistEntry *dep, Eterm ref, Eterm to) res = am_true; break; + case ERTS_DSIG_PREP_PENDING: case ERTS_DSIG_PREP_CONNECTED: erts_de_links_lock(dep); @@ -347,8 +371,7 @@ remote_demonitor(Process *c_p, DistEntry *dep, Eterm ref, Eterm to) } break; default: - ASSERT(! "Invalid dsig prepare result"); - return am_internal_error; + ERTS_ASSERT(! "Invalid dsig prepare result"); } @@ -768,10 +791,18 @@ remote_monitor(Process *p, Eterm bifarg1, Eterm bifarg2, int code; erts_proc_lock(p, ERTS_PROC_LOCK_LINK); - code = erts_dsig_prepare(&dsd, dep, p, ERTS_DSP_RLOCK, 0); + code = erts_dsig_prepare(&dsd, &dep, + p, (ERTS_PROC_LOCK_MAIN | ERTS_PROC_LOCK_LINK), + ERTS_DSP_RLOCK, 0, 0); switch (code) { + case ERTS_DSIG_PREP_PENDING: + /* + * Must wait for connection to know if node supports monitor. + * Damn these synchronous errors. + */ + erts_smp_de_runlock(dep); + /* fall through */ case ERTS_DSIG_PREP_NOT_ALIVE: - /* Let the dmonitor_p trap handle it */ case ERTS_DSIG_PREP_NOT_CONNECTED: erts_proc_unlock(p, ERTS_PROC_LOCK_LINK); ERTS_BIF_PREP_TRAP2(ret, dmonitor_p_trap, p, bifarg1, bifarg2); @@ -818,9 +849,7 @@ remote_monitor(Process *p, Eterm bifarg1, Eterm bifarg2, } break; default: - ASSERT(! "Invalid dsig prepare result"); - ERTS_BIF_PREP_ERROR(ret, p, EXC_INTERNAL_ERROR); - break; + ERTS_ASSERT(! "Invalid dsig prepare result"); } BIF_RET(ret); @@ -1158,7 +1187,8 @@ BIF_RETTYPE unlink_1(BIF_ALIST_1) BIF_RET(am_true); } - code = erts_dsig_prepare(&dsd, dep, BIF_P, ERTS_DSP_NO_LOCK, 0); + code = erts_dsig_prepare(&dsd, &dep, BIF_P, ERTS_PROC_LOCK_MAIN, + ERTS_DSP_NO_LOCK, 0, 0); switch (code) { case ERTS_DSIG_PREP_NOT_ALIVE: case ERTS_DSIG_PREP_NOT_CONNECTED: @@ -1173,6 +1203,7 @@ BIF_RETTYPE unlink_1(BIF_ALIST_1) BIF_TRAP1(dunlink_trap, BIF_P, BIF_ARG_1); #endif + case ERTS_DSIG_PREP_PENDING: case ERTS_DSIG_PREP_CONNECTED: erts_remove_dist_link(&dld, BIF_P->common.id, BIF_ARG_1, dep); code = erts_dsig_send_unlink(&dsd, BIF_P->common.id, BIF_ARG_1); @@ -1545,22 +1576,24 @@ BIF_RETTYPE exit_2(BIF_ALIST_2) DistEntry *dep; dep = external_pid_dist_entry(BIF_ARG_1); + ERTS_ASSERT(dep); if(dep == erts_this_dist_entry) BIF_RET(am_true); - code = erts_dsig_prepare(&dsd, dep, BIF_P, ERTS_DSP_NO_LOCK, 0); + code = erts_dsig_prepare(&dsd, &dep, BIF_P, ERTS_PROC_LOCK_MAIN, + ERTS_DSP_NO_LOCK, 0, 1); switch (code) { case ERTS_DSIG_PREP_NOT_ALIVE: case ERTS_DSIG_PREP_NOT_CONNECTED: - BIF_TRAP2(dexit_trap, BIF_P, BIF_ARG_1, BIF_ARG_2); + BIF_RET(am_true); + case ERTS_DSIG_PREP_PENDING: case ERTS_DSIG_PREP_CONNECTED: code = erts_dsig_send_exit2(&dsd, BIF_P->common.id, BIF_ARG_1, BIF_ARG_2); if (code == ERTS_DSIG_SEND_YIELD) ERTS_BIF_YIELD_RETURN(BIF_P, am_true); BIF_RET(am_true); default: - ASSERT(! "Invalid dsig prepare result"); - BIF_ERROR(BIF_P, EXC_INTERNAL_ERROR); + ERTS_ASSERT(! "Invalid dsig prepare result"); } } else if (is_not_internal_pid(BIF_ARG_1)) { @@ -1964,7 +1997,7 @@ ebif_bang_2(BIF_ALIST_2) * Send a message to Process, Port or Registered Process. * Returns non-negative reduction bump or negative result code. */ -#define SEND_TRAP (-1) +#define SEND_NOCONNECT (-1) #define SEND_YIELD (-2) #define SEND_YIELD_RETURN (-3) #define SEND_BADARG (-4) @@ -1980,20 +2013,22 @@ static Sint remote_send(Process *p, DistEntry *dep, { Sint res; int code; - ASSERT(is_atom(to) || is_external_pid(to)); ctx->dep = dep; - code = erts_dsig_prepare(&ctx->dsd, dep, p, ERTS_DSP_NO_LOCK, !ctx->suspend); + code = erts_dsig_prepare(&ctx->dsd, &dep, p, ERTS_PROC_LOCK_MAIN, + ERTS_DSP_NO_LOCK, + !ctx->suspend, ctx->connect); switch (code) { case ERTS_DSIG_PREP_NOT_ALIVE: case ERTS_DSIG_PREP_NOT_CONNECTED: - res = SEND_TRAP; + res = SEND_NOCONNECT; break; case ERTS_DSIG_PREP_WOULD_SUSPEND: ASSERT(!ctx->suspend); res = SEND_YIELD; break; + case ERTS_DSIG_PREP_PENDING: case ERTS_DSIG_PREP_CONNECTED: { if (is_atom(to)) @@ -2205,12 +2240,14 @@ do_send(Process *p, Eterm to, Eterm msg, Eterm *refp, ErtsSendContext *ctx) } return 0; } + ctx->dsd.node = tp[2]; ret = remote_send(p, dep, tp[1], to, msg, ctx); if (ret == SEND_YIELD_CONTINUE) { - if (dep) + if (dep) { erts_ref_dist_entry(dep); - ctx->dep_to_deref = dep; + ctx->deref_dep = 1; + } } return ret; } else { @@ -2251,7 +2288,6 @@ BIF_RETTYPE send_3(BIF_ALIST_3) Eterm msg = BIF_ARG_2; Eterm opts = BIF_ARG_3; - int connect = !0; Eterm l = opts; Sint result; @@ -2262,14 +2298,15 @@ BIF_RETTYPE send_3(BIF_ALIST_3) UseTmpHeap(sizeof(ErtsSendContext)/sizeof(Eterm), BIF_P); ctx->suspend = !0; - ctx->dep_to_deref = NULL; + ctx->connect = !0; + ctx->deref_dep = 0; ctx->return_term = am_ok; ctx->dss.reds = (Sint) (ERTS_BIF_REDS_LEFT(p) * TERM_TO_BINARY_LOOP_FACTOR); ctx->dss.phase = ERTS_DSIG_SEND_PHASE_INIT; while (is_list(l)) { if (CAR(list_val(l)) == am_noconnect) { - connect = 0; + ctx->connect = 0; } else if (CAR(list_val(l)) == am_nosuspend) { ctx->suspend = 0; } else { @@ -2306,9 +2343,9 @@ BIF_RETTYPE send_3(BIF_ALIST_3) goto yield_return; ERTS_BIF_PREP_RET(retval, am_ok); break; - case SEND_TRAP: - if (connect) { - ERTS_BIF_PREP_TRAP3(retval, dsend3_trap, p, to, msg, opts); + case SEND_NOCONNECT: + if (ctx->connect) { + ERTS_BIF_PREP_RET(retval, am_ok); } else { ERTS_BIF_PREP_RET(retval, am_noconnect); } @@ -2412,7 +2449,8 @@ Eterm erl_send(Process *p, Eterm to, Eterm msg) ref = NIL; #endif ctx->suspend = !0; - ctx->dep_to_deref = NULL; + ctx->connect = !0; + ctx->deref_dep = 0; ctx->return_term = msg; ctx->dss.reds = (Sint) (ERTS_BIF_REDS_LEFT(p) * TERM_TO_BINARY_LOOP_FACTOR); ctx->dss.phase = ERTS_DSIG_SEND_PHASE_INIT; @@ -2436,8 +2474,8 @@ Eterm erl_send(Process *p, Eterm to, Eterm msg) goto yield_return; ERTS_BIF_PREP_RET(retval, msg); break; - case SEND_TRAP: - ERTS_BIF_PREP_TRAP2(retval, dsend2_trap, p, to, msg); + case SEND_NOCONNECT: + ERTS_BIF_PREP_RET(retval, msg); break; case SEND_YIELD: ERTS_BIF_PREP_YIELD2(retval, bif_export[BIF_send_2], p, to, msg); @@ -4387,20 +4425,21 @@ BIF_RETTYPE group_leader_2(BIF_ALIST_2) if(dep == erts_this_dist_entry) BIF_ERROR(BIF_P, BADARG); - code = erts_dsig_prepare(&dsd, dep, BIF_P, ERTS_DSP_NO_LOCK, 0); + code = erts_dsig_prepare(&dsd, &dep, BIF_P, ERTS_PROC_LOCK_MAIN, + ERTS_DSP_NO_LOCK, 0, 0); switch (code) { case ERTS_DSIG_PREP_NOT_ALIVE: BIF_RET(am_true); case ERTS_DSIG_PREP_NOT_CONNECTED: BIF_TRAP2(dgroup_leader_trap, BIF_P, BIF_ARG_1, BIF_ARG_2); + case ERTS_DSIG_PREP_PENDING: case ERTS_DSIG_PREP_CONNECTED: code = erts_dsig_send_group_leader(&dsd, BIF_ARG_1, BIF_ARG_2); if (code == ERTS_DSIG_SEND_YIELD) ERTS_BIF_YIELD_RETURN(BIF_P, am_true); BIF_RET(am_true); default: - ASSERT(! "Invalid dsig prepare result"); - BIF_ERROR(BIF_P, EXC_INTERNAL_ERROR); + ERTS_ASSERT(! "Invalid dsig prepare result"); } } else if (is_internal_pid(BIF_ARG_2)) { diff --git a/erts/emulator/beam/bif.tab b/erts/emulator/beam/bif.tab index f7b4451890..66469a011d 100644 --- a/erts/emulator/beam/bif.tab +++ b/erts/emulator/beam/bif.tab @@ -691,3 +691,11 @@ bif erts_internal:maps_to_list/2 # bif erlang:iolist_to_iovec/1 + +# +# New in 21.0 +# + +bif erlang:new_connection_id/1 +bif erlang:abort_connection_id/2 + diff --git a/erts/emulator/beam/dist.c b/erts/emulator/beam/dist.c index bc168fc58d..0fa399cf53 100644 --- a/erts/emulator/beam/dist.c +++ b/erts/emulator/beam/dist.c @@ -103,8 +103,6 @@ dist_msg_dbg(ErtsDistExternal *edep, char *what, byte *buf, int sz) -#define PASS_THROUGH 'p' /* This code should go */ - int erts_is_alive; /* System must be blocked on change */ int erts_dist_buf_busy_limit; @@ -684,31 +682,11 @@ size_obuf(ErtsDistOutputBuf *obuf) return bin->orig_size; } -static void clear_dist_entry(DistEntry *dep) +static ErtsDistOutputBuf* clear_de_out_queues(DistEntry* dep) { - Sint obufsize = 0; - ErtsAtomCache *cache; - ErtsProcList *suspendees; ErtsDistOutputBuf *obuf; - erts_de_rwlock(dep); - erts_atomic_set_nob(&dep->input_handler, - (erts_aint_t) NIL); - cache = dep->cache; - dep->cache = NULL; - -#ifdef DEBUG - erts_de_links_lock(dep); - ASSERT(!dep->nlinks); - ASSERT(!dep->node_links); - ASSERT(!dep->monitors); - erts_de_links_unlock(dep); -#endif - - erts_mtx_lock(&dep->qlock); - - erts_atomic64_set_nob(&dep->in, 0); - erts_atomic64_set_nob(&dep->out, 0); + ERTS_LC_ASSERT(erts_lc_mtx_is_locked(&dep->qlock)); if (!dep->out_queue.last) obuf = dep->finalized_out_queue.first; @@ -728,17 +706,12 @@ static void clear_dist_entry(DistEntry *dep) dep->tmp_out_queue.last = NULL; dep->finalized_out_queue.first = NULL; dep->finalized_out_queue.last = NULL; - dep->status = 0; - suspendees = get_suspended_on_de(dep, ERTS_DE_QFLGS_ALL); - - erts_mtx_unlock(&dep->qlock); - erts_atomic_set_nob(&dep->dist_cmd_scheduled, 0); - dep->send = NULL; - erts_de_rwunlock(dep); - - erts_resume_processes(suspendees); + return obuf; +} - delete_cache(cache); +static void free_de_out_queues(DistEntry* dep, ErtsDistOutputBuf *obuf) +{ + Sint obufsize = 0; while (obuf) { ErtsDistOutputBuf *fobuf; @@ -750,13 +723,54 @@ static void clear_dist_entry(DistEntry *dep) if (obufsize) { erts_mtx_lock(&dep->qlock); - ASSERT(erts_atomic_read_nob(&dep->qsize) >= obufsize); + ASSERT(erts_atomic_read_nob(&dep->qsize) >= obufsize); erts_atomic_add_nob(&dep->qsize, (erts_aint_t) -obufsize); erts_mtx_unlock(&dep->qlock); } } +static void clear_dist_entry(DistEntry *dep) +{ + ErtsAtomCache *cache; + ErtsProcList *suspendees; + ErtsDistOutputBuf *obuf; + + erts_de_rwlock(dep); + erts_atomic_set_nob(&dep->input_handler, + (erts_aint_t) NIL); + cache = dep->cache; + dep->cache = NULL; + +#ifdef DEBUG + erts_de_links_lock(dep); + ASSERT(!dep->nlinks); + ASSERT(!dep->node_links); + ASSERT(!dep->monitors); + erts_de_links_unlock(dep); +#endif + + erts_mtx_lock(&dep->qlock); + + erts_atomic64_set_nob(&dep->in, 0); + erts_atomic64_set_nob(&dep->out, 0); + + obuf = clear_de_out_queues(dep); + dep->status = 0; + suspendees = get_suspended_on_de(dep, ERTS_DE_QFLGS_ALL); + + erts_mtx_unlock(&dep->qlock); + erts_atomic_set_nob(&dep->dist_cmd_scheduled, 0); + dep->send = NULL; + erts_de_rwunlock(dep); + + erts_resume_processes(suspendees); + + delete_cache(cache); + + free_de_out_queues(dep, obuf); +} + int erts_dsend_context_dtor(Binary* ctx_bin) { ErtsSendContext* ctx = ERTS_MAGIC_BIN_DATA(ctx_bin); @@ -772,8 +786,8 @@ int erts_dsend_context_dtor(Binary* ctx_bin) if (ctx->dss.phase >= ERTS_DSIG_SEND_PHASE_ALLOC && ctx->dss.obuf) { free_dist_obuf(ctx->dss.obuf); } - if (ctx->dep_to_deref) - erts_deref_dist_entry(ctx->dep_to_deref); + if (ctx->deref_dep) + erts_deref_dist_entry(ctx->dep); return 1; } @@ -1324,7 +1338,7 @@ int erts_net_message(Port *prt, /* This is tricky (we MUST force a distributed send) */ ErtsDSigData dsd; int code; - code = erts_dsig_prepare(&dsd, dep, NULL, ERTS_DSP_NO_LOCK, 0); + code = erts_dsig_prepare(&dsd, &dep, NULL, 0, ERTS_DSP_NO_LOCK, 0, 0); if (code == ERTS_DSIG_PREP_CONNECTED) { code = erts_dsig_send_exit(&dsd, to, from, am_noproc); ASSERT(code == ERTS_DSIG_SEND_OK); @@ -1416,7 +1430,7 @@ int erts_net_message(Port *prt, if (!rp) { ErtsDSigData dsd; int code; - code = erts_dsig_prepare(&dsd, dep, NULL, ERTS_DSP_NO_LOCK, 0); + code = erts_dsig_prepare(&dsd, &dep, NULL, 0, ERTS_DSP_NO_LOCK, 0, 0); if (code == ERTS_DSIG_PREP_CONNECTED) { code = erts_dsig_send_m_exit(&dsd, watcher, watched, ref, am_noproc); @@ -1883,7 +1897,7 @@ erts_dsig_send(ErtsDSigData *dsdp, struct erts_dsig_send_context* ctx) } else { ctx->acmp = NULL; - ctx->pass_through_size = 1; + ctx->pass_through_size = 3; /* SVERK rename */ } #ifdef ERTS_DIST_MSG_DBG @@ -1961,9 +1975,9 @@ erts_dsig_send(ErtsDSigData *dsdp, struct erts_dsig_send_context* ctx) ctx->obuf->next = NULL; erts_de_rlock(dep); cid = dep->cid; - if (cid != dsdp->cid - || dep->connection_id != dsdp->connection_id - || dep->status & ERTS_DE_SFLG_EXITING) { + if (!(dep->status & (ERTS_DE_SFLG_PENDING | ERTS_DE_SFLG_CONNECTED)) + || dep->status & ERTS_DE_SFLG_EXITING + || dep->connection_id != dsdp->connection_id) { /* Not the same connection as when we started; drop message... */ erts_de_runlock(dep); free_dist_obuf(ctx->obuf); @@ -2037,8 +2051,13 @@ erts_dsig_send(ErtsDSigData *dsdp, struct erts_dsig_send_context* ctx) } erts_mtx_unlock(&dep->qlock); - if (is_internal_port(dep->cid)) - erts_schedule_dist_command(NULL, dep); + if (!(dep->status & ERTS_DE_SFLG_PENDING)) { + if (is_internal_port(dep->cid)) + erts_schedule_dist_command(NULL, dep); + } + else { + notify_proc = NIL; + } erts_de_runlock(dep); if (is_internal_pid(notify_proc)) notify_dist_data(ctx->c_p, notify_proc); @@ -2306,9 +2325,6 @@ erts_dist_command(Port *prt, int reds_limit) ob->extp = erts_encode_ext_dist_header_finalize(ob->extp, dep->cache, flags); - if (!(flags & DFLAG_DIST_HDR_ATOM_CACHE)) - *--ob->extp = PASS_THROUGH; /* Old node; 'pass through' - needed */ ASSERT(&ob->data[0] <= ob->extp && ob->extp < ob->ext_endp); reds += ERTS_PORT_REDS_DIST_CMD_FINALIZE; preempt = reds > reds_limit; @@ -2346,21 +2362,18 @@ erts_dist_command(Port *prt, int reds_limit) int de_busy; int preempt = 0; while (oq.first && !preempt) { - ErtsDistOutputBuf *fob; - Uint size; - oq.first->extp - = erts_encode_ext_dist_header_finalize(oq.first->extp, - dep->cache, - flags); - reds += ERTS_PORT_REDS_DIST_CMD_FINALIZE; - if (!(flags & DFLAG_DIST_HDR_ATOM_CACHE)) - *--oq.first->extp = PASS_THROUGH; /* Old node; 'pass through' - needed */ - ASSERT(&oq.first->data[0] <= oq.first->extp - && oq.first->extp < oq.first->ext_endp); - size = (*send)(prt, oq.first); + ErtsDistOutputBuf *fob; + Uint size; + oq.first->extp + = erts_encode_ext_dist_header_finalize(oq.first->extp, + dep->cache, + flags); + reds += ERTS_PORT_REDS_DIST_CMD_FINALIZE; + ASSERT(&oq.first->data[0] <= oq.first->extp + && oq.first->extp < oq.first->ext_endp); + size = (*send)(prt, oq.first); erts_atomic64_inc_nob(&dep->out); - esdp->io.out += (Uint64) size; + esdp->io.out += (Uint64) size; #ifdef ERTS_RAW_DIST_MSG_DBG erts_fprintf(stderr, ">> "); bw(oq.first->extp, size); @@ -2482,11 +2495,13 @@ erts_dist_command(Port *prt, int reds_limit) foq.first = NULL; foq.last = NULL; +/* SVERK Hmmm.... #ifdef DEBUG erts_mtx_lock(&dep->qlock); ASSERT(erts_atomic_read_nob(&dep->qsize) == obufsize); erts_mtx_unlock(&dep->qlock); #endif +*/ } else { if (oq.first) { @@ -3218,6 +3233,8 @@ BIF_RETTYPE setnode_3(BIF_ALIST_3) ErtsProcLocks proc_unlock = 0; Process *proc; Port *pp = NULL; + Eterm notify_proc; + erts_aint32_t qflgs; /* * Check and pick out arguments @@ -3245,15 +3262,14 @@ BIF_RETTYPE setnode_3(BIF_ALIST_3) if (!is_atom(ic) || !is_atom(oc)) goto badarg; - /* DFLAG_EXTENDED_REFERENCES is compulsory from R9 and forward */ - if (!(DFLAG_EXTENDED_REFERENCES & flags)) { + if (~flags & DFLAG_DIST_MANDATORY) { erts_dsprintf_buf_t *dsbufp = erts_create_logger_dsbuf(); erts_dsprintf(dsbufp, "%T", BIF_P->common.id); if (BIF_P->common.u.alive.reg) erts_dsprintf(dsbufp, " (%T)", BIF_P->common.u.alive.reg->name); erts_dsprintf(dsbufp, " attempted to enable connection to node %T " - "which is not able to handle extended references.\n", + "which does not support all mandatory capabilities.\n", BIF_ARG_1); erts_send_error_to_logger(BIF_P->group_leader, dsbufp); goto badarg; @@ -3376,7 +3392,8 @@ BIF_RETTYPE setnode_3(BIF_ALIST_3) dep->creation = 0; #ifdef DEBUG - ASSERT(erts_atomic_read_nob(&dep->qsize) == 0); + ASSERT(erts_atomic_read_nob(&dep->qsize) == 0 + || (dep->status & ERTS_DE_SFLG_PENDING)); #endif if (flags & DFLAG_DIST_HDR_ATOM_CACHE) @@ -3384,7 +3401,26 @@ BIF_RETTYPE setnode_3(BIF_ALIST_3) erts_set_dist_entry_connected(dep, BIF_ARG_2, flags); + notify_proc = NIL; + if (erts_atomic_read_nob(&dep->qsize)) { + if (is_internal_port(dep->cid)) { + erts_schedule_dist_command(NULL, dep); + } + else { + qflgs = erts_atomic32_read_nob(&dep->qflgs); + if (qflgs & ERTS_DE_QFLG_REQ_INFO) { + qflgs = erts_atomic32_read_band_mb(&dep->qflgs, + ~ERTS_DE_QFLG_REQ_INFO); + if (qflgs & ERTS_DE_QFLG_REQ_INFO) { + notify_proc = dep->cid; + ASSERT(is_internal_pid(notify_proc)); + } + } + } + } erts_de_rwunlock(dep); + if (is_internal_pid(notify_proc)) + notify_dist_data(BIF_P, notify_proc); ERTS_BIF_PREP_RET(ret, erts_make_dhandle(BIF_P, dep)); @@ -3426,6 +3462,112 @@ BIF_RETTYPE setnode_3(BIF_ALIST_3) goto done; } +BIF_RETTYPE new_connection_id_1(BIF_ALIST_1) +{ + DistEntry* dep; + Uint32 conn_id; + + if (is_not_atom(BIF_ARG_1)) { + BIF_ERROR(BIF_P, BADARG); + } + dep = erts_find_or_insert_dist_entry(BIF_ARG_1); + ASSERT(dep != erts_this_dist_entry); /* SVERK: What to do? */ + + erts_de_rwlock(dep); + + if (ERTS_DE_IS_CONNECTED(dep) || dep->status & ERTS_DE_SFLG_PENDING) + conn_id = dep->connection_id; + else if (dep->status == 0) { + dep->status = ERTS_DE_SFLG_PENDING; + dep->flags = DFLAG_DIST_MANDATORY | DFLAG_PENDING_CONNECTION; + dep->connection_id++; + dep->connection_id &= ERTS_DIST_CON_ID_MASK; + conn_id = dep->connection_id; + } + else { + ASSERT(!"SVERK: What to do?"); + conn_id = dep->connection_id; + } + erts_de_rwunlock(dep); + BIF_RET(make_small(conn_id)); +} + +BIF_RETTYPE abort_connection_id_2(BIF_ALIST_2) +{ + DistEntry* dep; + + if (is_not_atom(BIF_ARG_1) || is_not_small(BIF_ARG_2)) { + BIF_ERROR(BIF_P, BADARG); + } + dep = erts_find_dist_entry(BIF_ARG_1); + ASSERT(dep != erts_this_dist_entry); /* SVERK: What to do? */ + + if (!dep) { + BIF_RET(am_false); + } + + erts_de_rwlock(dep); + + if (dep->status == ERTS_DE_SFLG_PENDING + && dep->connection_id == unsigned_val(BIF_ARG_2)) { + + NetExitsContext nec = {dep}; + ErtsLink *nlinks; + ErtsLink *node_links; + ErtsMonitor *monitors; + ErtsAtomCache *cache; + ErtsDistOutputBuf *obuf; + ErtsProcList *resume_procs; + Sint reds = 0; + + ASSERT(is_nil(dep->cid)); + + erts_de_links_lock(dep); + monitors = dep->monitors; + nlinks = dep->nlinks; + node_links = dep->node_links; + dep->monitors = NULL; + dep->nlinks = NULL; + dep->node_links = NULL; + erts_de_links_unlock(dep); + + cache = dep->cache; + dep->cache = NULL; + dep->status = 0; + dep->flags = 0; + erts_mtx_lock(&dep->qlock); + obuf = dep->out_queue.first; + dep->out_queue.first = NULL; + dep->out_queue.last = NULL; + ASSERT(!dep->tmp_out_queue.first); + ASSERT(!dep->finalized_out_queue.first); + resume_procs = get_suspended_on_de(dep, ERTS_DE_QFLGS_ALL); + erts_mtx_unlock(&dep->qlock); + erts_atomic_set_nob(&dep->dist_cmd_scheduled, 0); + dep->send = NULL; + erts_de_rwunlock(dep); + + erts_sweep_monitors(monitors, &doit_monitor_net_exits, &nec); + erts_sweep_links(nlinks, &doit_link_net_exits, &nec); + erts_sweep_links(node_links, &doit_node_link_net_exits, &nec); + + if (resume_procs) { + int resumed = erts_resume_processes(resume_procs); + reds += resumed*ERTS_PORT_REDS_DIST_CMD_RESUMED; + } + + delete_cache(cache); + + free_de_out_queues(dep, obuf); + + BIF_RET2(am_true, reds); + } + + erts_de_rwunlock(dep); + + BIF_RET(am_false); +} + /**********************************************************************/ /* dist_exit(Local, Term, Remote) -> Bool */ diff --git a/erts/emulator/beam/dist.h b/erts/emulator/beam/dist.h index d4765c50b8..26432b21e9 100644 --- a/erts/emulator/beam/dist.h +++ b/erts/emulator/beam/dist.h @@ -45,6 +45,13 @@ #define DFLAG_MAP_TAG 0x20000 #define DFLAG_BIG_CREATION 0x40000 #define DFLAG_SEND_SENDER 0x80000 +#define DFLAG_PENDING_CONNECTION 0x100000 + +/* Mandatory flags for distribution (sync with dist_util.erl) */ +#define DFLAG_DIST_MANDATORY (DFLAG_EXTENDED_REFERENCES \ + | DFLAG_EXTENDED_PIDS_PORTS \ + | DFLAG_UTF8_ATOMS \ + | DFLAG_NEW_FUN_TAGS) /* All flags that should be enabled when term_to_binary/1 is used. */ #define TERM_TO_BINARY_DFLAGS (DFLAG_EXTENDED_REFERENCES \ @@ -98,6 +105,7 @@ typedef enum { typedef struct { Process *proc; DistEntry *dep; + Eterm node; /* used if dep == NULL */ Eterm cid; Eterm connection_id; int no_suspend; @@ -117,14 +125,10 @@ extern int erts_is_alive; /* * erts_dsig_prepare() prepares a send of a distributed signal. - * One of the values defined below are returned. If the returned - * value is another than ERTS_DSIG_PREP_CONNECTED, the - * distributed signal cannot be sent before appropriate actions - * have been taken. Appropriate actions would typically be setting - * up the connection. + * One of the values defined below are returned. */ -/* Connected; signal can be sent. */ +/* Connected; signals can be enqueued and sent. */ #define ERTS_DSIG_PREP_CONNECTED 0 /* Not connected; connection needs to be set up. */ #define ERTS_DSIG_PREP_NOT_CONNECTED 1 @@ -132,11 +136,15 @@ extern int erts_is_alive; #define ERTS_DSIG_PREP_WOULD_SUSPEND 2 /* System not alive (distributed) */ #define ERTS_DSIG_PREP_NOT_ALIVE 3 +/* Pending connection; signals can be enqueued */ +#define ERTS_DSIG_PREP_PENDING 4 ERTS_GLB_INLINE int erts_dsig_prepare(ErtsDSigData *, - DistEntry *, + DistEntry **, Process *, + ErtsProcLocks, ErtsDSigPrepLock, + int, int); ERTS_GLB_INLINE @@ -146,29 +154,100 @@ void erts_schedule_dist_command(Port *, DistEntry *); ERTS_GLB_INLINE int erts_dsig_prepare(ErtsDSigData *dsdp, - DistEntry *dep, + DistEntry **depp, Process *proc, + ErtsProcLocks proc_locks, ErtsDSigPrepLock dspl, - int no_suspend) + int no_suspend, + int connect) { - int failure; + DistEntry* dep = *depp; + int res; + if (!erts_is_alive) return ERTS_DSIG_PREP_NOT_ALIVE; - if (!dep) - return ERTS_DSIG_PREP_NOT_CONNECTED; + if (!dep) { + if (!connect) + return ERTS_DSIG_PREP_NOT_CONNECTED; + + dep = erts_find_or_insert_dist_entry(dsdp->node); + ASSERT(dep != erts_this_dist_entry); /* SVERK: What to do? */ + } + +#ifdef ERTS_ENABLE_LOCK_CHECK + if (connect) { + erts_proc_lc_might_unlock(proc, proc_locks); + } +#endif + +retry: if (dspl == ERTS_DSP_RWLOCK) erts_de_rwlock(dep); else erts_de_rlock(dep); - if (ERTS_DE_IS_NOT_CONNECTED(dep)) { - failure = ERTS_DSIG_PREP_NOT_CONNECTED; + + if (ERTS_DE_IS_CONNECTED(dep)) { + res = ERTS_DSIG_PREP_CONNECTED; + } + else if (dep->status & ERTS_DE_SFLG_PENDING) { + res = ERTS_DSIG_PREP_PENDING; + } + else if (dep->status & ERTS_DE_SFLG_EXITING) { + /* SVERK is this ok, or should we trigger another connection setup */ + res = ERTS_DSIG_PREP_NOT_CONNECTED; + goto fail; + } + else if (connect) { + ASSERT(dep->status == 0); + if (dspl != ERTS_DSP_RWLOCK) { + erts_de_runlock(dep); + erts_de_rwlock(dep); + } + if (dep->status == 0) { + Process* net_kernel; + ErtsProcLocks nk_locks = ERTS_PROC_LOCK_MSGQ; + Eterm *hp; + ErlOffHeap *ohp; + ErtsMessage *mp; + Eterm msg, conn_id; + + dep->status = ERTS_DE_SFLG_PENDING; + dep->flags = DFLAG_DIST_MANDATORY | DFLAG_PENDING_CONNECTION; + dep->connection_id++; + dep->connection_id &= ERTS_DIST_CON_ID_MASK; + conn_id = make_small(dep->connection_id); + erts_de_rwunlock(dep); + + net_kernel = erts_whereis_process(proc, proc_locks, + am_net_kernel, nk_locks, 0); + if (!net_kernel) { + if (!*depp) { + erts_deref_dist_entry(dep); + } + return ERTS_DSIG_PREP_NOT_ALIVE; + } + + /* Send {auto_connect, Node, ConnId} to net_kernel */ + mp = erts_alloc_message_heap(net_kernel, &nk_locks, 4, &hp, &ohp); + msg = TUPLE3(hp, am_auto_connect, dep->sysname, conn_id); + erts_queue_message(net_kernel, nk_locks, mp, msg, proc->common.id); + erts_proc_unlock(net_kernel, nk_locks); + } + else + erts_de_rwunlock(dep); + goto retry; + } + else { + ASSERT(dep->status == 0); + res = ERTS_DSIG_PREP_NOT_CONNECTED; goto fail; } + if (no_suspend) { - if (erts_atomic32_read_acqb(&dep->qflgs) & ERTS_DE_QFLG_BUSY) { - failure = ERTS_DSIG_PREP_WOULD_SUSPEND; + if (erts_atomic32_read_acqb(&dep->qflgs) & ERTS_DE_QFLG_BUSY) { + res = ERTS_DSIG_PREP_WOULD_SUSPEND; goto fail; - } + } } dsdp->proc = proc; dsdp->dep = dep; @@ -177,15 +256,15 @@ erts_dsig_prepare(ErtsDSigData *dsdp, dsdp->no_suspend = no_suspend; if (dspl == ERTS_DSP_NO_LOCK) erts_de_runlock(dep); - return ERTS_DSIG_PREP_CONNECTED; + *depp = dep; + return res; fail: if (dspl == ERTS_DSP_RWLOCK) erts_de_rwunlock(dep); else erts_de_runlock(dep); - return failure; - + return res; } ERTS_GLB_INLINE @@ -346,11 +425,12 @@ struct erts_dsig_send_context { typedef struct { int suspend; + int connect; Eterm ctl_heap[6]; ErtsDSigData dsd; - DistEntry* dep_to_deref; DistEntry *dep; + int deref_dep; struct erts_dsig_send_context dss; Eterm return_term; diff --git a/erts/emulator/beam/erl_node_tables.c b/erts/emulator/beam/erl_node_tables.c index 9931686cbe..dd607f438e 100644 --- a/erts/emulator/beam/erl_node_tables.c +++ b/erts/emulator/beam/erl_node_tables.c @@ -597,6 +597,8 @@ erts_set_dist_entry_not_connected(DistEntry *dep) void erts_set_dist_entry_connected(DistEntry *dep, Eterm cid, Uint flags) { + erts_aint32_t set_qflgs; + ERTS_LC_ASSERT(erts_lc_is_de_rwlocked(dep)); erts_rwmtx_rwlock(&erts_dist_table_rwmtx); @@ -619,22 +621,26 @@ erts_set_dist_entry_connected(DistEntry *dep, Eterm cid, Uint flags) ASSERT(erts_no_of_not_connected_dist_entries > 0); erts_no_of_not_connected_dist_entries--; + if (dep->status & ERTS_DE_SFLG_PENDING) { + dep->status &= ~ERTS_DE_SFLG_PENDING; + } else { + dep->connection_id++; + dep->connection_id &= ERTS_DIST_CON_ID_MASK; + } dep->status |= ERTS_DE_SFLG_CONNECTED; - dep->flags = flags; + dep->flags = flags & ~DFLAG_PENDING_CONNECTION; dep->cid = cid; erts_atomic_set_nob(&dep->input_handler, (erts_aint_t) cid); - dep->connection_id++; - dep->connection_id &= ERTS_DIST_EXT_CON_ID_MASK; dep->prev = NULL; erts_atomic64_set_nob(&dep->in, 0); erts_atomic64_set_nob(&dep->out, 0); - erts_atomic32_set_nob(&dep->qflgs, - (is_internal_port(cid) - ? ERTS_DE_QFLG_PORT_CTRL - : ERTS_DE_QFLG_PROC_CTRL)); + set_qflgs = (is_internal_port(cid) ? + ERTS_DE_QFLG_PORT_CTRL : ERTS_DE_QFLG_PROC_CTRL); + erts_atomic32_read_bor_nob(&dep->qflgs, set_qflgs); + if(flags & DFLAG_PUBLISHED) { dep->next = erts_visible_dist_entries; if(erts_visible_dist_entries) { diff --git a/erts/emulator/beam/erl_node_tables.h b/erts/emulator/beam/erl_node_tables.h index 2cfd18f22f..d012f4b2cf 100644 --- a/erts/emulator/beam/erl_node_tables.h +++ b/erts/emulator/beam/erl_node_tables.h @@ -57,8 +57,9 @@ #define ERST_INTERNAL_CHANNEL_NO 0 -#define ERTS_DE_SFLG_CONNECTED (((Uint32) 1) << 0) -#define ERTS_DE_SFLG_EXITING (((Uint32) 1) << 1) +#define ERTS_DE_SFLG_PENDING (((Uint32) 1) << 0) +#define ERTS_DE_SFLG_CONNECTED (((Uint32) 1) << 1) +#define ERTS_DE_SFLG_EXITING (((Uint32) 1) << 2) #define ERTS_DE_QFLG_BUSY (((erts_aint32_t) 1) << 0) #define ERTS_DE_QFLG_EXIT (((erts_aint32_t) 1) << 1) @@ -106,8 +107,6 @@ struct ErtsProcList_; * unlock mutexes with higher numbers before mutexes with higher numbers. */ -struct erl_link; - typedef struct dist_entry_ { HashBucket hash_bucket; /* Hash bucket */ struct dist_entry_ *next; /* Next entry in dist_table (not sorted) */ diff --git a/erts/emulator/beam/erl_process.c b/erts/emulator/beam/erl_process.c index 3c0a126fe2..2d13cd92b2 100644 --- a/erts/emulator/beam/erl_process.c +++ b/erts/emulator/beam/erl_process.c @@ -12659,9 +12659,11 @@ static void doit_exit_monitor(ErtsMonitor *mon, void *vpcontext) erts_de_links_unlock(dep); if (rmon) { ErtsDSigData dsd; - int code = erts_dsig_prepare(&dsd, dep, NULL, - ERTS_DSP_NO_LOCK, 0); - if (code == ERTS_DSIG_PREP_CONNECTED) { + int code = erts_dsig_prepare(&dsd, &dep, NULL, 0, + ERTS_DSP_NO_LOCK, 0, 0); + if (code == ERTS_DSIG_PREP_CONNECTED || + code == ERTS_DSIG_PREP_PENDING) { + code = erts_dsig_send_demonitor(&dsd, rmon->u.pid, mon->name, @@ -12705,9 +12707,11 @@ static void doit_exit_monitor(ErtsMonitor *mon, void *vpcontext) erts_de_links_unlock(dep); if (rmon) { ErtsDSigData dsd; - int code = erts_dsig_prepare(&dsd, dep, NULL, - ERTS_DSP_NO_LOCK, 0); - if (code == ERTS_DSIG_PREP_CONNECTED) { + int code = erts_dsig_prepare(&dsd, &dep, NULL, 0, + ERTS_DSP_NO_LOCK, 0, 0); + if (code == ERTS_DSIG_PREP_CONNECTED || + code == ERTS_DSIG_PREP_PENDING) { + code = erts_dsig_send_demonitor(&dsd, rmon->u.pid, mon->u.pid, @@ -12764,8 +12768,8 @@ static void doit_exit_monitor(ErtsMonitor *mon, void *vpcontext) erts_de_links_unlock(dep); if (rmon) { ErtsDSigData dsd; - int code = erts_dsig_prepare(&dsd, dep, NULL, - ERTS_DSP_NO_LOCK, 0); + int code = erts_dsig_prepare(&dsd, &dep, NULL, 0, + ERTS_DSP_NO_LOCK, 0, 0); if (code == ERTS_DSIG_PREP_CONNECTED) { code = erts_dsig_send_m_exit(&dsd, mon->u.pid, @@ -12887,14 +12891,18 @@ static void doit_exit_link(ErtsLink *lnk, void *vpcontext) int code; ErtsDistLinkData dld; erts_remove_dist_link(&dld, p->common.id, item, dep); - erts_proc_lock(p, ERTS_PROC_LOCK_MAIN); - code = erts_dsig_prepare(&dsd, dep, p, ERTS_DSP_NO_LOCK, 0); - if (code == ERTS_DSIG_PREP_CONNECTED) { - code = erts_dsig_send_exit_tt(&dsd, p->common.id, item, - reason, SEQ_TRACE_TOKEN(p)); - ASSERT(code == ERTS_DSIG_SEND_OK); - } - erts_proc_unlock(p, ERTS_PROC_LOCK_MAIN); + if (dld.d_lnk) { + erts_proc_lock(p, ERTS_PROC_LOCK_MAIN); + code = erts_dsig_prepare(&dsd, &dep, p, 0, ERTS_DSP_NO_LOCK, 0, 0); + if (code == ERTS_DSIG_PREP_CONNECTED || + code == ERTS_DSIG_PREP_PENDING) { + + code = erts_dsig_send_exit_tt(&dsd, p->common.id, item, + reason, SEQ_TRACE_TOKEN(p)); + ASSERT(code == ERTS_DSIG_SEND_OK); + } + erts_proc_unlock(p, ERTS_PROC_LOCK_MAIN); + } erts_destroy_dist_link(&dld); } } diff --git a/erts/emulator/beam/external.c b/erts/emulator/beam/external.c index 3b851087d1..c49e2f617a 100644 --- a/erts/emulator/beam/external.c +++ b/erts/emulator/beam/external.c @@ -349,6 +349,8 @@ byte *erts_encode_ext_dist_header_setup(byte *ctl_ext, ErtsAtomCacheMap *acmp) } } +#define PASS_THROUGH 'p' /* This code should go */ + byte *erts_encode_ext_dist_header_finalize(byte *ext, ErtsAtomCache *cache, Uint32 dflags) { byte *ip; @@ -358,9 +360,33 @@ byte *erts_encode_ext_dist_header_finalize(byte *ext, ErtsAtomCache *cache, Uint int long_atoms; register byte *ep = ext; ASSERT(dflags & DFLAG_UTF8_ATOMS); - ASSERT(ep[0] == VERSION_MAGIC); - if (ep[1] != DIST_HEADER) - return ext; + + if (ep[0] != VERSION_MAGIC) { + ASSERT(ep[0] == SMALL_TUPLE_EXT || ep[0] == LARGE_TUPLE_EXT); + if (dflags & DFLAG_DIST_HDR_ATOM_CACHE) { + /* + * Encoded without atom cache (toward pending connection) + * but receiver wants dist header. Let's prepend an empty one. + */ + *--ep = 0; /* NumberOfAtomCacheRefs */ + *--ep = DIST_HEADER; + *--ep = VERSION_MAGIC; + } + else { + /* Node without atom cache, 'pass through' needed */ + + ASSERT(!"SVERK: Must insert VERSION_MAGIC's"); + *--ep = PASS_THROUGH; + } + return ep; + } + else if (ep[1] != DIST_HEADER) { + ASSERT(ep[1] == SMALL_TUPLE_EXT || ep[1] == LARGE_TUPLE_EXT); + ASSERT(!(dflags & DFLAG_DIST_HDR_ATOM_CACHE)); + /* Node without atom cache, 'pass through' needed */ + *--ep = PASS_THROUGH; + return ep; + } dist_hdr_flags = ep[2]; long_atoms = ERTS_DIST_HDR_LONG_ATOMS_FLG & ((int) dist_hdr_flags); @@ -511,7 +537,7 @@ int erts_encode_dist_ext_size_int(Eterm term, struct erts_dsig_send_context* ctx return -1; } else { #ifndef ERTS_DEBUG_USE_DIST_SEP - if (!(ctx->flags & DFLAG_DIST_HDR_ATOM_CACHE)) + if (!(ctx->flags & (DFLAG_DIST_HDR_ATOM_CACHE | DFLAG_PENDING_CONNECTION))) #endif sz++ /* VERSION_MAGIC */; @@ -543,7 +569,7 @@ int erts_encode_dist_ext(Eterm term, byte **ext, Uint32 flags, ErtsAtomCacheMap { if (!ctx || !ctx->wstack.wstart) { #ifndef ERTS_DEBUG_USE_DIST_SEP - if (!(flags & DFLAG_DIST_HDR_ATOM_CACHE)) + if (!(flags & (DFLAG_DIST_HDR_ATOM_CACHE | DFLAG_PENDING_CONNECTION))) #endif *(*ext)++ = VERSION_MAGIC; } @@ -644,12 +670,11 @@ erts_prepare_dist_ext(ErtsDistExternal *edep, erts_de_rlock(dep); - if ((dep->status & (ERTS_DE_SFLG_EXITING|ERTS_DE_SFLG_CONNECTED)) - != ERTS_DE_SFLG_CONNECTED) { + if (dep->status != ERTS_DE_SFLG_CONNECTED && + dep->status != ERTS_DE_SFLG_PENDING) { erts_de_runlock(dep); return ERTS_PREP_DIST_EXT_CLOSED; } - if (dep->flags & DFLAG_DIST_HDR_ATOM_CACHE) edep->flags |= ERTS_DIST_EXT_DFLAG_HDR; diff --git a/erts/emulator/beam/external.h b/erts/emulator/beam/external.h index d6416edbc3..49950c4aad 100644 --- a/erts/emulator/beam/external.h +++ b/erts/emulator/beam/external.h @@ -117,7 +117,7 @@ typedef struct { #define ERTS_DIST_EXT_ATOM_TRANS_TAB ((Uint32) 0x2) #define ERTS_DIST_EXT_BTT_SAFE ((Uint32) 0x4) -#define ERTS_DIST_CON_ID_MASK ((Uint32) 0x00ffffff) +#define ERTS_DIST_CON_ID_MASK ((Uint32) 0x00ffffff) /* also in net_kernel.erl */ typedef struct { DistEntry *dep; diff --git a/erts/emulator/beam/io.c b/erts/emulator/beam/io.c index 85013af3ad..6cd1aa5e79 100644 --- a/erts/emulator/beam/io.c +++ b/erts/emulator/beam/io.c @@ -3829,11 +3829,12 @@ static void sweep_one_link(ErtsLink *lnk, void *vpsc) ErtsDistLinkData dld; ErtsDSigData dsd; int code; - code = erts_dsig_prepare(&dsd, dep, NULL, ERTS_DSP_NO_LOCK, 0); + code = erts_dsig_prepare(&dsd, &dep, NULL, 0, ERTS_DSP_NO_LOCK, 0, 0); switch (code) { case ERTS_DSIG_PREP_NOT_ALIVE: case ERTS_DSIG_PREP_NOT_CONNECTED: break; + case ERTS_DSIG_PREP_PENDING: case ERTS_DSIG_PREP_CONNECTED: erts_remove_dist_link(&dld, port_id, lnk->pid, dep); erts_destroy_dist_link(&dld); -- cgit v1.2.3 From 3dda5298322a76f2787c6c4dd1cce78416c20dc3 Mon Sep 17 00:00:00 2001 From: Sverker Eriksson Date: Wed, 16 Aug 2017 16:56:12 +0200 Subject: erts: Async auto-connect for monitor_node Removed distribution_SUITE:applied_monitor_node as it seems to test apply of trapping BIF and monitor_node does not trap anymore. --- erts/emulator/beam/dist.c | 124 ++++++++++++++++++++---------- erts/emulator/test/distribution_SUITE.erl | 24 +----- erts/emulator/test/erl_link_SUITE.erl | 13 ++-- 3 files changed, 94 insertions(+), 67 deletions(-) (limited to 'erts/emulator') diff --git a/erts/emulator/beam/dist.c b/erts/emulator/beam/dist.c index 0fa399cf53..724cb6b24d 100644 --- a/erts/emulator/beam/dist.c +++ b/erts/emulator/beam/dist.c @@ -3789,11 +3789,17 @@ monitor_node(Process* p, Eterm Node, Eterm Bool, Eterm Options) DistEntry *dep; ErtsLink *lnk; Eterm l; + int async_connect = 1; for (l = Options; l != NIL && is_list(l); l = CDR(list_val(l))) { Eterm t = CAR(list_val(l)); - /* allow_passive_connect the only available option right now */ - if (t != am_allow_passive_connect) { + if (t == am_allow_passive_connect) { + /* + * Handle this horrible feature by falling back on old synchronous + * auto-connect (if needed) + */ + async_connect = 0; + } else { BIF_ERROR(p, BADARG); } } @@ -3807,47 +3813,85 @@ monitor_node(Process* p, Eterm Node, Eterm Bool, Eterm Options) && (Node != erts_this_node->sysname))) { BIF_ERROR(p, BADARG); } - dep = erts_sysname_to_connected_dist_entry(Node); - if (!dep) { - do_trap: - BIF_TRAP3(dmonitor_node_trap, p, Node, Bool, Options); - } - if (dep == erts_this_dist_entry) - goto done; - - erts_proc_lock(p, ERTS_PROC_LOCK_LINK); - erts_de_rlock(dep); - if (ERTS_DE_IS_NOT_CONNECTED(dep)) { - erts_proc_unlock(p, ERTS_PROC_LOCK_LINK); - erts_de_runlock(dep); - goto do_trap; - } - erts_de_links_lock(dep); - erts_de_runlock(dep); if (Bool == am_true) { - ASSERT(dep->cid != NIL); - lnk = erts_add_or_lookup_link(&(dep->node_links), LINK_NODE, - p->common.id); - ++ERTS_LINK_REFC(lnk); - lnk = erts_add_or_lookup_link(&ERTS_P_LINKS(p), LINK_NODE, Node); - ++ERTS_LINK_REFC(lnk); - } - else { - lnk = erts_lookup_link(dep->node_links, p->common.id); - if (lnk != NULL) { - if ((--ERTS_LINK_REFC(lnk)) == 0) { - erts_destroy_link(erts_remove_link(&(dep->node_links), - p->common.id)); - } - } - lnk = erts_lookup_link(ERTS_P_LINKS(p), Node); - if (lnk != NULL) { - if ((--ERTS_LINK_REFC(lnk)) == 0) { - erts_destroy_link(erts_remove_link(&ERTS_P_LINKS(p), - Node)); + ErtsDSigData dsd; + dsd.node = Node; + dep = erts_find_or_insert_dist_entry(Node); + if (dep == erts_this_dist_entry) + goto done; + + erts_proc_lock(p, ERTS_PROC_LOCK_LINK); + + switch (erts_dsig_prepare(&dsd, &dep, p, + (ERTS_PROC_LOCK_MAIN | ERTS_PROC_LOCK_LINK), + ERTS_DSP_RLOCK, 0, async_connect)) { + case ERTS_DSIG_PREP_NOT_ALIVE: + case ERTS_DSIG_PREP_NOT_CONNECTED: + /* Trap to either send 'nodedown' or do passive connection attempt */ + trap: + erts_proc_unlock(p, ERTS_PROC_LOCK_LINK); + erts_deref_dist_entry(dep); + BIF_TRAP3(dmonitor_node_trap, p, Node, Bool, Options); + case ERTS_DSIG_PREP_PENDING: + if (!async_connect) { + /* + * Pending connection may fail, so we must trap + * to ensure passive connection attempt + */ + erts_de_runlock(dep); + goto trap; } - } + /*fall through*/ + case ERTS_DSIG_PREP_CONNECTED: + erts_de_links_lock(dep); + erts_de_runlock(dep); + lnk = erts_add_or_lookup_link(&(dep->node_links), LINK_NODE, + p->common.id); + ++ERTS_LINK_REFC(lnk); + lnk = erts_add_or_lookup_link(&ERTS_P_LINKS(p), LINK_NODE, Node); + ++ERTS_LINK_REFC(lnk); + break; + default: + ERTS_ASSERT(! "Invalid dsig prepare result"); + } + } + else { /* Bool == false */ + dep = erts_sysname_to_connected_dist_entry(Node); + if (!dep) { + /* + * Before OTP-21 this case triggered auto-connect + * and a 'nodedown' message if that failed. + * Now it's a simple no-op which feels more reasonable. + */ + BIF_RET(am_true); + } + if (dep == erts_this_dist_entry) + goto done; + + erts_proc_lock(p, ERTS_PROC_LOCK_LINK); + erts_de_rlock(dep); + if (!(dep->status & (ERTS_DE_SFLG_PENDING | ERTS_DE_SFLG_CONNECTED))) { + erts_proc_unlock(p, ERTS_PROC_LOCK_LINK); + erts_de_runlock(dep); + goto done; + } + erts_de_links_lock(dep); + erts_de_runlock(dep); + lnk = erts_lookup_link(dep->node_links, p->common.id); + if (lnk != NULL) { + if ((--ERTS_LINK_REFC(lnk)) == 0) { + erts_destroy_link(erts_remove_link(&(dep->node_links), + p->common.id)); + } + } + lnk = erts_lookup_link(ERTS_P_LINKS(p), Node); + if (lnk != NULL) { + if ((--ERTS_LINK_REFC(lnk)) == 0) { + erts_destroy_link(erts_remove_link(&ERTS_P_LINKS(p), + Node)); + } + } } erts_de_links_unlock(dep); diff --git a/erts/emulator/test/distribution_SUITE.erl b/erts/emulator/test/distribution_SUITE.erl index c7d63f9feb..98c39dc6dd 100644 --- a/erts/emulator/test/distribution_SUITE.erl +++ b/erts/emulator/test/distribution_SUITE.erl @@ -41,7 +41,7 @@ local_send_small/1, local_send_big/1, local_send_legal/1, link_to_busy/1, exit_to_busy/1, lost_exit/1, link_to_dead/1, link_to_dead_new_node/1, - applied_monitor_node/1, ref_port_roundtrip/1, nil_roundtrip/1, + ref_port_roundtrip/1, nil_roundtrip/1, trap_bif_1/1, trap_bif_2/1, trap_bif_3/1, stop_dist/1, dist_auto_connect_never/1, dist_auto_connect_once/1, @@ -75,7 +75,7 @@ suite() -> all() -> [ping, {group, bulk_send}, {group, local_send}, link_to_busy, exit_to_busy, lost_exit, link_to_dead, - link_to_dead_new_node, applied_monitor_node, + link_to_dead_new_node, ref_port_roundtrip, nil_roundtrip, stop_dist, {group, trap_bif}, {group, dist_auto_connect}, dist_parallel_send, atom_roundtrip, unicode_atom_roundtrip, @@ -639,26 +639,6 @@ link_to_dead_new_node(Config) when is_list(Config) -> end, ok. -%% Test that monitor_node/2 works when applied. -applied_monitor_node(Config) when is_list(Config) -> - NonExisting = list_to_atom("__non_existing__@" ++ hostname()), - - %% Tail-recursive call to apply (since the node is non-existing, - %% there will be a trap). - - true = tail_apply(erlang, monitor_node, [NonExisting, true]), - [{nodedown, NonExisting}] = test_server:messages_get(), - - %% Ordinary call (with trap). - - true = apply(erlang, monitor_node, [NonExisting, true]), - [{nodedown, NonExisting}] = test_server:messages_get(), - - ok. - -tail_apply(M, F, A) -> - apply(M, F, A). - %% Test that sending a port or reference to another node and back again %% doesn't correct them in any way. ref_port_roundtrip(Config) when is_list(Config) -> diff --git a/erts/emulator/test/erl_link_SUITE.erl b/erts/emulator/test/erl_link_SUITE.erl index d8c5b663e3..a66ca7a57d 100644 --- a/erts/emulator/test/erl_link_SUITE.erl +++ b/erts/emulator/test/erl_link_SUITE.erl @@ -200,13 +200,16 @@ monitor_nodes(Config) when is_list(Config) -> monitor_node(A, true), check_monitor_node(self(), A, 1), check_monitor_node(self(), B, 3), + ok = receive {nodedown, C} -> ok after 1000 -> timeout end, + %%OTP-21: monitor_node(_,false) does not trigger auto-connect anymore + %% and therefore no nodedown if it fails. + %%ok = receive {nodedown, C} -> ok after 1000 -> timeout end, + ok = receive {nodedown, C} -> ok after 1000 -> timeout end, + ok = receive {nodedown, D} -> ok after 1000 -> timeout end, + ok = receive {nodedown, D} -> ok after 1000 -> timeout end, check_monitor_node(self(), C, 0), check_monitor_node(self(), D, 0), - receive {nodedown, C} -> ok end, - receive {nodedown, C} -> ok end, - receive {nodedown, C} -> ok end, - receive {nodedown, D} -> ok end, - receive {nodedown, D} -> ok end, + stop_node(A), receive {nodedown, A} -> ok end, check_monitor_node(self(), A, 0), -- cgit v1.2.3 From 3f5f22656a33aec1ba4ed01249732e32d08a8c50 Mon Sep 17 00:00:00 2001 From: Sverker Eriksson Date: Fri, 18 Aug 2017 16:56:21 +0200 Subject: erts: Async auto-connect for group_leader/2 --- erts/emulator/beam/bif.c | 6 ++-- erts/emulator/test/distribution_SUITE.erl | 50 +++++++++++++++++++++++++++++++ 2 files changed, 53 insertions(+), 3 deletions(-) (limited to 'erts/emulator') diff --git a/erts/emulator/beam/bif.c b/erts/emulator/beam/bif.c index 34f5afae78..2583a5f8d6 100644 --- a/erts/emulator/beam/bif.c +++ b/erts/emulator/beam/bif.c @@ -4422,16 +4422,16 @@ BIF_RETTYPE group_leader_2(BIF_ALIST_2) int code; ErtsDSigData dsd; dep = external_pid_dist_entry(BIF_ARG_2); + ERTS_ASSERT(dep); if(dep == erts_this_dist_entry) BIF_ERROR(BIF_P, BADARG); code = erts_dsig_prepare(&dsd, &dep, BIF_P, ERTS_PROC_LOCK_MAIN, - ERTS_DSP_NO_LOCK, 0, 0); + ERTS_DSP_NO_LOCK, 0, 1); switch (code) { case ERTS_DSIG_PREP_NOT_ALIVE: - BIF_RET(am_true); case ERTS_DSIG_PREP_NOT_CONNECTED: - BIF_TRAP2(dgroup_leader_trap, BIF_P, BIF_ARG_1, BIF_ARG_2); + BIF_RET(am_true); case ERTS_DSIG_PREP_PENDING: case ERTS_DSIG_PREP_CONNECTED: code = erts_dsig_send_group_leader(&dsd, BIF_ARG_1, BIF_ARG_2); diff --git a/erts/emulator/test/distribution_SUITE.erl b/erts/emulator/test/distribution_SUITE.erl index 98c39dc6dd..fa683f363f 100644 --- a/erts/emulator/test/distribution_SUITE.erl +++ b/erts/emulator/test/distribution_SUITE.erl @@ -35,8 +35,12 @@ -include_lib("common_test/include/ct.hrl"). +%-define(Line, erlang:display({line,?LINE}),). +-define(Line,). + -export([all/0, suite/0, groups/0, ping/1, bulk_send_small/1, + group_leader/1, bulk_send_big/1, bulk_send_bigbig/1, local_send_small/1, local_send_big/1, local_send_legal/1, link_to_busy/1, exit_to_busy/1, @@ -61,6 +65,7 @@ %% Internal exports. -export([sender/3, receiver2/2, dummy_waiter/0, dead_process/0, + group_leader_1/1, roundtrip/1, bounce/1, do_dist_auto_connect/1, inet_rpc_server/1, dist_parallel_sender/3, dist_parallel_receiver/0, dist_evil_parallel_receiver/0]). @@ -74,6 +79,7 @@ suite() -> all() -> [ping, {group, bulk_send}, {group, local_send}, + group_leader, link_to_busy, exit_to_busy, lost_exit, link_to_dead, link_to_dead_new_node, ref_port_roundtrip, nil_roundtrip, stop_dist, @@ -124,6 +130,50 @@ ping(Config) when is_list(Config) -> ok. +%% Test erlang:group_leader(_, ExternalPid), i.e. DOP_GROUP_LEADER +group_leader(Config) when is_list(Config) -> + ?Line Sock = start_relay_node(group_leader_1, []), + ?Line Sock2 = start_relay_node(group_leader_2, []), + try + ?Line Node2 = inet_rpc_nodename(Sock2), + ?Line {ok, ok} = do_inet_rpc(Sock, ?MODULE, group_leader_1, [Node2]) + after + ?Line stop_relay_node(Sock), + ?Line stop_relay_node(Sock2) + end, + ok. + +group_leader_1(Node2) -> + ?Line ExtPid = spawn(Node2, fun F() -> + receive {From, group_leader} -> + From ! {self(), group_leader, group_leader()} + end, + F() + end), + ?Line GL1 = self(), + ?Line group_leader(GL1, ExtPid), + ?Line ExtPid ! {self(), group_leader}, + ?Line {ExtPid, group_leader, GL1} = receive_one(), + + %% Kill connection and repeat test when group_leader/2 triggers auto-connect + ?Line net_kernel:monitor_nodes(true), + ?Line net_kernel:disconnect(Node2), + ?Line {nodedown, Node2} = receive_one(), + ?Line GL2 = spawn(fun() -> dummy end), + ?Line group_leader(GL2, ExtPid), + ?Line {nodeup, Node2} = receive_one(), + ?Line ExtPid ! {self(), group_leader}, + ?Line {ExtPid, group_leader, GL2} = receive_one(), + ok. + +receive_one() -> + receive M -> M after 1000 -> timeout end. + + + + + + bulk_send_small(Config) when is_list(Config) -> bulk_send(64, 32). -- cgit v1.2.3 From 17e198d6ee60f7dec9abfed272cf4226aea44535 Mon Sep 17 00:00:00 2001 From: Sverker Eriksson Date: Fri, 18 Aug 2017 19:56:02 +0200 Subject: erts: Async auto-connect for monitor/2 --- erts/emulator/beam/bif.c | 23 ++++++----------------- 1 file changed, 6 insertions(+), 17 deletions(-) (limited to 'erts/emulator') diff --git a/erts/emulator/beam/bif.c b/erts/emulator/beam/bif.c index 2583a5f8d6..2af61e6ebc 100644 --- a/erts/emulator/beam/bif.c +++ b/erts/emulator/beam/bif.c @@ -790,31 +790,20 @@ remote_monitor(Process *p, Eterm bifarg1, Eterm bifarg2, BIF_RETTYPE ret; int code; + ASSERT(dep); erts_proc_lock(p, ERTS_PROC_LOCK_LINK); code = erts_dsig_prepare(&dsd, &dep, p, (ERTS_PROC_LOCK_MAIN | ERTS_PROC_LOCK_LINK), - ERTS_DSP_RLOCK, 0, 0); + ERTS_DSP_RLOCK, 0, 1); switch (code) { - case ERTS_DSIG_PREP_PENDING: - /* - * Must wait for connection to know if node supports monitor. - * Damn these synchronous errors. - */ - erts_smp_de_runlock(dep); - /* fall through */ case ERTS_DSIG_PREP_NOT_ALIVE: case ERTS_DSIG_PREP_NOT_CONNECTED: erts_proc_unlock(p, ERTS_PROC_LOCK_LINK); ERTS_BIF_PREP_TRAP2(ret, dmonitor_p_trap, p, bifarg1, bifarg2); break; + case ERTS_DSIG_PREP_PENDING: case ERTS_DSIG_PREP_CONNECTED: - if (!(dep->flags & DFLAG_DIST_MONITOR) - || (byname && !(dep->flags & DFLAG_DIST_MONITOR_NAME))) { - erts_de_runlock(dep); - erts_proc_unlock(p, ERTS_PROC_LOCK_LINK); - ERTS_BIF_PREP_ERROR(ret, p, BADARG); - } - else { + { Eterm p_trgt, p_name, d_name, mon_ref; mon_ref = erts_make_ref(p); @@ -917,17 +906,17 @@ local_port: if (!erts_is_alive && remote_node != am_Noname) { goto badarg; /* Remote monitor from (this) undistributed node */ } - dep = erts_sysname_to_connected_dist_entry(remote_node); + dep = erts_find_or_insert_dist_entry(remote_node); if (dep == erts_this_dist_entry) { ret = local_name_monitor(BIF_P, BIF_ARG_1, name); } else { ret = remote_monitor(BIF_P, BIF_ARG_1, BIF_ARG_2, dep, name, 1); } + erts_deref_dist_entry(dep); } else { badarg: ERTS_BIF_PREP_ERROR(ret, BIF_P, BADARG); } - return ret; } -- cgit v1.2.3 From e8df66a934045a1b0dc1edf0a695b6dbcf4cca4b Mon Sep 17 00:00:00 2001 From: Sverker Eriksson Date: Mon, 24 Jul 2017 16:05:36 +0200 Subject: Remove obsolete erlang:dsend --- erts/emulator/beam/atom.names | 1 - erts/emulator/beam/dist.c | 11 +---------- erts/emulator/beam/dist.h | 2 -- 3 files changed, 1 insertion(+), 13 deletions(-) (limited to 'erts/emulator') diff --git a/erts/emulator/beam/atom.names b/erts/emulator/beam/atom.names index a534dd44fb..dd3e11dc26 100644 --- a/erts/emulator/beam/atom.names +++ b/erts/emulator/beam/atom.names @@ -231,7 +231,6 @@ atom dollar_endonly atom dotall atom driver atom driver_options -atom dsend atom dsend_continue_trap atom dunlink atom duplicate_bag diff --git a/erts/emulator/beam/dist.c b/erts/emulator/beam/dist.c index 724cb6b24d..7c44545de3 100644 --- a/erts/emulator/beam/dist.c +++ b/erts/emulator/beam/dist.c @@ -108,9 +108,6 @@ int erts_dist_buf_busy_limit; /* distribution trap functions */ -Export* dsend2_trap = NULL; -Export* dsend3_trap = NULL; -/*Export* dsend_nosuspend_trap = NULL;*/ Export* dlink_trap = NULL; Export* dunlink_trap = NULL; Export* dmonitor_node_trap = NULL; @@ -636,9 +633,6 @@ void init_dist(void) erts_atomic_init_nob(&no_caches, 0); /* Lookup/Install all references to trap functions */ - dsend2_trap = trap_function(am_dsend,2); - dsend3_trap = trap_function(am_dsend,3); - /* dsend_nosuspend_trap = trap_function(am_dsend_nosuspend,2);*/ dlink_trap = trap_function(am_dlink,1); dunlink_trap = trap_function(am_dunlink,1); dmonitor_node_trap = trap_function(am_dmonitor_node,3); @@ -3148,10 +3142,7 @@ BIF_RETTYPE setnode_2(BIF_ALIST_2) goto error; /* Check that all trap functions are defined !! */ - if (dsend2_trap->addressv[0] == NULL || - dsend3_trap->addressv[0] == NULL || - /* dsend_nosuspend_trap->address == NULL ||*/ - dlink_trap->addressv[0] == NULL || + if (dlink_trap->addressv[0] == NULL || dunlink_trap->addressv[0] == NULL || dmonitor_node_trap->addressv[0] == NULL || dgroup_leader_trap->addressv[0] == NULL || diff --git a/erts/emulator/beam/dist.h b/erts/emulator/beam/dist.h index 26432b21e9..8bb915c76b 100644 --- a/erts/emulator/beam/dist.h +++ b/erts/emulator/beam/dist.h @@ -86,8 +86,6 @@ #define DOP_SEND_SENDER_TT 23 /* distribution trap functions */ -extern Export* dsend2_trap; -extern Export* dsend3_trap; extern Export* dlink_trap; extern Export* dunlink_trap; extern Export* dmonitor_node_trap; -- cgit v1.2.3 From 5ad822ccfd841400bc44cb53acc6a0889ca3f128 Mon Sep 17 00:00:00 2001 From: Sverker Eriksson Date: Tue, 15 Aug 2017 16:26:34 +0200 Subject: Remove obsolete erlang:dlink/1, dunlink/1 and dist_exit/3 --- erts/emulator/beam/atom.names | 2 - erts/emulator/beam/bif.c | 9 ----- erts/emulator/beam/bif.tab | 2 - erts/emulator/beam/dist.c | 87 +------------------------------------------ erts/emulator/beam/dist.h | 2 - 5 files changed, 1 insertion(+), 101 deletions(-) (limited to 'erts/emulator') diff --git a/erts/emulator/beam/atom.names b/erts/emulator/beam/atom.names index dd3e11dc26..3c06c26ba0 100644 --- a/erts/emulator/beam/atom.names +++ b/erts/emulator/beam/atom.names @@ -222,7 +222,6 @@ atom dist_ctrl_put_data atom dist_data atom Div='/' atom div -atom dlink atom dmonitor_node atom dmonitor_p atom DollarDollar='$$' @@ -232,7 +231,6 @@ atom dotall atom driver atom driver_options atom dsend_continue_trap -atom dunlink atom duplicate_bag atom duplicated atom dupnames diff --git a/erts/emulator/beam/bif.c b/erts/emulator/beam/bif.c index 2af61e6ebc..8eb4d9e85b 100644 --- a/erts/emulator/beam/bif.c +++ b/erts/emulator/beam/bif.c @@ -1181,16 +1181,7 @@ BIF_RETTYPE unlink_1(BIF_ALIST_1) switch (code) { case ERTS_DSIG_PREP_NOT_ALIVE: case ERTS_DSIG_PREP_NOT_CONNECTED: -#if 1 BIF_RET(am_true); -#else - /* - * This is how we used to do it, but the link is obviously not - * active, so I see no point in setting up a connection. - * /Rickard - */ - BIF_TRAP1(dunlink_trap, BIF_P, BIF_ARG_1); -#endif case ERTS_DSIG_PREP_PENDING: case ERTS_DSIG_PREP_CONNECTED: diff --git a/erts/emulator/beam/bif.tab b/erts/emulator/beam/bif.tab index 66469a011d..5a591caac9 100644 --- a/erts/emulator/beam/bif.tab +++ b/erts/emulator/beam/bif.tab @@ -153,14 +153,12 @@ bif erlang:whereis/1 bif erlang:spawn_opt/1 bif erlang:setnode/2 bif erlang:setnode/3 -bif erlang:dist_exit/3 bif erlang:dist_get_stat/1 bif erlang:dist_ctrl_input_handler/2 bif erlang:dist_ctrl_put_data/2 bif erlang:dist_ctrl_get_data/1 bif erlang:dist_ctrl_get_data_notification/1 - # Static native functions in erts_internal bif erts_internal:port_info/1 bif erts_internal:port_info/2 diff --git a/erts/emulator/beam/dist.c b/erts/emulator/beam/dist.c index 7c44545de3..6e08c72289 100644 --- a/erts/emulator/beam/dist.c +++ b/erts/emulator/beam/dist.c @@ -108,8 +108,6 @@ int erts_dist_buf_busy_limit; /* distribution trap functions */ -Export* dlink_trap = NULL; -Export* dunlink_trap = NULL; Export* dmonitor_node_trap = NULL; Export* dgroup_leader_trap = NULL; Export* dexit_trap = NULL; @@ -633,8 +631,6 @@ void init_dist(void) erts_atomic_init_nob(&no_caches, 0); /* Lookup/Install all references to trap functions */ - dlink_trap = trap_function(am_dlink,1); - dunlink_trap = trap_function(am_dunlink,1); dmonitor_node_trap = trap_function(am_dmonitor_node,3); dgroup_leader_trap = trap_function(am_dgroup_leader,2); dexit_trap = trap_function(am_dexit, 2); @@ -3100,7 +3096,6 @@ int distribution_info(fmtfn_t to, void *arg) /* Called by break handler */ monitor_node -- turn on/off node monitoring node controller only: - dist_exit/3 -- send exit signals from remote to local process dist_link/2 -- link a remote process to a local dist_unlink/2 -- unlink a remote from a local ****************************************************************************/ @@ -3111,9 +3106,6 @@ int distribution_info(fmtfn_t to, void *arg) /* Called by break handler */ ** Set the node name of current node fail if node already is set. ** setnode(name@host, Creation) ** loads functions pointer to trap_functions from module erlang. - ** erlang:dsend/2 - ** erlang:dlink/1 - ** erlang:dunlink/1 ** erlang:dmonitor_node/3 ** erlang:dgroup_leader/2 ** erlang:dexit/2 @@ -3142,9 +3134,7 @@ BIF_RETTYPE setnode_2(BIF_ALIST_2) goto error; /* Check that all trap functions are defined !! */ - if (dlink_trap->addressv[0] == NULL || - dunlink_trap->addressv[0] == NULL || - dmonitor_node_trap->addressv[0] == NULL || + if (dmonitor_node_trap->addressv[0] == NULL || dgroup_leader_trap->addressv[0] == NULL || dmonitor_p_trap->addressv[0] == NULL || dexit_trap->addressv[0] == NULL) { @@ -3559,81 +3549,6 @@ BIF_RETTYPE abort_connection_id_2(BIF_ALIST_2) BIF_RET(am_false); } - -/**********************************************************************/ -/* dist_exit(Local, Term, Remote) -> Bool */ - -BIF_RETTYPE dist_exit_3(BIF_ALIST_3) -{ - Eterm local; - Eterm remote; - DistEntry *rdep; - - local = BIF_ARG_1; - remote = BIF_ARG_3; - - /* Check that remote is a remote process */ - if (is_not_external_pid(remote)) - goto error; - - rdep = external_dist_entry(remote); - - if(rdep == erts_this_dist_entry) - goto error; - - /* Check that local is local */ - if (is_internal_pid(local)) { - Process *lp; - ErtsProcLocks lp_locks; - if (BIF_P->common.id == local) { - lp_locks = ERTS_PROC_LOCKS_ALL; - lp = BIF_P; - erts_proc_lock(BIF_P, ERTS_PROC_LOCKS_ALL_MINOR); - } - else { - lp_locks = ERTS_PROC_LOCKS_XSIG_SEND; - lp = erts_pid2proc(BIF_P, ERTS_PROC_LOCK_MAIN, - local, lp_locks); - if (!lp) { - BIF_RET(am_true); /* ignore */ - } - } - - (void) erts_send_exit_signal(BIF_P, - remote, - lp, - &lp_locks, - BIF_ARG_2, - NIL, - NULL, - 0); - if (lp == BIF_P) - lp_locks &= ~ERTS_PROC_LOCK_MAIN; - erts_proc_unlock(lp, lp_locks); - if (lp == BIF_P) { - erts_aint32_t state = erts_atomic32_read_acqb(&BIF_P->state); - /* - * We may have exited current process and may have to take action. - */ - if (state & (ERTS_PSFLG_EXITING|ERTS_PSFLG_PENDING_EXIT)) { - if (state & ERTS_PSFLG_PENDING_EXIT) - erts_handle_pending_exit(BIF_P, ERTS_PROC_LOCK_MAIN); - ERTS_BIF_EXITED(BIF_P); - } - } - } - else if (is_external_pid(local) - && external_dist_entry(local) == erts_this_dist_entry) { - BIF_RET(am_true); /* ignore */ - } - else - goto error; - BIF_RET(am_true); - - error: - BIF_ERROR(BIF_P, BADARG); -} - /**********************************************************************/ /* node(Object) -> Node */ diff --git a/erts/emulator/beam/dist.h b/erts/emulator/beam/dist.h index 8bb915c76b..50e72ac42c 100644 --- a/erts/emulator/beam/dist.h +++ b/erts/emulator/beam/dist.h @@ -86,8 +86,6 @@ #define DOP_SEND_SENDER_TT 23 /* distribution trap functions */ -extern Export* dlink_trap; -extern Export* dunlink_trap; extern Export* dmonitor_node_trap; extern Export* dgroup_leader_trap; extern Export* dexit_trap; -- cgit v1.2.3 From da2935ce340cef5db1b5f589778eb20044796610 Mon Sep 17 00:00:00 2001 From: Sverker Eriksson Date: Tue, 15 Aug 2017 17:12:02 +0200 Subject: Remove obsolete erlang:dexit/2 --- erts/emulator/beam/atom.names | 1 - erts/emulator/beam/dist.c | 8 +------- erts/emulator/beam/dist.h | 1 - 3 files changed, 1 insertion(+), 9 deletions(-) (limited to 'erts/emulator') diff --git a/erts/emulator/beam/atom.names b/erts/emulator/beam/atom.names index 3c06c26ba0..d755b5393b 100644 --- a/erts/emulator/beam/atom.names +++ b/erts/emulator/beam/atom.names @@ -199,7 +199,6 @@ atom debug_flags atom decimals atom default atom delay_trap -atom dexit atom depth atom dgroup_leader atom dictionary diff --git a/erts/emulator/beam/dist.c b/erts/emulator/beam/dist.c index 6e08c72289..e36ea9ba94 100644 --- a/erts/emulator/beam/dist.c +++ b/erts/emulator/beam/dist.c @@ -110,7 +110,6 @@ int erts_dist_buf_busy_limit; /* distribution trap functions */ Export* dmonitor_node_trap = NULL; Export* dgroup_leader_trap = NULL; -Export* dexit_trap = NULL; Export* dmonitor_p_trap = NULL; /* local variables */ @@ -633,7 +632,6 @@ void init_dist(void) /* Lookup/Install all references to trap functions */ dmonitor_node_trap = trap_function(am_dmonitor_node,3); dgroup_leader_trap = trap_function(am_dgroup_leader,2); - dexit_trap = trap_function(am_dexit, 2); dmonitor_p_trap = trap_function(am_dmonitor_p, 2); dist_ctrl_put_data_trap = erts_export_put(am_erts_internal, am_dist_ctrl_put_data, @@ -3108,9 +3106,6 @@ int distribution_info(fmtfn_t to, void *arg) /* Called by break handler */ ** loads functions pointer to trap_functions from module erlang. ** erlang:dmonitor_node/3 ** erlang:dgroup_leader/2 - ** erlang:dexit/2 - ** -- are these needed ? - ** dexit/1 ***********************************************************************/ BIF_RETTYPE setnode_2(BIF_ALIST_2) @@ -3136,8 +3131,7 @@ BIF_RETTYPE setnode_2(BIF_ALIST_2) /* Check that all trap functions are defined !! */ if (dmonitor_node_trap->addressv[0] == NULL || dgroup_leader_trap->addressv[0] == NULL || - dmonitor_p_trap->addressv[0] == NULL || - dexit_trap->addressv[0] == NULL) { + dmonitor_p_trap->addressv[0] == NULL) { goto error; } diff --git a/erts/emulator/beam/dist.h b/erts/emulator/beam/dist.h index 50e72ac42c..339e4839ea 100644 --- a/erts/emulator/beam/dist.h +++ b/erts/emulator/beam/dist.h @@ -88,7 +88,6 @@ /* distribution trap functions */ extern Export* dmonitor_node_trap; extern Export* dgroup_leader_trap; -extern Export* dexit_trap; extern Export* dmonitor_p_trap; typedef enum { -- cgit v1.2.3 From 389e11b8b8a476ca73ca03a39ad7ec298dc99e83 Mon Sep 17 00:00:00 2001 From: Sverker Eriksson Date: Fri, 18 Aug 2017 17:08:27 +0200 Subject: Remove obsolete erlang:dgroup_leader --- erts/emulator/beam/atom.names | 1 - erts/emulator/beam/dist.c | 6 ------ erts/emulator/beam/dist.h | 1 - 3 files changed, 8 deletions(-) (limited to 'erts/emulator') diff --git a/erts/emulator/beam/atom.names b/erts/emulator/beam/atom.names index d755b5393b..75a70c3716 100644 --- a/erts/emulator/beam/atom.names +++ b/erts/emulator/beam/atom.names @@ -200,7 +200,6 @@ atom decimals atom default atom delay_trap atom depth -atom dgroup_leader atom dictionary atom dirty_bif_exception atom dirty_bif_result diff --git a/erts/emulator/beam/dist.c b/erts/emulator/beam/dist.c index e36ea9ba94..cd4125e404 100644 --- a/erts/emulator/beam/dist.c +++ b/erts/emulator/beam/dist.c @@ -109,7 +109,6 @@ int erts_dist_buf_busy_limit; /* distribution trap functions */ Export* dmonitor_node_trap = NULL; -Export* dgroup_leader_trap = NULL; Export* dmonitor_p_trap = NULL; /* local variables */ @@ -631,7 +630,6 @@ void init_dist(void) /* Lookup/Install all references to trap functions */ dmonitor_node_trap = trap_function(am_dmonitor_node,3); - dgroup_leader_trap = trap_function(am_dgroup_leader,2); dmonitor_p_trap = trap_function(am_dmonitor_p, 2); dist_ctrl_put_data_trap = erts_export_put(am_erts_internal, am_dist_ctrl_put_data, @@ -3103,9 +3101,6 @@ int distribution_info(fmtfn_t to, void *arg) /* Called by break handler */ /********************************************************************** ** Set the node name of current node fail if node already is set. ** setnode(name@host, Creation) - ** loads functions pointer to trap_functions from module erlang. - ** erlang:dmonitor_node/3 - ** erlang:dgroup_leader/2 ***********************************************************************/ BIF_RETTYPE setnode_2(BIF_ALIST_2) @@ -3130,7 +3125,6 @@ BIF_RETTYPE setnode_2(BIF_ALIST_2) /* Check that all trap functions are defined !! */ if (dmonitor_node_trap->addressv[0] == NULL || - dgroup_leader_trap->addressv[0] == NULL || dmonitor_p_trap->addressv[0] == NULL) { goto error; } diff --git a/erts/emulator/beam/dist.h b/erts/emulator/beam/dist.h index 339e4839ea..49f3eac340 100644 --- a/erts/emulator/beam/dist.h +++ b/erts/emulator/beam/dist.h @@ -87,7 +87,6 @@ /* distribution trap functions */ extern Export* dmonitor_node_trap; -extern Export* dgroup_leader_trap; extern Export* dmonitor_p_trap; typedef enum { -- cgit v1.2.3 From 9271fd39c48a121b6be79e7a44a524c883fafc41 Mon Sep 17 00:00:00 2001 From: Sverker Eriksson Date: Fri, 8 Sep 2017 17:11:20 +0200 Subject: erts: Let send(_,_,[noconnect]) enqueue msg on pending connection. The least bad behavior I think: * We cannot return 'noconnect' as caller might already have enqueued monitor/link that never triggers. * We cannot block waiting for connection as that can ruin latency when 'noconnect' is used to avoid blocking auto-connect (see gen_server and gen_statem). But there might be users getting more cases of bad latency waiting for a pendig connection, instead of a fast 'noconnect'. --- erts/emulator/beam/bif.c | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) (limited to 'erts/emulator') diff --git a/erts/emulator/beam/bif.c b/erts/emulator/beam/bif.c index 8eb4d9e85b..30278cbe36 100644 --- a/erts/emulator/beam/bif.c +++ b/erts/emulator/beam/bif.c @@ -2192,11 +2192,10 @@ do_send(Process *p, Eterm to, Eterm msg, Eterm *refp, ErtsSendContext *ctx) if (is_not_atom(tp[1]) || is_not_atom(tp[2])) return SEND_BADARG; - /* sysname_to_connected_dist_entry will return NULL if there - is no dist_entry or the dist_entry has no port, + /* erts_find_dist_entry will return NULL if there is no dist_entry but remote_send() will handle that. */ - dep = erts_sysname_to_connected_dist_entry(tp[2]); + dep = erts_find_dist_entry(tp[2]); if (dep == erts_this_dist_entry) { Eterm id; -- cgit v1.2.3 From 5857245406584b8bfcbdf0be450ad494a992d5c8 Mon Sep 17 00:00:00 2001 From: Sverker Eriksson Date: Fri, 8 Sep 2017 17:11:43 +0200 Subject: erts: Fix auto-connect toward erl_interface/jinterface --- erts/emulator/beam/dist.c | 63 +++++++++++++++++------------------- erts/emulator/beam/dist.h | 2 +- erts/emulator/beam/erl_node_tables.h | 1 + erts/emulator/beam/external.c | 44 +++++++++++++++++++------ erts/emulator/beam/external.h | 2 +- 5 files changed, 66 insertions(+), 46 deletions(-) (limited to 'erts/emulator') diff --git a/erts/emulator/beam/dist.c b/erts/emulator/beam/dist.c index cd4125e404..6d2e907f18 100644 --- a/erts/emulator/beam/dist.c +++ b/erts/emulator/beam/dist.c @@ -1879,33 +1879,32 @@ erts_dsig_send(ErtsDSigData *dsdp, struct erts_dsig_send_context* ctx) if (ctx->flags & DFLAG_DIST_HDR_ATOM_CACHE) { ctx->acmp = erts_get_atom_cache_map(ctx->c_p); - ctx->pass_through_size = 0; + ctx->max_finalize_prepend = 0; } else { ctx->acmp = NULL; - ctx->pass_through_size = 3; /* SVERK rename */ + ctx->max_finalize_prepend = 3; } #ifdef ERTS_DIST_MSG_DBG - erts_fprintf(stderr, ">>%s CTL: %T\n", ctx->pass_through_size ? "P" : " ", ctx->ctl); - if (is_value(ctx->msg)) - erts_fprintf(stderr, " MSG: %T\n", ctx->msg); + erts_fprintf(stderr, ">> CTL: %T\n", ctx->ctl); + if (is_value(ctx->msg)) + erts_fprintf(stderr, " MSG: %T\n", ctx->msg); #endif - ctx->data_size = ctx->pass_through_size; + ctx->data_size = ctx->max_finalize_prepend; erts_reset_atom_cache_map(ctx->acmp); erts_encode_dist_ext_size(ctx->ctl, ctx->flags, ctx->acmp, &ctx->data_size); - if (is_value(ctx->msg)) { - ctx->u.sc.wstack.wstart = NULL; - ctx->u.sc.flags = ctx->flags; - ctx->u.sc.level = 0; - ctx->phase = ERTS_DSIG_SEND_PHASE_MSG_SIZE; - } else { - ctx->phase = ERTS_DSIG_SEND_PHASE_ALLOC; - } - break; + if (is_non_value(ctx->msg)) { + ctx->phase = ERTS_DSIG_SEND_PHASE_ALLOC; + break; + } + ctx->u.sc.wstack.wstart = NULL; + ctx->u.sc.flags = ctx->flags; + ctx->u.sc.level = 0; + ctx->phase = ERTS_DSIG_SEND_PHASE_MSG_SIZE; case ERTS_DSIG_SEND_PHASE_MSG_SIZE: if (erts_encode_dist_ext_size_int(ctx->msg, ctx, &ctx->data_size)) { retval = ERTS_DSIG_SEND_CONTINUE; @@ -1920,23 +1919,24 @@ erts_dsig_send(ErtsDSigData *dsdp, struct erts_dsig_send_context* ctx) ctx->data_size += ctx->dhdr_ext_size; ctx->obuf = alloc_dist_obuf(ctx->data_size); - ctx->obuf->ext_endp = &ctx->obuf->data[0] + ctx->pass_through_size + ctx->dhdr_ext_size; + ctx->obuf->ext_endp = &ctx->obuf->data[0] + ctx->max_finalize_prepend + ctx->dhdr_ext_size; /* Encode internal version of dist header */ ctx->obuf->extp = erts_encode_ext_dist_header_setup(ctx->obuf->ext_endp, ctx->acmp); /* Encode control message */ erts_encode_dist_ext(ctx->ctl, &ctx->obuf->ext_endp, ctx->flags, ctx->acmp, NULL, NULL); - if (is_value(ctx->msg)) { - ctx->u.ec.flags = ctx->flags; - ctx->u.ec.level = 0; - ctx->u.ec.wstack.wstart = NULL; - ctx->phase = ERTS_DSIG_SEND_PHASE_MSG_ENCODE; - } else { - ctx->phase = ERTS_DSIG_SEND_PHASE_FIN; - } - break; + if (is_non_value(ctx->msg)) { + ctx->obuf->msg_start = NULL; + ctx->phase = ERTS_DSIG_SEND_PHASE_FIN; + break; + } + ctx->u.ec.flags = ctx->flags; + ctx->u.ec.level = 0; + ctx->u.ec.wstack.wstart = NULL; + ctx->obuf->msg_start = ctx->obuf->ext_endp; - case ERTS_DSIG_SEND_PHASE_MSG_ENCODE: + ctx->phase = ERTS_DSIG_SEND_PHASE_MSG_ENCODE; + case ERTS_DSIG_SEND_PHASE_MSG_ENCODE: if (erts_encode_dist_ext(ctx->msg, &ctx->obuf->ext_endp, ctx->flags, ctx->acmp, &ctx->u.ec, &ctx->reds)) { retval = ERTS_DSIG_SEND_CONTINUE; goto done; @@ -1949,7 +1949,7 @@ erts_dsig_send(ErtsDSigData *dsdp, struct erts_dsig_send_context* ctx) int resume = 0; ASSERT(ctx->obuf->extp < ctx->obuf->ext_endp); - ASSERT(&ctx->obuf->data[0] <= ctx->obuf->extp - ctx->pass_through_size); + ASSERT(&ctx->obuf->data[0] <= ctx->obuf->extp - ctx->max_finalize_prepend); ASSERT(ctx->obuf->ext_endp <= &ctx->obuf->data[0] + ctx->data_size); ctx->data_size = ctx->obuf->ext_endp - ctx->obuf->extp; @@ -2308,9 +2308,7 @@ erts_dist_command(Port *prt, int reds_limit) ob = oq.first; ASSERT(ob); do { - ob->extp = erts_encode_ext_dist_header_finalize(ob->extp, - dep->cache, - flags); + erts_encode_ext_dist_header_finalize(ob, dep->cache, flags); ASSERT(&ob->data[0] <= ob->extp && ob->extp < ob->ext_endp); reds += ERTS_PORT_REDS_DIST_CMD_FINALIZE; preempt = reds > reds_limit; @@ -2350,10 +2348,7 @@ erts_dist_command(Port *prt, int reds_limit) while (oq.first && !preempt) { ErtsDistOutputBuf *fob; Uint size; - oq.first->extp - = erts_encode_ext_dist_header_finalize(oq.first->extp, - dep->cache, - flags); + erts_encode_ext_dist_header_finalize(oq.first, dep->cache, flags); reds += ERTS_PORT_REDS_DIST_CMD_FINALIZE; ASSERT(&oq.first->data[0] <= oq.first->extp && oq.first->extp < oq.first->ext_endp); diff --git a/erts/emulator/beam/dist.h b/erts/emulator/beam/dist.h index 49f3eac340..505d510473 100644 --- a/erts/emulator/beam/dist.h +++ b/erts/emulator/beam/dist.h @@ -405,7 +405,7 @@ struct erts_dsig_send_context { Eterm ctl; Eterm msg; int force_busy; - Uint32 pass_through_size; + Uint32 max_finalize_prepend; Uint data_size, dhdr_ext_size; ErtsAtomCacheMap *acmp; ErtsDistOutputBuf *obuf; diff --git a/erts/emulator/beam/erl_node_tables.h b/erts/emulator/beam/erl_node_tables.h index d012f4b2cf..af42814b8b 100644 --- a/erts/emulator/beam/erl_node_tables.h +++ b/erts/emulator/beam/erl_node_tables.h @@ -87,6 +87,7 @@ struct ErtsDistOutputBuf_ { ErtsDistOutputBuf *next; byte *extp; byte *ext_endp; + byte *msg_start; byte data[1]; }; diff --git a/erts/emulator/beam/external.c b/erts/emulator/beam/external.c index c49e2f617a..d8f52ceb57 100644 --- a/erts/emulator/beam/external.c +++ b/erts/emulator/beam/external.c @@ -351,41 +351,63 @@ byte *erts_encode_ext_dist_header_setup(byte *ctl_ext, ErtsAtomCacheMap *acmp) #define PASS_THROUGH 'p' /* This code should go */ -byte *erts_encode_ext_dist_header_finalize(byte *ext, ErtsAtomCache *cache, Uint32 dflags) +void erts_encode_ext_dist_header_finalize(ErtsDistOutputBuf* ob, + ErtsAtomCache *cache, + Uint32 dflags) { byte *ip; byte instr_buf[(2+4)*ERTS_ATOM_CACHE_SIZE]; int ci, sz; byte dist_hdr_flags; int long_atoms; - register byte *ep = ext; + register byte *ep = ob->extp; ASSERT(dflags & DFLAG_UTF8_ATOMS); + /* + * The buffer can have three different layouts at this point depending on + * what was known when encoded: + * + * Pending connection: CtrlTerm [, MsgTerm] + * With atom cache : VERSION_MAGIC, DIST_HEADER, ..., CtrlTerm [, MsgTerm] + * No atom cache : VERSION_MAGIC, CtrlTerm [, VERSION_MAGIC, MsgTerm] + */ + if (ep[0] != VERSION_MAGIC) { + /* + * Was encoded without atom cache toward pending connection. + */ ASSERT(ep[0] == SMALL_TUPLE_EXT || ep[0] == LARGE_TUPLE_EXT); if (dflags & DFLAG_DIST_HDR_ATOM_CACHE) { /* - * Encoded without atom cache (toward pending connection) - * but receiver wants dist header. Let's prepend an empty one. + * Receiver wants dist header. Let's prepend an empty one. */ *--ep = 0; /* NumberOfAtomCacheRefs */ *--ep = DIST_HEADER; *--ep = VERSION_MAGIC; } else { - /* Node without atom cache, 'pass through' needed */ - - ASSERT(!"SVERK: Must insert VERSION_MAGIC's"); + /* + * Primitive receiver without atom cache (erl_interface/jinterface). + * Must prepend VERSION_MAGIC to both ctrl and message + * And add PASSTHROUGH. + */ + if (ob->msg_start) { + ASSERT(ep < ob->msg_start && ob->msg_start < ob->ext_endp); + sys_memmove(ep-1, ep, (ob->msg_start - ep)); + ob->msg_start[-1] = VERSION_MAGIC; + --ep; + } + *--ep = VERSION_MAGIC; *--ep = PASS_THROUGH; } - return ep; + goto done; } else if (ep[1] != DIST_HEADER) { ASSERT(ep[1] == SMALL_TUPLE_EXT || ep[1] == LARGE_TUPLE_EXT); ASSERT(!(dflags & DFLAG_DIST_HDR_ATOM_CACHE)); /* Node without atom cache, 'pass through' needed */ *--ep = PASS_THROUGH; - return ep; + goto done; } dist_hdr_flags = ep[2]; @@ -510,7 +532,9 @@ byte *erts_encode_ext_dist_header_finalize(byte *ext, ErtsAtomCache *cache, Uint put_int8(ci, ep); *--ep = DIST_HEADER; *--ep = VERSION_MAGIC; - return ep; +done: + ASSERT(ep >= ob->data); + ob->extp = ep; } int erts_encode_dist_ext_size(Eterm term, Uint32 flags, ErtsAtomCacheMap *acmp, diff --git a/erts/emulator/beam/external.h b/erts/emulator/beam/external.h index 49950c4aad..11044b17ef 100644 --- a/erts/emulator/beam/external.h +++ b/erts/emulator/beam/external.h @@ -154,7 +154,7 @@ void erts_finalize_atom_cache_map(ErtsAtomCacheMap *, Uint32); Uint erts_encode_ext_dist_header_size(ErtsAtomCacheMap *); byte *erts_encode_ext_dist_header_setup(byte *, ErtsAtomCacheMap *); -byte *erts_encode_ext_dist_header_finalize(byte *, ErtsAtomCache *, Uint32); +void erts_encode_ext_dist_header_finalize(ErtsDistOutputBuf*, ErtsAtomCache *, Uint32); struct erts_dsig_send_context; int erts_encode_dist_ext_size(Eterm, Uint32, ErtsAtomCacheMap*, Uint* szp); int erts_encode_dist_ext_size_int(Eterm term, struct erts_dsig_send_context* ctx, Uint* szp); -- cgit v1.2.3 From d736e87ff94ab8191f33dca55516e6c1d440b915 Mon Sep 17 00:00:00 2001 From: Sverker Eriksson Date: Thu, 24 Aug 2017 17:20:21 +0200 Subject: Add optimistic DFLAG_DIST_HOPEFULLY for pending connections to avoid tuple fallbacks for export funs and bitstrings. ToDo: Re-encode if receiver turn out to be erl_interface/jinterface. --- erts/emulator/beam/dist.c | 2 +- erts/emulator/beam/dist.h | 9 ++++-- erts/emulator/beam/erl_node_tables.c | 2 +- erts/emulator/beam/external.c | 4 +-- erts/emulator/test/distribution_SUITE.erl | 53 +++++++++++++++++++++++++++++-- 5 files changed, 62 insertions(+), 8 deletions(-) (limited to 'erts/emulator') diff --git a/erts/emulator/beam/dist.c b/erts/emulator/beam/dist.c index 6d2e907f18..7877865ad4 100644 --- a/erts/emulator/beam/dist.c +++ b/erts/emulator/beam/dist.c @@ -3443,7 +3443,7 @@ BIF_RETTYPE new_connection_id_1(BIF_ALIST_1) conn_id = dep->connection_id; else if (dep->status == 0) { dep->status = ERTS_DE_SFLG_PENDING; - dep->flags = DFLAG_DIST_MANDATORY | DFLAG_PENDING_CONNECTION; + dep->flags = (DFLAG_DIST_MANDATORY | DFLAG_DIST_HOPEFULLY); dep->connection_id++; dep->connection_id &= ERTS_DIST_CON_ID_MASK; conn_id = dep->connection_id; diff --git a/erts/emulator/beam/dist.h b/erts/emulator/beam/dist.h index 505d510473..f9a2037687 100644 --- a/erts/emulator/beam/dist.h +++ b/erts/emulator/beam/dist.h @@ -45,7 +45,7 @@ #define DFLAG_MAP_TAG 0x20000 #define DFLAG_BIG_CREATION 0x40000 #define DFLAG_SEND_SENDER 0x80000 -#define DFLAG_PENDING_CONNECTION 0x100000 +#define DFLAG_NO_MAGIC 0x100000 /* Mandatory flags for distribution (sync with dist_util.erl) */ #define DFLAG_DIST_MANDATORY (DFLAG_EXTENDED_REFERENCES \ @@ -53,6 +53,11 @@ | DFLAG_UTF8_ATOMS \ | DFLAG_NEW_FUN_TAGS) +/* Additional optimistic flags when encoding toward pending connection */ +#define DFLAG_DIST_HOPEFULLY (DFLAG_NO_MAGIC \ + | DFLAG_EXPORT_PTR_TAG \ + | DFLAG_BIT_BINARIES) + /* All flags that should be enabled when term_to_binary/1 is used. */ #define TERM_TO_BINARY_DFLAGS (DFLAG_EXTENDED_REFERENCES \ | DFLAG_NEW_FUN_TAGS \ @@ -206,7 +211,7 @@ retry: Eterm msg, conn_id; dep->status = ERTS_DE_SFLG_PENDING; - dep->flags = DFLAG_DIST_MANDATORY | DFLAG_PENDING_CONNECTION; + dep->flags = (DFLAG_DIST_MANDATORY | DFLAG_DIST_HOPEFULLY); dep->connection_id++; dep->connection_id &= ERTS_DIST_CON_ID_MASK; conn_id = make_small(dep->connection_id); diff --git a/erts/emulator/beam/erl_node_tables.c b/erts/emulator/beam/erl_node_tables.c index dd607f438e..2a33766ac8 100644 --- a/erts/emulator/beam/erl_node_tables.c +++ b/erts/emulator/beam/erl_node_tables.c @@ -628,7 +628,7 @@ erts_set_dist_entry_connected(DistEntry *dep, Eterm cid, Uint flags) dep->connection_id &= ERTS_DIST_CON_ID_MASK; } dep->status |= ERTS_DE_SFLG_CONNECTED; - dep->flags = flags & ~DFLAG_PENDING_CONNECTION; + dep->flags = flags & ~DFLAG_NO_MAGIC; dep->cid = cid; erts_atomic_set_nob(&dep->input_handler, (erts_aint_t) cid); diff --git a/erts/emulator/beam/external.c b/erts/emulator/beam/external.c index d8f52ceb57..bfac48580c 100644 --- a/erts/emulator/beam/external.c +++ b/erts/emulator/beam/external.c @@ -561,7 +561,7 @@ int erts_encode_dist_ext_size_int(Eterm term, struct erts_dsig_send_context* ctx return -1; } else { #ifndef ERTS_DEBUG_USE_DIST_SEP - if (!(ctx->flags & (DFLAG_DIST_HDR_ATOM_CACHE | DFLAG_PENDING_CONNECTION))) + if (!(ctx->flags & (DFLAG_DIST_HDR_ATOM_CACHE | DFLAG_NO_MAGIC))) #endif sz++ /* VERSION_MAGIC */; @@ -593,7 +593,7 @@ int erts_encode_dist_ext(Eterm term, byte **ext, Uint32 flags, ErtsAtomCacheMap { if (!ctx || !ctx->wstack.wstart) { #ifndef ERTS_DEBUG_USE_DIST_SEP - if (!(flags & (DFLAG_DIST_HDR_ATOM_CACHE | DFLAG_PENDING_CONNECTION))) + if (!(flags & (DFLAG_DIST_HDR_ATOM_CACHE | DFLAG_NO_MAGIC))) #endif *(*ext)++ = VERSION_MAGIC; } diff --git a/erts/emulator/test/distribution_SUITE.erl b/erts/emulator/test/distribution_SUITE.erl index fa683f363f..f1831b7c45 100644 --- a/erts/emulator/test/distribution_SUITE.erl +++ b/erts/emulator/test/distribution_SUITE.erl @@ -41,6 +41,7 @@ -export([all/0, suite/0, groups/0, ping/1, bulk_send_small/1, group_leader/1, + optimistic_dflags/1, bulk_send_big/1, bulk_send_bigbig/1, local_send_small/1, local_send_big/1, local_send_legal/1, link_to_busy/1, exit_to_busy/1, @@ -66,6 +67,7 @@ %% Internal exports. -export([sender/3, receiver2/2, dummy_waiter/0, dead_process/0, group_leader_1/1, + optimistic_dflags_echo/0, optimistic_dflags_sender/1, roundtrip/1, bounce/1, do_dist_auto_connect/1, inet_rpc_server/1, dist_parallel_sender/3, dist_parallel_receiver/0, dist_evil_parallel_receiver/0]). @@ -80,6 +82,7 @@ suite() -> all() -> [ping, {group, bulk_send}, {group, local_send}, group_leader, + optimistic_dflags, link_to_busy, exit_to_busy, lost_exit, link_to_dead, link_to_dead_new_node, ref_port_roundtrip, nil_roundtrip, stop_dist, @@ -166,12 +169,58 @@ group_leader_1(Node2) -> ?Line {ExtPid, group_leader, GL2} = receive_one(), ok. -receive_one() -> - receive M -> M after 1000 -> timeout end. +%% Test optimistic distribution flags toward pending connections (DFLAG_DIST_HOPEFULLY) +optimistic_dflags(Config) when is_list(Config) -> + ?Line Sender = start_relay_node(optimistic_dflags_sender, []), + ?Line Echo = start_relay_node(optimistic_dflags_echo, []), + try + ?Line {ok, ok} = do_inet_rpc(Echo, ?MODULE, optimistic_dflags_echo, []), + + ?Line EchoNode = inet_rpc_nodename(Echo), + ?Line {ok, ok} = do_inet_rpc(Sender, ?MODULE, optimistic_dflags_sender, [EchoNode]) + after + ?Line stop_relay_node(Sender), + ?Line stop_relay_node(Echo) + end, + ok. + +optimistic_dflags_echo() -> + P = spawn(fun F() -> + receive {From, Term} -> + From ! {self(), Term} + end, + F() + end), + register(optimistic_dflags_echo, P), + optimistic_dflags_echo ! {self(), hello}, + {P, hello} = receive_one(), + ok. + +optimistic_dflags_sender(EchoNode) -> + ?Line net_kernel:monitor_nodes(true), + optimistic_dflags_do(EchoNode, <<1:1>>), + optimistic_dflags_do(EchoNode, fun lists:map/2), + ok. +optimistic_dflags_do(EchoNode, Term) -> + ?Line {optimistic_dflags_echo, EchoNode} ! {self(), Term}, + ?Line {nodeup, EchoNode} = receive_one(), + ?Line {EchoPid, Term} = receive_one(), + %% repeat with pid destination + ?Line net_kernel:disconnect(EchoNode), + ?Line {nodedown, EchoNode} = receive_one(), + ?Line EchoPid ! {self(), Term}, + ?Line {nodeup, EchoNode} = receive_one(), + ?Line {EchoPid, Term} = receive_one(), + + ?Line net_kernel:disconnect(EchoNode), + ?Line {nodedown, EchoNode} = receive_one(), + ok. +receive_one() -> + receive M -> M after 1000 -> timeout end. bulk_send_small(Config) when is_list(Config) -> -- cgit v1.2.3 From 42dadd12ac6b6977513a6edd73ff6137488c4a4a Mon Sep 17 00:00:00 2001 From: Sverker Eriksson Date: Wed, 6 Sep 2017 20:38:52 +0200 Subject: erts: Ensure enc_term_int() always do progress even when reds <= 1 Removed micro optimization for first fun variable to make things simpler. --- erts/emulator/beam/external.c | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) (limited to 'erts/emulator') diff --git a/erts/emulator/beam/external.c b/erts/emulator/beam/external.c index bfac48580c..8408d7dd12 100644 --- a/erts/emulator/beam/external.c +++ b/erts/emulator/beam/external.c @@ -2554,8 +2554,6 @@ enc_term_int(TTBEncodeContext* ctx, ErtsAtomCacheMap *acmp, Eterm obj, byte* ep, break; } - L_jump_start: - if (ctx && --r <= 0) { *reds = 0; ctx->obj = obj; @@ -2563,6 +2561,8 @@ enc_term_int(TTBEncodeContext* ctx, ErtsAtomCacheMap *acmp, Eterm obj, byte* ep, WSTACK_SAVE(s, &ctx->wstack); return -1; } + + L_jump_start: switch(tag_val_def(obj)) { case NIL_DEF: *ep++ = NIL_EXT; @@ -2945,13 +2945,9 @@ enc_term_int(TTBEncodeContext* ctx, ErtsAtomCacheMap *acmp, Eterm obj, byte* ep, ep = enc_term(acmp, make_small(funp->fe->old_uniq), ep, dflags, off_heap); ep = enc_pid(acmp, funp->creator, ep, dflags); - for (ei = funp->num_free-1; ei > 0; ei--) { + for (ei = funp->num_free-1; ei >= 0; ei--) { WSTACK_PUSH2(s, ENC_TERM, (UWord) funp->env[ei]); } - if (funp->num_free != 0) { - obj = funp->env[0]; - goto L_jump_start; - } } break; } -- cgit v1.2.3 From e4d28f70133af9b3b8c320fe5e70e2086830874d Mon Sep 17 00:00:00 2001 From: Sverker Eriksson Date: Fri, 25 Aug 2017 18:46:58 +0200 Subject: erts: Transcode tuple fallbacks When finalizing outgoing distribution messages we transcode them into using tuple fallbacks if the receiver does not support bitstrings and export-funs. This can only happen if the message was first encoded toward a pending connection when the receiver was unknown. It's an optimistic approach optmimized for modern beam nodes, that expect real bitstrings and funs (since orig_bytes[0]; #ifdef DEBUG obuf->dbg_pattern = ERTS_DIST_OUTPUT_BUF_DBG_PATTERN; + obuf->alloc_endp = obuf->data + size; ASSERT(bin == ErtsDistOutputBuf2Binary(obuf)); #endif return obuf; @@ -692,6 +693,7 @@ static ErtsDistOutputBuf* clear_de_out_queues(DistEntry* dep) dep->tmp_out_queue.last = NULL; dep->finalized_out_queue.first = NULL; dep->finalized_out_queue.last = NULL; + return obuf; } @@ -755,6 +757,8 @@ static void clear_dist_entry(DistEntry *dep) delete_cache(cache); free_de_out_queues(dep, obuf); + if (dep->transcode_ctx) + transcode_free_ctx(dep); } int erts_dsend_context_dtor(Binary* ctx_bin) @@ -2208,7 +2212,6 @@ dist_port_commandv(Port *prt, ErtsDistOutputBuf *obuf) #endif #define ERTS_PORT_REDS_DIST_CMD_START 5 -#define ERTS_PORT_REDS_DIST_CMD_FINALIZE 3 #define ERTS_PORT_REDS_DIST_CMD_EXIT 200 #define ERTS_PORT_REDS_DIST_CMD_RESUMED 5 #define ERTS_PORT_REDS_DIST_CMD_DATA(SZ) \ @@ -2217,9 +2220,9 @@ dist_port_commandv(Port *prt, ErtsDistOutputBuf *obuf) : ((((Sint) (SZ)) >> 10) & ((Sint) ERTS_PORT_REDS_MASK__))) int -erts_dist_command(Port *prt, int reds_limit) +erts_dist_command(Port *prt, int initial_reds) { - Sint reds = ERTS_PORT_REDS_DIST_CMD_START; + Sint reds = initial_reds - ERTS_PORT_REDS_DIST_CMD_START; Uint32 status; Uint32 flags; Sint qsize, obufsize = 0; @@ -2241,9 +2244,12 @@ erts_dist_command(Port *prt, int reds_limit) if (status & ERTS_DE_SFLG_EXITING) { erts_deliver_port_exit(prt, prt->common.id, am_killed, 0, 1); - return reds + ERTS_PORT_REDS_DIST_CMD_EXIT; + reds -= ERTS_PORT_REDS_DIST_CMD_EXIT; + return initial_reds - reds; } + ASSERT(!(status & ERTS_DE_SFLG_PENDING)); + ASSERT(send); /* @@ -2268,7 +2274,7 @@ erts_dist_command(Port *prt, int reds_limit) sched_flags = erts_atomic32_read_nob(&prt->sched.flags); - if (reds > reds_limit) + if (reds < 0) goto preempted; if (!(sched_flags & ERTS_PTS_FLG_BUSY_PORT) && foq.first) { @@ -2283,13 +2289,13 @@ erts_dist_command(Port *prt, int reds_limit) erts_fprintf(stderr, ">> "); bw(foq.first->extp, size); #endif - reds += ERTS_PORT_REDS_DIST_CMD_DATA(size); - fob = foq.first; - obufsize += size_obuf(fob); - foq.first = foq.first->next; - free_dist_obuf(fob); + reds -= ERTS_PORT_REDS_DIST_CMD_DATA(size); + fob = foq.first; + obufsize += size_obuf(fob); + foq.first = foq.first->next; + free_dist_obuf(fob); sched_flags = erts_atomic32_read_nob(&prt->sched.flags); - preempt = reds > reds_limit || (sched_flags & ERTS_PTS_FLG_EXIT); + preempt = reds < 0 || (sched_flags & ERTS_PTS_FLG_EXIT); if (sched_flags & ERTS_PTS_FLG_BUSY_PORT) break; } while (foq.first && !preempt); @@ -2302,44 +2308,42 @@ erts_dist_command(Port *prt, int reds_limit) if (sched_flags & ERTS_PTS_FLG_BUSY_PORT) { if (oq.first) { ErtsDistOutputBuf *ob; - int preempt; + ErtsDistOutputBuf *last_finalized = NULL; finalize_only: - preempt = 0; ob = oq.first; ASSERT(ob); do { - erts_encode_ext_dist_header_finalize(ob, dep->cache, flags); - ASSERT(&ob->data[0] <= ob->extp && ob->extp < ob->ext_endp); - reds += ERTS_PORT_REDS_DIST_CMD_FINALIZE; - preempt = reds > reds_limit; - if (preempt) - break; - ob = ob->next; + reds = erts_encode_ext_dist_header_finalize(ob, dep, flags, reds); + if (reds >= 0) { + last_finalized = ob; + ob = ob->next; + } } while (ob); - /* - * At least one buffer was finalized; if we got preempted, - * ob points to the last buffer that we finalized. - */ - if (foq.last) - foq.last->next = oq.first; - else - foq.first = oq.first; - if (!preempt) { - /* All buffers finalized */ - foq.last = oq.last; - oq.first = oq.last = NULL; - } - else { - /* Not all buffers finalized; split oq. */ - foq.last = ob; - oq.first = ob->next; - if (oq.first) - ob->next = NULL; - else - oq.last = NULL; - } - if (preempt) - goto preempted; + if (last_finalized) { + /* + * At least one buffer was finalized; if we got preempted, + * ob points to the next buffer to continue finalize. + */ + if (foq.last) + foq.last->next = oq.first; + else + foq.first = oq.first; + foq.last = last_finalized; + if (!ob) { + /* All buffers finalized */ + ASSERT(foq.last == oq.last); + ASSERT(foq.last->next == NULL); + oq.first = oq.last = NULL; + } + else { + /* Not all buffers finalized; split oq. */ + ASSERT(foq.last->next == ob); + foq.last->next = NULL; + oq.first = ob; + } + } + if (reds <= 0) + goto preempted; } } else { @@ -2348,8 +2352,11 @@ erts_dist_command(Port *prt, int reds_limit) while (oq.first && !preempt) { ErtsDistOutputBuf *fob; Uint size; - erts_encode_ext_dist_header_finalize(oq.first, dep->cache, flags); - reds += ERTS_PORT_REDS_DIST_CMD_FINALIZE; + reds = erts_encode_ext_dist_header_finalize(oq.first, dep, flags, reds); + if (reds < 0) { + preempt = 1; + break; + } ASSERT(&oq.first->data[0] <= oq.first->extp && oq.first->extp < oq.first->ext_endp); size = (*send)(prt, oq.first); @@ -2359,13 +2366,13 @@ erts_dist_command(Port *prt, int reds_limit) erts_fprintf(stderr, ">> "); bw(oq.first->extp, size); #endif - reds += ERTS_PORT_REDS_DIST_CMD_DATA(size); - fob = oq.first; - obufsize += size_obuf(fob); - oq.first = oq.first->next; - free_dist_obuf(fob); + reds -= ERTS_PORT_REDS_DIST_CMD_DATA(size); + fob = oq.first; + obufsize += size_obuf(fob); + oq.first = oq.first->next; + free_dist_obuf(fob); sched_flags = erts_atomic32_read_nob(&prt->sched.flags); - preempt = reds > reds_limit || (sched_flags & ERTS_PTS_FLG_EXIT); + preempt = reds <= 0 || (sched_flags & ERTS_PTS_FLG_EXIT); if ((sched_flags & ERTS_PTS_FLG_BUSY_PORT) && oq.first && !preempt) goto finalize_only; } @@ -2405,7 +2412,7 @@ erts_dist_command(Port *prt, int reds_limit) erts_mtx_unlock(&dep->qlock); resumed = erts_resume_processes(suspendees); - reds += resumed*ERTS_PORT_REDS_DIST_CMD_RESUMED; + reds -= resumed*ERTS_PORT_REDS_DIST_CMD_RESUMED; } else erts_mtx_unlock(&dep->qlock); @@ -2428,8 +2435,7 @@ erts_dist_command(Port *prt, int reds_limit) erts_mtx_unlock(&dep->qlock); } - ASSERT(foq.first || !foq.last); - ASSERT(!foq.first || foq.last); + ASSERT(!!foq.first == !!foq.last); ASSERT(!dep->finalized_out_queue.first); ASSERT(!dep->finalized_out_queue.last); @@ -2439,10 +2445,10 @@ erts_dist_command(Port *prt, int reds_limit) } /* Avoid wrapping reduction counter... */ - if (reds > INT_MAX/2) - reds = INT_MAX/2; + if (reds < INT_MIN/2) + reds = INT_MIN/2; - return reds; + return initial_reds - reds; preempted: /* @@ -2450,8 +2456,7 @@ erts_dist_command(Port *prt, int reds_limit) * since last call to driver. */ - ASSERT(oq.first || !oq.last); - ASSERT(!oq.first || oq.last); + ASSERT(!!oq.first == !!oq.last); if (sched_flags & ERTS_PTS_FLG_EXIT) { /* @@ -2475,14 +2480,11 @@ erts_dist_command(Port *prt, int reds_limit) foq.first = NULL; foq.last = NULL; - -/* SVERK Hmmm.... #ifdef DEBUG erts_mtx_lock(&dep->qlock); ASSERT(erts_atomic_read_nob(&dep->qsize) == obufsize); erts_mtx_unlock(&dep->qlock); #endif -*/ } else { if (oq.first) { @@ -2772,7 +2774,8 @@ BIF_RETTYPE dist_ctrl_get_data_1(BIF_ALIST_1) { DistEntry *dep = ERTS_PROC_GET_DIST_ENTRY(BIF_P); - int reds = 1; + const Sint initial_reds = ERTS_BIF_REDS_LEFT(BIF_P); + Sint reds = initial_reds; ErtsDistOutputBuf *obuf; Eterm *hp; ProcBin *pb; @@ -2803,6 +2806,7 @@ dist_ctrl_get_data_1(BIF_ALIST_1) { if (!dep->tmp_out_queue.first) { ASSERT(!dep->tmp_out_queue.last); + ASSERT(!dep->transcode_ctx); qsize = erts_atomic_read_acqb(&dep->qsize); if (qsize > 0) { erts_mtx_lock(&dep->qlock); @@ -2820,21 +2824,18 @@ dist_ctrl_get_data_1(BIF_ALIST_1) erts_de_runlock(dep); BIF_RET(am_none); } - else { - obuf = dep->tmp_out_queue.first; - dep->tmp_out_queue.first = obuf->next; - if (!obuf->next) - dep->tmp_out_queue.last = NULL; + + obuf = dep->tmp_out_queue.first; + reds = erts_encode_ext_dist_header_finalize(obuf, dep, dep->flags, reds); + if (reds < 0) { + erts_de_runlock(dep); + ERTS_BIF_YIELD1(bif_export[BIF_dist_ctrl_get_data_1], + BIF_P, BIF_ARG_1); } - obuf->extp = erts_encode_ext_dist_header_finalize(obuf->extp, - dep->cache, - dep->flags); - reds += ERTS_PORT_REDS_DIST_CMD_FINALIZE; - if (!(dep->flags & DFLAG_DIST_HDR_ATOM_CACHE)) - *--obuf->extp = PASS_THROUGH; /* 'pass through' needed */ - ASSERT(&obuf->data[0] <= obuf->extp - && obuf->extp < obuf->ext_endp); + dep->tmp_out_queue.first = obuf->next; + if (!obuf->next) + dep->tmp_out_queue.last = NULL; } erts_atomic64_inc_nob(&dep->out); @@ -2862,11 +2863,11 @@ dist_ctrl_get_data_1(BIF_ALIST_1) erts_mtx_unlock(&dep->qlock); if (resume_procs) { int resumed = erts_resume_processes(resume_procs); - reds += resumed*ERTS_PORT_REDS_DIST_CMD_RESUMED; + reds -= resumed*ERTS_PORT_REDS_DIST_CMD_RESUMED; } } - BIF_RET2(make_binary(pb), reds); + BIF_RET2(make_binary(pb), (initial_reds - reds)); } void diff --git a/erts/emulator/beam/erl_alloc.types b/erts/emulator/beam/erl_alloc.types index 2960272eab..c6cc5c78b3 100644 --- a/erts/emulator/beam/erl_alloc.types +++ b/erts/emulator/beam/erl_alloc.types @@ -261,6 +261,7 @@ type MINDIRECTION FIXED_SIZE SYSTEM magic_indirection type BINARY_FIND SHORT_LIVED PROCESSES binary_find type OPEN_PORT_ENV TEMPORARY SYSTEM open_port_env type CRASH_DUMP STANDARD SYSTEM crash_dump +type DIST_TRANSCODE SHORT_LIVED SYSTEM dist_transcode_context type THR_Q_EL STANDARD SYSTEM thr_q_element type THR_Q_EL_SL FIXED_SIZE SYSTEM sl_thr_q_element diff --git a/erts/emulator/beam/erl_node_tables.c b/erts/emulator/beam/erl_node_tables.c index 2a33766ac8..00e8306510 100644 --- a/erts/emulator/beam/erl_node_tables.c +++ b/erts/emulator/beam/erl_node_tables.c @@ -204,6 +204,7 @@ dist_table_alloc(void *dep_tmpl) erts_port_task_handle_init(&dep->dist_cmd); dep->send = NULL; dep->cache = NULL; + dep->transcode_ctx = NULL; /* Link in */ diff --git a/erts/emulator/beam/erl_node_tables.h b/erts/emulator/beam/erl_node_tables.h index af42814b8b..afa83f8b46 100644 --- a/erts/emulator/beam/erl_node_tables.h +++ b/erts/emulator/beam/erl_node_tables.h @@ -83,6 +83,7 @@ typedef struct ErtsDistOutputBuf_ ErtsDistOutputBuf; struct ErtsDistOutputBuf_ { #ifdef DEBUG Uint dbg_pattern; + byte *alloc_endp; #endif ErtsDistOutputBuf *next; byte *extp; @@ -156,6 +157,8 @@ typedef struct dist_entry_ { struct cache* cache; /* The atom cache */ ErtsThrPrgrLaterOp later_op; + + struct transcode_context* transcode_ctx; } DistEntry; typedef struct erl_node_ { diff --git a/erts/emulator/beam/external.c b/erts/emulator/beam/external.c index 8408d7dd12..f2e6399ad7 100644 --- a/erts/emulator/beam/external.c +++ b/erts/emulator/beam/external.c @@ -122,6 +122,9 @@ static int encode_size_struct_int(struct TTBSizeContext_*, ErtsAtomCacheMap *acm static Export binary_to_term_trap_export; static BIF_RETTYPE binary_to_term_trap_1(BIF_ALIST_1); +static Sint transcode_dist_obuf(ErtsDistOutputBuf*, DistEntry*, Uint32 dflags, Sint reds); + + void erts_init_external(void) { erts_init_trap_export(&term_to_binary_trap_export, @@ -349,11 +352,13 @@ byte *erts_encode_ext_dist_header_setup(byte *ctl_ext, ErtsAtomCacheMap *acmp) } } -#define PASS_THROUGH 'p' /* This code should go */ -void erts_encode_ext_dist_header_finalize(ErtsDistOutputBuf* ob, - ErtsAtomCache *cache, - Uint32 dflags) +#define PASS_THROUGH 'p' + +Sint erts_encode_ext_dist_header_finalize(ErtsDistOutputBuf* ob, + DistEntry* dep, + Uint32 dflags, + Sint reds) { byte *ip; byte instr_buf[(2+4)*ERTS_ATOM_CACHE_SIZE]; @@ -364,7 +369,7 @@ void erts_encode_ext_dist_header_finalize(ErtsDistOutputBuf* ob, ASSERT(dflags & DFLAG_UTF8_ATOMS); /* - * The buffer can have three different layouts at this point depending on + * The buffer can have different layouts at this point depending on * what was known when encoded: * * Pending connection: CtrlTerm [, MsgTerm] @@ -372,34 +377,28 @@ void erts_encode_ext_dist_header_finalize(ErtsDistOutputBuf* ob, * No atom cache : VERSION_MAGIC, CtrlTerm [, VERSION_MAGIC, MsgTerm] */ - if (ep[0] != VERSION_MAGIC) { + if (ep[0] != VERSION_MAGIC || dep->transcode_ctx) { /* * Was encoded without atom cache toward pending connection. */ ASSERT(ep[0] == SMALL_TUPLE_EXT || ep[0] == LARGE_TUPLE_EXT); + + if (~dflags & (DFLAG_BIT_BINARIES | DFLAG_EXPORT_PTR_TAG + | DFLAG_DIST_HDR_ATOM_CACHE)) { + reds = transcode_dist_obuf(ob, dep, dflags, reds); + if (reds < 0) + return reds; + ep = ob->extp; + } if (dflags & DFLAG_DIST_HDR_ATOM_CACHE) { /* - * Receiver wants dist header. Let's prepend an empty one. + * Encoding was done without atom caching but receiver expects + * a dist header, so we prepend an empty one. */ *--ep = 0; /* NumberOfAtomCacheRefs */ *--ep = DIST_HEADER; *--ep = VERSION_MAGIC; } - else { - /* - * Primitive receiver without atom cache (erl_interface/jinterface). - * Must prepend VERSION_MAGIC to both ctrl and message - * And add PASSTHROUGH. - */ - if (ob->msg_start) { - ASSERT(ep < ob->msg_start && ob->msg_start < ob->ext_endp); - sys_memmove(ep-1, ep, (ob->msg_start - ep)); - ob->msg_start[-1] = VERSION_MAGIC; - --ep; - } - *--ep = VERSION_MAGIC; - *--ep = PASS_THROUGH; - } goto done; } else if (ep[1] != DIST_HEADER) { @@ -436,6 +435,7 @@ void erts_encode_ext_dist_header_finalize(ErtsDistOutputBuf* ob, / sizeof(Uint32))+1]; register Uint32 flgs; int iix, flgs_bytes, flgs_buf_ix, used_half_bytes; + ErtsAtomCache* cache = dep->cache; #ifdef DEBUG int tot_used_half_bytes; #endif @@ -527,14 +527,16 @@ void erts_encode_ext_dist_header_finalize(ErtsDistOutputBuf* ob, break; } } + reds -= 3; /*was ERTS_PORT_REDS_DIST_CMD_FINALIZE*/ } --ep; put_int8(ci, ep); *--ep = DIST_HEADER; *--ep = VERSION_MAGIC; done: - ASSERT(ep >= ob->data); ob->extp = ep; + ASSERT(&ob->data[0] <= ob->extp && ob->extp < ob->ext_endp); + return reds < 0 ? 0 : reds; } int erts_encode_dist_ext_size(Eterm term, Uint32 flags, ErtsAtomCacheMap *acmp, @@ -545,7 +547,7 @@ int erts_encode_dist_ext_size(Eterm term, Uint32 flags, ErtsAtomCacheMap *acmp, return -1; } else { #ifndef ERTS_DEBUG_USE_DIST_SEP - if (!(flags & DFLAG_DIST_HDR_ATOM_CACHE)) + if (!(flags & (DFLAG_DIST_HDR_ATOM_CACHE | DFLAG_NO_MAGIC))) #endif sz++ /* VERSION_MAGIC */; @@ -3285,6 +3287,7 @@ dec_term_atom_common: n--; if (ctx) { if (reds < n) { + ASSERT(reds > 0); ctx->state = B2TDecodeList; ctx->u.dc.remaining_n = n - reds; n = reds; @@ -4388,7 +4391,7 @@ decoded_size(byte *ep, byte* endp, int internal_tags, B2TContext* ctx) } } else - reds = 0; /* not used but compiler warns anyway */ + ERTS_UNDEF(reds, 0); heap_size = 0; terms = 1; @@ -4698,3 +4701,177 @@ error: #undef SKIP2 #undef CHKSIZE } + + +struct transcode_context { + enum { + TRANSCODE_DEC_MSG_SIZE, + TRANSCODE_DEC_MSG, + TRANSCODE_ENC_CTL, + TRANSCODE_ENC_MSG + }state; + Eterm ctl_term; + Eterm* ctl_heap; + ErtsHeapFactory ctl_factory; + Eterm* msg_heap; + B2TContext b2t; + TTBEncodeContext ttb; +#ifdef DEBUG + ErtsDistOutputBuf* dbg_ob; +#endif +}; + +void transcode_free_ctx(DistEntry* dep) +{ + struct transcode_context* ctx = dep->transcode_ctx; + + erts_factory_close(&ctx->ctl_factory); + erts_free(ERTS_ALC_T_DIST_TRANSCODE, ctx->ctl_heap); + + if (ctx->msg_heap) { + erts_factory_close(&ctx->b2t.u.dc.factory); + erts_free(ERTS_ALC_T_DIST_TRANSCODE, ctx->msg_heap); + } + erts_free(ERTS_ALC_T_DIST_TRANSCODE, ctx); + dep->transcode_ctx = NULL; +} + +Sint transcode_dist_obuf(ErtsDistOutputBuf* ob, + DistEntry* dep, + Uint32 dflags, + Sint reds) +{ + Sint hsz; + byte* decp; + const int have_msg = !!ob->msg_start; + int i; + struct transcode_context* ctx = dep->transcode_ctx; + + if (!ctx) { /* first call for 'ob' */ + + if (~dflags & (DFLAG_BIT_BINARIES | DFLAG_EXPORT_PTR_TAG)) { + /* + * Receiver does not support bitstrings and/or export funs. + * We need to transcode control and message terms to use tuple fallbacks. + */ + ctx = erts_alloc(ERTS_ALC_T_DIST_TRANSCODE, sizeof(struct transcode_context)); + dep->transcode_ctx = ctx; + #ifdef DEBUG + ctx->dbg_ob = ob; + #endif + + hsz = decoded_size(ob->extp, ob->ext_endp, 0, NULL); + ctx->ctl_heap = erts_alloc(ERTS_ALC_T_DIST_TRANSCODE, hsz*sizeof(Eterm)); + erts_factory_tmp_init(&ctx->ctl_factory, ctx->ctl_heap, hsz, ERTS_ALC_T_DIST_TRANSCODE); + ctx->msg_heap = NULL; + + decp = dec_term(NULL, &ctx->ctl_factory, ob->extp, &ctx->ctl_term, NULL); + if (have_msg) { + ASSERT(decp == ob->msg_start); (void)decp; + ctx->b2t.u.sc.ep = NULL; + ctx->b2t.state = B2TSize; + ctx->b2t.aligned_alloc = NULL; + ctx->b2t.b2ts.exttmp = 0; + ctx->state = TRANSCODE_DEC_MSG_SIZE; + } + else { + ASSERT(decp == ob->ext_endp); + ctx->state = TRANSCODE_ENC_CTL; + } + } + else { + /* + * No need for full transcoding, but primitive receiver (erl_/jinterface) + * expects VERSION_MAGIC before both control and message terms. + */ + if (ob->msg_start) { + Sint ctl_bytes = ob->msg_start - ob->extp; + ASSERT(ob->extp < ob->msg_start && ob->msg_start < ob->ext_endp); + /* Move control term back 1 byte to make room */ + sys_memmove(ob->extp-1, ob->extp, ctl_bytes); + *--(ob->msg_start) = VERSION_MAGIC; + --(ob->extp); + reds -= ctl_bytes / (B2T_BYTES_PER_REDUCTION * B2T_MEMCPY_FACTOR); + } + *--(ob->extp) = VERSION_MAGIC; + goto done; + } + } + else { + ASSERT(ctx->dbg_ob == ob); + } + ctx->b2t.reds = reds * B2T_BYTES_PER_REDUCTION; + + switch (ctx->state) { + case TRANSCODE_DEC_MSG_SIZE: + hsz = decoded_size(ob->msg_start, ob->ext_endp, 0, &ctx->b2t); + if (ctx->b2t.state == B2TSize) { + return -1; + } + ASSERT(ctx->b2t.state == B2TDecodeInit); + ctx->msg_heap = erts_alloc(ERTS_ALC_T_DIST_TRANSCODE, hsz*sizeof(Eterm)); + ctx->b2t.u.dc.ep = ob->msg_start; + ctx->b2t.u.dc.res = (Eterm) NULL; + ctx->b2t.u.dc.next = &ctx->b2t.u.dc.res; + erts_factory_tmp_init(&ctx->b2t.u.dc.factory, + ctx->msg_heap, hsz, ERTS_ALC_T_DIST_TRANSCODE); + ctx->b2t.u.dc.flat_maps.wstart = NULL; + ctx->b2t.u.dc.hamt_array.pstart = NULL; + ctx->b2t.state = B2TDecode; + + ctx->state = TRANSCODE_DEC_MSG; + case TRANSCODE_DEC_MSG: + if (ctx->b2t.reds <= 0) + ctx->b2t.reds = 1; + decp = dec_term(NULL, NULL, NULL, NULL, &ctx->b2t); + if (ctx->b2t.state < B2TDone) { + return -1; + } + ASSERT(ctx->b2t.state == B2TDone); + ASSERT(decp && decp <= ob->ext_endp); + reds = ctx->b2t.reds / B2T_BYTES_PER_REDUCTION; + b2t_destroy_context(&ctx->b2t); + + ctx->state = TRANSCODE_ENC_CTL; + case TRANSCODE_ENC_CTL: + if (!(dflags & DFLAG_DIST_HDR_ATOM_CACHE)) { + ASSERT(!(dflags & DFLAG_NO_MAGIC)); + ob->extp -= 2; /* VERSION_MAGIC x 2 */ + } + ob->ext_endp = ob->extp; + i = erts_encode_dist_ext(ctx->ctl_term, &ob->ext_endp, dflags, + NULL, NULL, NULL); + ASSERT(i == 0); (void)i; + ASSERT(ob->ext_endp <= ob->alloc_endp); + + if (!have_msg) { + break; + } + ob->msg_start = ob->ext_endp; + ctx->ttb.wstack.wstart = NULL; + ctx->ttb.flags = dflags; + ctx->ttb.level = 0; + + ctx->state = TRANSCODE_ENC_MSG; + case TRANSCODE_ENC_MSG: + reds *= TERM_TO_BINARY_LOOP_FACTOR; + if (erts_encode_dist_ext(ctx->b2t.u.dc.res, &ob->ext_endp, dflags, NULL, + &ctx->ttb, &reds)) { + return -1; + } + reds /= TERM_TO_BINARY_LOOP_FACTOR; + + ASSERT(ob->ext_endp <= ob->alloc_endp); + + } + transcode_free_ctx(dep); + +done: + if (!(dflags & DFLAG_DIST_HDR_ATOM_CACHE)) + *--(ob->extp) = PASS_THROUGH; + + if (reds < 0) + reds = 0; + + return reds; +} diff --git a/erts/emulator/beam/external.h b/erts/emulator/beam/external.h index 11044b17ef..f9f8abcc27 100644 --- a/erts/emulator/beam/external.h +++ b/erts/emulator/beam/external.h @@ -154,7 +154,7 @@ void erts_finalize_atom_cache_map(ErtsAtomCacheMap *, Uint32); Uint erts_encode_ext_dist_header_size(ErtsAtomCacheMap *); byte *erts_encode_ext_dist_header_setup(byte *, ErtsAtomCacheMap *); -void erts_encode_ext_dist_header_finalize(ErtsDistOutputBuf*, ErtsAtomCache *, Uint32); +Sint erts_encode_ext_dist_header_finalize(ErtsDistOutputBuf*, DistEntry *, Uint32 dflags, Sint reds); struct erts_dsig_send_context; int erts_encode_dist_ext_size(Eterm, Uint32, ErtsAtomCacheMap*, Uint* szp); int erts_encode_dist_ext_size_int(Eterm term, struct erts_dsig_send_context* ctx, Uint* szp); @@ -195,7 +195,7 @@ void erts_binary2term_abort(ErtsBinary2TermState *); Eterm erts_binary2term_create(ErtsBinary2TermState *, ErtsHeapFactory*); int erts_debug_max_atom_out_cache_index(void); int erts_debug_atom_to_out_cache_index(Eterm); - +void transcode_free_ctx(DistEntry* dep); #if ERTS_GLB_INLINE_INCL_FUNC_DEF -- cgit v1.2.3 From 56ab31904f7157d7b4282c2afd474c7bed7b9c75 Mon Sep 17 00:00:00 2001 From: Sverker Eriksson Date: Tue, 19 Sep 2017 17:44:47 +0200 Subject: Remove faulty assert Send may have failed, port exit with dist_entry cleaned up and then new pending connection with queued messages. --- erts/emulator/beam/dist.c | 5 ----- 1 file changed, 5 deletions(-) (limited to 'erts/emulator') diff --git a/erts/emulator/beam/dist.c b/erts/emulator/beam/dist.c index a5df92bf9a..474b0e8364 100644 --- a/erts/emulator/beam/dist.c +++ b/erts/emulator/beam/dist.c @@ -2480,11 +2480,6 @@ erts_dist_command(Port *prt, int initial_reds) foq.first = NULL; foq.last = NULL; -#ifdef DEBUG - erts_mtx_lock(&dep->qlock); - ASSERT(erts_atomic_read_nob(&dep->qsize) == obufsize); - erts_mtx_unlock(&dep->qlock); -#endif } else { if (oq.first) { -- cgit v1.2.3 From 08040e5c5cd329395d9d756f1fdf8d66ffbbe705 Mon Sep 17 00:00:00 2001 From: Sverker Eriksson Date: Thu, 21 Sep 2017 19:50:22 +0200 Subject: Allow DistEntries in ETS --- erts/emulator/beam/erl_node_tables.c | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) (limited to 'erts/emulator') diff --git a/erts/emulator/beam/erl_node_tables.c b/erts/emulator/beam/erl_node_tables.c index 00e8306510..17d410e42b 100644 --- a/erts/emulator/beam/erl_node_tables.c +++ b/erts/emulator/beam/erl_node_tables.c @@ -1089,6 +1089,7 @@ typedef struct { typedef struct dist_referrer_ { struct dist_referrer_ *next; int heap_ref; + int ets_ref; int node_ref; int ctrl_ref; int system_ref; @@ -1207,10 +1208,11 @@ insert_dist_referrer(ReferredDist *referred_dist, else { Uint *hp = &drp->id_heap[0]; ASSERT(is_tuple(id)); - drp->id = copy_struct(id, size_object(id), &hp, NULL); + drp->id = copy_struct(id, size_object(id), &hp, NULL); } drp->creation = creation; drp->heap_ref = 0; + drp->ets_ref = 0; drp->node_ref = 0; drp->ctrl_ref = 0; drp->system_ref = 0; @@ -1220,6 +1222,7 @@ insert_dist_referrer(ReferredDist *referred_dist, case NODE_REF: drp->node_ref++; break; case CTRL_REF: drp->ctrl_ref++; break; case HEAP_REF: drp->heap_ref++; break; + case ETS_REF: drp->ets_ref++; break; case SYSTEM_REF: drp->system_ref++; break; default: ASSERT(0); } @@ -1897,6 +1900,10 @@ reference_table_term(Uint **hpp, ErlOffHeap *ohp, Uint *szp) tup = MK_2TUP(AM_heap, MK_UINT(drp->heap_ref)); drl = MK_CONS(tup, drl); } + if(drp->ets_ref) { + tup = MK_2TUP(AM_ets, MK_UINT(drp->ets_ref)); + drl = MK_CONS(tup, drl); + } if(drp->system_ref) { tup = MK_2TUP(AM_system, MK_UINT(drp->system_ref)); drl = MK_CONS(tup, drl); @@ -1913,12 +1920,17 @@ reference_table_term(Uint **hpp, ErlOffHeap *ohp, Uint *szp) else if (is_tuple(drp->id)) { Eterm *t; ASSERT(drp->system_ref && !drp->node_ref - && !drp->ctrl_ref && !drp->heap_ref); + && !drp->ctrl_ref && !drp->heap_ref && !drp->ets_ref); t = tuple_val(drp->id); ASSERT(2 == arityval(t[0])); tup = MK_2TUP(t[1], t[2]); } - else { + else if (drp->ets_ref) { + ASSERT(!drp->heap_ref && !drp->node_ref && + !drp->ctrl_ref && !drp->system_ref); + tup = MK_2TUP(AM_ets, drp->id); + } + else { ASSERT(!drp->ctrl_ref && drp->node_ref); ASSERT(is_atom(drp->id)); tup = MK_2TUP(drp->id, MK_UINT(drp->creation)); -- cgit v1.2.3 From 771abbd23709b5a03416278595588931889ab7c5 Mon Sep 17 00:00:00 2001 From: Sverker Eriksson Date: Wed, 20 Sep 2017 15:06:07 +0200 Subject: erts: Keep magic ref to DistEntry in net_kernel to make sure it's kept alive. --- erts/emulator/beam/dist.c | 22 +++++++++++++++++----- erts/emulator/beam/dist.h | 24 ++++++++++++++++-------- erts/emulator/beam/erl_node_tables.c | 30 ++++++++++++++---------------- erts/emulator/beam/erl_node_tables.h | 1 + 4 files changed, 48 insertions(+), 29 deletions(-) (limited to 'erts/emulator') diff --git a/erts/emulator/beam/dist.c b/erts/emulator/beam/dist.c index 474b0e8364..c16579037c 100644 --- a/erts/emulator/beam/dist.c +++ b/erts/emulator/beam/dist.c @@ -3426,6 +3426,8 @@ BIF_RETTYPE new_connection_id_1(BIF_ALIST_1) { DistEntry* dep; Uint32 conn_id; + Eterm* hp; + Eterm dhandle; if (is_not_atom(BIF_ARG_1)) { BIF_ERROR(BIF_P, BADARG); @@ -3449,17 +3451,25 @@ BIF_RETTYPE new_connection_id_1(BIF_ALIST_1) conn_id = dep->connection_id; } erts_de_rwunlock(dep); - BIF_RET(make_small(conn_id)); + hp = HAlloc(BIF_P, 3 + ERTS_MAGIC_REF_THING_SIZE); + dhandle = erts_build_dhandle(&hp, &BIF_P->off_heap, dep); + erts_deref_dist_entry(dep); + BIF_RET(TUPLE2(hp, make_small(conn_id), dhandle)); } BIF_RETTYPE abort_connection_id_2(BIF_ALIST_2) { DistEntry* dep; + Eterm* tp; - if (is_not_atom(BIF_ARG_1) || is_not_small(BIF_ARG_2)) { + if (is_not_atom(BIF_ARG_1) || is_not_tuple_arity(BIF_ARG_2, 2)) { BIF_ERROR(BIF_P, BADARG); } - dep = erts_find_dist_entry(BIF_ARG_1); + tp = tuple_val(BIF_ARG_2); + dep = erts_dhandle_to_dist_entry(tp[2]); + if (is_not_small(tp[1]) || dep != erts_find_dist_entry(BIF_ARG_1)) { + BIF_ERROR(BIF_P, BADARG); + } ASSERT(dep != erts_this_dist_entry); /* SVERK: What to do? */ if (!dep) { @@ -3469,7 +3479,7 @@ BIF_RETTYPE abort_connection_id_2(BIF_ALIST_2) erts_de_rwlock(dep); if (dep->status == ERTS_DE_SFLG_PENDING - && dep->connection_id == unsigned_val(BIF_ARG_2)) { + && dep->connection_id == unsigned_val(tp[1])) { NetExitsContext nec = {dep}; ErtsLink *nlinks; @@ -3736,10 +3746,12 @@ monitor_node(Process* p, Eterm Node, Eterm Bool, Eterm Options) ++ERTS_LINK_REFC(lnk); lnk = erts_add_or_lookup_link(&ERTS_P_LINKS(p), LINK_NODE, Node); ++ERTS_LINK_REFC(lnk); + erts_de_links_unlock(dep); break; default: ERTS_ASSERT(! "Invalid dsig prepare result"); } + erts_deref_dist_entry(dep); } else { /* Bool == false */ dep = erts_sysname_to_connected_dist_entry(Node); @@ -3777,9 +3789,9 @@ monitor_node(Process* p, Eterm Node, Eterm Bool, Eterm Options) Node)); } } + erts_de_links_unlock(dep); } - erts_de_links_unlock(dep); erts_proc_unlock(p, ERTS_PROC_LOCK_LINK); done: diff --git a/erts/emulator/beam/dist.h b/erts/emulator/beam/dist.h index f9a2037687..2ead588ee9 100644 --- a/erts/emulator/beam/dist.h +++ b/erts/emulator/beam/dist.h @@ -161,6 +161,7 @@ erts_dsig_prepare(ErtsDSigData *dsdp, int connect) { DistEntry* dep = *depp; + int deref_dep = 0; int res; if (!erts_is_alive) @@ -171,6 +172,7 @@ erts_dsig_prepare(ErtsDSigData *dsdp, dep = erts_find_or_insert_dist_entry(dsdp->node); ASSERT(dep != erts_this_dist_entry); /* SVERK: What to do? */ + deref_dep = 1; } #ifdef ERTS_ENABLE_LOCK_CHECK @@ -208,7 +210,7 @@ retry: Eterm *hp; ErlOffHeap *ohp; ErtsMessage *mp; - Eterm msg, conn_id; + Eterm msg, conn_id, dhandle; dep->status = ERTS_DE_SFLG_PENDING; dep->flags = (DFLAG_DIST_MANDATORY | DFLAG_DIST_HOPEFULLY); @@ -220,16 +222,18 @@ retry: net_kernel = erts_whereis_process(proc, proc_locks, am_net_kernel, nk_locks, 0); if (!net_kernel) { - if (!*depp) { - erts_deref_dist_entry(dep); - } + if (deref_dep) + erts_deref_dist_entry(dep); return ERTS_DSIG_PREP_NOT_ALIVE; } - /* Send {auto_connect, Node, ConnId} to net_kernel */ - mp = erts_alloc_message_heap(net_kernel, &nk_locks, 4, &hp, &ohp); - msg = TUPLE3(hp, am_auto_connect, dep->sysname, conn_id); - erts_queue_message(net_kernel, nk_locks, mp, msg, proc->common.id); + /* Send {auto_connect, Node, ConnId, DHandle} to net_kernel */ + mp = erts_alloc_message_heap(net_kernel, &nk_locks, + 5 + ERTS_MAGIC_REF_THING_SIZE, + &hp, &ohp); + dhandle = erts_build_dhandle(&hp, ohp, dep); + msg = TUPLE4(hp, am_auto_connect, dep->sysname, conn_id, dhandle); + erts_queue_message(net_kernel, nk_locks, mp, msg, proc->common.id); erts_proc_unlock(net_kernel, nk_locks); } else @@ -255,6 +259,8 @@ retry: dsdp->no_suspend = no_suspend; if (dspl == ERTS_DSP_NO_LOCK) erts_de_runlock(dep); + if (deref_dep) + erts_deref_dist_entry(dep); *depp = dep; return res; @@ -263,6 +269,8 @@ retry: erts_de_rwunlock(dep); else erts_de_runlock(dep); + if (deref_dep) + erts_deref_dist_entry(dep); return res; } diff --git a/erts/emulator/beam/erl_node_tables.c b/erts/emulator/beam/erl_node_tables.c index 17d410e42b..7c75ec60e9 100644 --- a/erts/emulator/beam/erl_node_tables.c +++ b/erts/emulator/beam/erl_node_tables.c @@ -141,9 +141,7 @@ dist_table_cmp(void *dep1, void *dep2) static void* dist_table_alloc(void *dep_tmpl) { -#ifdef DEBUG erts_aint_t refc; -#endif Eterm sysname; Binary *bin; DistEntry *dep; @@ -160,13 +158,8 @@ dist_table_alloc(void *dep_tmpl) dist_entries++; -#ifdef DEBUG - refc = -#else - (void) -#endif - de_refc_dec_read(dep, -1); - ASSERT(refc == -1); + refc = de_refc_dec_read(dep, -1); + ASSERT(refc == -1); (void)refc; dep->prev = NULL; erts_rwmtx_init_opt(&dep->rwmtx, &rwmtx_opt, "dist_entry", sysname, @@ -227,6 +220,8 @@ dist_table_free(void *vdep) { DistEntry *dep = (DistEntry *) vdep; + ASSERT(de_refc_read(dep, -1) == -1); + ASSERT(dep->status == 0); ASSERT(is_nil(dep->cid)); ASSERT(dep->nlinks == NULL); ASSERT(dep->node_links == NULL); @@ -387,16 +382,19 @@ erts_dhandle_to_dist_entry(Eterm dhandle) } Eterm -erts_make_dhandle(Process *c_p, DistEntry *dep) +erts_build_dhandle(Eterm **hpp, ErlOffHeap* ohp, DistEntry *dep) { - Binary *bin; - Eterm *hp; - - bin = ErtsDistEntry2Bin(dep); + Binary *bin = ErtsDistEntry2Bin(dep); ASSERT(bin); ASSERT(ERTS_MAGIC_BIN_DESTRUCTOR(bin) == erts_dist_entry_destructor); - hp = HAlloc(c_p, ERTS_MAGIC_REF_THING_SIZE); - return erts_mk_magic_ref(&hp, &c_p->off_heap, bin); + return erts_mk_magic_ref(hpp, ohp, bin); +} + +Eterm +erts_make_dhandle(Process *c_p, DistEntry *dep) +{ + Eterm *hp = HAlloc(c_p, ERTS_MAGIC_REF_THING_SIZE); + return erts_build_dhandle(&hp, &c_p->off_heap, dep); } static void start_timer_delete_dist_entry(void *vdep); diff --git a/erts/emulator/beam/erl_node_tables.h b/erts/emulator/beam/erl_node_tables.h index afa83f8b46..83d0678bbc 100644 --- a/erts/emulator/beam/erl_node_tables.h +++ b/erts/emulator/beam/erl_node_tables.h @@ -210,6 +210,7 @@ int erts_lc_is_de_rlocked(DistEntry *); #endif int erts_dist_entry_destructor(Binary *bin); DistEntry *erts_dhandle_to_dist_entry(Eterm dhandle); +Eterm erts_build_dhandle(Eterm **hpp, ErlOffHeap*, DistEntry*); Eterm erts_make_dhandle(Process *c_p, DistEntry *dep); void erts_ref_dist_entry(DistEntry *dep); void erts_deref_dist_entry(DistEntry *dep); -- cgit v1.2.3 From 23ba06691aa64d7f05781bc9b2ea448c3710a812 Mon Sep 17 00:00:00 2001 From: Sverker Eriksson Date: Mon, 25 Sep 2017 20:23:06 +0200 Subject: Remove unused ERTS_DSP_RWLOCK --- erts/emulator/beam/dist.h | 19 +++++-------------- 1 file changed, 5 insertions(+), 14 deletions(-) (limited to 'erts/emulator') diff --git a/erts/emulator/beam/dist.h b/erts/emulator/beam/dist.h index 2ead588ee9..b9b39de3b6 100644 --- a/erts/emulator/beam/dist.h +++ b/erts/emulator/beam/dist.h @@ -96,8 +96,7 @@ extern Export* dmonitor_p_trap; typedef enum { ERTS_DSP_NO_LOCK, - ERTS_DSP_RLOCK, - ERTS_DSP_RWLOCK + ERTS_DSP_RLOCK } ErtsDSigPrepLock; @@ -182,10 +181,7 @@ erts_dsig_prepare(ErtsDSigData *dsdp, #endif retry: - if (dspl == ERTS_DSP_RWLOCK) - erts_de_rwlock(dep); - else - erts_de_rlock(dep); + erts_de_rlock(dep); if (ERTS_DE_IS_CONNECTED(dep)) { res = ERTS_DSIG_PREP_CONNECTED; @@ -200,10 +196,8 @@ retry: } else if (connect) { ASSERT(dep->status == 0); - if (dspl != ERTS_DSP_RWLOCK) { - erts_de_runlock(dep); - erts_de_rwlock(dep); - } + erts_de_runlock(dep); + erts_de_rwlock(dep); if (dep->status == 0) { Process* net_kernel; ErtsProcLocks nk_locks = ERTS_PROC_LOCK_MSGQ; @@ -265,10 +259,7 @@ retry: return res; fail: - if (dspl == ERTS_DSP_RWLOCK) - erts_de_rwunlock(dep); - else - erts_de_runlock(dep); + erts_de_runlock(dep); if (deref_dep) erts_deref_dist_entry(dep); return res; -- cgit v1.2.3 From 25f9d0469e2b1da0aec73b55e54561be8b573634 Mon Sep 17 00:00:00 2001 From: Sverker Eriksson Date: Tue, 26 Sep 2017 16:01:29 +0200 Subject: Refactor auto_connect into an outline function and abort_pending_connection into own utility function. --- erts/emulator/beam/dist.c | 92 ++++++++++++++++++++++++++++++++++------------- erts/emulator/beam/dist.h | 42 ++++------------------ 2 files changed, 75 insertions(+), 59 deletions(-) (limited to 'erts/emulator') diff --git a/erts/emulator/beam/dist.c b/erts/emulator/beam/dist.c index c16579037c..3e8c4c65b5 100644 --- a/erts/emulator/beam/dist.c +++ b/erts/emulator/beam/dist.c @@ -120,6 +120,7 @@ static void clear_dist_entry(DistEntry*); static int dsig_send_ctl(ErtsDSigData* dsdp, Eterm ctl, int force_busy); static void send_nodes_mon_msgs(Process *, Eterm, Eterm, Eterm, Eterm); static void init_nodes_monitors(void); +static Sint abort_pending_connection(DistEntry* dep, Uint32 conn_id); static erts_atomic_t no_caches; static erts_atomic_t no_nodes; @@ -3457,30 +3458,16 @@ BIF_RETTYPE new_connection_id_1(BIF_ALIST_1) BIF_RET(TUPLE2(hp, make_small(conn_id), dhandle)); } -BIF_RETTYPE abort_connection_id_2(BIF_ALIST_2) +static Sint abort_pending_connection(DistEntry* dep, Uint32 conn_id) { - DistEntry* dep; - Eterm* tp; - - if (is_not_atom(BIF_ARG_1) || is_not_tuple_arity(BIF_ARG_2, 2)) { - BIF_ERROR(BIF_P, BADARG); - } - tp = tuple_val(BIF_ARG_2); - dep = erts_dhandle_to_dist_entry(tp[2]); - if (is_not_small(tp[1]) || dep != erts_find_dist_entry(BIF_ARG_1)) { - BIF_ERROR(BIF_P, BADARG); - } - ASSERT(dep != erts_this_dist_entry); /* SVERK: What to do? */ - - if (!dep) { - BIF_RET(am_false); - } + Sint reds = 0; erts_de_rwlock(dep); - if (dep->status == ERTS_DE_SFLG_PENDING - && dep->connection_id == unsigned_val(tp[1])) { - + if (dep->status != ERTS_DE_SFLG_PENDING || dep->connection_id != conn_id) { + erts_de_rwunlock(dep); + } + else { NetExitsContext nec = {dep}; ErtsLink *nlinks; ErtsLink *node_links; @@ -3488,7 +3475,6 @@ BIF_RETTYPE abort_connection_id_2(BIF_ALIST_2) ErtsAtomCache *cache; ErtsDistOutputBuf *obuf; ErtsProcList *resume_procs; - Sint reds = 0; ASSERT(is_nil(dep->cid)); @@ -3529,13 +3515,71 @@ BIF_RETTYPE abort_connection_id_2(BIF_ALIST_2) delete_cache(cache); free_de_out_queues(dep, obuf); + } + return reds; +} - BIF_RET2(am_true, reds); +BIF_RETTYPE abort_connection_id_2(BIF_ALIST_2) +{ + DistEntry* dep; + Eterm* tp; + + if (is_not_atom(BIF_ARG_1) || is_not_tuple_arity(BIF_ARG_2, 2)) { + BIF_ERROR(BIF_P, BADARG); + } + tp = tuple_val(BIF_ARG_2); + dep = erts_dhandle_to_dist_entry(tp[2]); + if (is_not_small(tp[1]) || dep != erts_find_dist_entry(BIF_ARG_1)) { + BIF_ERROR(BIF_P, BADARG); } + ASSERT(dep != erts_this_dist_entry); /* SVERK: What to do? */ - erts_de_rwunlock(dep); + if (dep) { + Sint reds = abort_pending_connection(dep, unsigned_val(tp[1])); + BUMP_REDS(BIF_P, reds); + } + BIF_RET(am_true); +} - BIF_RET(am_false); +int erts_auto_connect(DistEntry* dep, Process *proc, ErtsProcLocks proc_locks) +{ + erts_de_rwlock(dep); + if (dep->status != 0) { + erts_de_rwunlock(dep); + } + else { + Process* net_kernel; + ErtsProcLocks nk_locks = ERTS_PROC_LOCK_MSGQ; + Eterm *hp; + ErlOffHeap *ohp; + ErtsMessage *mp; + Eterm msg, dhandle; + Uint32 conn_id; + + dep->status = ERTS_DE_SFLG_PENDING; + dep->flags = (DFLAG_DIST_MANDATORY | DFLAG_DIST_HOPEFULLY); + dep->connection_id = (dep->connection_id + 1) & ERTS_DIST_CON_ID_MASK; + conn_id = dep->connection_id; + erts_de_rwunlock(dep); + + net_kernel = erts_whereis_process(proc, proc_locks, + am_net_kernel, nk_locks, 0); + if (!net_kernel) { + return 0; + } + + /* Send {auto_connect, Node, ConnId, DHandle} to net_kernel */ + mp = erts_alloc_message_heap(net_kernel, &nk_locks, + 5 + ERTS_MAGIC_REF_THING_SIZE, + &hp, &ohp); + dhandle = erts_build_dhandle(&hp, ohp, dep); + msg = TUPLE4(hp, am_auto_connect, dep->sysname, make_small(conn_id), + dhandle); + erts_queue_message(net_kernel, nk_locks, mp, msg, proc->common.id); + erts_proc_unlock(net_kernel, nk_locks); + } + + return 1; } /**********************************************************************/ diff --git a/erts/emulator/beam/dist.h b/erts/emulator/beam/dist.h index b9b39de3b6..2685a55b97 100644 --- a/erts/emulator/beam/dist.h +++ b/erts/emulator/beam/dist.h @@ -148,6 +148,8 @@ ERTS_GLB_INLINE int erts_dsig_prepare(ErtsDSigData *, ERTS_GLB_INLINE void erts_schedule_dist_command(Port *, DistEntry *); +int erts_auto_connect(DistEntry* dep, Process *proc, ErtsProcLocks proc_locks); + #if ERTS_GLB_INLINE_INCL_FUNC_DEF ERTS_GLB_INLINE int @@ -197,41 +199,11 @@ retry: else if (connect) { ASSERT(dep->status == 0); erts_de_runlock(dep); - erts_de_rwlock(dep); - if (dep->status == 0) { - Process* net_kernel; - ErtsProcLocks nk_locks = ERTS_PROC_LOCK_MSGQ; - Eterm *hp; - ErlOffHeap *ohp; - ErtsMessage *mp; - Eterm msg, conn_id, dhandle; - - dep->status = ERTS_DE_SFLG_PENDING; - dep->flags = (DFLAG_DIST_MANDATORY | DFLAG_DIST_HOPEFULLY); - dep->connection_id++; - dep->connection_id &= ERTS_DIST_CON_ID_MASK; - conn_id = make_small(dep->connection_id); - erts_de_rwunlock(dep); - - net_kernel = erts_whereis_process(proc, proc_locks, - am_net_kernel, nk_locks, 0); - if (!net_kernel) { - if (deref_dep) - erts_deref_dist_entry(dep); - return ERTS_DSIG_PREP_NOT_ALIVE; - } - - /* Send {auto_connect, Node, ConnId, DHandle} to net_kernel */ - mp = erts_alloc_message_heap(net_kernel, &nk_locks, - 5 + ERTS_MAGIC_REF_THING_SIZE, - &hp, &ohp); - dhandle = erts_build_dhandle(&hp, ohp, dep); - msg = TUPLE4(hp, am_auto_connect, dep->sysname, conn_id, dhandle); - erts_queue_message(net_kernel, nk_locks, mp, msg, proc->common.id); - erts_proc_unlock(net_kernel, nk_locks); - } - else - erts_de_rwunlock(dep); + if (!erts_auto_connect(dep, proc, proc_locks)) { + if (deref_dep) + erts_deref_dist_entry(dep); + return ERTS_DSIG_PREP_NOT_ALIVE; + } goto retry; } else { -- cgit v1.2.3 From 6b48ef0226084a7f471d391abd4c1c399068840a Mon Sep 17 00:00:00 2001 From: Sverker Eriksson Date: Wed, 27 Sep 2017 19:02:10 +0200 Subject: erts: Put pending DistrEntry in separate list --- erts/emulator/beam/dist.c | 29 +++++--- erts/emulator/beam/erl_node_tables.c | 128 +++++++++++++++++++++++++---------- erts/emulator/beam/erl_node_tables.h | 3 + 3 files changed, 113 insertions(+), 47 deletions(-) (limited to 'erts/emulator') diff --git a/erts/emulator/beam/dist.c b/erts/emulator/beam/dist.c index 3e8c4c65b5..44f56ebe28 100644 --- a/erts/emulator/beam/dist.c +++ b/erts/emulator/beam/dist.c @@ -3062,6 +3062,10 @@ int distribution_info(fmtfn_t to, void *arg) /* Called by break handler */ info_dist_entry(to, arg, dep, 0, 1); } + for (dep = erts_pending_dist_entries; dep; dep = dep->next) { + info_dist_entry(to, arg, dep, 0, 0); + } + for (dep = erts_not_connected_dist_entries; dep; dep = dep->next) { if (dep != erts_this_dist_entry) { info_dist_entry(to, arg, dep, 0, 0); @@ -3441,10 +3445,7 @@ BIF_RETTYPE new_connection_id_1(BIF_ALIST_1) if (ERTS_DE_IS_CONNECTED(dep) || dep->status & ERTS_DE_SFLG_PENDING) conn_id = dep->connection_id; else if (dep->status == 0) { - dep->status = ERTS_DE_SFLG_PENDING; - dep->flags = (DFLAG_DIST_MANDATORY | DFLAG_DIST_HOPEFULLY); - dep->connection_id++; - dep->connection_id &= ERTS_DIST_CON_ID_MASK; + erts_set_dist_entry_pending(dep); conn_id = dep->connection_id; } else { @@ -3489,8 +3490,6 @@ static Sint abort_pending_connection(DistEntry* dep, Uint32 conn_id) cache = dep->cache; dep->cache = NULL; - dep->status = 0; - dep->flags = 0; erts_mtx_lock(&dep->qlock); obuf = dep->out_queue.first; dep->out_queue.first = NULL; @@ -3501,6 +3500,9 @@ static Sint abort_pending_connection(DistEntry* dep, Uint32 conn_id) erts_mtx_unlock(&dep->qlock); erts_atomic_set_nob(&dep->dist_cmd_scheduled, 0); dep->send = NULL; + + erts_set_dist_entry_not_connected(dep); + erts_de_rwunlock(dep); erts_sweep_monitors(monitors, &doit_monitor_net_exits, &nec); @@ -3556,9 +3558,7 @@ int erts_auto_connect(DistEntry* dep, Process *proc, ErtsProcLocks proc_locks) Eterm msg, dhandle; Uint32 conn_id; - dep->status = ERTS_DE_SFLG_PENDING; - dep->flags = (DFLAG_DIST_MANDATORY | DFLAG_DIST_HOPEFULLY); - dep->connection_id = (dep->connection_id + 1) & ERTS_DIST_CON_ID_MASK; + erts_set_dist_entry_pending(dep); conn_id = dep->connection_id; erts_de_rwunlock(dep); @@ -3655,9 +3655,11 @@ BIF_RETTYPE nodes_1(BIF_ALIST_1) ASSERT(erts_no_of_not_connected_dist_entries > 0); ASSERT(erts_no_of_hidden_dist_entries >= 0); + ASSERT(erts_no_of_pending_dist_entries >= 0); ASSERT(erts_no_of_visible_dist_entries >= 0); if(not_connected) - length += (erts_no_of_not_connected_dist_entries - 1); + length += ((erts_no_of_not_connected_dist_entries - 1) + + erts_no_of_pending_dist_entries); if(hidden) length += erts_no_of_hidden_dist_entries; if(visible) @@ -3677,13 +3679,18 @@ BIF_RETTYPE nodes_1(BIF_ALIST_1) #ifdef DEBUG endp = hp + length*2; #endif - if(not_connected) + if(not_connected) { for(dep = erts_not_connected_dist_entries; dep; dep = dep->next) { if (dep != erts_this_dist_entry) { result = CONS(hp, dep->sysname, result); hp += 2; } + } + for(dep = erts_pending_dist_entries; dep; dep = dep->next) { + result = CONS(hp, dep->sysname, result); + hp += 2; } + } if(hidden) for(dep = erts_hidden_dist_entries; dep; dep = dep->next) { result = CONS(hp, dep->sysname, result); diff --git a/erts/emulator/beam/erl_node_tables.c b/erts/emulator/beam/erl_node_tables.c index 7c75ec60e9..eaf133f5c0 100644 --- a/erts/emulator/beam/erl_node_tables.c +++ b/erts/emulator/beam/erl_node_tables.c @@ -39,9 +39,11 @@ erts_rwmtx_t erts_node_table_rwmtx; DistEntry *erts_hidden_dist_entries; DistEntry *erts_visible_dist_entries; +DistEntry *erts_pending_dist_entries; DistEntry *erts_not_connected_dist_entries; /* including erts_this_dist_entry */ Sint erts_no_of_hidden_dist_entries; Sint erts_no_of_visible_dist_entries; +Sint erts_no_of_pending_dist_entries; Sint erts_no_of_not_connected_dist_entries; /* including erts_this_dist_entry */ DistEntry *erts_this_dist_entry; @@ -522,12 +524,17 @@ erts_dist_table_size(void) i++; ASSERT(i == erts_no_of_hidden_dist_entries); i = 0; + for(dep = erts_pending_dist_entries; dep; dep = dep->next) + i++; + ASSERT(i == erts_no_of_pending_dist_entries); + i = 0; for(dep = erts_not_connected_dist_entries; dep; dep = dep->next) i++; ASSERT(i == erts_no_of_not_connected_dist_entries); ASSERT(dist_entries == (erts_no_of_visible_dist_entries + erts_no_of_hidden_dist_entries + + erts_no_of_pending_dist_entries + erts_no_of_not_connected_dist_entries)); #endif @@ -542,43 +549,46 @@ erts_dist_table_size(void) void erts_set_dist_entry_not_connected(DistEntry *dep) { + DistEntry** head; + ERTS_LC_ASSERT(erts_lc_is_de_rwlocked(dep)); erts_rwmtx_rwlock(&erts_dist_table_rwmtx); ASSERT(dep != erts_this_dist_entry); - ASSERT(is_internal_port(dep->cid) || is_internal_pid(dep->cid)); - if(dep->flags & DFLAG_PUBLISHED) { - if(dep->prev) { - ASSERT(is_in_de_list(dep, erts_visible_dist_entries)); - dep->prev->next = dep->next; - } - else { - ASSERT(erts_visible_dist_entries == dep); - erts_visible_dist_entries = dep->next; - } - - ASSERT(erts_no_of_visible_dist_entries > 0); - erts_no_of_visible_dist_entries--; + if (dep->status & ERTS_DE_SFLG_PENDING) { + ASSERT(is_nil(dep->cid)); + ASSERT(erts_no_of_pending_dist_entries > 0); + erts_no_of_pending_dist_entries--; + head = &erts_pending_dist_entries; } else { - if(dep->prev) { - ASSERT(is_in_de_list(dep, erts_hidden_dist_entries)); - dep->prev->next = dep->next; - } - else { - ASSERT(erts_hidden_dist_entries == dep); - erts_hidden_dist_entries = dep->next; - } - - ASSERT(erts_no_of_hidden_dist_entries > 0); - erts_no_of_hidden_dist_entries--; + ASSERT(dep->status != 0); + ASSERT(is_internal_port(dep->cid) || is_internal_pid(dep->cid)); + if (dep->flags & DFLAG_PUBLISHED) { + ASSERT(erts_no_of_visible_dist_entries > 0); + erts_no_of_visible_dist_entries--; + head = &erts_visible_dist_entries; + } + else { + ASSERT(erts_no_of_hidden_dist_entries > 0); + erts_no_of_hidden_dist_entries--; + head = &erts_hidden_dist_entries; + } } + if(dep->prev) { + ASSERT(is_in_de_list(dep, *head)); + dep->prev->next = dep->next; + } + else { + ASSERT(*head == dep); + *head = dep->next; + } if(dep->next) dep->next->prev = dep->prev; - dep->status &= ~ERTS_DE_SFLG_CONNECTED; + dep->status &= ~(ERTS_DE_SFLG_PENDING | ERTS_DE_SFLG_CONNECTED); dep->flags = 0; dep->prev = NULL; dep->cid = NIL; @@ -593,6 +603,45 @@ erts_set_dist_entry_not_connected(DistEntry *dep) erts_rwmtx_rwunlock(&erts_dist_table_rwmtx); } +void +erts_set_dist_entry_pending(DistEntry *dep) +{ + ERTS_LC_ASSERT(erts_lc_is_de_rwlocked(dep)); + erts_rwmtx_rwlock(&erts_dist_table_rwmtx); + + ASSERT(dep != erts_this_dist_entry); + ASSERT(dep->status == 0); + ASSERT(is_nil(dep->cid)); + + if(dep->prev) { + ASSERT(is_in_de_list(dep, erts_not_connected_dist_entries)); + dep->prev->next = dep->next; + } + else { + ASSERT(dep == erts_not_connected_dist_entries); + erts_not_connected_dist_entries = dep->next; + } + + if(dep->next) + dep->next->prev = dep->prev; + + erts_no_of_not_connected_dist_entries--; + + dep->status = ERTS_DE_SFLG_PENDING; + dep->flags = (DFLAG_DIST_MANDATORY | DFLAG_DIST_HOPEFULLY); + dep->connection_id = (dep->connection_id + 1) & ERTS_DIST_CON_ID_MASK; + + dep->prev = NULL; + dep->next = erts_pending_dist_entries; + if(erts_pending_dist_entries) { + ASSERT(erts_pending_dist_entries->prev == NULL); + erts_pending_dist_entries->prev = dep; + } + erts_pending_dist_entries = dep; + erts_no_of_pending_dist_entries++; + erts_rwmtx_rwunlock(&erts_dist_table_rwmtx); +} + void erts_set_dist_entry_connected(DistEntry *dep, Eterm cid, Uint flags) { @@ -603,29 +652,25 @@ erts_set_dist_entry_connected(DistEntry *dep, Eterm cid, Uint flags) ASSERT(dep != erts_this_dist_entry); ASSERT(is_nil(dep->cid)); + ASSERT(dep->status & ERTS_DE_SFLG_PENDING); ASSERT(is_internal_port(cid) || is_internal_pid(cid)); if(dep->prev) { - ASSERT(is_in_de_list(dep, erts_not_connected_dist_entries)); + ASSERT(is_in_de_list(dep, erts_pending_dist_entries)); dep->prev->next = dep->next; } else { - ASSERT(erts_not_connected_dist_entries == dep); - erts_not_connected_dist_entries = dep->next; + ASSERT(erts_pending_dist_entries == dep); + erts_pending_dist_entries = dep->next; } if(dep->next) dep->next->prev = dep->prev; - ASSERT(erts_no_of_not_connected_dist_entries > 0); - erts_no_of_not_connected_dist_entries--; + ASSERT(erts_no_of_pending_dist_entries > 0); + erts_no_of_pending_dist_entries--; - if (dep->status & ERTS_DE_SFLG_PENDING) { - dep->status &= ~ERTS_DE_SFLG_PENDING; - } else { - dep->connection_id++; - dep->connection_id &= ERTS_DIST_CON_ID_MASK; - } + dep->status &= ~ERTS_DE_SFLG_PENDING; dep->status |= ERTS_DE_SFLG_CONNECTED; dep->flags = flags & ~DFLAG_NO_MAGIC; dep->cid = cid; @@ -959,9 +1004,11 @@ void erts_init_node_tables(int dd_sec) erts_hidden_dist_entries = NULL; erts_visible_dist_entries = NULL; + erts_pending_dist_entries = NULL; erts_not_connected_dist_entries = NULL; erts_no_of_hidden_dist_entries = 0; erts_no_of_visible_dist_entries = 0; + erts_no_of_pending_dist_entries = 0; erts_no_of_not_connected_dist_entries = 0; node_tmpl.sysname = am_Noname; @@ -1729,6 +1776,15 @@ setup_reference_table(void) insert_monitors(dep->monitors, dep->sysname); } + for(dep = erts_pending_dist_entries; dep; dep = dep->next) { + if(dep->nlinks) + insert_links2(dep->nlinks, dep->sysname); + if(dep->node_links) + insert_links(dep->node_links, dep->sysname); + if(dep->monitors) + insert_monitors(dep->monitors, dep->sysname); + } + /* Not connected dist entries should not have any links, but inspect them anyway */ for(dep = erts_not_connected_dist_entries; dep; dep = dep->next) { diff --git a/erts/emulator/beam/erl_node_tables.h b/erts/emulator/beam/erl_node_tables.h index 83d0678bbc..8d29c83e15 100644 --- a/erts/emulator/beam/erl_node_tables.h +++ b/erts/emulator/beam/erl_node_tables.h @@ -177,9 +177,11 @@ extern erts_rwmtx_t erts_node_table_rwmtx; extern DistEntry *erts_hidden_dist_entries; extern DistEntry *erts_visible_dist_entries; +extern DistEntry *erts_pending_dist_entries; extern DistEntry *erts_not_connected_dist_entries; extern Sint erts_no_of_hidden_dist_entries; extern Sint erts_no_of_visible_dist_entries; +extern Sint erts_no_of_pending_dist_entries; extern Sint erts_no_of_not_connected_dist_entries; extern DistEntry *erts_this_dist_entry; @@ -195,6 +197,7 @@ void erts_schedule_delete_dist_entry(DistEntry *); Uint erts_dist_table_size(void); void erts_dist_table_info(fmtfn_t, void *); void erts_set_dist_entry_not_connected(DistEntry *); +void erts_set_dist_entry_pending(DistEntry *); void erts_set_dist_entry_connected(DistEntry *, Eterm, Uint); ErlNode *erts_find_or_insert_node(Eterm, Uint32); void erts_schedule_delete_node(ErlNode *); -- cgit v1.2.3 From 0a6ccd3d1fb1d9cacd7369533d6e5aa1fc2ded77 Mon Sep 17 00:00:00 2001 From: Sverker Eriksson Date: Wed, 27 Sep 2017 19:06:23 +0200 Subject: Abort all pending connections if net_kernel terminates --- erts/emulator/beam/dist.c | 66 ++++++++++++++++++++++++++++++----------------- 1 file changed, 43 insertions(+), 23 deletions(-) (limited to 'erts/emulator') diff --git a/erts/emulator/beam/dist.c b/erts/emulator/beam/dist.c index 44f56ebe28..1b5b730764 100644 --- a/erts/emulator/beam/dist.c +++ b/erts/emulator/beam/dist.c @@ -476,41 +476,54 @@ int erts_do_net_exits(DistEntry *dep, Eterm reason) if (dep == erts_this_dist_entry) { /* Net kernel has died (clean up!!) */ DistEntry *tdep; - int no_dist_ctrl = 0; + int no_dist_ctrl; + int no_pending; Eterm nd_reason = (reason == am_no_network ? am_no_network : am_net_kernel_terminated); + int i = 0; + Eterm *dist_ctrl; + DistEntry** pending; + + ERTS_UNDEF(dist_ctrl, NULL); + ERTS_UNDEF(pending, NULL); + erts_rwmtx_rlock(&erts_dist_table_rwmtx); - for (tdep = erts_hidden_dist_entries; tdep; tdep = tdep->next) - no_dist_ctrl++; - for (tdep = erts_visible_dist_entries; tdep; tdep = tdep->next) - no_dist_ctrl++; + no_dist_ctrl = (erts_no_of_hidden_dist_entries + + erts_no_of_visible_dist_entries); + no_pending = erts_no_of_pending_dist_entries; /* KILL all port controllers */ - if (no_dist_ctrl == 0) - erts_rwmtx_runlock(&erts_dist_table_rwmtx); - else { - Eterm def_buf[128]; - int i = 0; - Eterm *dist_ctrl; - - if (no_dist_ctrl <= sizeof(def_buf)/sizeof(def_buf[0])) - dist_ctrl = &def_buf[0]; - else - dist_ctrl = erts_alloc(ERTS_ALC_T_TMP, - sizeof(Eterm)*no_dist_ctrl); + if (no_dist_ctrl) { + dist_ctrl = erts_alloc(ERTS_ALC_T_TMP, + sizeof(Eterm)*no_dist_ctrl); for (tdep = erts_hidden_dist_entries; tdep; tdep = tdep->next) { ASSERT(is_internal_port(tdep->cid) || is_internal_pid(tdep->cid)); + ASSERT(i < no_dist_ctrl); dist_ctrl[i++] = tdep->cid; } for (tdep = erts_visible_dist_entries; tdep; tdep = tdep->next) { ASSERT(is_internal_port(tdep->cid) || is_internal_pid(tdep->cid)); + ASSERT(i < no_dist_ctrl); dist_ctrl[i++] = tdep->cid; } - erts_rwmtx_runlock(&erts_dist_table_rwmtx); + ASSERT(i == no_dist_ctrl); + } + if (no_pending) { + pending = erts_alloc(ERTS_ALC_T_TMP, sizeof(DistEntry*)*no_pending); + i = 0; + for (tdep = erts_pending_dist_entries; tdep; tdep = tdep->next) { + ASSERT(is_nil(tdep->cid)); + ASSERT(i < no_pending); + pending[i++] = tdep; + } + ASSERT(i == no_pending); + } + erts_rwmtx_runlock(&erts_dist_table_rwmtx); - for (i = 0; i < no_dist_ctrl; i++) { + if (no_dist_ctrl) { + for (i = 0; i < no_dist_ctrl; i++) { if (is_internal_pid(dist_ctrl[i])) schedule_kill_dist_ctrl_proc(dist_ctrl[i]); else { @@ -524,11 +537,17 @@ int erts_do_net_exits(DistEntry *dep, Eterm reason) prt, dist_ctrl[i], nd_reason, NULL); } } - } + } + erts_free(ERTS_ALC_T_TMP, dist_ctrl); + } + + if (no_pending) { + for (i = 0; i < no_pending; i++) { + abort_pending_connection(pending[i], pending[i]->connection_id); + } + erts_free(ERTS_ALC_T_TMP, pending); + } - if (dist_ctrl != &def_buf[0]) - erts_free(ERTS_ALC_T_TMP, dist_ctrl); - } /* * When last dist ctrl exits, node will be taken @@ -3565,6 +3584,7 @@ int erts_auto_connect(DistEntry* dep, Process *proc, ErtsProcLocks proc_locks) net_kernel = erts_whereis_process(proc, proc_locks, am_net_kernel, nk_locks, 0); if (!net_kernel) { + abort_pending_connection(dep, conn_id); return 0; } -- cgit v1.2.3 From 6c4a3094d0263e46827c6b1869a6de0c033b6b64 Mon Sep 17 00:00:00 2001 From: Sverker Eriksson Date: Wed, 27 Sep 2017 19:45:42 +0200 Subject: Improve connection aborting --- erts/emulator/beam/dist.c | 120 +++++++++++++++++++++++++++++++--------------- erts/emulator/beam/dist.h | 3 +- 2 files changed, 82 insertions(+), 41 deletions(-) (limited to 'erts/emulator') diff --git a/erts/emulator/beam/dist.c b/erts/emulator/beam/dist.c index 1b5b730764..4a25fbdd4c 100644 --- a/erts/emulator/beam/dist.c +++ b/erts/emulator/beam/dist.c @@ -120,7 +120,7 @@ static void clear_dist_entry(DistEntry*); static int dsig_send_ctl(ErtsDSigData* dsdp, Eterm ctl, int force_busy); static void send_nodes_mon_msgs(Process *, Eterm, Eterm, Eterm, Eterm); static void init_nodes_monitors(void); -static Sint abort_pending_connection(DistEntry* dep, Uint32 conn_id); +static Sint abort_connection(DistEntry* dep, Uint32 conn_id); static erts_atomic_t no_caches; static erts_atomic_t no_nodes; @@ -517,6 +517,7 @@ int erts_do_net_exits(DistEntry *dep, Eterm reason) ASSERT(is_nil(tdep->cid)); ASSERT(i < no_pending); pending[i++] = tdep; + erts_ref_dist_entry(tdep); } ASSERT(i == no_pending); } @@ -543,7 +544,8 @@ int erts_do_net_exits(DistEntry *dep, Eterm reason) if (no_pending) { for (i = 0; i < no_pending; i++) { - abort_pending_connection(pending[i], pending[i]->connection_id); + abort_connection(pending[i], pending[i]->connection_id); + erts_deref_dist_entry(pending[i]); } erts_free(ERTS_ALC_T_TMP, pending); } @@ -624,7 +626,6 @@ int erts_do_net_exits(DistEntry *dep, Eterm reason) reason == am_normal ? am_connection_closed : reason); clear_dist_entry(dep); - } dec_no_nodes(); @@ -2904,24 +2905,31 @@ erts_dist_port_not_busy(Port *prt) erts_schedule_dist_command(prt, NULL); } +static void kill_connection(DistEntry *dep) +{ + ERTS_LC_ASSERT(erts_lc_is_de_rwlocked(dep)); + ASSERT(dep->status == ERTS_DE_SFLG_CONNECTED); + + dep->status |= ERTS_DE_SFLG_EXITING; + erts_mtx_lock(&dep->qlock); + ASSERT(!(erts_atomic32_read_nob(&dep->qflgs) & ERTS_DE_QFLG_EXIT)); + erts_atomic32_read_bor_nob(&dep->qflgs, ERTS_DE_QFLG_EXIT); + erts_mtx_unlock(&dep->qlock); + + if (is_internal_port(dep->cid)) + erts_schedule_dist_command(NULL, dep); + else if (is_internal_pid(dep->cid)) + schedule_kill_dist_ctrl_proc(dep->cid); +} + void erts_kill_dist_connection(DistEntry *dep, Uint32 connection_id) { erts_de_rwlock(dep); if (connection_id == dep->connection_id - && !(dep->status & ERTS_DE_SFLG_EXITING)) { - - dep->status |= ERTS_DE_SFLG_EXITING; - - erts_mtx_lock(&dep->qlock); - ASSERT(!(erts_atomic32_read_nob(&dep->qflgs) & ERTS_DE_QFLG_EXIT)); - erts_atomic32_read_bor_nob(&dep->qflgs, ERTS_DE_QFLG_EXIT); - erts_mtx_unlock(&dep->qlock); + && dep->status == ERTS_DE_SFLG_CONNECTED) { - if (is_internal_port(dep->cid)) - erts_schedule_dist_command(NULL, dep); - else if (is_internal_pid(dep->cid)) - schedule_kill_dist_ctrl_proc(dep->cid); + kill_connection(dep); } erts_de_rwunlock(dep); } @@ -3259,6 +3267,11 @@ BIF_RETTYPE setnode_3(BIF_ALIST_3) goto badarg; } + /* + * ToDo: Should we not pass connection_id as well + * to make sure it's the right connection we commit. + */ + /* * Arguments seem to be in order. */ @@ -3302,6 +3315,23 @@ BIF_RETTYPE setnode_3(BIF_ALIST_3) goto badarg; } + if (dep->status & ERTS_DE_SFLG_EXITING) { + /* Suspend on dist entry waiting for the exit to finish */ + ErtsProcList *plp = erts_proclist_create(BIF_P); + plp->next = NULL; + erts_suspend(BIF_P, ERTS_PROC_LOCK_MAIN, NULL); + erts_mtx_lock(&dep->qlock); + erts_proclist_store_last(&dep->suspended, plp); + erts_mtx_unlock(&dep->qlock); + goto yield; + } + if (dep->status != ERTS_DE_SFLG_PENDING) { + if (dep->status == 0) + erts_set_dist_entry_pending(dep); + else + goto badarg; + } + if (is_not_nil(dep->cid)) goto badarg; @@ -3344,8 +3374,12 @@ BIF_RETTYPE setnode_3(BIF_ALIST_3) erts_mtx_unlock(&dep->qlock); goto yield; } - - ASSERT(!(dep->status & ERTS_DE_SFLG_EXITING)); + if (dep->status != ERTS_DE_SFLG_PENDING) { + if (dep->status == 0) + erts_set_dist_entry_pending(dep); + else + goto badarg; + } if (pp->dist_entry || is_not_nil(dep->cid)) goto badarg; @@ -3457,7 +3491,11 @@ BIF_RETTYPE new_connection_id_1(BIF_ALIST_1) BIF_ERROR(BIF_P, BADARG); } dep = erts_find_or_insert_dist_entry(BIF_ARG_1); - ASSERT(dep != erts_this_dist_entry); /* SVERK: What to do? */ + + if (dep == erts_this_dist_entry) { + erts_deref_dist_entry(dep); + BIF_ERROR(BIF_P, BADARG); + } erts_de_rwlock(dep); @@ -3468,8 +3506,8 @@ BIF_RETTYPE new_connection_id_1(BIF_ALIST_1) conn_id = dep->connection_id; } else { - ASSERT(!"SVERK: What to do?"); - conn_id = dep->connection_id; + ASSERT(dep->status & ERTS_DE_SFLG_EXITING); + conn_id = (dep->connection_id + 1) & ERTS_DIST_CON_ID_MASK; } erts_de_rwunlock(dep); hp = HAlloc(BIF_P, 3 + ERTS_MAGIC_REF_THING_SIZE); @@ -3478,23 +3516,24 @@ BIF_RETTYPE new_connection_id_1(BIF_ALIST_1) BIF_RET(TUPLE2(hp, make_small(conn_id), dhandle)); } -static Sint abort_pending_connection(DistEntry* dep, Uint32 conn_id) +static Sint abort_connection(DistEntry* dep, Uint32 conn_id) { - Sint reds = 0; - erts_de_rwlock(dep); - if (dep->status != ERTS_DE_SFLG_PENDING || dep->connection_id != conn_id) { - erts_de_rwunlock(dep); + if (dep->connection_id != conn_id) + ; + else if (dep->status == ERTS_DE_SFLG_CONNECTED) { + kill_connection(dep); } - else { - NetExitsContext nec = {dep}; - ErtsLink *nlinks; - ErtsLink *node_links; - ErtsMonitor *monitors; - ErtsAtomCache *cache; - ErtsDistOutputBuf *obuf; + else if (dep->status == ERTS_DE_SFLG_PENDING) { + NetExitsContext nec = {dep}; + ErtsLink *nlinks; + ErtsLink *node_links; + ErtsMonitor *monitors; + ErtsAtomCache *cache; + ErtsDistOutputBuf *obuf; ErtsProcList *resume_procs; + Sint reds = 0; ASSERT(is_nil(dep->cid)); @@ -3534,10 +3573,11 @@ static Sint abort_pending_connection(DistEntry* dep, Uint32 conn_id) } delete_cache(cache); - free_de_out_queues(dep, obuf); + return reds; } - return reds; + erts_de_rwunlock(dep); + return 0; } BIF_RETTYPE abort_connection_id_2(BIF_ALIST_2) @@ -3550,13 +3590,13 @@ BIF_RETTYPE abort_connection_id_2(BIF_ALIST_2) } tp = tuple_val(BIF_ARG_2); dep = erts_dhandle_to_dist_entry(tp[2]); - if (is_not_small(tp[1]) || dep != erts_find_dist_entry(BIF_ARG_1)) { + if (is_not_small(tp[1]) || dep != erts_find_dist_entry(BIF_ARG_1) + || dep == erts_this_dist_entry) { BIF_ERROR(BIF_P, BADARG); } - ASSERT(dep != erts_this_dist_entry); /* SVERK: What to do? */ if (dep) { - Sint reds = abort_pending_connection(dep, unsigned_val(tp[1])); + Sint reds = abort_connection(dep, unsigned_val(tp[1])); BUMP_REDS(BIF_P, reds); } BIF_RET(am_true); @@ -3584,11 +3624,13 @@ int erts_auto_connect(DistEntry* dep, Process *proc, ErtsProcLocks proc_locks) net_kernel = erts_whereis_process(proc, proc_locks, am_net_kernel, nk_locks, 0); if (!net_kernel) { - abort_pending_connection(dep, conn_id); + abort_connection(dep, conn_id); return 0; } - /* Send {auto_connect, Node, ConnId, DHandle} to net_kernel */ + /* + * Send {auto_connect, Node, ConnId, DHandle} to net_kernel + */ mp = erts_alloc_message_heap(net_kernel, &nk_locks, 5 + ERTS_MAGIC_REF_THING_SIZE, &hp, &ohp); diff --git a/erts/emulator/beam/dist.h b/erts/emulator/beam/dist.h index 2685a55b97..a205ce0cb5 100644 --- a/erts/emulator/beam/dist.h +++ b/erts/emulator/beam/dist.h @@ -172,7 +172,7 @@ erts_dsig_prepare(ErtsDSigData *dsdp, return ERTS_DSIG_PREP_NOT_CONNECTED; dep = erts_find_or_insert_dist_entry(dsdp->node); - ASSERT(dep != erts_this_dist_entry); /* SVERK: What to do? */ + ASSERT(dep != erts_this_dist_entry); deref_dep = 1; } @@ -192,7 +192,6 @@ retry: res = ERTS_DSIG_PREP_PENDING; } else if (dep->status & ERTS_DE_SFLG_EXITING) { - /* SVERK is this ok, or should we trigger another connection setup */ res = ERTS_DSIG_PREP_NOT_CONNECTED; goto fail; } -- cgit v1.2.3 From 2f7a0afbbf34699ac1cc1733e3634c1cbeb94c5d Mon Sep 17 00:00:00 2001 From: Sverker Eriksson Date: Wed, 11 Oct 2017 20:38:33 +0200 Subject: Refactor erts_dsig_prepare argument dep(p) Don't need to be pointer-pointer --- erts/emulator/beam/bif.c | 28 +++++++++++++++++----------- erts/emulator/beam/dist.c | 6 +++--- erts/emulator/beam/dist.h | 21 ++++----------------- erts/emulator/beam/erl_process.c | 8 ++++---- erts/emulator/beam/io.c | 2 +- 5 files changed, 29 insertions(+), 36 deletions(-) (limited to 'erts/emulator') diff --git a/erts/emulator/beam/bif.c b/erts/emulator/beam/bif.c index 30278cbe36..50699eac31 100644 --- a/erts/emulator/beam/bif.c +++ b/erts/emulator/beam/bif.c @@ -227,7 +227,7 @@ BIF_RETTYPE link_1(BIF_ALIST_1) goto res_no_proc; } - code = erts_dsig_prepare(&dsd, &dep, BIF_P, + code = erts_dsig_prepare(&dsd, dep, BIF_P, (ERTS_PROC_LOCK_MAIN | ERTS_PROC_LOCK_LINK), ERTS_DSP_RLOCK, 0, 1); switch (code) { @@ -314,7 +314,7 @@ remote_demonitor(Process *c_p, DistEntry *dep, Eterm ref, Eterm to) ERTS_LC_ASSERT((ERTS_PROC_LOCK_MAIN|ERTS_PROC_LOCK_LINK) == erts_proc_lc_my_proc_locks(c_p)); - code = erts_dsig_prepare(&dsd, &dep, c_p, ERTS_PROC_LOCK_MAIN, + code = erts_dsig_prepare(&dsd, dep, c_p, ERTS_PROC_LOCK_MAIN, ERTS_DSP_RLOCK, 0, 0); switch (code) { case ERTS_DSIG_PREP_NOT_ALIVE: @@ -792,7 +792,7 @@ remote_monitor(Process *p, Eterm bifarg1, Eterm bifarg2, ASSERT(dep); erts_proc_lock(p, ERTS_PROC_LOCK_LINK); - code = erts_dsig_prepare(&dsd, &dep, + code = erts_dsig_prepare(&dsd, dep, p, (ERTS_PROC_LOCK_MAIN | ERTS_PROC_LOCK_LINK), ERTS_DSP_RLOCK, 0, 1); switch (code) { @@ -1176,7 +1176,7 @@ BIF_RETTYPE unlink_1(BIF_ALIST_1) BIF_RET(am_true); } - code = erts_dsig_prepare(&dsd, &dep, BIF_P, ERTS_PROC_LOCK_MAIN, + code = erts_dsig_prepare(&dsd, dep, BIF_P, ERTS_PROC_LOCK_MAIN, ERTS_DSP_NO_LOCK, 0, 0); switch (code) { case ERTS_DSIG_PREP_NOT_ALIVE: @@ -1560,7 +1560,7 @@ BIF_RETTYPE exit_2(BIF_ALIST_2) if(dep == erts_this_dist_entry) BIF_RET(am_true); - code = erts_dsig_prepare(&dsd, &dep, BIF_P, ERTS_PROC_LOCK_MAIN, + code = erts_dsig_prepare(&dsd, dep, BIF_P, ERTS_PROC_LOCK_MAIN, ERTS_DSP_NO_LOCK, 0, 1); switch (code) { case ERTS_DSIG_PREP_NOT_ALIVE: @@ -1996,7 +1996,7 @@ static Sint remote_send(Process *p, DistEntry *dep, ASSERT(is_atom(to) || is_external_pid(to)); ctx->dep = dep; - code = erts_dsig_prepare(&ctx->dsd, &dep, p, ERTS_PROC_LOCK_MAIN, + code = erts_dsig_prepare(&ctx->dsd, dep, p, ERTS_PROC_LOCK_MAIN, ERTS_DSP_NO_LOCK, !ctx->suspend, ctx->connect); switch (code) { @@ -2185,6 +2185,7 @@ do_send(Process *p, Eterm to, Eterm msg, Eterm *refp, ErtsSendContext *ctx) } return ret_val; } else if (is_tuple(to)) { /* Remote send */ + int deref_dep = 0; int ret; tp = tuple_val(to); if (*tp != make_arityval(2)) @@ -2219,15 +2220,20 @@ do_send(Process *p, Eterm to, Eterm msg, Eterm *refp, ErtsSendContext *ctx) } return 0; } + if (dep == NULL) { + dep = erts_find_or_insert_dist_entry(tp[2]); + ASSERT(dep != erts_this_dist_entry); + deref_dep = 1; + } ctx->dsd.node = tp[2]; ret = remote_send(p, dep, tp[1], to, msg, ctx); if (ret == SEND_YIELD_CONTINUE) { - if (dep) { - erts_ref_dist_entry(dep); - ctx->deref_dep = 1; - } + erts_ref_dist_entry(ctx->dep); + ctx->deref_dep = 1; } + if (deref_dep) + erts_deref_dist_entry(dep); return ret; } else { if (IS_TRACED_FL(p, F_TRACE_SEND)) @@ -4405,7 +4411,7 @@ BIF_RETTYPE group_leader_2(BIF_ALIST_2) if(dep == erts_this_dist_entry) BIF_ERROR(BIF_P, BADARG); - code = erts_dsig_prepare(&dsd, &dep, BIF_P, ERTS_PROC_LOCK_MAIN, + code = erts_dsig_prepare(&dsd, dep, BIF_P, ERTS_PROC_LOCK_MAIN, ERTS_DSP_NO_LOCK, 0, 1); switch (code) { case ERTS_DSIG_PREP_NOT_ALIVE: diff --git a/erts/emulator/beam/dist.c b/erts/emulator/beam/dist.c index 4a25fbdd4c..ae64f394c8 100644 --- a/erts/emulator/beam/dist.c +++ b/erts/emulator/beam/dist.c @@ -1349,7 +1349,7 @@ int erts_net_message(Port *prt, /* This is tricky (we MUST force a distributed send) */ ErtsDSigData dsd; int code; - code = erts_dsig_prepare(&dsd, &dep, NULL, 0, ERTS_DSP_NO_LOCK, 0, 0); + code = erts_dsig_prepare(&dsd, dep, NULL, 0, ERTS_DSP_NO_LOCK, 0, 0); if (code == ERTS_DSIG_PREP_CONNECTED) { code = erts_dsig_send_exit(&dsd, to, from, am_noproc); ASSERT(code == ERTS_DSIG_SEND_OK); @@ -1441,7 +1441,7 @@ int erts_net_message(Port *prt, if (!rp) { ErtsDSigData dsd; int code; - code = erts_dsig_prepare(&dsd, &dep, NULL, 0, ERTS_DSP_NO_LOCK, 0, 0); + code = erts_dsig_prepare(&dsd, dep, NULL, 0, ERTS_DSP_NO_LOCK, 0, 0); if (code == ERTS_DSIG_PREP_CONNECTED) { code = erts_dsig_send_m_exit(&dsd, watcher, watched, ref, am_noproc); @@ -3831,7 +3831,7 @@ monitor_node(Process* p, Eterm Node, Eterm Bool, Eterm Options) erts_proc_lock(p, ERTS_PROC_LOCK_LINK); - switch (erts_dsig_prepare(&dsd, &dep, p, + switch (erts_dsig_prepare(&dsd, dep, p, (ERTS_PROC_LOCK_MAIN | ERTS_PROC_LOCK_LINK), ERTS_DSP_RLOCK, 0, async_connect)) { case ERTS_DSIG_PREP_NOT_ALIVE: diff --git a/erts/emulator/beam/dist.h b/erts/emulator/beam/dist.h index a205ce0cb5..a96e39be89 100644 --- a/erts/emulator/beam/dist.h +++ b/erts/emulator/beam/dist.h @@ -138,7 +138,7 @@ extern int erts_is_alive; #define ERTS_DSIG_PREP_PENDING 4 ERTS_GLB_INLINE int erts_dsig_prepare(ErtsDSigData *, - DistEntry **, + DistEntry*, Process *, ErtsProcLocks, ErtsDSigPrepLock, @@ -154,26 +154,20 @@ int erts_auto_connect(DistEntry* dep, Process *proc, ErtsProcLocks proc_locks); ERTS_GLB_INLINE int erts_dsig_prepare(ErtsDSigData *dsdp, - DistEntry **depp, + DistEntry *dep, Process *proc, ErtsProcLocks proc_locks, ErtsDSigPrepLock dspl, int no_suspend, int connect) { - DistEntry* dep = *depp; - int deref_dep = 0; int res; if (!erts_is_alive) return ERTS_DSIG_PREP_NOT_ALIVE; if (!dep) { - if (!connect) - return ERTS_DSIG_PREP_NOT_CONNECTED; - - dep = erts_find_or_insert_dist_entry(dsdp->node); - ASSERT(dep != erts_this_dist_entry); - deref_dep = 1; + ASSERT(!connect); + return ERTS_DSIG_PREP_NOT_CONNECTED; } #ifdef ERTS_ENABLE_LOCK_CHECK @@ -199,8 +193,6 @@ retry: ASSERT(dep->status == 0); erts_de_runlock(dep); if (!erts_auto_connect(dep, proc, proc_locks)) { - if (deref_dep) - erts_deref_dist_entry(dep); return ERTS_DSIG_PREP_NOT_ALIVE; } goto retry; @@ -224,15 +216,10 @@ retry: dsdp->no_suspend = no_suspend; if (dspl == ERTS_DSP_NO_LOCK) erts_de_runlock(dep); - if (deref_dep) - erts_deref_dist_entry(dep); - *depp = dep; return res; fail: erts_de_runlock(dep); - if (deref_dep) - erts_deref_dist_entry(dep); return res; } diff --git a/erts/emulator/beam/erl_process.c b/erts/emulator/beam/erl_process.c index 2d13cd92b2..a807d60ec7 100644 --- a/erts/emulator/beam/erl_process.c +++ b/erts/emulator/beam/erl_process.c @@ -12659,7 +12659,7 @@ static void doit_exit_monitor(ErtsMonitor *mon, void *vpcontext) erts_de_links_unlock(dep); if (rmon) { ErtsDSigData dsd; - int code = erts_dsig_prepare(&dsd, &dep, NULL, 0, + int code = erts_dsig_prepare(&dsd, dep, NULL, 0, ERTS_DSP_NO_LOCK, 0, 0); if (code == ERTS_DSIG_PREP_CONNECTED || code == ERTS_DSIG_PREP_PENDING) { @@ -12707,7 +12707,7 @@ static void doit_exit_monitor(ErtsMonitor *mon, void *vpcontext) erts_de_links_unlock(dep); if (rmon) { ErtsDSigData dsd; - int code = erts_dsig_prepare(&dsd, &dep, NULL, 0, + int code = erts_dsig_prepare(&dsd, dep, NULL, 0, ERTS_DSP_NO_LOCK, 0, 0); if (code == ERTS_DSIG_PREP_CONNECTED || code == ERTS_DSIG_PREP_PENDING) { @@ -12768,7 +12768,7 @@ static void doit_exit_monitor(ErtsMonitor *mon, void *vpcontext) erts_de_links_unlock(dep); if (rmon) { ErtsDSigData dsd; - int code = erts_dsig_prepare(&dsd, &dep, NULL, 0, + int code = erts_dsig_prepare(&dsd, dep, NULL, 0, ERTS_DSP_NO_LOCK, 0, 0); if (code == ERTS_DSIG_PREP_CONNECTED) { code = erts_dsig_send_m_exit(&dsd, @@ -12893,7 +12893,7 @@ static void doit_exit_link(ErtsLink *lnk, void *vpcontext) erts_remove_dist_link(&dld, p->common.id, item, dep); if (dld.d_lnk) { erts_proc_lock(p, ERTS_PROC_LOCK_MAIN); - code = erts_dsig_prepare(&dsd, &dep, p, 0, ERTS_DSP_NO_LOCK, 0, 0); + code = erts_dsig_prepare(&dsd, dep, p, 0, ERTS_DSP_NO_LOCK, 0, 0); if (code == ERTS_DSIG_PREP_CONNECTED || code == ERTS_DSIG_PREP_PENDING) { diff --git a/erts/emulator/beam/io.c b/erts/emulator/beam/io.c index 6cd1aa5e79..9933c8dda4 100644 --- a/erts/emulator/beam/io.c +++ b/erts/emulator/beam/io.c @@ -3829,7 +3829,7 @@ static void sweep_one_link(ErtsLink *lnk, void *vpsc) ErtsDistLinkData dld; ErtsDSigData dsd; int code; - code = erts_dsig_prepare(&dsd, &dep, NULL, 0, ERTS_DSP_NO_LOCK, 0, 0); + code = erts_dsig_prepare(&dsd, dep, NULL, 0, ERTS_DSP_NO_LOCK, 0, 0); switch (code) { case ERTS_DSIG_PREP_NOT_ALIVE: case ERTS_DSIG_PREP_NOT_CONNECTED: -- cgit v1.2.3 From 19c9f3a61fbef3682056bab3db4787cd763bd06e Mon Sep 17 00:00:00 2001 From: Sverker Eriksson Date: Wed, 15 Nov 2017 19:44:52 +0100 Subject: Move new|abort_connection_id to erts_internal and drop _id suffix. --- erts/emulator/beam/bif.tab | 4 ++-- erts/emulator/beam/dist.c | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) (limited to 'erts/emulator') diff --git a/erts/emulator/beam/bif.tab b/erts/emulator/beam/bif.tab index 5a591caac9..f739f414de 100644 --- a/erts/emulator/beam/bif.tab +++ b/erts/emulator/beam/bif.tab @@ -694,6 +694,6 @@ bif erlang:iolist_to_iovec/1 # New in 21.0 # -bif erlang:new_connection_id/1 -bif erlang:abort_connection_id/2 +bif erts_internal:new_connection/1 +bif erts_internal:abort_connection/2 diff --git a/erts/emulator/beam/dist.c b/erts/emulator/beam/dist.c index ae64f394c8..16a7bd9eb3 100644 --- a/erts/emulator/beam/dist.c +++ b/erts/emulator/beam/dist.c @@ -3480,7 +3480,7 @@ BIF_RETTYPE setnode_3(BIF_ALIST_3) goto done; } -BIF_RETTYPE new_connection_id_1(BIF_ALIST_1) +BIF_RETTYPE erts_internal_new_connection_1(BIF_ALIST_1) { DistEntry* dep; Uint32 conn_id; @@ -3580,7 +3580,7 @@ static Sint abort_connection(DistEntry* dep, Uint32 conn_id) return 0; } -BIF_RETTYPE abort_connection_id_2(BIF_ALIST_2) +BIF_RETTYPE erts_internal_abort_connection_2(BIF_ALIST_2) { DistEntry* dep; Eterm* tp; -- cgit v1.2.3