From b17db5ae892a8b9c48f8f94f7219c60fe122f629 Mon Sep 17 00:00:00 2001 From: Sverker Eriksson Date: Thu, 21 Sep 2017 19:21:18 +0200 Subject: erts: Fix bug in DistEntry refc dance to handle "lookup without refc++" correctly which was introduced in 4dcb2ae7810a507b701a30072b2f514cab7ebbdb. When decrementing refc to zero (in try_delete or prepare_try_delete) we must always wait thread progress to make sure no thread has done lookup without refc++ and is just about to do refc++ and thereby revive the DistEntry. That is, we wait for a potential other thread to either do refc++ or drop its pointer to the DistEntry. And if that other thread does refc++ (in erts_ref_dist_entry) it must also do the extra refc++ for the scheduled pending delete. --- erts/emulator/beam/erl_node_tables.c | 103 +++++++++++++++++----------- erts/emulator/test/node_container_SUITE.erl | 2 + 2 files changed, 66 insertions(+), 39 deletions(-) diff --git a/erts/emulator/beam/erl_node_tables.c b/erts/emulator/beam/erl_node_tables.c index 0f3dfa797c..1ab096ab69 100644 --- a/erts/emulator/beam/erl_node_tables.c +++ b/erts/emulator/beam/erl_node_tables.c @@ -101,7 +101,9 @@ void erts_ref_dist_entry(DistEntry *dep) { ASSERT(dep); - de_refc_inc(dep, 1); + if (de_refc_inc_read(dep, 1) == 1) { + de_refc_inc(dep, 2); /* Pending delete */ + } } void @@ -396,44 +398,77 @@ erts_make_dhandle(Process *c_p, DistEntry *dep) return erts_mk_magic_ref(&hp, &c_p->off_heap, bin); } -static void try_delete_dist_entry(void *vbin); +static void start_timer_delete_dist_entry(void *vdep); +static void prepare_try_delete_dist_entry(void *vdep); +static void try_delete_dist_entry(DistEntry*); + +static void schedule_delete_dist_entry(DistEntry* dep) +{ + /* + * Here we need thread progress to wait for other threads, that may have + * done lookup without refc++, to do either refc++ or drop their refs. + * + * Note that timeouts do not guarantee thread progress. + */ + erts_schedule_thr_prgr_later_op(start_timer_delete_dist_entry, + dep, &dep->later_op); +} static void -prepare_try_delete_dist_entry(void *vbin) +start_timer_delete_dist_entry(void *vdep) { - Binary *bin = (Binary *) vbin; - DistEntry *dep = ErtsBin2DistEntry(bin); - Uint size; + if (node_tab_delete_delay == 0) { + prepare_try_delete_dist_entry(vdep); + } + else { + ASSERT(node_tab_delete_delay > 0); + erts_start_timer_callback(node_tab_delete_delay, + prepare_try_delete_dist_entry, + vdep); + } +} + +static void +prepare_try_delete_dist_entry(void *vdep) +{ + DistEntry *dep = vdep; erts_aint_t refc; + /* + * Time has passed since we decremented refc to zero and DistEntry may + * have been revived. Do a fast check without table lock first. + */ refc = de_refc_read(dep, 0); - if (refc > 0) - return; - - size = ERTS_MAGIC_BIN_SIZE(sizeof(DistEntry)); - erts_schedule_thr_prgr_later_cleanup_op(try_delete_dist_entry, - vbin, &dep->later_op, size); + if (refc == 0) { + try_delete_dist_entry(dep); + } + else { + /* + * Someone has done lookup and done refc++ for us. + */ + refc = de_refc_dec_read(dep, 0); + if (refc == 0) + schedule_delete_dist_entry(dep); + } } -static void try_delete_dist_entry(void *vbin) +static void try_delete_dist_entry(DistEntry* dep) { - Binary *bin = (Binary *) vbin; - DistEntry *dep = ErtsBin2DistEntry(bin); erts_aint_t refc; erts_rwmtx_rwlock(&erts_dist_table_rwmtx); /* * Another thread might have looked up this dist entry after * we decided to delete it (refc became zero). If so, the other - * thread incremented refc twice. Once for the new reference - * and once for this thread. + * thread incremented refc one extra step for this thread. * - * If refc reach -1, no one has used the entry since we - * set up the timer. Delete the entry. + * If refc reach -1, no one has done lookup and no one can do lookup + * as we have table lock. Delete the entry. * - * If refc reach 0, the entry is currently not in use - * but has been used since we set up the timer. Set up a - * new timer. + * If refc reach 0, someone raced us and either + * (1) did lookup with own refc++ and already released it again + * (2) did lookup without own refc++ + * Schedule new delete operation. * * If refc > 0, the entry is in use. Keep the entry. */ @@ -443,12 +478,7 @@ static void try_delete_dist_entry(void *vbin) erts_rwmtx_rwunlock(&erts_dist_table_rwmtx); if (refc == 0) { - if (node_tab_delete_delay == 0) - prepare_try_delete_dist_entry(vbin); - else if (node_tab_delete_delay > 0) - erts_start_timer_callback(node_tab_delete_delay, - prepare_try_delete_dist_entry, - vbin); + schedule_delete_dist_entry(dep); } } @@ -462,12 +492,7 @@ int erts_dist_entry_destructor(Binary *bin) if (refc == -1) return 1; /* Allow deallocation of structure... */ - if (node_tab_delete_delay == 0) - prepare_try_delete_dist_entry((void *) bin); - else if (node_tab_delete_delay > 0) - erts_start_timer_callback(node_tab_delete_delay, - prepare_try_delete_dist_entry, - (void *) bin); + schedule_delete_dist_entry(dep); return 0; } @@ -1463,9 +1488,9 @@ insert_delayed_delete_node(void *state, } static void -insert_thr_prgr_delete_dist_entry(void *arg, ErtsThrPrgrVal thr_prgr, void *vbin) +insert_thr_prgr_delete_dist_entry(void *arg, ErtsThrPrgrVal thr_prgr, void *vdep) { - DistEntry *dep = ErtsBin2DistEntry(vbin); + DistEntry *dep = vdep; Eterm heap[3]; insert_dist_entry(dep, SYSTEM_REF, @@ -1476,9 +1501,9 @@ insert_thr_prgr_delete_dist_entry(void *arg, ErtsThrPrgrVal thr_prgr, void *vbin static void insert_delayed_delete_dist_entry(void *state, ErtsMonotonicTime timeout_pos, - void *vbin) + void *vdep) { - DistEntry *dep = ErtsBin2DistEntry(vbin); + DistEntry *dep = vdep; Eterm heap[3]; insert_dist_entry(dep, SYSTEM_REF, @@ -1520,7 +1545,7 @@ setup_reference_table(void) erts_debug_callback_timer_foreach(prepare_try_delete_dist_entry, insert_delayed_delete_dist_entry, NULL); - erts_debug_later_op_foreach(try_delete_dist_entry, + erts_debug_later_op_foreach(start_timer_delete_dist_entry, insert_thr_prgr_delete_dist_entry, NULL); diff --git a/erts/emulator/test/node_container_SUITE.erl b/erts/emulator/test/node_container_SUITE.erl index be90f929df..7df001fec5 100644 --- a/erts/emulator/test/node_container_SUITE.erl +++ b/erts/emulator/test/node_container_SUITE.erl @@ -966,6 +966,8 @@ check_refc(ThisNodeName,ThisCreation,Table,EntryList) when is_list(EntryList) -> {case Referrer of {system,delayed_delete_timer} -> true; + {system,thread_progress_delete_timer} -> + true; _ -> DDT end, -- cgit v1.2.3 From 50f08826065ad29040a6c218addcf32bf12945b1 Mon Sep 17 00:00:00 2001 From: Sverker Eriksson Date: Fri, 22 Sep 2017 16:24:45 +0200 Subject: erts TEST: Add missing ref to DistEntry from send context --- erts/emulator/beam/erl_node_tables.c | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/erts/emulator/beam/erl_node_tables.c b/erts/emulator/beam/erl_node_tables.c index 1ab096ab69..9931686cbe 100644 --- a/erts/emulator/beam/erl_node_tables.c +++ b/erts/emulator/beam/erl_node_tables.c @@ -1325,6 +1325,11 @@ insert_offheap2(ErlOffHeap *oh, void *arg) (((Bin)->intern.flags & BIN_FLAG_MAGIC) \ && ERTS_MAGIC_BIN_DESTRUCTOR((Bin)) == erts_dist_entry_destructor) +#define IsSendCtxBinary(Bin) \ + (((Bin)->intern.flags & BIN_FLAG_MAGIC) \ + && ERTS_MAGIC_BIN_DESTRUCTOR((Bin)) == erts_dsend_context_dtor) + + static void insert_offheap(ErlOffHeap *oh, int type, Eterm id) { @@ -1361,7 +1366,12 @@ insert_offheap(ErlOffHeap *oh, int type, Eterm id) inserted_bins = nib; UnUseTmpHeapNoproc(BIG_UINT_HEAP_SIZE); } - } + } + else if (IsSendCtxBinary(u.mref->mb)) { + ErtsSendContext* ctx = ERTS_MAGIC_BIN_DATA(u.mref->mb); + if (ctx->deref_dep) + insert_dist_entry(ctx->dep, type, id, 0); + } break; case REFC_BINARY_SUBTAG: case FUN_SUBTAG: -- cgit v1.2.3 From adc71259e87805990ea2a91bc0b5c17861984a47 Mon Sep 17 00:00:00 2001 From: Sverker Eriksson Date: Fri, 14 Jul 2017 15:08:03 +0200 Subject: erts: Remove obsolete code for latin1 in atom cache Distribution flag DFLAG_UTF8_ATOMS is supported since R16 and mandatory since 20.0. --- erts/emulator/beam/external.c | 49 +++++++++++-------------------------------- 1 file changed, 12 insertions(+), 37 deletions(-) diff --git a/erts/emulator/beam/external.c b/erts/emulator/beam/external.c index 6666c42778..9761a69ad1 100644 --- a/erts/emulator/beam/external.c +++ b/erts/emulator/beam/external.c @@ -220,23 +220,10 @@ erts_destroy_atom_cache_map(ErtsAtomCacheMap *acmp) static ERTS_INLINE void insert_acache_map(ErtsAtomCacheMap *acmp, Eterm atom, Uint32 dflags) { - /* - * If the receiver do not understand utf8 atoms - * and this atom cannot be represented in latin1, - * we are not allowed to cache it. - * - * In this case all atoms are assumed to have - * latin1 encoding in the cache. By refusing it - * in the cache we will instead encode it using - * ATOM_UTF8_EXT/SMALL_ATOM_UTF8_EXT which the - * receiver do not recognize and tear down the - * connection. - */ - if (acmp && acmp->sz < ERTS_MAX_INTERNAL_ATOM_CACHE_ENTRIES - && ((dflags & DFLAG_UTF8_ATOMS) - || atom_tab(atom_val(atom))->latin1_chars >= 0)) { + if (acmp && acmp->sz < ERTS_MAX_INTERNAL_ATOM_CACHE_ENTRIES) { int ix; ASSERT(acmp->hdr_sz < 0); + ASSERT(dflags & DFLAG_UTF8_ATOMS); ix = atom2cix(atom); if (acmp->cache[ix].iix < 0) { acmp->cache[ix].iix = acmp->sz; @@ -256,9 +243,7 @@ get_iix_acache_map(ErtsAtomCacheMap *acmp, Eterm atom, Uint32 dflags) ASSERT(is_atom(atom)); ix = atom2cix(atom); if (acmp->cache[ix].iix < 0) { - ASSERT(acmp->sz == ERTS_MAX_INTERNAL_ATOM_CACHE_ENTRIES - || (!(dflags & DFLAG_UTF8_ATOMS) - && atom_tab(atom_val(atom))->latin1_chars < 0)); + ASSERT(acmp->sz == ERTS_MAX_INTERNAL_ATOM_CACHE_ENTRIES); return -1; } else { @@ -272,7 +257,6 @@ void erts_finalize_atom_cache_map(ErtsAtomCacheMap *acmp, Uint32 dflags) { if (acmp) { - int utf8_atoms = (int) (dflags & DFLAG_UTF8_ATOMS); int long_atoms = 0; /* !0 if one or more atoms are longer than 255. */ int i; int sz; @@ -283,6 +267,7 @@ erts_finalize_atom_cache_map(ErtsAtomCacheMap *acmp, Uint32 dflags) + 1 /* number of internal cache entries */ ; int min_sz; + ASSERT(dflags & DFLAG_UTF8_ATOMS); ASSERT(acmp->hdr_sz < 0); /* Make sure cache update instructions fit */ min_sz = fix_sz+(2+4)*acmp->sz; @@ -294,7 +279,7 @@ erts_finalize_atom_cache_map(ErtsAtomCacheMap *acmp, Uint32 dflags) atom = acmp->cache[acmp->cix[i]].atom; ASSERT(is_atom(atom)); a = atom_tab(atom_val(atom)); - len = (int) (utf8_atoms ? a->len : a->latin1_chars); + len = (int) a->len; ASSERT(len >= 0); if (!long_atoms && len > 255) long_atoms = 1; @@ -371,8 +356,8 @@ byte *erts_encode_ext_dist_header_finalize(byte *ext, ErtsAtomCache *cache, Uint int ci, sz; byte dist_hdr_flags; int long_atoms; - int utf8_atoms = (int) (dflags & DFLAG_UTF8_ATOMS); register byte *ep = ext; + ASSERT(dflags & DFLAG_UTF8_ATOMS); ASSERT(ep[0] == VERSION_MAGIC); if (ep[1] != DIST_HEADER) return ext; @@ -445,17 +430,9 @@ byte *erts_encode_ext_dist_header_finalize(byte *ext, ErtsAtomCache *cache, Uint Atom *a; cache->out_arr[cix] = atom; a = atom_tab(atom_val(atom)); - if (utf8_atoms) { - sz = a->len; - ep -= sz; - sys_memcpy((void *) ep, (void *) a->name, sz); - } - else { - ASSERT(0 <= a->latin1_chars && a->latin1_chars <= MAX_ATOM_CHARACTERS); - ep -= a->latin1_chars; - sz = erts_utf8_to_latin1(ep, a->name, a->len); - ASSERT(a->latin1_chars == sz); - } + sz = a->len; + ep -= sz; + sys_memcpy((void *) ep, (void *) a->name, sz); if (long_atoms) { ep -= 2; put_int16(sz, ep); @@ -641,7 +618,7 @@ erts_prepare_dist_ext(ErtsDistExternal *edep, #endif register byte *ep = ext; - int utf8_atoms = (int) (dep->flags & DFLAG_UTF8_ATOMS); + ASSERT(dep->flags & DFLAG_UTF8_ATOMS); edep->heap_size = -1; edep->ext_endp = ext+size; @@ -804,9 +781,7 @@ erts_prepare_dist_ext(ErtsDistExternal *edep, CHKSIZE(len); atom = erts_atom_put((byte *) ep, len, - (utf8_atoms - ? ERTS_ATOM_ENC_UTF8 - : ERTS_ATOM_ENC_LATIN1), + ERTS_ATOM_ENC_UTF8, 0); if (is_non_value(atom)) ERTS_EXT_HDR_FAIL; @@ -2135,7 +2110,7 @@ enc_atom(ErtsAtomCacheMap *acmp, Eterm atom, byte *ep, Uint32 dflags) { int iix; int len; - int utf8_atoms = (int) (dflags & DFLAG_UTF8_ATOMS); + const int utf8_atoms = (int) (dflags & DFLAG_UTF8_ATOMS); ASSERT(is_atom(atom)); -- cgit v1.2.3 From 8f3cb95bb60db55728bfea339b94ebc164b4dbdb Mon Sep 17 00:00:00 2001 From: Sverker Eriksson Date: Fri, 14 Jul 2017 19:35:26 +0200 Subject: erts: Remove some dead code --- erts/emulator/beam/erl_node_tables.h | 3 --- erts/emulator/beam/external.h | 22 ---------------------- 2 files changed, 25 deletions(-) diff --git a/erts/emulator/beam/erl_node_tables.h b/erts/emulator/beam/erl_node_tables.h index ee8277b5ea..2cfd18f22f 100644 --- a/erts/emulator/beam/erl_node_tables.h +++ b/erts/emulator/beam/erl_node_tables.h @@ -60,9 +60,6 @@ #define ERTS_DE_SFLG_CONNECTED (((Uint32) 1) << 0) #define ERTS_DE_SFLG_EXITING (((Uint32) 1) << 1) -#define ERTS_DE_SFLGS_ALL (ERTS_DE_SFLG_CONNECTED \ - | ERTS_DE_SFLG_EXITING) - #define ERTS_DE_QFLG_BUSY (((erts_aint32_t) 1) << 0) #define ERTS_DE_QFLG_EXIT (((erts_aint32_t) 1) << 1) #define ERTS_DE_QFLG_REQ_INFO (((erts_aint32_t) 1) << 2) diff --git a/erts/emulator/beam/external.h b/erts/emulator/beam/external.h index 3c61d013da..fb7eec1a31 100644 --- a/erts/emulator/beam/external.h +++ b/erts/emulator/beam/external.h @@ -133,11 +133,6 @@ typedef struct { ErtsAtomTranslationTable attab; } ErtsDistExternal; -typedef struct { - int have_header; - int cache_entries; -} ErtsDistHeaderPeek; - #define ERTS_DIST_EXT_SIZE(EDEP) \ (sizeof(ErtsDistExternal) \ - (((EDEP)->flags & ERTS_DIST_EXT_ATOM_TRANS_TAB) \ @@ -177,9 +172,6 @@ Uint erts_encode_ext_size_ets(Eterm); void erts_encode_ext(Eterm, byte **); byte* erts_encode_ext_ets(Eterm, byte *, struct erl_off_heap_header** ext_off_heap); -#ifdef ERTS_WANT_EXTERNAL_TAGS -ERTS_GLB_INLINE void erts_peek_dist_header(ErtsDistHeaderPeek *, byte *, Uint); -#endif ERTS_GLB_INLINE void erts_free_dist_ext_copy(ErtsDistExternal *); ERTS_GLB_INLINE void *erts_dist_ext_trailer(ErtsDistExternal *); ErtsDistExternal *erts_make_dist_ext_copy(ErtsDistExternal *, Uint); @@ -210,20 +202,6 @@ int erts_debug_atom_to_out_cache_index(Eterm); #if ERTS_GLB_INLINE_INCL_FUNC_DEF -#ifdef ERTS_WANT_EXTERNAL_TAGS -ERTS_GLB_INLINE void -erts_peek_dist_header(ErtsDistHeaderPeek *dhpp, byte *ext, Uint sz) -{ - if (ext[0] == VERSION_MAGIC - || ext[1] != DIST_HEADER - || sz < (1+1+1)) - dhpp->have_header = 0; - else { - dhpp->have_header = 1; - dhpp->cache_entries = (int) get_int8(&ext[2]); - } -} -#endif ERTS_GLB_INLINE void erts_free_dist_ext_copy(ErtsDistExternal *edep) -- cgit v1.2.3 From 1b8515868ee4cadd7e93b281cea8e59078c27be8 Mon Sep 17 00:00:00 2001 From: Sverker Eriksson Date: Mon, 14 Aug 2017 19:38:21 +0200 Subject: erts: Change to #ifndef from #if !defined --- erts/emulator/beam/erl_thr_progress.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/erts/emulator/beam/erl_thr_progress.h b/erts/emulator/beam/erl_thr_progress.h index fa936b5707..8c029bcf99 100644 --- a/erts/emulator/beam/erl_thr_progress.h +++ b/erts/emulator/beam/erl_thr_progress.h @@ -28,7 +28,7 @@ * Author: Rickard Green */ -#if !defined(ERL_THR_PROGRESS_H__TSD_TYPE__) +#ifndef ERL_THR_PROGRESS_H__TSD_TYPE__ #define ERL_THR_PROGRESS_H__TSD_TYPE__ #include "sys.h" -- cgit v1.2.3 From 8fa8ec018cf7d6ff6f4e1711fdb71f34fcea6db1 Mon Sep 17 00:00:00 2001 From: Sverker Eriksson Date: Wed, 16 Aug 2017 20:14:26 +0200 Subject: erts: Refactoring in distribution_SUITE for set/get_internal_state calls. --- erts/emulator/test/distribution_SUITE.erl | 44 +++++++++++++++++++------------ 1 file changed, 27 insertions(+), 17 deletions(-) diff --git a/erts/emulator/test/distribution_SUITE.erl b/erts/emulator/test/distribution_SUITE.erl index e2914cbc92..c7d63f9feb 100644 --- a/erts/emulator/test/distribution_SUITE.erl +++ b/erts/emulator/test/distribution_SUITE.erl @@ -1158,8 +1158,6 @@ contended_atom_cache_entry_test(Config, Type) -> spawn_link( SNode, fun () -> - erts_debug:set_internal_state(available_internal_state, - true), Master = self(), CIX = get_cix(), TestAtoms = case Type of @@ -1244,7 +1242,7 @@ get_cix(CIX) when is_integer(CIX), CIX < 0 -> get_cix(CIX) when is_integer(CIX) -> get_cix(CIX, unwanted_cixs(), - erts_debug:get_internal_state(max_atom_out_cache_index)). + get_internal_state(max_atom_out_cache_index)). get_cix(CIX, Unwanted, MaxCIX) when CIX > MaxCIX -> get_cix(0, Unwanted, MaxCIX); @@ -1256,8 +1254,8 @@ get_cix(CIX, Unwanted, MaxCIX) -> unwanted_cixs() -> lists:map(fun (Node) -> - erts_debug:get_internal_state({atom_out_cache_index, - Node}) + get_internal_state({atom_out_cache_index, + Node}) end, nodes()). @@ -1266,7 +1264,7 @@ get_conflicting_atoms(_CIX, 0) -> []; get_conflicting_atoms(CIX, N) -> Atom = list_to_atom("atom" ++ integer_to_list(erlang:unique_integer([positive]))), - case erts_debug:get_internal_state({atom_out_cache_index, Atom}) of + case get_internal_state({atom_out_cache_index, Atom}) of CIX -> [Atom|get_conflicting_atoms(CIX, N-1)]; _ -> @@ -1277,7 +1275,7 @@ get_conflicting_unicode_atoms(_CIX, 0) -> []; get_conflicting_unicode_atoms(CIX, N) -> Atom = string_to_atom([16#1f608] ++ "atom" ++ integer_to_list(erlang:unique_integer([positive]))), - case erts_debug:get_internal_state({atom_out_cache_index, Atom}) of + case get_internal_state({atom_out_cache_index, Atom}) of CIX -> [Atom|get_conflicting_unicode_atoms(CIX, N-1)]; _ -> @@ -1885,11 +1883,26 @@ send_bad_dhdr(BadNode, ToNode) when is_atom(BadNode), is_atom(ToNode) -> receive Done -> ok end. dctrl(Node) when is_atom(Node) -> - case catch erts_debug:get_internal_state(available_internal_state) of - true -> true; - _ -> erts_debug:set_internal_state(available_internal_state, true) - end, - erts_debug:get_internal_state({dist_ctrl, Node}). + get_internal_state({dist_ctrl, Node}). + +get_internal_state(Op) -> + try erts_debug:get_internal_state(Op) of + R -> R + catch + error:undef -> + erts_debug:set_internal_state(available_internal_state, true), + erts_debug:get_internal_state(Op) + end. + +set_internal_state(Op, Val) -> + try erts_debug:set_internal_state(Op, Val) of + R -> R + catch + error:undef -> + erts_debug:set_internal_state(available_internal_state, true), + erts_debug:set_internal_state(Op, Val) + end. + dmsg_hdr() -> [131, % Version Magic @@ -2032,11 +2045,9 @@ freeze_node(Node, MS) -> Freezer = self(), spawn_link(Node, fun () -> - erts_debug:set_internal_state(available_internal_state, - true), dctrl_dop_send(Freezer, DoingIt), receive after Own -> ok end, - erts_debug:set_internal_state(block, MS+Own) + set_internal_state(block, MS+Own) end), receive DoingIt -> ok end, receive after Own -> ok end. @@ -2240,8 +2251,7 @@ forever(Fun) -> forever(Fun). abort(Why) -> - erts_debug:set_internal_state(available_internal_state, true), - erts_debug:set_internal_state(abort, Why). + set_internal_state(abort, Why). start_busy_dist_port_tracer() -> -- cgit v1.2.3 From 6b9ce49827d57b066c376f474c36741e5c915ccb Mon Sep 17 00:00:00 2001 From: Sverker Eriksson Date: Thu, 24 Aug 2017 17:39:35 +0200 Subject: jinterface: Silence some verbose tests Runs much faster now. --- lib/jinterface/test/nc_SUITE.erl | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/lib/jinterface/test/nc_SUITE.erl b/lib/jinterface/test/nc_SUITE.erl index 81944f71d4..daa17195ee 100644 --- a/lib/jinterface/test/nc_SUITE.erl +++ b/lib/jinterface/test/nc_SUITE.erl @@ -651,11 +651,10 @@ do_echo(DataList, Config, OutTrans, InTrans, ExtraArgs) echo_loop([D|Ds], Echoer, OutTrans, InTrans, TermAcc) -> OutMsg = OutTrans(D), Echoer ! OutMsg, - io:format("echo_server ~p: ~p ! ~P~n", [self(),Echoer,OutMsg,10]), + %%io:format("echo_server ~p: ~p ! ~P~n", [self(),Echoer,OutMsg,10]), receive Reply -> - io:format("echo_server ~p: receive ~P~n", - [self(),Reply,10]), + %%io:format("echo_server ~p: receive ~P~n", [self(),Reply,10]), InTrans(Echoer, D, Reply) end, Term = case OutMsg of @@ -670,11 +669,10 @@ echo_loop(Cont, Echoer, OutTrans, InTrans, TermAcc) when is_function(Cont, 0) -> check_terms(Echoer, TermAcc), OutMsg = Echoer ! {self(),undefined,hash_clear}, - io:format("echo_server ~p: ~p ! ~P~n", [self(),Echoer,OutMsg,10]), + %%io:format("echo_server ~p: ~p ! ~P~n", [self(),Echoer,OutMsg,10]), receive {Echoer,hash_cleared,hash_clear}=Reply -> - io:format("echo_server ~p: receive ~P~n", - [self(),Reply,10]), + %%io:format("echo_server ~p: receive ~P~n", [self(),Reply,10]), ok; Other -> io:format("echo_server_terms unexpected ~p: receive ~P~n", @@ -686,11 +684,10 @@ echo_loop(Cont, Echoer, OutTrans, InTrans, TermAcc) check_terms(Echoer, [Term | Rest]) -> OutMsg = {self(),Term,hash_lookup}, Echoer ! OutMsg, - io:format("check_terms ~p: ~p ! ~P~n", [self(),Echoer,OutMsg,10]), + %%io:format("check_terms ~p: ~p ! ~P~n", [self(),Echoer,OutMsg,10]), receive {Echoer,true,hash_lookup} = ReplyMsg -> - io:format("check_terms ~p: receive ~P~n", - [self(),ReplyMsg,10]), + %%io:format("check_terms ~p: receive ~P~n", [self(),ReplyMsg,10]), check_terms(Echoer, Rest); Other -> io:format("check_terms unexpected ~p: receive ~P~n", -- cgit v1.2.3 From 92d383cac5ddcb3856432e653ef84b0a3f84cc0a Mon Sep 17 00:00:00 2001 From: Sverker Eriksson Date: Tue, 22 Aug 2017 16:48:33 +0200 Subject: erts: Make DFLAGS_NEW_FUN_TAGS mandatory and remove ugly encoding fallback as {fun, ...} DFLAGS_NEW_FUN_TAGS has been supported by vm/erl_interface/jinterface since R13 or even older. Renamed test case obsolete_funs to term2bin_tuple_fallbacks and removed test for {fun,...} fallback and added missing test for bitstring fallback {Binary, Bits}. --- erts/emulator/beam/erl_bif_info.c | 8 +-- erts/emulator/beam/external.c | 91 +++++++-------------------- erts/emulator/test/binary_SUITE.erl | 51 +++++++-------- lib/kernel/src/dist_util.erl | 3 +- lib/kernel/test/erl_distribution_wb_SUITE.erl | 6 +- 5 files changed, 55 insertions(+), 104 deletions(-) diff --git a/erts/emulator/beam/erl_bif_info.c b/erts/emulator/beam/erl_bif_info.c index 27bbf70c0b..9d05680723 100644 --- a/erts/emulator/beam/erl_bif_info.c +++ b/erts/emulator/beam/erl_bif_info.c @@ -3830,10 +3830,10 @@ BIF_RETTYPE erts_debug_get_internal_state_1(BIF_ALIST_1) BIF_RET(res); } } - else if (ERTS_IS_ATOM_STR("term_to_binary_no_funs", tp[1])) { - Uint dflags = (DFLAG_EXTENDED_REFERENCES | - DFLAG_EXTENDED_PIDS_PORTS | - DFLAG_BIT_BINARIES); + else if (ERTS_IS_ATOM_STR("term_to_binary_tuple_fallbacks", tp[1])) { + Uint dflags = (TERM_TO_BINARY_DFLAGS + & ~DFLAG_EXPORT_PTR_TAG + & ~DFLAG_BIT_BINARIES); BIF_RET(erts_term_to_binary(BIF_P, tp[2], 0, dflags)); } else if (ERTS_IS_ATOM_STR("dist_ctrl", tp[1])) { diff --git a/erts/emulator/beam/external.c b/erts/emulator/beam/external.c index 9761a69ad1..9230e1d8ab 100644 --- a/erts/emulator/beam/external.c +++ b/erts/emulator/beam/external.c @@ -2878,55 +2878,24 @@ enc_term_int(TTBEncodeContext* ctx, ErtsAtomCacheMap *acmp, Eterm obj, byte* ep, ErlFunThing* funp = (ErlFunThing *) fun_val(obj); int ei; - if ((dflags & DFLAG_NEW_FUN_TAGS) != 0) { - *ep++ = NEW_FUN_EXT; - WSTACK_PUSH2(s, ENC_PATCH_FUN_SIZE, - (UWord) ep); /* Position for patching in size */ - ep += 4; - *ep = funp->arity; - ep += 1; - sys_memcpy(ep, funp->fe->uniq, 16); - ep += 16; - put_int32(funp->fe->index, ep); - ep += 4; - put_int32(funp->num_free, ep); - ep += 4; - ep = enc_atom(acmp, funp->fe->module, ep, dflags); - ep = enc_term(acmp, make_small(funp->fe->old_index), ep, dflags, off_heap); - ep = enc_term(acmp, make_small(funp->fe->old_uniq), ep, dflags, off_heap); - ep = enc_pid(acmp, funp->creator, ep, dflags); - } else { - /* - * Communicating with an obsolete erl_interface or - * jinterface node. Convert the fun to a tuple to - * avoid crasching. - */ - - /* Tag, arity */ - *ep++ = SMALL_TUPLE_EXT; - put_int8(5, ep); - ep += 1; - - /* 'fun' */ - ep = enc_atom(acmp, am_fun, ep, dflags); - - /* Module name */ - ep = enc_atom(acmp, funp->fe->module, ep, dflags); - - /* Index, Uniq */ - *ep++ = INTEGER_EXT; - put_int32(funp->fe->old_index, ep); - ep += 4; - *ep++ = INTEGER_EXT; - put_int32(funp->fe->old_uniq, ep); - ep += 4; - - /* Environment sub-tuple arity */ - ASSERT(funp->num_free < MAX_ARG); - *ep++ = SMALL_TUPLE_EXT; - put_int8(funp->num_free, ep); - ep += 1; - } + ASSERT(dflags & DFLAG_NEW_FUN_TAGS); + *ep++ = NEW_FUN_EXT; + WSTACK_PUSH2(s, ENC_PATCH_FUN_SIZE, + (UWord) ep); /* Position for patching in size */ + ep += 4; + *ep = funp->arity; + ep += 1; + sys_memcpy(ep, funp->fe->uniq, 16); + ep += 16; + put_int32(funp->fe->index, ep); + ep += 4; + put_int32(funp->num_free, ep); + ep += 4; + ep = enc_atom(acmp, funp->fe->module, ep, dflags); + ep = enc_term(acmp, make_small(funp->fe->old_index), ep, dflags, off_heap); + ep = enc_term(acmp, make_small(funp->fe->old_uniq), ep, dflags, off_heap); + ep = enc_pid(acmp, funp->creator, ep, dflags); + for (ei = funp->num_free-1; ei > 0; ei--) { WSTACK_PUSH2(s, ENC_TERM, (UWord) funp->env[ei]); } @@ -4281,24 +4250,12 @@ encode_size_struct_int(TTBSizeContext* ctx, ErtsAtomCacheMap *acmp, Eterm obj, { ErlFunThing* funp = (ErlFunThing *) fun_val(obj); - if ((dflags & DFLAG_NEW_FUN_TAGS) != 0) { - result += 20+1+1+4; /* New ID + Tag */ - result += 4; /* Length field (number of free variables */ - result += encode_size_struct2(acmp, funp->creator, dflags); - result += encode_size_struct2(acmp, funp->fe->module, dflags); - result += 2 * (1+4); /* Index, Uniq */ - } else { - /* - * Size when fun is mapped to a tuple. - */ - result += 1 + 1; /* Tuple tag, arity */ - result += 1 + 1 + 2 + - atom_tab(atom_val(am_fun))->len; /* 'fun' */ - result += 1 + 1 + 2 + - atom_tab(atom_val(funp->fe->module))->len; /* Module name */ - result += 2 * (1 + 4); /* Index + Uniq */ - result += 1 + (funp->num_free < 0x100 ? 1 : 4); - } + ASSERT(dflags & DFLAG_NEW_FUN_TAGS); + result += 20+1+1+4; /* New ID + Tag */ + result += 4; /* Length field (number of free variables */ + result += encode_size_struct2(acmp, funp->creator, dflags); + result += encode_size_struct2(acmp, funp->fe->module, dflags); + result += 2 * (1+4); /* Index, Uniq */ if (funp->num_free > 1) { WSTACK_PUSH2(s, (UWord) (funp->env + 1), (UWord) TERM_ARRAY_OP(funp->num_free-1)); diff --git a/erts/emulator/test/binary_SUITE.erl b/erts/emulator/test/binary_SUITE.erl index cbc2d8fae5..b59c22f125 100644 --- a/erts/emulator/test/binary_SUITE.erl +++ b/erts/emulator/test/binary_SUITE.erl @@ -58,7 +58,9 @@ otp_5484/1,otp_5933/1, ordering/1,unaligned_order/1,gc_test/1, bit_sized_binary_sizes/1, - otp_6817/1,deep/1,obsolete_funs/1,robustness/1,otp_8117/1, + otp_6817/1,deep/1, + term2bin_tuple_fallbacks/1, + robustness/1,otp_8117/1, otp_8180/1, trapping/1, large/1, error_after_yield/1, cmp_old_impl/1]). @@ -79,7 +81,8 @@ all() -> bad_term_to_binary, more_bad_terms, otp_5484, otp_5933, ordering, unaligned_order, gc_test, bit_sized_binary_sizes, otp_6817, otp_8117, deep, - obsolete_funs, robustness, otp_8180, trapping, large, + term2bin_tuple_fallbacks, + robustness, otp_8180, trapping, large, error_after_yield, cmp_old_impl]. groups() -> @@ -1300,40 +1303,28 @@ deep_roundtrip(T) -> B = term_to_binary(T), T = binary_to_term(B). -obsolete_funs(Config) when is_list(Config) -> +term2bin_tuple_fallbacks(Config) when is_list(Config) -> erts_debug:set_internal_state(available_internal_state, true), - X = id({1,2,3}), - Y = id([a,b,c,d]), - Z = id({x,y,z}), - obsolete_fun(fun() -> ok end), - obsolete_fun(fun() -> X end), - obsolete_fun(fun(A) -> {A,X} end), - obsolete_fun(fun() -> {X,Y} end), - obsolete_fun(fun() -> {X,Y,Z} end), - - obsolete_fun(fun ?MODULE:all/1), + term2bin_tf(fun ?MODULE:all/1), + term2bin_tf(<<1:1>>), + term2bin_tf(<<90,80:7>>), erts_debug:set_internal_state(available_internal_state, false), ok. -obsolete_fun(Fun) -> - Tuple = case erlang:fun_info(Fun, type) of - {type,external} -> - {module,M} = erlang:fun_info(Fun, module), - {name,F} = erlang:fun_info(Fun, name), - {M,F}; - {type,local} -> - {module,M} = erlang:fun_info(Fun, module), - {index,I} = erlang:fun_info(Fun, index), - {uniq,U} = erlang:fun_info(Fun, uniq), - {env,E} = erlang:fun_info(Fun, env), - {'fun',M,I,U,list_to_tuple(E)} - end, - Tuple = no_fun_roundtrip(Fun). - -no_fun_roundtrip(Term) -> - binary_to_term_stress(erts_debug:get_internal_state({term_to_binary_no_funs,Term})). +term2bin_tf(Term) -> + Tuple = case Term of + Fun when is_function(Fun) -> + {type, external} = erlang:fun_info(Fun, type), + {module,M} = erlang:fun_info(Fun, module), + {name,F} = erlang:fun_info(Fun, name), + {M,F}; + BS when bit_size(BS) rem 8 =/= 0 -> + Bits = bit_size(BS) rem 8, + {<>, Bits} + end, + Tuple = binary_to_term_stress(erts_debug:get_internal_state({term_to_binary_tuple_fallbacks,Term})). %% Test non-standard encodings never generated by term_to_binary/1 %% but recognized by binary_to_term/1. diff --git a/lib/kernel/src/dist_util.erl b/lib/kernel/src/dist_util.erl index 08bd5946cd..fb9f7fd7eb 100644 --- a/lib/kernel/src/dist_util.erl +++ b/lib/kernel/src/dist_util.erl @@ -250,7 +250,8 @@ check_dflags(#hs_data{other_node = Node, require_flags = RequiredFlags} = HSData) -> Mandatory = ((?DFLAG_EXTENDED_REFERENCES bor ?DFLAG_EXTENDED_PIDS_PORTS - bor ?DFLAG_UTF8_ATOMS) + bor ?DFLAG_UTF8_ATOMS + bor ?DFLAG_NEW_FUN_TAGS) bor RequiredFlags), Missing = check_mandatory(0, ?DFLAGS_ALL, Mandatory, OtherFlags, []), diff --git a/lib/kernel/test/erl_distribution_wb_SUITE.erl b/lib/kernel/test/erl_distribution_wb_SUITE.erl index 258ed4f88c..1145d30e5e 100644 --- a/lib/kernel/test/erl_distribution_wb_SUITE.erl +++ b/lib/kernel/test/erl_distribution_wb_SUITE.erl @@ -61,9 +61,11 @@ %% From R9 and forward extended references is compulsory %% From R10 and forward extended pids and ports are compulsory %% From R20 and forward UTF8 atoms are compulsory +%% From R21 and forward NEW_FUN_TAGS is compulsory (no more tuple fallback {fun, ...}) -define(COMPULSORY_DFLAGS, (?DFLAG_EXTENDED_REFERENCES bor ?DFLAG_EXTENDED_PIDS_PORTS bor - ?DFLAG_UTF8_ATOMS)). + ?DFLAG_UTF8_ATOMS bor + ?DFLAG_NEW_FUN_TAGS)). -define(PASS_THROUGH, $p). @@ -682,7 +684,7 @@ recv_message(Socket) -> <<_:Siz/binary,B2/binary>> = B1, Message = case (catch binary_to_term(B2)) of {'EXIT', _} -> - could_not_digest_message; + {could_not_digest_message,B2}; Other -> Other end, -- cgit v1.2.3 From c8a6219d1f6c678f8421b1062ff633487d52bf04 Mon Sep 17 00:00:00 2001 From: Sverker Eriksson Date: Mon, 10 Jul 2017 19:47:51 +0200 Subject: kernel: Move auto connect into net_kernel server as a preparation for async auto-connect requests. --- lib/kernel/src/net_kernel.erl | 174 ++++++++++++++++++++++-------------------- 1 file changed, 90 insertions(+), 84 deletions(-) diff --git a/lib/kernel/src/net_kernel.erl b/lib/kernel/src/net_kernel.erl index f36b4f1e6a..c68036a291 100644 --- a/lib/kernel/src/net_kernel.erl +++ b/lib/kernel/src/net_kernel.erl @@ -247,14 +247,14 @@ ticktime_res(A) when is_atom(A) -> A. %% Called though BIF's -connect(Node) -> do_connect(Node, normal, false). +connect(Node) -> auto_connect(Node, normal, false). %%% Long timeout if blocked (== barred), only affects nodes with %%% {dist_auto_connect, once} set. -passive_cnct(Node) -> do_connect(Node, normal, true). +passive_cnct(Node) -> auto_connect(Node, normal, true). disconnect(Node) -> request({disconnect, Node}). %% connect but not seen -hidden_connect(Node) -> do_connect(Node, hidden, false). +hidden_connect(Node) -> auto_connect(Node, hidden, false). %% Should this node publish itself on Node? publish_on_node(Node) when is_atom(Node) -> @@ -272,67 +272,35 @@ connect_node(Node) when is_atom(Node) -> hidden_connect_node(Node) when is_atom(Node) -> request({connect, hidden, Node}). -do_connect(Node, Type, WaitForBarred) -> %% Type = normal | hidden - case catch ets:lookup(sys_dist, Node) of - {'EXIT', _} -> - ?connect_failure(Node,{table_missing, sys_dist}), - false; - [#barred_connection{}] -> - case WaitForBarred of - false -> - false; - true -> - Pid = spawn(?MODULE,passive_connect_monitor,[self(),Node]), - receive - {Pid, true} -> - %%io:format("Net Kernel: barred connection (~p) " - %% "connected from other end.~n",[Node]), - true; - {Pid, false} -> - ?connect_failure(Node,{barred_connection, - ets:lookup(sys_dist, Node)}), - %%io:format("Net Kernel: barred connection (~p) " - %% "- failure.~n",[Node]), - false - end - end; - Else -> - case application:get_env(kernel, dist_auto_connect) of - {ok, never} -> - ?connect_failure(Node,{dist_auto_connect,never}), - false; - % This might happen due to connection close - % not beeing propagated to user space yet. - % Save the day by just not connecting... - {ok, once} when Else =/= [], - (hd(Else))#connection.state =:= up -> - ?connect_failure(Node,{barred_connection, - ets:lookup(sys_dist, Node)}), - false; - _ -> - request({connect, Type, Node}) - end +auto_connect(Node, Type, WaitForBarred) -> %% Type = normal | hidden + case request({auto_connect, Type, Node, WaitForBarred}) of + ignored -> false; + Other -> Other end. -passive_connect_monitor(Parent, Node) -> +passive_connect_monitor(From, Node) -> ok = monitor_nodes(true,[{node_type,all}]), - case lists:member(Node,nodes([connected])) of - true -> - ok = monitor_nodes(false,[{node_type,all}]), - Parent ! {self(),true}; - _ -> - Ref = make_ref(), - Tref = erlang:send_after(connecttime(),self(),Ref), - receive - Ref -> - ok = monitor_nodes(false,[{node_type,all}]), - Parent ! {self(), false}; - {nodeup,Node,_} -> - ok = monitor_nodes(false,[{node_type,all}]), - _ = erlang:cancel_timer(Tref), - Parent ! {self(),true} - end - end. + Reply = case lists:member(Node,nodes([connected])) of + true -> + io:format("~p: passive_connect_monitor ~p\n", [self(), ?LINE]), + true; + _ -> + receive + {nodeup,Node,_} -> + io:format("~p: passive_connect_monitor ~p\n", [self(), ?LINE]), + true + after connecttime() -> + io:format("~p: passive_connect_monitor ~p\n", [self(), ?LINE]), + false + end + end, + ok = monitor_nodes(false,[{node_type,all}]), + io:format("~p: passive_connect_monitor ~p\n", [self(), ?LINE]), + {Pid, Tag} = From, + io:format("~p: passive_connect_monitor ~p\n", [self(), ?LINE]), + erlang:send(Pid, {Tag, Reply}), + io:format("~p: passive_connect_monitor ~p\n", [self(), ?LINE]). + %% If the net_kernel isn't running we ignore all requests to the %% kernel, thus basically accepting them :-) @@ -394,40 +362,78 @@ init({Name, LongOrShortNames, TickT, CleanHalt}) -> end. +handle_connect([Conn], _, _, From, State) when Conn#connection.state =:= up -> + async_reply({reply, true, State}, From); +handle_connect([Conn], _, _, From, State) when Conn#connection.state =:= pending; + Conn#connection.state =:= up_pending -> + Waiting = Conn#connection.waiting, + ets:insert(sys_dist, Conn#connection{waiting = [From|Waiting]}), + {noreply, State}; +handle_connect(_, Type, Node, From , State) -> + case setup(Node,Type,From,State) of + {ok, SetupPid} -> + Owners = [{SetupPid, Node} | State#state.conn_owners], + {noreply,State#state{conn_owners=Owners}}; + _Error -> + ?connect_failure(Node, {setup_call, failed, _Error}), + async_reply({reply, false, State}, From) + end. + %% ------------------------------------------------------------ %% handle_call. %% ------------------------------------------------------------ %% -%% Set up a connection to Node. +%% Auto-connect to Node. %% The response is delayed until the connection is up and %% running. %% -handle_call({connect, _, Node}, From, State) when Node =:= node() -> +handle_call({auto_connect, _, Node, _}, From, State) when Node =:= node() -> + async_reply({reply, true, State}, From); +handle_call({auto_connect, Type, Node, WaitForBarred}, From, State) -> + verbose({auto_connect, Type, Node, WaitForBarred}, 1, State), + + ConnLookup = ets:lookup(sys_dist, Node), + + case ConnLookup of + [#barred_connection{}] -> + case WaitForBarred of + false -> + async_reply({reply, false, State}, From); + true -> + spawn(?MODULE,passive_connect_monitor,[From,Node]), + {noreply, State} + end; + + _ -> + case application:get_env(kernel, dist_auto_connect) of + {ok, never} -> + ?connect_failure(Node,{dist_auto_connect,never}), + async_reply({reply, false, State}, From); + + %% This might happen due to connection close + %% not beeing propagated to user space yet. + %% Save the day by just not connecting... + {ok, once} when ConnLookup =/= [], + (hd(ConnLookup))#connection.state =:= up -> + ?connect_failure(Node,{barred_connection, + ets:lookup(sys_dist, Node)}), + async_reply({reply, false, State}, From); + _ -> + handle_connect(ConnLookup, Type, Node, From, State) + end + end; + +%% +%% Explicit connect +%% The response is delayed until the connection is up and running. +%% +handle_call({connect, _, Node, _, _}, From, State) when Node =:= node() -> async_reply({reply, true, State}, From); handle_call({connect, Type, Node}, From, State) -> verbose({connect, Type, Node}, 1, State), - case ets:lookup(sys_dist, Node) of - [Conn] when Conn#connection.state =:= up -> - async_reply({reply, true, State}, From); - [Conn] when Conn#connection.state =:= pending -> - Waiting = Conn#connection.waiting, - ets:insert(sys_dist, Conn#connection{waiting = [From|Waiting]}), - {noreply, State}; - [Conn] when Conn#connection.state =:= up_pending -> - Waiting = Conn#connection.waiting, - ets:insert(sys_dist, Conn#connection{waiting = [From|Waiting]}), - {noreply, State}; - _ -> - case setup(Node,Type,From,State) of - {ok, SetupPid} -> - Owners = [{SetupPid, Node} | State#state.conn_owners], - {noreply,State#state{conn_owners=Owners}}; - _Error -> - ?connect_failure(Node, {setup_call, failed, _Error}), - async_reply({reply, false, State}, From) - end - end; + ConnLookup = ets:lookup(sys_dist, Node), + handle_connect(ConnLookup, Type, Node, From, State); %% %% Close the connection to Node. -- cgit v1.2.3 From fe720f6b2051c9bf8ff303f857c3db0a84b1c050 Mon Sep 17 00:00:00 2001 From: Sverker Eriksson Date: Mon, 18 Sep 2017 17:15:19 +0200 Subject: erts: Refactor connection_id in ErtsDistExternal Break out from 'flags' into new dedicated 'connection_id' just for simplicity. Also changed flags to low bits and that affected enif_binary_to_term. --- erts/emulator/beam/erl_nif.c | 11 ++++++----- erts/emulator/beam/external.c | 4 ++-- erts/emulator/beam/external.h | 22 +++++++++------------- 3 files changed, 17 insertions(+), 20 deletions(-) diff --git a/erts/emulator/beam/erl_nif.c b/erts/emulator/beam/erl_nif.c index c21b139cfa..d1018bab26 100644 --- a/erts/emulator/beam/erl_nif.c +++ b/erts/emulator/beam/erl_nif.c @@ -1201,11 +1201,12 @@ size_t enif_binary_to_term(ErlNifEnv *dst_env, Sint size; ErtsHeapFactory factory; byte *bp = (byte*) data; + Uint32 flags = 0; - ERTS_CT_ASSERT(ERL_NIF_BIN2TERM_SAFE == ERTS_DIST_EXT_BTT_SAFE); - - if (opts & ~ERL_NIF_BIN2TERM_SAFE) { - return 0; + switch ((Uint32)opts) { + case 0: break; + case ERL_NIF_BIN2TERM_SAFE: flags = ERTS_DIST_EXT_BTT_SAFE; break; + default: return 0; } if ((size = erts_decode_ext_size(bp, data_sz)) < 0) return 0; @@ -1217,7 +1218,7 @@ size_t enif_binary_to_term(ErlNifEnv *dst_env, erts_factory_dummy_init(&factory); } - *term = erts_decode_ext(&factory, &bp, (Uint32)opts); + *term = erts_decode_ext(&factory, &bp, flags); if (is_non_value(*term)) { return 0; diff --git a/erts/emulator/beam/external.c b/erts/emulator/beam/external.c index 9230e1d8ab..3b851087d1 100644 --- a/erts/emulator/beam/external.c +++ b/erts/emulator/beam/external.c @@ -654,7 +654,7 @@ erts_prepare_dist_ext(ErtsDistExternal *edep, edep->flags |= ERTS_DIST_EXT_DFLAG_HDR; *connection_id = dep->connection_id; - edep->flags |= (dep->connection_id & ERTS_DIST_EXT_CON_ID_MASK); + edep->connection_id = dep->connection_id; if (ep[1] != DIST_HEADER) { if (edep->flags & ERTS_DIST_EXT_DFLAG_HDR) @@ -868,7 +868,7 @@ bad_dist_ext(ErtsDistExternal *edep) erts_dsprintf(dsbufp, ", %d=%T", i, edep->attab.atom[i]); } erts_send_warning_to_logger_nogl(dsbufp); - erts_kill_dist_connection(dep, ERTS_DIST_EXT_CON_ID(edep)); + erts_kill_dist_connection(dep, edep->connection_id); } } diff --git a/erts/emulator/beam/external.h b/erts/emulator/beam/external.h index fb7eec1a31..d6416edbc3 100644 --- a/erts/emulator/beam/external.h +++ b/erts/emulator/beam/external.h @@ -109,26 +109,22 @@ typedef struct { } ErtsAtomTranslationTable; /* - * These flags are tagged onto the high bits of a connection ID and stored in - * the ErtsDistExternal structure's flags field. They are used to indicate - * various bits of state necessary to decode binaries in a variety of - * scenarios. The mask ERTS_DIST_EXT_CON_ID_MASK is used later to separate the - * connection ID from the flags. Be careful to ensure that the mask does not - * overlap any of the bits used for flags, or ERTS will leak flags bits into - * connection IDs and leak connection ID bits into the flags. + * These flags are stored in the ErtsDistExternal structure's flags field. + * They are used to indicate various bits of state necessary to decode binaries + * in a variety of scenarios. */ -#define ERTS_DIST_EXT_DFLAG_HDR ((Uint32) 0x80000000) -#define ERTS_DIST_EXT_ATOM_TRANS_TAB ((Uint32) 0x40000000) -#define ERTS_DIST_EXT_BTT_SAFE ((Uint32) 0x20000000) -#define ERTS_DIST_EXT_CON_ID_MASK ((Uint32) 0x1fffffff) +#define ERTS_DIST_EXT_DFLAG_HDR ((Uint32) 0x1) +#define ERTS_DIST_EXT_ATOM_TRANS_TAB ((Uint32) 0x2) +#define ERTS_DIST_EXT_BTT_SAFE ((Uint32) 0x4) + +#define ERTS_DIST_CON_ID_MASK ((Uint32) 0x00ffffff) -#define ERTS_DIST_EXT_CON_ID(DIST_EXTP) \ - ((DIST_EXTP)->flags & ERTS_DIST_EXT_CON_ID_MASK) typedef struct { DistEntry *dep; byte *extp; byte *ext_endp; Sint heap_size; + Uint32 connection_id; Uint32 flags; ErtsAtomTranslationTable attab; } ErtsDistExternal; -- cgit v1.2.3 From f89fb92384280e2939414287a2ecb8f86a199318 Mon Sep 17 00:00:00 2001 From: Sverker Eriksson Date: Fri, 14 Jul 2017 19:34:54 +0200 Subject: erts: Introduce asynchronous auto-connect --- erts/emulator/beam/atom.names | 1 + erts/emulator/beam/bif.c | 121 ++++++++++------ erts/emulator/beam/bif.tab | 8 + erts/emulator/beam/dist.c | 274 ++++++++++++++++++++++++++--------- erts/emulator/beam/dist.h | 122 +++++++++++++--- erts/emulator/beam/erl_node_tables.c | 20 ++- erts/emulator/beam/erl_node_tables.h | 7 +- erts/emulator/beam/erl_process.c | 40 +++-- erts/emulator/beam/external.c | 41 +++++- erts/emulator/beam/external.h | 2 +- erts/emulator/beam/io.c | 3 +- erts/preloaded/src/erlang.erl | 15 ++ lib/kernel/src/net_kernel.erl | 210 ++++++++++++++++++++------- 13 files changed, 648 insertions(+), 216 deletions(-) diff --git a/erts/emulator/beam/atom.names b/erts/emulator/beam/atom.names index fc55b687d4..a534dd44fb 100644 --- a/erts/emulator/beam/atom.names +++ b/erts/emulator/beam/atom.names @@ -108,6 +108,7 @@ atom asynchronous atom atom atom atom_used atom attributes +atom auto_connect atom await_microstate_accounting_modifications atom await_port_send_result atom await_proc_exit diff --git a/erts/emulator/beam/bif.c b/erts/emulator/beam/bif.c index 4b11884f38..34f5afae78 100644 --- a/erts/emulator/beam/bif.c +++ b/erts/emulator/beam/bif.c @@ -227,17 +227,40 @@ BIF_RETTYPE link_1(BIF_ALIST_1) goto res_no_proc; } - code = erts_dsig_prepare(&dsd, dep, BIF_P, ERTS_DSP_RLOCK, 0); + code = erts_dsig_prepare(&dsd, &dep, BIF_P, + (ERTS_PROC_LOCK_MAIN | ERTS_PROC_LOCK_LINK), + ERTS_DSP_RLOCK, 0, 1); switch (code) { case ERTS_DSIG_PREP_NOT_ALIVE: - /* Let the dlink trap handle it */ - case ERTS_DSIG_PREP_NOT_CONNECTED: - erts_proc_unlock(BIF_P, ERTS_PROC_LOCK_LINK); - BIF_TRAP1(dlink_trap, BIF_P, BIF_ARG_1); - + case ERTS_DSIG_PREP_NOT_CONNECTED: { + ErtsProcLocks locks = ERTS_PROC_LOCK_MAIN | ERTS_PROC_LOCK_LINK; + erts_aint32_t state; + erts_proc_lock(BIF_P, (ERTS_PROC_LOCKS_ALL & ~locks)); + locks = ERTS_PROC_LOCKS_ALL; + erts_send_exit_signal(BIF_P, BIF_ARG_1, BIF_P, &locks, + am_noconnection, NIL, NULL, 0); + erts_proc_unlock(BIF_P, locks & ERTS_PROC_LOCKS_ALL_MINOR); + + /* + * Copy-paste from old dist_exit_3, not sure if we really + * need erts_handle_pending_exit when exit_2 does not. + */ + state = erts_atomic32_read_acqb(&BIF_P->state); + if (state & (ERTS_PSFLG_EXITING|ERTS_PSFLG_PENDING_EXIT)) { +#ifdef ERTS_SMP + if (state & ERTS_PSFLG_PENDING_EXIT) + erts_handle_pending_exit(BIF_P, ERTS_PROC_LOCK_MAIN); +#endif + ERTS_BIF_EXITED(BIF_P); + } + BIF_RET(am_true); + } + case ERTS_DSIG_PREP_PENDING: case ERTS_DSIG_PREP_CONNECTED: - /* We are connected. Setup link and send link signal */ - + /* + * We have (pending) connection. + * Setup link and enqueue link signal. + */ erts_de_links_lock(dep); erts_add_link(&ERTS_P_LINKS(BIF_P), LINK_PID, BIF_ARG_1); @@ -256,8 +279,7 @@ BIF_RETTYPE link_1(BIF_ALIST_1) ERTS_BIF_YIELD_RETURN(BIF_P, am_true); BIF_RET(am_true); default: - ASSERT(! "Invalid dsig prepare result"); - BIF_ERROR(BIF_P, EXC_INTERNAL_ERROR); + ERTS_ASSERT(! "Invalid dsig prepare result"); } } } @@ -292,7 +314,8 @@ remote_demonitor(Process *c_p, DistEntry *dep, Eterm ref, Eterm to) ERTS_LC_ASSERT((ERTS_PROC_LOCK_MAIN|ERTS_PROC_LOCK_LINK) == erts_proc_lc_my_proc_locks(c_p)); - code = erts_dsig_prepare(&dsd, dep, c_p, ERTS_DSP_RLOCK, 0); + code = erts_dsig_prepare(&dsd, &dep, c_p, ERTS_PROC_LOCK_MAIN, + ERTS_DSP_RLOCK, 0, 0); switch (code) { case ERTS_DSIG_PREP_NOT_ALIVE: case ERTS_DSIG_PREP_NOT_CONNECTED: @@ -313,6 +336,7 @@ remote_demonitor(Process *c_p, DistEntry *dep, Eterm ref, Eterm to) res = am_true; break; + case ERTS_DSIG_PREP_PENDING: case ERTS_DSIG_PREP_CONNECTED: erts_de_links_lock(dep); @@ -347,8 +371,7 @@ remote_demonitor(Process *c_p, DistEntry *dep, Eterm ref, Eterm to) } break; default: - ASSERT(! "Invalid dsig prepare result"); - return am_internal_error; + ERTS_ASSERT(! "Invalid dsig prepare result"); } @@ -768,10 +791,18 @@ remote_monitor(Process *p, Eterm bifarg1, Eterm bifarg2, int code; erts_proc_lock(p, ERTS_PROC_LOCK_LINK); - code = erts_dsig_prepare(&dsd, dep, p, ERTS_DSP_RLOCK, 0); + code = erts_dsig_prepare(&dsd, &dep, + p, (ERTS_PROC_LOCK_MAIN | ERTS_PROC_LOCK_LINK), + ERTS_DSP_RLOCK, 0, 0); switch (code) { + case ERTS_DSIG_PREP_PENDING: + /* + * Must wait for connection to know if node supports monitor. + * Damn these synchronous errors. + */ + erts_smp_de_runlock(dep); + /* fall through */ case ERTS_DSIG_PREP_NOT_ALIVE: - /* Let the dmonitor_p trap handle it */ case ERTS_DSIG_PREP_NOT_CONNECTED: erts_proc_unlock(p, ERTS_PROC_LOCK_LINK); ERTS_BIF_PREP_TRAP2(ret, dmonitor_p_trap, p, bifarg1, bifarg2); @@ -818,9 +849,7 @@ remote_monitor(Process *p, Eterm bifarg1, Eterm bifarg2, } break; default: - ASSERT(! "Invalid dsig prepare result"); - ERTS_BIF_PREP_ERROR(ret, p, EXC_INTERNAL_ERROR); - break; + ERTS_ASSERT(! "Invalid dsig prepare result"); } BIF_RET(ret); @@ -1158,7 +1187,8 @@ BIF_RETTYPE unlink_1(BIF_ALIST_1) BIF_RET(am_true); } - code = erts_dsig_prepare(&dsd, dep, BIF_P, ERTS_DSP_NO_LOCK, 0); + code = erts_dsig_prepare(&dsd, &dep, BIF_P, ERTS_PROC_LOCK_MAIN, + ERTS_DSP_NO_LOCK, 0, 0); switch (code) { case ERTS_DSIG_PREP_NOT_ALIVE: case ERTS_DSIG_PREP_NOT_CONNECTED: @@ -1173,6 +1203,7 @@ BIF_RETTYPE unlink_1(BIF_ALIST_1) BIF_TRAP1(dunlink_trap, BIF_P, BIF_ARG_1); #endif + case ERTS_DSIG_PREP_PENDING: case ERTS_DSIG_PREP_CONNECTED: erts_remove_dist_link(&dld, BIF_P->common.id, BIF_ARG_1, dep); code = erts_dsig_send_unlink(&dsd, BIF_P->common.id, BIF_ARG_1); @@ -1545,22 +1576,24 @@ BIF_RETTYPE exit_2(BIF_ALIST_2) DistEntry *dep; dep = external_pid_dist_entry(BIF_ARG_1); + ERTS_ASSERT(dep); if(dep == erts_this_dist_entry) BIF_RET(am_true); - code = erts_dsig_prepare(&dsd, dep, BIF_P, ERTS_DSP_NO_LOCK, 0); + code = erts_dsig_prepare(&dsd, &dep, BIF_P, ERTS_PROC_LOCK_MAIN, + ERTS_DSP_NO_LOCK, 0, 1); switch (code) { case ERTS_DSIG_PREP_NOT_ALIVE: case ERTS_DSIG_PREP_NOT_CONNECTED: - BIF_TRAP2(dexit_trap, BIF_P, BIF_ARG_1, BIF_ARG_2); + BIF_RET(am_true); + case ERTS_DSIG_PREP_PENDING: case ERTS_DSIG_PREP_CONNECTED: code = erts_dsig_send_exit2(&dsd, BIF_P->common.id, BIF_ARG_1, BIF_ARG_2); if (code == ERTS_DSIG_SEND_YIELD) ERTS_BIF_YIELD_RETURN(BIF_P, am_true); BIF_RET(am_true); default: - ASSERT(! "Invalid dsig prepare result"); - BIF_ERROR(BIF_P, EXC_INTERNAL_ERROR); + ERTS_ASSERT(! "Invalid dsig prepare result"); } } else if (is_not_internal_pid(BIF_ARG_1)) { @@ -1964,7 +1997,7 @@ ebif_bang_2(BIF_ALIST_2) * Send a message to Process, Port or Registered Process. * Returns non-negative reduction bump or negative result code. */ -#define SEND_TRAP (-1) +#define SEND_NOCONNECT (-1) #define SEND_YIELD (-2) #define SEND_YIELD_RETURN (-3) #define SEND_BADARG (-4) @@ -1980,20 +2013,22 @@ static Sint remote_send(Process *p, DistEntry *dep, { Sint res; int code; - ASSERT(is_atom(to) || is_external_pid(to)); ctx->dep = dep; - code = erts_dsig_prepare(&ctx->dsd, dep, p, ERTS_DSP_NO_LOCK, !ctx->suspend); + code = erts_dsig_prepare(&ctx->dsd, &dep, p, ERTS_PROC_LOCK_MAIN, + ERTS_DSP_NO_LOCK, + !ctx->suspend, ctx->connect); switch (code) { case ERTS_DSIG_PREP_NOT_ALIVE: case ERTS_DSIG_PREP_NOT_CONNECTED: - res = SEND_TRAP; + res = SEND_NOCONNECT; break; case ERTS_DSIG_PREP_WOULD_SUSPEND: ASSERT(!ctx->suspend); res = SEND_YIELD; break; + case ERTS_DSIG_PREP_PENDING: case ERTS_DSIG_PREP_CONNECTED: { if (is_atom(to)) @@ -2205,12 +2240,14 @@ do_send(Process *p, Eterm to, Eterm msg, Eterm *refp, ErtsSendContext *ctx) } return 0; } + ctx->dsd.node = tp[2]; ret = remote_send(p, dep, tp[1], to, msg, ctx); if (ret == SEND_YIELD_CONTINUE) { - if (dep) + if (dep) { erts_ref_dist_entry(dep); - ctx->dep_to_deref = dep; + ctx->deref_dep = 1; + } } return ret; } else { @@ -2251,7 +2288,6 @@ BIF_RETTYPE send_3(BIF_ALIST_3) Eterm msg = BIF_ARG_2; Eterm opts = BIF_ARG_3; - int connect = !0; Eterm l = opts; Sint result; @@ -2262,14 +2298,15 @@ BIF_RETTYPE send_3(BIF_ALIST_3) UseTmpHeap(sizeof(ErtsSendContext)/sizeof(Eterm), BIF_P); ctx->suspend = !0; - ctx->dep_to_deref = NULL; + ctx->connect = !0; + ctx->deref_dep = 0; ctx->return_term = am_ok; ctx->dss.reds = (Sint) (ERTS_BIF_REDS_LEFT(p) * TERM_TO_BINARY_LOOP_FACTOR); ctx->dss.phase = ERTS_DSIG_SEND_PHASE_INIT; while (is_list(l)) { if (CAR(list_val(l)) == am_noconnect) { - connect = 0; + ctx->connect = 0; } else if (CAR(list_val(l)) == am_nosuspend) { ctx->suspend = 0; } else { @@ -2306,9 +2343,9 @@ BIF_RETTYPE send_3(BIF_ALIST_3) goto yield_return; ERTS_BIF_PREP_RET(retval, am_ok); break; - case SEND_TRAP: - if (connect) { - ERTS_BIF_PREP_TRAP3(retval, dsend3_trap, p, to, msg, opts); + case SEND_NOCONNECT: + if (ctx->connect) { + ERTS_BIF_PREP_RET(retval, am_ok); } else { ERTS_BIF_PREP_RET(retval, am_noconnect); } @@ -2412,7 +2449,8 @@ Eterm erl_send(Process *p, Eterm to, Eterm msg) ref = NIL; #endif ctx->suspend = !0; - ctx->dep_to_deref = NULL; + ctx->connect = !0; + ctx->deref_dep = 0; ctx->return_term = msg; ctx->dss.reds = (Sint) (ERTS_BIF_REDS_LEFT(p) * TERM_TO_BINARY_LOOP_FACTOR); ctx->dss.phase = ERTS_DSIG_SEND_PHASE_INIT; @@ -2436,8 +2474,8 @@ Eterm erl_send(Process *p, Eterm to, Eterm msg) goto yield_return; ERTS_BIF_PREP_RET(retval, msg); break; - case SEND_TRAP: - ERTS_BIF_PREP_TRAP2(retval, dsend2_trap, p, to, msg); + case SEND_NOCONNECT: + ERTS_BIF_PREP_RET(retval, msg); break; case SEND_YIELD: ERTS_BIF_PREP_YIELD2(retval, bif_export[BIF_send_2], p, to, msg); @@ -4387,20 +4425,21 @@ BIF_RETTYPE group_leader_2(BIF_ALIST_2) if(dep == erts_this_dist_entry) BIF_ERROR(BIF_P, BADARG); - code = erts_dsig_prepare(&dsd, dep, BIF_P, ERTS_DSP_NO_LOCK, 0); + code = erts_dsig_prepare(&dsd, &dep, BIF_P, ERTS_PROC_LOCK_MAIN, + ERTS_DSP_NO_LOCK, 0, 0); switch (code) { case ERTS_DSIG_PREP_NOT_ALIVE: BIF_RET(am_true); case ERTS_DSIG_PREP_NOT_CONNECTED: BIF_TRAP2(dgroup_leader_trap, BIF_P, BIF_ARG_1, BIF_ARG_2); + case ERTS_DSIG_PREP_PENDING: case ERTS_DSIG_PREP_CONNECTED: code = erts_dsig_send_group_leader(&dsd, BIF_ARG_1, BIF_ARG_2); if (code == ERTS_DSIG_SEND_YIELD) ERTS_BIF_YIELD_RETURN(BIF_P, am_true); BIF_RET(am_true); default: - ASSERT(! "Invalid dsig prepare result"); - BIF_ERROR(BIF_P, EXC_INTERNAL_ERROR); + ERTS_ASSERT(! "Invalid dsig prepare result"); } } else if (is_internal_pid(BIF_ARG_2)) { diff --git a/erts/emulator/beam/bif.tab b/erts/emulator/beam/bif.tab index f7b4451890..66469a011d 100644 --- a/erts/emulator/beam/bif.tab +++ b/erts/emulator/beam/bif.tab @@ -691,3 +691,11 @@ bif erts_internal:maps_to_list/2 # bif erlang:iolist_to_iovec/1 + +# +# New in 21.0 +# + +bif erlang:new_connection_id/1 +bif erlang:abort_connection_id/2 + diff --git a/erts/emulator/beam/dist.c b/erts/emulator/beam/dist.c index bc168fc58d..0fa399cf53 100644 --- a/erts/emulator/beam/dist.c +++ b/erts/emulator/beam/dist.c @@ -103,8 +103,6 @@ dist_msg_dbg(ErtsDistExternal *edep, char *what, byte *buf, int sz) -#define PASS_THROUGH 'p' /* This code should go */ - int erts_is_alive; /* System must be blocked on change */ int erts_dist_buf_busy_limit; @@ -684,31 +682,11 @@ size_obuf(ErtsDistOutputBuf *obuf) return bin->orig_size; } -static void clear_dist_entry(DistEntry *dep) +static ErtsDistOutputBuf* clear_de_out_queues(DistEntry* dep) { - Sint obufsize = 0; - ErtsAtomCache *cache; - ErtsProcList *suspendees; ErtsDistOutputBuf *obuf; - erts_de_rwlock(dep); - erts_atomic_set_nob(&dep->input_handler, - (erts_aint_t) NIL); - cache = dep->cache; - dep->cache = NULL; - -#ifdef DEBUG - erts_de_links_lock(dep); - ASSERT(!dep->nlinks); - ASSERT(!dep->node_links); - ASSERT(!dep->monitors); - erts_de_links_unlock(dep); -#endif - - erts_mtx_lock(&dep->qlock); - - erts_atomic64_set_nob(&dep->in, 0); - erts_atomic64_set_nob(&dep->out, 0); + ERTS_LC_ASSERT(erts_lc_mtx_is_locked(&dep->qlock)); if (!dep->out_queue.last) obuf = dep->finalized_out_queue.first; @@ -728,17 +706,12 @@ static void clear_dist_entry(DistEntry *dep) dep->tmp_out_queue.last = NULL; dep->finalized_out_queue.first = NULL; dep->finalized_out_queue.last = NULL; - dep->status = 0; - suspendees = get_suspended_on_de(dep, ERTS_DE_QFLGS_ALL); - - erts_mtx_unlock(&dep->qlock); - erts_atomic_set_nob(&dep->dist_cmd_scheduled, 0); - dep->send = NULL; - erts_de_rwunlock(dep); - - erts_resume_processes(suspendees); + return obuf; +} - delete_cache(cache); +static void free_de_out_queues(DistEntry* dep, ErtsDistOutputBuf *obuf) +{ + Sint obufsize = 0; while (obuf) { ErtsDistOutputBuf *fobuf; @@ -750,13 +723,54 @@ static void clear_dist_entry(DistEntry *dep) if (obufsize) { erts_mtx_lock(&dep->qlock); - ASSERT(erts_atomic_read_nob(&dep->qsize) >= obufsize); + ASSERT(erts_atomic_read_nob(&dep->qsize) >= obufsize); erts_atomic_add_nob(&dep->qsize, (erts_aint_t) -obufsize); erts_mtx_unlock(&dep->qlock); } } +static void clear_dist_entry(DistEntry *dep) +{ + ErtsAtomCache *cache; + ErtsProcList *suspendees; + ErtsDistOutputBuf *obuf; + + erts_de_rwlock(dep); + erts_atomic_set_nob(&dep->input_handler, + (erts_aint_t) NIL); + cache = dep->cache; + dep->cache = NULL; + +#ifdef DEBUG + erts_de_links_lock(dep); + ASSERT(!dep->nlinks); + ASSERT(!dep->node_links); + ASSERT(!dep->monitors); + erts_de_links_unlock(dep); +#endif + + erts_mtx_lock(&dep->qlock); + + erts_atomic64_set_nob(&dep->in, 0); + erts_atomic64_set_nob(&dep->out, 0); + + obuf = clear_de_out_queues(dep); + dep->status = 0; + suspendees = get_suspended_on_de(dep, ERTS_DE_QFLGS_ALL); + + erts_mtx_unlock(&dep->qlock); + erts_atomic_set_nob(&dep->dist_cmd_scheduled, 0); + dep->send = NULL; + erts_de_rwunlock(dep); + + erts_resume_processes(suspendees); + + delete_cache(cache); + + free_de_out_queues(dep, obuf); +} + int erts_dsend_context_dtor(Binary* ctx_bin) { ErtsSendContext* ctx = ERTS_MAGIC_BIN_DATA(ctx_bin); @@ -772,8 +786,8 @@ int erts_dsend_context_dtor(Binary* ctx_bin) if (ctx->dss.phase >= ERTS_DSIG_SEND_PHASE_ALLOC && ctx->dss.obuf) { free_dist_obuf(ctx->dss.obuf); } - if (ctx->dep_to_deref) - erts_deref_dist_entry(ctx->dep_to_deref); + if (ctx->deref_dep) + erts_deref_dist_entry(ctx->dep); return 1; } @@ -1324,7 +1338,7 @@ int erts_net_message(Port *prt, /* This is tricky (we MUST force a distributed send) */ ErtsDSigData dsd; int code; - code = erts_dsig_prepare(&dsd, dep, NULL, ERTS_DSP_NO_LOCK, 0); + code = erts_dsig_prepare(&dsd, &dep, NULL, 0, ERTS_DSP_NO_LOCK, 0, 0); if (code == ERTS_DSIG_PREP_CONNECTED) { code = erts_dsig_send_exit(&dsd, to, from, am_noproc); ASSERT(code == ERTS_DSIG_SEND_OK); @@ -1416,7 +1430,7 @@ int erts_net_message(Port *prt, if (!rp) { ErtsDSigData dsd; int code; - code = erts_dsig_prepare(&dsd, dep, NULL, ERTS_DSP_NO_LOCK, 0); + code = erts_dsig_prepare(&dsd, &dep, NULL, 0, ERTS_DSP_NO_LOCK, 0, 0); if (code == ERTS_DSIG_PREP_CONNECTED) { code = erts_dsig_send_m_exit(&dsd, watcher, watched, ref, am_noproc); @@ -1883,7 +1897,7 @@ erts_dsig_send(ErtsDSigData *dsdp, struct erts_dsig_send_context* ctx) } else { ctx->acmp = NULL; - ctx->pass_through_size = 1; + ctx->pass_through_size = 3; /* SVERK rename */ } #ifdef ERTS_DIST_MSG_DBG @@ -1961,9 +1975,9 @@ erts_dsig_send(ErtsDSigData *dsdp, struct erts_dsig_send_context* ctx) ctx->obuf->next = NULL; erts_de_rlock(dep); cid = dep->cid; - if (cid != dsdp->cid - || dep->connection_id != dsdp->connection_id - || dep->status & ERTS_DE_SFLG_EXITING) { + if (!(dep->status & (ERTS_DE_SFLG_PENDING | ERTS_DE_SFLG_CONNECTED)) + || dep->status & ERTS_DE_SFLG_EXITING + || dep->connection_id != dsdp->connection_id) { /* Not the same connection as when we started; drop message... */ erts_de_runlock(dep); free_dist_obuf(ctx->obuf); @@ -2037,8 +2051,13 @@ erts_dsig_send(ErtsDSigData *dsdp, struct erts_dsig_send_context* ctx) } erts_mtx_unlock(&dep->qlock); - if (is_internal_port(dep->cid)) - erts_schedule_dist_command(NULL, dep); + if (!(dep->status & ERTS_DE_SFLG_PENDING)) { + if (is_internal_port(dep->cid)) + erts_schedule_dist_command(NULL, dep); + } + else { + notify_proc = NIL; + } erts_de_runlock(dep); if (is_internal_pid(notify_proc)) notify_dist_data(ctx->c_p, notify_proc); @@ -2306,9 +2325,6 @@ erts_dist_command(Port *prt, int reds_limit) ob->extp = erts_encode_ext_dist_header_finalize(ob->extp, dep->cache, flags); - if (!(flags & DFLAG_DIST_HDR_ATOM_CACHE)) - *--ob->extp = PASS_THROUGH; /* Old node; 'pass through' - needed */ ASSERT(&ob->data[0] <= ob->extp && ob->extp < ob->ext_endp); reds += ERTS_PORT_REDS_DIST_CMD_FINALIZE; preempt = reds > reds_limit; @@ -2346,21 +2362,18 @@ erts_dist_command(Port *prt, int reds_limit) int de_busy; int preempt = 0; while (oq.first && !preempt) { - ErtsDistOutputBuf *fob; - Uint size; - oq.first->extp - = erts_encode_ext_dist_header_finalize(oq.first->extp, - dep->cache, - flags); - reds += ERTS_PORT_REDS_DIST_CMD_FINALIZE; - if (!(flags & DFLAG_DIST_HDR_ATOM_CACHE)) - *--oq.first->extp = PASS_THROUGH; /* Old node; 'pass through' - needed */ - ASSERT(&oq.first->data[0] <= oq.first->extp - && oq.first->extp < oq.first->ext_endp); - size = (*send)(prt, oq.first); + ErtsDistOutputBuf *fob; + Uint size; + oq.first->extp + = erts_encode_ext_dist_header_finalize(oq.first->extp, + dep->cache, + flags); + reds += ERTS_PORT_REDS_DIST_CMD_FINALIZE; + ASSERT(&oq.first->data[0] <= oq.first->extp + && oq.first->extp < oq.first->ext_endp); + size = (*send)(prt, oq.first); erts_atomic64_inc_nob(&dep->out); - esdp->io.out += (Uint64) size; + esdp->io.out += (Uint64) size; #ifdef ERTS_RAW_DIST_MSG_DBG erts_fprintf(stderr, ">> "); bw(oq.first->extp, size); @@ -2482,11 +2495,13 @@ erts_dist_command(Port *prt, int reds_limit) foq.first = NULL; foq.last = NULL; +/* SVERK Hmmm.... #ifdef DEBUG erts_mtx_lock(&dep->qlock); ASSERT(erts_atomic_read_nob(&dep->qsize) == obufsize); erts_mtx_unlock(&dep->qlock); #endif +*/ } else { if (oq.first) { @@ -3218,6 +3233,8 @@ BIF_RETTYPE setnode_3(BIF_ALIST_3) ErtsProcLocks proc_unlock = 0; Process *proc; Port *pp = NULL; + Eterm notify_proc; + erts_aint32_t qflgs; /* * Check and pick out arguments @@ -3245,15 +3262,14 @@ BIF_RETTYPE setnode_3(BIF_ALIST_3) if (!is_atom(ic) || !is_atom(oc)) goto badarg; - /* DFLAG_EXTENDED_REFERENCES is compulsory from R9 and forward */ - if (!(DFLAG_EXTENDED_REFERENCES & flags)) { + if (~flags & DFLAG_DIST_MANDATORY) { erts_dsprintf_buf_t *dsbufp = erts_create_logger_dsbuf(); erts_dsprintf(dsbufp, "%T", BIF_P->common.id); if (BIF_P->common.u.alive.reg) erts_dsprintf(dsbufp, " (%T)", BIF_P->common.u.alive.reg->name); erts_dsprintf(dsbufp, " attempted to enable connection to node %T " - "which is not able to handle extended references.\n", + "which does not support all mandatory capabilities.\n", BIF_ARG_1); erts_send_error_to_logger(BIF_P->group_leader, dsbufp); goto badarg; @@ -3376,7 +3392,8 @@ BIF_RETTYPE setnode_3(BIF_ALIST_3) dep->creation = 0; #ifdef DEBUG - ASSERT(erts_atomic_read_nob(&dep->qsize) == 0); + ASSERT(erts_atomic_read_nob(&dep->qsize) == 0 + || (dep->status & ERTS_DE_SFLG_PENDING)); #endif if (flags & DFLAG_DIST_HDR_ATOM_CACHE) @@ -3384,7 +3401,26 @@ BIF_RETTYPE setnode_3(BIF_ALIST_3) erts_set_dist_entry_connected(dep, BIF_ARG_2, flags); + notify_proc = NIL; + if (erts_atomic_read_nob(&dep->qsize)) { + if (is_internal_port(dep->cid)) { + erts_schedule_dist_command(NULL, dep); + } + else { + qflgs = erts_atomic32_read_nob(&dep->qflgs); + if (qflgs & ERTS_DE_QFLG_REQ_INFO) { + qflgs = erts_atomic32_read_band_mb(&dep->qflgs, + ~ERTS_DE_QFLG_REQ_INFO); + if (qflgs & ERTS_DE_QFLG_REQ_INFO) { + notify_proc = dep->cid; + ASSERT(is_internal_pid(notify_proc)); + } + } + } + } erts_de_rwunlock(dep); + if (is_internal_pid(notify_proc)) + notify_dist_data(BIF_P, notify_proc); ERTS_BIF_PREP_RET(ret, erts_make_dhandle(BIF_P, dep)); @@ -3426,6 +3462,112 @@ BIF_RETTYPE setnode_3(BIF_ALIST_3) goto done; } +BIF_RETTYPE new_connection_id_1(BIF_ALIST_1) +{ + DistEntry* dep; + Uint32 conn_id; + + if (is_not_atom(BIF_ARG_1)) { + BIF_ERROR(BIF_P, BADARG); + } + dep = erts_find_or_insert_dist_entry(BIF_ARG_1); + ASSERT(dep != erts_this_dist_entry); /* SVERK: What to do? */ + + erts_de_rwlock(dep); + + if (ERTS_DE_IS_CONNECTED(dep) || dep->status & ERTS_DE_SFLG_PENDING) + conn_id = dep->connection_id; + else if (dep->status == 0) { + dep->status = ERTS_DE_SFLG_PENDING; + dep->flags = DFLAG_DIST_MANDATORY | DFLAG_PENDING_CONNECTION; + dep->connection_id++; + dep->connection_id &= ERTS_DIST_CON_ID_MASK; + conn_id = dep->connection_id; + } + else { + ASSERT(!"SVERK: What to do?"); + conn_id = dep->connection_id; + } + erts_de_rwunlock(dep); + BIF_RET(make_small(conn_id)); +} + +BIF_RETTYPE abort_connection_id_2(BIF_ALIST_2) +{ + DistEntry* dep; + + if (is_not_atom(BIF_ARG_1) || is_not_small(BIF_ARG_2)) { + BIF_ERROR(BIF_P, BADARG); + } + dep = erts_find_dist_entry(BIF_ARG_1); + ASSERT(dep != erts_this_dist_entry); /* SVERK: What to do? */ + + if (!dep) { + BIF_RET(am_false); + } + + erts_de_rwlock(dep); + + if (dep->status == ERTS_DE_SFLG_PENDING + && dep->connection_id == unsigned_val(BIF_ARG_2)) { + + NetExitsContext nec = {dep}; + ErtsLink *nlinks; + ErtsLink *node_links; + ErtsMonitor *monitors; + ErtsAtomCache *cache; + ErtsDistOutputBuf *obuf; + ErtsProcList *resume_procs; + Sint reds = 0; + + ASSERT(is_nil(dep->cid)); + + erts_de_links_lock(dep); + monitors = dep->monitors; + nlinks = dep->nlinks; + node_links = dep->node_links; + dep->monitors = NULL; + dep->nlinks = NULL; + dep->node_links = NULL; + erts_de_links_unlock(dep); + + cache = dep->cache; + dep->cache = NULL; + dep->status = 0; + dep->flags = 0; + erts_mtx_lock(&dep->qlock); + obuf = dep->out_queue.first; + dep->out_queue.first = NULL; + dep->out_queue.last = NULL; + ASSERT(!dep->tmp_out_queue.first); + ASSERT(!dep->finalized_out_queue.first); + resume_procs = get_suspended_on_de(dep, ERTS_DE_QFLGS_ALL); + erts_mtx_unlock(&dep->qlock); + erts_atomic_set_nob(&dep->dist_cmd_scheduled, 0); + dep->send = NULL; + erts_de_rwunlock(dep); + + erts_sweep_monitors(monitors, &doit_monitor_net_exits, &nec); + erts_sweep_links(nlinks, &doit_link_net_exits, &nec); + erts_sweep_links(node_links, &doit_node_link_net_exits, &nec); + + if (resume_procs) { + int resumed = erts_resume_processes(resume_procs); + reds += resumed*ERTS_PORT_REDS_DIST_CMD_RESUMED; + } + + delete_cache(cache); + + free_de_out_queues(dep, obuf); + + BIF_RET2(am_true, reds); + } + + erts_de_rwunlock(dep); + + BIF_RET(am_false); +} + /**********************************************************************/ /* dist_exit(Local, Term, Remote) -> Bool */ diff --git a/erts/emulator/beam/dist.h b/erts/emulator/beam/dist.h index d4765c50b8..26432b21e9 100644 --- a/erts/emulator/beam/dist.h +++ b/erts/emulator/beam/dist.h @@ -45,6 +45,13 @@ #define DFLAG_MAP_TAG 0x20000 #define DFLAG_BIG_CREATION 0x40000 #define DFLAG_SEND_SENDER 0x80000 +#define DFLAG_PENDING_CONNECTION 0x100000 + +/* Mandatory flags for distribution (sync with dist_util.erl) */ +#define DFLAG_DIST_MANDATORY (DFLAG_EXTENDED_REFERENCES \ + | DFLAG_EXTENDED_PIDS_PORTS \ + | DFLAG_UTF8_ATOMS \ + | DFLAG_NEW_FUN_TAGS) /* All flags that should be enabled when term_to_binary/1 is used. */ #define TERM_TO_BINARY_DFLAGS (DFLAG_EXTENDED_REFERENCES \ @@ -98,6 +105,7 @@ typedef enum { typedef struct { Process *proc; DistEntry *dep; + Eterm node; /* used if dep == NULL */ Eterm cid; Eterm connection_id; int no_suspend; @@ -117,14 +125,10 @@ extern int erts_is_alive; /* * erts_dsig_prepare() prepares a send of a distributed signal. - * One of the values defined below are returned. If the returned - * value is another than ERTS_DSIG_PREP_CONNECTED, the - * distributed signal cannot be sent before appropriate actions - * have been taken. Appropriate actions would typically be setting - * up the connection. + * One of the values defined below are returned. */ -/* Connected; signal can be sent. */ +/* Connected; signals can be enqueued and sent. */ #define ERTS_DSIG_PREP_CONNECTED 0 /* Not connected; connection needs to be set up. */ #define ERTS_DSIG_PREP_NOT_CONNECTED 1 @@ -132,11 +136,15 @@ extern int erts_is_alive; #define ERTS_DSIG_PREP_WOULD_SUSPEND 2 /* System not alive (distributed) */ #define ERTS_DSIG_PREP_NOT_ALIVE 3 +/* Pending connection; signals can be enqueued */ +#define ERTS_DSIG_PREP_PENDING 4 ERTS_GLB_INLINE int erts_dsig_prepare(ErtsDSigData *, - DistEntry *, + DistEntry **, Process *, + ErtsProcLocks, ErtsDSigPrepLock, + int, int); ERTS_GLB_INLINE @@ -146,29 +154,100 @@ void erts_schedule_dist_command(Port *, DistEntry *); ERTS_GLB_INLINE int erts_dsig_prepare(ErtsDSigData *dsdp, - DistEntry *dep, + DistEntry **depp, Process *proc, + ErtsProcLocks proc_locks, ErtsDSigPrepLock dspl, - int no_suspend) + int no_suspend, + int connect) { - int failure; + DistEntry* dep = *depp; + int res; + if (!erts_is_alive) return ERTS_DSIG_PREP_NOT_ALIVE; - if (!dep) - return ERTS_DSIG_PREP_NOT_CONNECTED; + if (!dep) { + if (!connect) + return ERTS_DSIG_PREP_NOT_CONNECTED; + + dep = erts_find_or_insert_dist_entry(dsdp->node); + ASSERT(dep != erts_this_dist_entry); /* SVERK: What to do? */ + } + +#ifdef ERTS_ENABLE_LOCK_CHECK + if (connect) { + erts_proc_lc_might_unlock(proc, proc_locks); + } +#endif + +retry: if (dspl == ERTS_DSP_RWLOCK) erts_de_rwlock(dep); else erts_de_rlock(dep); - if (ERTS_DE_IS_NOT_CONNECTED(dep)) { - failure = ERTS_DSIG_PREP_NOT_CONNECTED; + + if (ERTS_DE_IS_CONNECTED(dep)) { + res = ERTS_DSIG_PREP_CONNECTED; + } + else if (dep->status & ERTS_DE_SFLG_PENDING) { + res = ERTS_DSIG_PREP_PENDING; + } + else if (dep->status & ERTS_DE_SFLG_EXITING) { + /* SVERK is this ok, or should we trigger another connection setup */ + res = ERTS_DSIG_PREP_NOT_CONNECTED; + goto fail; + } + else if (connect) { + ASSERT(dep->status == 0); + if (dspl != ERTS_DSP_RWLOCK) { + erts_de_runlock(dep); + erts_de_rwlock(dep); + } + if (dep->status == 0) { + Process* net_kernel; + ErtsProcLocks nk_locks = ERTS_PROC_LOCK_MSGQ; + Eterm *hp; + ErlOffHeap *ohp; + ErtsMessage *mp; + Eterm msg, conn_id; + + dep->status = ERTS_DE_SFLG_PENDING; + dep->flags = DFLAG_DIST_MANDATORY | DFLAG_PENDING_CONNECTION; + dep->connection_id++; + dep->connection_id &= ERTS_DIST_CON_ID_MASK; + conn_id = make_small(dep->connection_id); + erts_de_rwunlock(dep); + + net_kernel = erts_whereis_process(proc, proc_locks, + am_net_kernel, nk_locks, 0); + if (!net_kernel) { + if (!*depp) { + erts_deref_dist_entry(dep); + } + return ERTS_DSIG_PREP_NOT_ALIVE; + } + + /* Send {auto_connect, Node, ConnId} to net_kernel */ + mp = erts_alloc_message_heap(net_kernel, &nk_locks, 4, &hp, &ohp); + msg = TUPLE3(hp, am_auto_connect, dep->sysname, conn_id); + erts_queue_message(net_kernel, nk_locks, mp, msg, proc->common.id); + erts_proc_unlock(net_kernel, nk_locks); + } + else + erts_de_rwunlock(dep); + goto retry; + } + else { + ASSERT(dep->status == 0); + res = ERTS_DSIG_PREP_NOT_CONNECTED; goto fail; } + if (no_suspend) { - if (erts_atomic32_read_acqb(&dep->qflgs) & ERTS_DE_QFLG_BUSY) { - failure = ERTS_DSIG_PREP_WOULD_SUSPEND; + if (erts_atomic32_read_acqb(&dep->qflgs) & ERTS_DE_QFLG_BUSY) { + res = ERTS_DSIG_PREP_WOULD_SUSPEND; goto fail; - } + } } dsdp->proc = proc; dsdp->dep = dep; @@ -177,15 +256,15 @@ erts_dsig_prepare(ErtsDSigData *dsdp, dsdp->no_suspend = no_suspend; if (dspl == ERTS_DSP_NO_LOCK) erts_de_runlock(dep); - return ERTS_DSIG_PREP_CONNECTED; + *depp = dep; + return res; fail: if (dspl == ERTS_DSP_RWLOCK) erts_de_rwunlock(dep); else erts_de_runlock(dep); - return failure; - + return res; } ERTS_GLB_INLINE @@ -346,11 +425,12 @@ struct erts_dsig_send_context { typedef struct { int suspend; + int connect; Eterm ctl_heap[6]; ErtsDSigData dsd; - DistEntry* dep_to_deref; DistEntry *dep; + int deref_dep; struct erts_dsig_send_context dss; Eterm return_term; diff --git a/erts/emulator/beam/erl_node_tables.c b/erts/emulator/beam/erl_node_tables.c index 9931686cbe..dd607f438e 100644 --- a/erts/emulator/beam/erl_node_tables.c +++ b/erts/emulator/beam/erl_node_tables.c @@ -597,6 +597,8 @@ erts_set_dist_entry_not_connected(DistEntry *dep) void erts_set_dist_entry_connected(DistEntry *dep, Eterm cid, Uint flags) { + erts_aint32_t set_qflgs; + ERTS_LC_ASSERT(erts_lc_is_de_rwlocked(dep)); erts_rwmtx_rwlock(&erts_dist_table_rwmtx); @@ -619,22 +621,26 @@ erts_set_dist_entry_connected(DistEntry *dep, Eterm cid, Uint flags) ASSERT(erts_no_of_not_connected_dist_entries > 0); erts_no_of_not_connected_dist_entries--; + if (dep->status & ERTS_DE_SFLG_PENDING) { + dep->status &= ~ERTS_DE_SFLG_PENDING; + } else { + dep->connection_id++; + dep->connection_id &= ERTS_DIST_CON_ID_MASK; + } dep->status |= ERTS_DE_SFLG_CONNECTED; - dep->flags = flags; + dep->flags = flags & ~DFLAG_PENDING_CONNECTION; dep->cid = cid; erts_atomic_set_nob(&dep->input_handler, (erts_aint_t) cid); - dep->connection_id++; - dep->connection_id &= ERTS_DIST_EXT_CON_ID_MASK; dep->prev = NULL; erts_atomic64_set_nob(&dep->in, 0); erts_atomic64_set_nob(&dep->out, 0); - erts_atomic32_set_nob(&dep->qflgs, - (is_internal_port(cid) - ? ERTS_DE_QFLG_PORT_CTRL - : ERTS_DE_QFLG_PROC_CTRL)); + set_qflgs = (is_internal_port(cid) ? + ERTS_DE_QFLG_PORT_CTRL : ERTS_DE_QFLG_PROC_CTRL); + erts_atomic32_read_bor_nob(&dep->qflgs, set_qflgs); + if(flags & DFLAG_PUBLISHED) { dep->next = erts_visible_dist_entries; if(erts_visible_dist_entries) { diff --git a/erts/emulator/beam/erl_node_tables.h b/erts/emulator/beam/erl_node_tables.h index 2cfd18f22f..d012f4b2cf 100644 --- a/erts/emulator/beam/erl_node_tables.h +++ b/erts/emulator/beam/erl_node_tables.h @@ -57,8 +57,9 @@ #define ERST_INTERNAL_CHANNEL_NO 0 -#define ERTS_DE_SFLG_CONNECTED (((Uint32) 1) << 0) -#define ERTS_DE_SFLG_EXITING (((Uint32) 1) << 1) +#define ERTS_DE_SFLG_PENDING (((Uint32) 1) << 0) +#define ERTS_DE_SFLG_CONNECTED (((Uint32) 1) << 1) +#define ERTS_DE_SFLG_EXITING (((Uint32) 1) << 2) #define ERTS_DE_QFLG_BUSY (((erts_aint32_t) 1) << 0) #define ERTS_DE_QFLG_EXIT (((erts_aint32_t) 1) << 1) @@ -106,8 +107,6 @@ struct ErtsProcList_; * unlock mutexes with higher numbers before mutexes with higher numbers. */ -struct erl_link; - typedef struct dist_entry_ { HashBucket hash_bucket; /* Hash bucket */ struct dist_entry_ *next; /* Next entry in dist_table (not sorted) */ diff --git a/erts/emulator/beam/erl_process.c b/erts/emulator/beam/erl_process.c index 3c0a126fe2..2d13cd92b2 100644 --- a/erts/emulator/beam/erl_process.c +++ b/erts/emulator/beam/erl_process.c @@ -12659,9 +12659,11 @@ static void doit_exit_monitor(ErtsMonitor *mon, void *vpcontext) erts_de_links_unlock(dep); if (rmon) { ErtsDSigData dsd; - int code = erts_dsig_prepare(&dsd, dep, NULL, - ERTS_DSP_NO_LOCK, 0); - if (code == ERTS_DSIG_PREP_CONNECTED) { + int code = erts_dsig_prepare(&dsd, &dep, NULL, 0, + ERTS_DSP_NO_LOCK, 0, 0); + if (code == ERTS_DSIG_PREP_CONNECTED || + code == ERTS_DSIG_PREP_PENDING) { + code = erts_dsig_send_demonitor(&dsd, rmon->u.pid, mon->name, @@ -12705,9 +12707,11 @@ static void doit_exit_monitor(ErtsMonitor *mon, void *vpcontext) erts_de_links_unlock(dep); if (rmon) { ErtsDSigData dsd; - int code = erts_dsig_prepare(&dsd, dep, NULL, - ERTS_DSP_NO_LOCK, 0); - if (code == ERTS_DSIG_PREP_CONNECTED) { + int code = erts_dsig_prepare(&dsd, &dep, NULL, 0, + ERTS_DSP_NO_LOCK, 0, 0); + if (code == ERTS_DSIG_PREP_CONNECTED || + code == ERTS_DSIG_PREP_PENDING) { + code = erts_dsig_send_demonitor(&dsd, rmon->u.pid, mon->u.pid, @@ -12764,8 +12768,8 @@ static void doit_exit_monitor(ErtsMonitor *mon, void *vpcontext) erts_de_links_unlock(dep); if (rmon) { ErtsDSigData dsd; - int code = erts_dsig_prepare(&dsd, dep, NULL, - ERTS_DSP_NO_LOCK, 0); + int code = erts_dsig_prepare(&dsd, &dep, NULL, 0, + ERTS_DSP_NO_LOCK, 0, 0); if (code == ERTS_DSIG_PREP_CONNECTED) { code = erts_dsig_send_m_exit(&dsd, mon->u.pid, @@ -12887,14 +12891,18 @@ static void doit_exit_link(ErtsLink *lnk, void *vpcontext) int code; ErtsDistLinkData dld; erts_remove_dist_link(&dld, p->common.id, item, dep); - erts_proc_lock(p, ERTS_PROC_LOCK_MAIN); - code = erts_dsig_prepare(&dsd, dep, p, ERTS_DSP_NO_LOCK, 0); - if (code == ERTS_DSIG_PREP_CONNECTED) { - code = erts_dsig_send_exit_tt(&dsd, p->common.id, item, - reason, SEQ_TRACE_TOKEN(p)); - ASSERT(code == ERTS_DSIG_SEND_OK); - } - erts_proc_unlock(p, ERTS_PROC_LOCK_MAIN); + if (dld.d_lnk) { + erts_proc_lock(p, ERTS_PROC_LOCK_MAIN); + code = erts_dsig_prepare(&dsd, &dep, p, 0, ERTS_DSP_NO_LOCK, 0, 0); + if (code == ERTS_DSIG_PREP_CONNECTED || + code == ERTS_DSIG_PREP_PENDING) { + + code = erts_dsig_send_exit_tt(&dsd, p->common.id, item, + reason, SEQ_TRACE_TOKEN(p)); + ASSERT(code == ERTS_DSIG_SEND_OK); + } + erts_proc_unlock(p, ERTS_PROC_LOCK_MAIN); + } erts_destroy_dist_link(&dld); } } diff --git a/erts/emulator/beam/external.c b/erts/emulator/beam/external.c index 3b851087d1..c49e2f617a 100644 --- a/erts/emulator/beam/external.c +++ b/erts/emulator/beam/external.c @@ -349,6 +349,8 @@ byte *erts_encode_ext_dist_header_setup(byte *ctl_ext, ErtsAtomCacheMap *acmp) } } +#define PASS_THROUGH 'p' /* This code should go */ + byte *erts_encode_ext_dist_header_finalize(byte *ext, ErtsAtomCache *cache, Uint32 dflags) { byte *ip; @@ -358,9 +360,33 @@ byte *erts_encode_ext_dist_header_finalize(byte *ext, ErtsAtomCache *cache, Uint int long_atoms; register byte *ep = ext; ASSERT(dflags & DFLAG_UTF8_ATOMS); - ASSERT(ep[0] == VERSION_MAGIC); - if (ep[1] != DIST_HEADER) - return ext; + + if (ep[0] != VERSION_MAGIC) { + ASSERT(ep[0] == SMALL_TUPLE_EXT || ep[0] == LARGE_TUPLE_EXT); + if (dflags & DFLAG_DIST_HDR_ATOM_CACHE) { + /* + * Encoded without atom cache (toward pending connection) + * but receiver wants dist header. Let's prepend an empty one. + */ + *--ep = 0; /* NumberOfAtomCacheRefs */ + *--ep = DIST_HEADER; + *--ep = VERSION_MAGIC; + } + else { + /* Node without atom cache, 'pass through' needed */ + + ASSERT(!"SVERK: Must insert VERSION_MAGIC's"); + *--ep = PASS_THROUGH; + } + return ep; + } + else if (ep[1] != DIST_HEADER) { + ASSERT(ep[1] == SMALL_TUPLE_EXT || ep[1] == LARGE_TUPLE_EXT); + ASSERT(!(dflags & DFLAG_DIST_HDR_ATOM_CACHE)); + /* Node without atom cache, 'pass through' needed */ + *--ep = PASS_THROUGH; + return ep; + } dist_hdr_flags = ep[2]; long_atoms = ERTS_DIST_HDR_LONG_ATOMS_FLG & ((int) dist_hdr_flags); @@ -511,7 +537,7 @@ int erts_encode_dist_ext_size_int(Eterm term, struct erts_dsig_send_context* ctx return -1; } else { #ifndef ERTS_DEBUG_USE_DIST_SEP - if (!(ctx->flags & DFLAG_DIST_HDR_ATOM_CACHE)) + if (!(ctx->flags & (DFLAG_DIST_HDR_ATOM_CACHE | DFLAG_PENDING_CONNECTION))) #endif sz++ /* VERSION_MAGIC */; @@ -543,7 +569,7 @@ int erts_encode_dist_ext(Eterm term, byte **ext, Uint32 flags, ErtsAtomCacheMap { if (!ctx || !ctx->wstack.wstart) { #ifndef ERTS_DEBUG_USE_DIST_SEP - if (!(flags & DFLAG_DIST_HDR_ATOM_CACHE)) + if (!(flags & (DFLAG_DIST_HDR_ATOM_CACHE | DFLAG_PENDING_CONNECTION))) #endif *(*ext)++ = VERSION_MAGIC; } @@ -644,12 +670,11 @@ erts_prepare_dist_ext(ErtsDistExternal *edep, erts_de_rlock(dep); - if ((dep->status & (ERTS_DE_SFLG_EXITING|ERTS_DE_SFLG_CONNECTED)) - != ERTS_DE_SFLG_CONNECTED) { + if (dep->status != ERTS_DE_SFLG_CONNECTED && + dep->status != ERTS_DE_SFLG_PENDING) { erts_de_runlock(dep); return ERTS_PREP_DIST_EXT_CLOSED; } - if (dep->flags & DFLAG_DIST_HDR_ATOM_CACHE) edep->flags |= ERTS_DIST_EXT_DFLAG_HDR; diff --git a/erts/emulator/beam/external.h b/erts/emulator/beam/external.h index d6416edbc3..49950c4aad 100644 --- a/erts/emulator/beam/external.h +++ b/erts/emulator/beam/external.h @@ -117,7 +117,7 @@ typedef struct { #define ERTS_DIST_EXT_ATOM_TRANS_TAB ((Uint32) 0x2) #define ERTS_DIST_EXT_BTT_SAFE ((Uint32) 0x4) -#define ERTS_DIST_CON_ID_MASK ((Uint32) 0x00ffffff) +#define ERTS_DIST_CON_ID_MASK ((Uint32) 0x00ffffff) /* also in net_kernel.erl */ typedef struct { DistEntry *dep; diff --git a/erts/emulator/beam/io.c b/erts/emulator/beam/io.c index 85013af3ad..6cd1aa5e79 100644 --- a/erts/emulator/beam/io.c +++ b/erts/emulator/beam/io.c @@ -3829,11 +3829,12 @@ static void sweep_one_link(ErtsLink *lnk, void *vpsc) ErtsDistLinkData dld; ErtsDSigData dsd; int code; - code = erts_dsig_prepare(&dsd, dep, NULL, ERTS_DSP_NO_LOCK, 0); + code = erts_dsig_prepare(&dsd, &dep, NULL, 0, ERTS_DSP_NO_LOCK, 0, 0); switch (code) { case ERTS_DSIG_PREP_NOT_ALIVE: case ERTS_DSIG_PREP_NOT_CONNECTED: break; + case ERTS_DSIG_PREP_PENDING: case ERTS_DSIG_PREP_CONNECTED: erts_remove_dist_link(&dld, port_id, lnk->pid, dep); erts_destroy_dist_link(&dld); diff --git a/erts/preloaded/src/erlang.erl b/erts/preloaded/src/erlang.erl index 80bceae506..28625c400f 100644 --- a/erts/preloaded/src/erlang.erl +++ b/erts/preloaded/src/erlang.erl @@ -165,6 +165,8 @@ -export([registered/0, resume_process/1, round/1, self/0]). -export([seq_trace/2, seq_trace_print/1, seq_trace_print/2, setnode/2]). -export([setnode/3, size/1, spawn/3, spawn_link/3, split_binary/2]). +-export([new_connection_id/1]). +-export([abort_connection_id/2]). -export([suspend_process/2, system_monitor/0]). -export([system_monitor/1, system_monitor/2, system_profile/0]). -export([system_profile/2, throw/1, time/0, trace/3, trace_delivered/1]). @@ -1670,6 +1672,19 @@ setnode(_P1, _P2) -> setnode(_P1, _P2, _P3) -> erlang:nif_error(undefined). +%% new_connection_id/1 +-spec erlang:new_connection_id(Node) -> integer() when + Node :: atom(). +new_connection_id(_Node) -> + erlang:nif_error(undefined). + +%% abort_connection_id/2 +-spec erlang:abort_connection_id(Node, ConnId) -> integer() when + Node :: atom(), + ConnId :: integer(). +abort_connection_id(_Node, _ConnId) -> + erlang:nif_error(undefined). + %% size/1 %% Shadowed by erl_bif_types: erlang:size/1 -spec size(Item) -> non_neg_integer() when diff --git a/lib/kernel/src/net_kernel.erl b/lib/kernel/src/net_kernel.erl index c68036a291..f929e4bf11 100644 --- a/lib/kernel/src/net_kernel.erl +++ b/lib/kernel/src/net_kernel.erl @@ -122,6 +122,7 @@ -record(connection, { node, %% remote node name + conn_id, %% Connection identity state, %% pending | up | up_pending owner, %% owner pid pending_owner, %% possible new owner @@ -362,54 +363,33 @@ init({Name, LongOrShortNames, TickT, CleanHalt}) -> end. -handle_connect([Conn], _, _, From, State) when Conn#connection.state =:= up -> - async_reply({reply, true, State}, From); -handle_connect([Conn], _, _, From, State) when Conn#connection.state =:= pending; - Conn#connection.state =:= up_pending -> - Waiting = Conn#connection.waiting, - ets:insert(sys_dist, Conn#connection{waiting = [From|Waiting]}), - {noreply, State}; -handle_connect(_, Type, Node, From , State) -> - case setup(Node,Type,From,State) of - {ok, SetupPid} -> - Owners = [{SetupPid, Node} | State#state.conn_owners], - {noreply,State#state{conn_owners=Owners}}; - _Error -> - ?connect_failure(Node, {setup_call, failed, _Error}), - async_reply({reply, false, State}, From) - end. - -%% ------------------------------------------------------------ -%% handle_call. -%% ------------------------------------------------------------ - -%% -%% Auto-connect to Node. -%% The response is delayed until the connection is up and -%% running. -%% -handle_call({auto_connect, _, Node, _}, From, State) when Node =:= node() -> - async_reply({reply, true, State}, From); -handle_call({auto_connect, Type, Node, WaitForBarred}, From, State) -> - verbose({auto_connect, Type, Node, WaitForBarred}, 1, State), - +handle_auto_connect(Type, Node, ConnId, WaitForBarred, From, State) -> ConnLookup = ets:lookup(sys_dist, Node), case ConnLookup of [#barred_connection{}] -> case WaitForBarred of false -> - async_reply({reply, false, State}, From); + {reply, false, State}; true -> spawn(?MODULE,passive_connect_monitor,[From,Node]), {noreply, State} end; + [#connection{conn_id=ConnId, state = up}] -> + {reply, true, State}; + [#connection{conn_id=ConnId, waiting=Waiting}=Conn] -> + case From of + noreply -> ok; + _ -> ets:insert(sys_dist, Conn#connection{waiting = [From|Waiting]}) + end, + {noreply, State}; + _ -> case application:get_env(kernel, dist_auto_connect) of {ok, never} -> ?connect_failure(Node,{dist_auto_connect,never}), - async_reply({reply, false, State}, From); + {reply, false, State}; %% This might happen due to connection close %% not beeing propagated to user space yet. @@ -418,11 +398,78 @@ handle_call({auto_connect, Type, Node, WaitForBarred}, From, State) -> (hd(ConnLookup))#connection.state =:= up -> ?connect_failure(Node,{barred_connection, ets:lookup(sys_dist, Node)}), - async_reply({reply, false, State}, From); + {reply, false, State}; _ -> - handle_connect(ConnLookup, Type, Node, From, State) + case setup(ConnLookup, Node,ConnId,Type,From,State) of + {ok, SetupPid} -> + Owners = [{SetupPid, Node} | State#state.conn_owners], + {noreply,State#state{conn_owners=Owners}}; + _Error -> + ?connect_failure(Node, {setup_call, failed, _Error}), + {reply, false, State} + end end - end; + end. + + +handle_connect([#connection{conn_id = ConnId, state = up}], _, _, ConnId, _From, State) -> + {reply, true, State}; +handle_connect([#connection{conn_id = ConnId}=Conn], _, _, ConnId, From, State) + when Conn#connection.state =:= pending; + Conn#connection.state =:= up_pending -> + Waiting = Conn#connection.waiting, + ets:insert(sys_dist, Conn#connection{waiting = [From|Waiting]}), + {noreply, State}; +handle_connect([#barred_connection{}], Type, Node, ConnId, From , State) -> + %% Barred connection only affects auto_connect, ignore it. + handle_connect([], Type, Node, ConnId, From , State); +handle_connect(ConnLookup, Type, Node, ConnId, From , State) -> + case setup(ConnLookup, Node,ConnId,Type,From,State) of + {ok, SetupPid} -> + Owners = [{SetupPid, Node} | State#state.conn_owners], + {noreply,State#state{conn_owners=Owners}}; + _Error -> + ?connect_failure(Node, {setup_call, failed, _Error}), + {reply, false, State} + end. + +-define(ERTS_DIST_CON_ID_MASK, 16#ffffff). % also in external.h + +verify_new_conn_id([], ConnId) + when (ConnId band (bnot ?ERTS_DIST_CON_ID_MASK)) =:= 0 -> + true; +verify_new_conn_id([#connection{conn_id = Old}], New) + when New =:= ((Old+1) band ?ERTS_DIST_CON_ID_MASK) -> + true; +verify_new_conn_id(_, _) -> + false. + + + +%% ------------------------------------------------------------ +%% handle_call. +%% ------------------------------------------------------------ + +%% +%% Auto-connect to Node. +%% The response is delayed until the connection is up and running. +%% +handle_call({auto_connect, _, Node, _}, From, State) when Node =:= node() -> + async_reply({reply, true, State}, From); +handle_call({auto_connect, Type, Node, WaitForBarred}, From, State) -> + verbose({auto_connect, Type, Node, WaitForBarred}, 1, State), + + R = case (catch erlang:new_connection_id(Node)) of + ConnId when is_integer(ConnId) -> + handle_auto_connect(Type, Node, ConnId, WaitForBarred, From, State); + + _Error -> + error_logger:error_msg("~n** Cannot get connection id for node ~w~n", + [Node]), + {reply, false, State} + end, + + return_call(R, From); %% %% Explicit connect @@ -433,7 +480,17 @@ handle_call({connect, _, Node, _, _}, From, State) when Node =:= node() -> handle_call({connect, Type, Node}, From, State) -> verbose({connect, Type, Node}, 1, State), ConnLookup = ets:lookup(sys_dist, Node), - handle_connect(ConnLookup, Type, Node, From, State); + R = case (catch erlang:new_connection_id(Node)) of + ConnId when is_integer(ConnId) -> + handle_connect(ConnLookup, Type, Node, ConnId, From, State); + + _Error -> + error_logger:error_msg("~n** Cannot get connection id for node ~w~n", + [Node]), + {reply, false, State} + end, + return_call(R, From); + %% %% Close the connection to Node. @@ -639,6 +696,25 @@ terminate(_Reason, State) -> %% handle_info. %% ------------------------------------------------------------ +%% +%% Asynchronous auto connect request +%% +handle_info({auto_connect,Node,ConnId}, State) -> + verbose({auto_connect, Node, ConnId}, 1, State), + NewState = + case handle_auto_connect(normal, Node, ConnId, false, noreply, State) of + {noreply, S} -> %% Pending connection + S; + + {reply, true, S} -> %% Already connected + S; + + {reply, false, S} -> %% Connection refused + erlang:abort_connection_id(Node, ConnId), + S + end, + {noreply, NewState}; + %% %% accept a new connection. %% @@ -719,7 +795,12 @@ handle_info({AcceptPid, {accept_pending,MyNode,Node,Address,Type}}, State) -> AcceptPid ! {self(), {accept_pending, already_pending}}, {noreply, State}; _ -> + ConnId = case (catch erlang:new_connection_id(Node)) of + CI when is_integer(CI) -> CI + %% SVERK What to do? + end, ets:insert(sys_dist, #connection{node = Node, + conn_id = ConnId, state = pending, owner = AcceptPid, address = Address, @@ -912,6 +993,7 @@ pending_nodedown(Conn, Node, Type, State) -> % Don't bar connections that have never been alive %mark_sys_dist_nodedown(Node), % - instead just delete the node: + erlang:abort_connection_id(Node, Conn#connection.conn_id), ets:delete(sys_dist, Node), reply_waiting(Node,Conn#connection.waiting, false), case Type of @@ -934,15 +1016,16 @@ up_pending_nodedown(Conn, Node, _Reason, _Type, State) -> State#state{conn_owners = [{AcceptPid,Node}|Owners], pend_owners = Pend}. -up_nodedown(_Conn, Node, _Reason, Type, State) -> - mark_sys_dist_nodedown(Node), +up_nodedown(Conn, Node, _Reason, Type, State) -> + mark_sys_dist_nodedown(Conn, Node), case Type of normal -> ?nodedown(Node, State); _ -> ok end, State. -mark_sys_dist_nodedown(Node) -> +mark_sys_dist_nodedown(Conn, Node) -> + erlang:abort_connection_id(Node, Conn#connection.conn_id), case application:get_env(kernel, dist_auto_connect) of {ok, once} -> ets:insert(sys_dist, #barred_connection{node = Node}); @@ -1185,15 +1268,8 @@ spawn_func(_,{From,Tag},M,F,A,Gleader) -> %% Set up connection to a new node. %% ----------------------------------------------------------- -setup(Node,Type,From,State) -> - Allowed = State#state.allowed, - case lists:member(Node, Allowed) of - false when Allowed =/= [] -> - error_msg("** Connection attempt with " - "disallowed node ~w ** ~n", [Node]), - {error, bad_node}; - _ -> - case select_mod(Node, State#state.listen) of +setup(ConnLookup, Node,ConnId,Type,From,State) -> + case setup_check(ConnLookup, Node, ConnId, State) of {ok, L} -> Mod = L#listen.module, LAddr = L#listen.address, @@ -1206,18 +1282,45 @@ setup(Node,Type,From,State) -> Addr = LAddr#net_address { address = undefined, host = undefined }, + Waiting = case From of + noreply -> []; + _ -> [From] + end, ets:insert(sys_dist, #connection{node = Node, + conn_id = ConnId, state = pending, owner = Pid, - waiting = [From], + waiting = Waiting, address = Addr, type = normal}), {ok, Pid}; Error -> Error - end end. +setup_check(ConnLookup, Node, ConnId, State) -> + Allowed = State#state.allowed, + case lists:member(Node, Allowed) of + false when Allowed =/= [] -> + error_msg("** Connection attempt with " + "disallowed node ~w ** ~n", [Node]), + {error, bad_node}; + _ -> + case verify_new_conn_id(ConnLookup, ConnId) of + false -> + error_msg("** Connection attempt to ~w with " + "bad connection id ~w ** ~n", [Node, ConnId]), + {error, bad_conn_id}; + true -> + case select_mod(Node, State#state.listen) of + {ok, _L}=OK -> OK; + Error -> Error + end + end + end. + + + %% %% Find a module that is willing to handle connection setup to Node %% @@ -1658,6 +1761,11 @@ verbose(_, _, _) -> getnode(P) when is_pid(P) -> node(P); getnode(P) -> P. +return_call({noreply, _State}=R, _From) -> + R; +return_call(R, From) -> + async_reply(R, From). + async_reply({reply, Msg, State}, From) -> async_gen_server_reply(From, Msg), {noreply, State}. -- cgit v1.2.3 From 3dda5298322a76f2787c6c4dd1cce78416c20dc3 Mon Sep 17 00:00:00 2001 From: Sverker Eriksson Date: Wed, 16 Aug 2017 16:56:12 +0200 Subject: erts: Async auto-connect for monitor_node Removed distribution_SUITE:applied_monitor_node as it seems to test apply of trapping BIF and monitor_node does not trap anymore. --- erts/emulator/beam/dist.c | 124 ++++++++++++++++++++---------- erts/emulator/test/distribution_SUITE.erl | 24 +----- erts/emulator/test/erl_link_SUITE.erl | 13 ++-- erts/preloaded/src/erlang.erl | 10 +-- 4 files changed, 98 insertions(+), 73 deletions(-) diff --git a/erts/emulator/beam/dist.c b/erts/emulator/beam/dist.c index 0fa399cf53..724cb6b24d 100644 --- a/erts/emulator/beam/dist.c +++ b/erts/emulator/beam/dist.c @@ -3789,11 +3789,17 @@ monitor_node(Process* p, Eterm Node, Eterm Bool, Eterm Options) DistEntry *dep; ErtsLink *lnk; Eterm l; + int async_connect = 1; for (l = Options; l != NIL && is_list(l); l = CDR(list_val(l))) { Eterm t = CAR(list_val(l)); - /* allow_passive_connect the only available option right now */ - if (t != am_allow_passive_connect) { + if (t == am_allow_passive_connect) { + /* + * Handle this horrible feature by falling back on old synchronous + * auto-connect (if needed) + */ + async_connect = 0; + } else { BIF_ERROR(p, BADARG); } } @@ -3807,47 +3813,85 @@ monitor_node(Process* p, Eterm Node, Eterm Bool, Eterm Options) && (Node != erts_this_node->sysname))) { BIF_ERROR(p, BADARG); } - dep = erts_sysname_to_connected_dist_entry(Node); - if (!dep) { - do_trap: - BIF_TRAP3(dmonitor_node_trap, p, Node, Bool, Options); - } - if (dep == erts_this_dist_entry) - goto done; - - erts_proc_lock(p, ERTS_PROC_LOCK_LINK); - erts_de_rlock(dep); - if (ERTS_DE_IS_NOT_CONNECTED(dep)) { - erts_proc_unlock(p, ERTS_PROC_LOCK_LINK); - erts_de_runlock(dep); - goto do_trap; - } - erts_de_links_lock(dep); - erts_de_runlock(dep); if (Bool == am_true) { - ASSERT(dep->cid != NIL); - lnk = erts_add_or_lookup_link(&(dep->node_links), LINK_NODE, - p->common.id); - ++ERTS_LINK_REFC(lnk); - lnk = erts_add_or_lookup_link(&ERTS_P_LINKS(p), LINK_NODE, Node); - ++ERTS_LINK_REFC(lnk); - } - else { - lnk = erts_lookup_link(dep->node_links, p->common.id); - if (lnk != NULL) { - if ((--ERTS_LINK_REFC(lnk)) == 0) { - erts_destroy_link(erts_remove_link(&(dep->node_links), - p->common.id)); - } - } - lnk = erts_lookup_link(ERTS_P_LINKS(p), Node); - if (lnk != NULL) { - if ((--ERTS_LINK_REFC(lnk)) == 0) { - erts_destroy_link(erts_remove_link(&ERTS_P_LINKS(p), - Node)); + ErtsDSigData dsd; + dsd.node = Node; + dep = erts_find_or_insert_dist_entry(Node); + if (dep == erts_this_dist_entry) + goto done; + + erts_proc_lock(p, ERTS_PROC_LOCK_LINK); + + switch (erts_dsig_prepare(&dsd, &dep, p, + (ERTS_PROC_LOCK_MAIN | ERTS_PROC_LOCK_LINK), + ERTS_DSP_RLOCK, 0, async_connect)) { + case ERTS_DSIG_PREP_NOT_ALIVE: + case ERTS_DSIG_PREP_NOT_CONNECTED: + /* Trap to either send 'nodedown' or do passive connection attempt */ + trap: + erts_proc_unlock(p, ERTS_PROC_LOCK_LINK); + erts_deref_dist_entry(dep); + BIF_TRAP3(dmonitor_node_trap, p, Node, Bool, Options); + case ERTS_DSIG_PREP_PENDING: + if (!async_connect) { + /* + * Pending connection may fail, so we must trap + * to ensure passive connection attempt + */ + erts_de_runlock(dep); + goto trap; } - } + /*fall through*/ + case ERTS_DSIG_PREP_CONNECTED: + erts_de_links_lock(dep); + erts_de_runlock(dep); + lnk = erts_add_or_lookup_link(&(dep->node_links), LINK_NODE, + p->common.id); + ++ERTS_LINK_REFC(lnk); + lnk = erts_add_or_lookup_link(&ERTS_P_LINKS(p), LINK_NODE, Node); + ++ERTS_LINK_REFC(lnk); + break; + default: + ERTS_ASSERT(! "Invalid dsig prepare result"); + } + } + else { /* Bool == false */ + dep = erts_sysname_to_connected_dist_entry(Node); + if (!dep) { + /* + * Before OTP-21 this case triggered auto-connect + * and a 'nodedown' message if that failed. + * Now it's a simple no-op which feels more reasonable. + */ + BIF_RET(am_true); + } + if (dep == erts_this_dist_entry) + goto done; + + erts_proc_lock(p, ERTS_PROC_LOCK_LINK); + erts_de_rlock(dep); + if (!(dep->status & (ERTS_DE_SFLG_PENDING | ERTS_DE_SFLG_CONNECTED))) { + erts_proc_unlock(p, ERTS_PROC_LOCK_LINK); + erts_de_runlock(dep); + goto done; + } + erts_de_links_lock(dep); + erts_de_runlock(dep); + lnk = erts_lookup_link(dep->node_links, p->common.id); + if (lnk != NULL) { + if ((--ERTS_LINK_REFC(lnk)) == 0) { + erts_destroy_link(erts_remove_link(&(dep->node_links), + p->common.id)); + } + } + lnk = erts_lookup_link(ERTS_P_LINKS(p), Node); + if (lnk != NULL) { + if ((--ERTS_LINK_REFC(lnk)) == 0) { + erts_destroy_link(erts_remove_link(&ERTS_P_LINKS(p), + Node)); + } + } } erts_de_links_unlock(dep); diff --git a/erts/emulator/test/distribution_SUITE.erl b/erts/emulator/test/distribution_SUITE.erl index c7d63f9feb..98c39dc6dd 100644 --- a/erts/emulator/test/distribution_SUITE.erl +++ b/erts/emulator/test/distribution_SUITE.erl @@ -41,7 +41,7 @@ local_send_small/1, local_send_big/1, local_send_legal/1, link_to_busy/1, exit_to_busy/1, lost_exit/1, link_to_dead/1, link_to_dead_new_node/1, - applied_monitor_node/1, ref_port_roundtrip/1, nil_roundtrip/1, + ref_port_roundtrip/1, nil_roundtrip/1, trap_bif_1/1, trap_bif_2/1, trap_bif_3/1, stop_dist/1, dist_auto_connect_never/1, dist_auto_connect_once/1, @@ -75,7 +75,7 @@ suite() -> all() -> [ping, {group, bulk_send}, {group, local_send}, link_to_busy, exit_to_busy, lost_exit, link_to_dead, - link_to_dead_new_node, applied_monitor_node, + link_to_dead_new_node, ref_port_roundtrip, nil_roundtrip, stop_dist, {group, trap_bif}, {group, dist_auto_connect}, dist_parallel_send, atom_roundtrip, unicode_atom_roundtrip, @@ -639,26 +639,6 @@ link_to_dead_new_node(Config) when is_list(Config) -> end, ok. -%% Test that monitor_node/2 works when applied. -applied_monitor_node(Config) when is_list(Config) -> - NonExisting = list_to_atom("__non_existing__@" ++ hostname()), - - %% Tail-recursive call to apply (since the node is non-existing, - %% there will be a trap). - - true = tail_apply(erlang, monitor_node, [NonExisting, true]), - [{nodedown, NonExisting}] = test_server:messages_get(), - - %% Ordinary call (with trap). - - true = apply(erlang, monitor_node, [NonExisting, true]), - [{nodedown, NonExisting}] = test_server:messages_get(), - - ok. - -tail_apply(M, F, A) -> - apply(M, F, A). - %% Test that sending a port or reference to another node and back again %% doesn't correct them in any way. ref_port_roundtrip(Config) when is_list(Config) -> diff --git a/erts/emulator/test/erl_link_SUITE.erl b/erts/emulator/test/erl_link_SUITE.erl index d8c5b663e3..a66ca7a57d 100644 --- a/erts/emulator/test/erl_link_SUITE.erl +++ b/erts/emulator/test/erl_link_SUITE.erl @@ -200,13 +200,16 @@ monitor_nodes(Config) when is_list(Config) -> monitor_node(A, true), check_monitor_node(self(), A, 1), check_monitor_node(self(), B, 3), + ok = receive {nodedown, C} -> ok after 1000 -> timeout end, + %%OTP-21: monitor_node(_,false) does not trigger auto-connect anymore + %% and therefore no nodedown if it fails. + %%ok = receive {nodedown, C} -> ok after 1000 -> timeout end, + ok = receive {nodedown, C} -> ok after 1000 -> timeout end, + ok = receive {nodedown, D} -> ok after 1000 -> timeout end, + ok = receive {nodedown, D} -> ok after 1000 -> timeout end, check_monitor_node(self(), C, 0), check_monitor_node(self(), D, 0), - receive {nodedown, C} -> ok end, - receive {nodedown, C} -> ok end, - receive {nodedown, C} -> ok end, - receive {nodedown, D} -> ok end, - receive {nodedown, D} -> ok end, + stop_node(A), receive {nodedown, A} -> ok end, check_monitor_node(self(), A, 0), diff --git a/erts/preloaded/src/erlang.erl b/erts/preloaded/src/erlang.erl index 28625c400f..9630c0c934 100644 --- a/erts/preloaded/src/erlang.erl +++ b/erts/preloaded/src/erlang.erl @@ -3304,12 +3304,10 @@ dunlink(Pid) -> false -> true end. -dmonitor_node(Node, Flag, []) -> - case net_kernel:connect(Node) of - true -> erlang:monitor_node(Node, Flag, []); - false -> erlang:self() ! {nodedown, Node}, true - end; - +dmonitor_node(Node, _Flag, []) -> + %% Only called when auto-connect attempt failed early in VM + erlang:self() ! {nodedown, Node}, + true; dmonitor_node(Node, Flag, Opts) -> case lists:member(allow_passive_connect, Opts) of true -> -- cgit v1.2.3 From 3f5f22656a33aec1ba4ed01249732e32d08a8c50 Mon Sep 17 00:00:00 2001 From: Sverker Eriksson Date: Fri, 18 Aug 2017 16:56:21 +0200 Subject: erts: Async auto-connect for group_leader/2 --- erts/emulator/beam/bif.c | 6 ++-- erts/emulator/test/distribution_SUITE.erl | 50 +++++++++++++++++++++++++++++++ 2 files changed, 53 insertions(+), 3 deletions(-) diff --git a/erts/emulator/beam/bif.c b/erts/emulator/beam/bif.c index 34f5afae78..2583a5f8d6 100644 --- a/erts/emulator/beam/bif.c +++ b/erts/emulator/beam/bif.c @@ -4422,16 +4422,16 @@ BIF_RETTYPE group_leader_2(BIF_ALIST_2) int code; ErtsDSigData dsd; dep = external_pid_dist_entry(BIF_ARG_2); + ERTS_ASSERT(dep); if(dep == erts_this_dist_entry) BIF_ERROR(BIF_P, BADARG); code = erts_dsig_prepare(&dsd, &dep, BIF_P, ERTS_PROC_LOCK_MAIN, - ERTS_DSP_NO_LOCK, 0, 0); + ERTS_DSP_NO_LOCK, 0, 1); switch (code) { case ERTS_DSIG_PREP_NOT_ALIVE: - BIF_RET(am_true); case ERTS_DSIG_PREP_NOT_CONNECTED: - BIF_TRAP2(dgroup_leader_trap, BIF_P, BIF_ARG_1, BIF_ARG_2); + BIF_RET(am_true); case ERTS_DSIG_PREP_PENDING: case ERTS_DSIG_PREP_CONNECTED: code = erts_dsig_send_group_leader(&dsd, BIF_ARG_1, BIF_ARG_2); diff --git a/erts/emulator/test/distribution_SUITE.erl b/erts/emulator/test/distribution_SUITE.erl index 98c39dc6dd..fa683f363f 100644 --- a/erts/emulator/test/distribution_SUITE.erl +++ b/erts/emulator/test/distribution_SUITE.erl @@ -35,8 +35,12 @@ -include_lib("common_test/include/ct.hrl"). +%-define(Line, erlang:display({line,?LINE}),). +-define(Line,). + -export([all/0, suite/0, groups/0, ping/1, bulk_send_small/1, + group_leader/1, bulk_send_big/1, bulk_send_bigbig/1, local_send_small/1, local_send_big/1, local_send_legal/1, link_to_busy/1, exit_to_busy/1, @@ -61,6 +65,7 @@ %% Internal exports. -export([sender/3, receiver2/2, dummy_waiter/0, dead_process/0, + group_leader_1/1, roundtrip/1, bounce/1, do_dist_auto_connect/1, inet_rpc_server/1, dist_parallel_sender/3, dist_parallel_receiver/0, dist_evil_parallel_receiver/0]). @@ -74,6 +79,7 @@ suite() -> all() -> [ping, {group, bulk_send}, {group, local_send}, + group_leader, link_to_busy, exit_to_busy, lost_exit, link_to_dead, link_to_dead_new_node, ref_port_roundtrip, nil_roundtrip, stop_dist, @@ -124,6 +130,50 @@ ping(Config) when is_list(Config) -> ok. +%% Test erlang:group_leader(_, ExternalPid), i.e. DOP_GROUP_LEADER +group_leader(Config) when is_list(Config) -> + ?Line Sock = start_relay_node(group_leader_1, []), + ?Line Sock2 = start_relay_node(group_leader_2, []), + try + ?Line Node2 = inet_rpc_nodename(Sock2), + ?Line {ok, ok} = do_inet_rpc(Sock, ?MODULE, group_leader_1, [Node2]) + after + ?Line stop_relay_node(Sock), + ?Line stop_relay_node(Sock2) + end, + ok. + +group_leader_1(Node2) -> + ?Line ExtPid = spawn(Node2, fun F() -> + receive {From, group_leader} -> + From ! {self(), group_leader, group_leader()} + end, + F() + end), + ?Line GL1 = self(), + ?Line group_leader(GL1, ExtPid), + ?Line ExtPid ! {self(), group_leader}, + ?Line {ExtPid, group_leader, GL1} = receive_one(), + + %% Kill connection and repeat test when group_leader/2 triggers auto-connect + ?Line net_kernel:monitor_nodes(true), + ?Line net_kernel:disconnect(Node2), + ?Line {nodedown, Node2} = receive_one(), + ?Line GL2 = spawn(fun() -> dummy end), + ?Line group_leader(GL2, ExtPid), + ?Line {nodeup, Node2} = receive_one(), + ?Line ExtPid ! {self(), group_leader}, + ?Line {ExtPid, group_leader, GL2} = receive_one(), + ok. + +receive_one() -> + receive M -> M after 1000 -> timeout end. + + + + + + bulk_send_small(Config) when is_list(Config) -> bulk_send(64, 32). -- cgit v1.2.3 From 17e198d6ee60f7dec9abfed272cf4226aea44535 Mon Sep 17 00:00:00 2001 From: Sverker Eriksson Date: Fri, 18 Aug 2017 19:56:02 +0200 Subject: erts: Async auto-connect for monitor/2 --- erts/emulator/beam/bif.c | 23 ++++++----------------- erts/preloaded/src/erlang.erl | 21 ++++----------------- 2 files changed, 10 insertions(+), 34 deletions(-) diff --git a/erts/emulator/beam/bif.c b/erts/emulator/beam/bif.c index 2583a5f8d6..2af61e6ebc 100644 --- a/erts/emulator/beam/bif.c +++ b/erts/emulator/beam/bif.c @@ -790,31 +790,20 @@ remote_monitor(Process *p, Eterm bifarg1, Eterm bifarg2, BIF_RETTYPE ret; int code; + ASSERT(dep); erts_proc_lock(p, ERTS_PROC_LOCK_LINK); code = erts_dsig_prepare(&dsd, &dep, p, (ERTS_PROC_LOCK_MAIN | ERTS_PROC_LOCK_LINK), - ERTS_DSP_RLOCK, 0, 0); + ERTS_DSP_RLOCK, 0, 1); switch (code) { - case ERTS_DSIG_PREP_PENDING: - /* - * Must wait for connection to know if node supports monitor. - * Damn these synchronous errors. - */ - erts_smp_de_runlock(dep); - /* fall through */ case ERTS_DSIG_PREP_NOT_ALIVE: case ERTS_DSIG_PREP_NOT_CONNECTED: erts_proc_unlock(p, ERTS_PROC_LOCK_LINK); ERTS_BIF_PREP_TRAP2(ret, dmonitor_p_trap, p, bifarg1, bifarg2); break; + case ERTS_DSIG_PREP_PENDING: case ERTS_DSIG_PREP_CONNECTED: - if (!(dep->flags & DFLAG_DIST_MONITOR) - || (byname && !(dep->flags & DFLAG_DIST_MONITOR_NAME))) { - erts_de_runlock(dep); - erts_proc_unlock(p, ERTS_PROC_LOCK_LINK); - ERTS_BIF_PREP_ERROR(ret, p, BADARG); - } - else { + { Eterm p_trgt, p_name, d_name, mon_ref; mon_ref = erts_make_ref(p); @@ -917,17 +906,17 @@ local_port: if (!erts_is_alive && remote_node != am_Noname) { goto badarg; /* Remote monitor from (this) undistributed node */ } - dep = erts_sysname_to_connected_dist_entry(remote_node); + dep = erts_find_or_insert_dist_entry(remote_node); if (dep == erts_this_dist_entry) { ret = local_name_monitor(BIF_P, BIF_ARG_1, name); } else { ret = remote_monitor(BIF_P, BIF_ARG_1, BIF_ARG_2, dep, name, 1); } + erts_deref_dist_entry(dep); } else { badarg: ERTS_BIF_PREP_ERROR(ret, BIF_P, BADARG); } - return ret; } diff --git a/erts/preloaded/src/erlang.erl b/erts/preloaded/src/erlang.erl index 9630c0c934..d63a413f21 100644 --- a/erts/preloaded/src/erlang.erl +++ b/erts/preloaded/src/erlang.erl @@ -3367,23 +3367,10 @@ dsend({Name, Node}, Msg, Opts) -> -spec erlang:dmonitor_p('process', pid() | {atom(),atom()}) -> reference(). dmonitor_p(process, ProcSpec) -> - %% ProcSpec = pid() | {atom(),atom()} - %% ProcSpec CANNOT be an atom because a locally registered process - %% is never handled here. - Node = case ProcSpec of - {S,N} when erlang:is_atom(S), - erlang:is_atom(N), - N =/= erlang:node() -> N; - _ when erlang:is_pid(ProcSpec) -> erlang:node(ProcSpec) - end, - case net_kernel:connect(Node) of - true -> - erlang:monitor(process, ProcSpec); - false -> - Ref = erlang:make_ref(), - erlang:self() ! {'DOWN', Ref, process, ProcSpec, noconnection}, - Ref - end. + %% Only called when auto-connect attempt failed early in VM + Ref = erlang:make_ref(), + erlang:self() ! {'DOWN', Ref, process, ProcSpec, noconnection}, + Ref. %% %% Trap function used when modified timing has been enabled. -- cgit v1.2.3 From e8df66a934045a1b0dc1edf0a695b6dbcf4cca4b Mon Sep 17 00:00:00 2001 From: Sverker Eriksson Date: Mon, 24 Jul 2017 16:05:36 +0200 Subject: Remove obsolete erlang:dsend --- erts/emulator/beam/atom.names | 1 - erts/emulator/beam/dist.c | 11 +---------- erts/emulator/beam/dist.h | 2 -- erts/preloaded/src/erlang.erl | 36 ++---------------------------------- lib/tools/emacs/erlang.el | 1 - 5 files changed, 3 insertions(+), 48 deletions(-) diff --git a/erts/emulator/beam/atom.names b/erts/emulator/beam/atom.names index a534dd44fb..dd3e11dc26 100644 --- a/erts/emulator/beam/atom.names +++ b/erts/emulator/beam/atom.names @@ -231,7 +231,6 @@ atom dollar_endonly atom dotall atom driver atom driver_options -atom dsend atom dsend_continue_trap atom dunlink atom duplicate_bag diff --git a/erts/emulator/beam/dist.c b/erts/emulator/beam/dist.c index 724cb6b24d..7c44545de3 100644 --- a/erts/emulator/beam/dist.c +++ b/erts/emulator/beam/dist.c @@ -108,9 +108,6 @@ int erts_dist_buf_busy_limit; /* distribution trap functions */ -Export* dsend2_trap = NULL; -Export* dsend3_trap = NULL; -/*Export* dsend_nosuspend_trap = NULL;*/ Export* dlink_trap = NULL; Export* dunlink_trap = NULL; Export* dmonitor_node_trap = NULL; @@ -636,9 +633,6 @@ void init_dist(void) erts_atomic_init_nob(&no_caches, 0); /* Lookup/Install all references to trap functions */ - dsend2_trap = trap_function(am_dsend,2); - dsend3_trap = trap_function(am_dsend,3); - /* dsend_nosuspend_trap = trap_function(am_dsend_nosuspend,2);*/ dlink_trap = trap_function(am_dlink,1); dunlink_trap = trap_function(am_dunlink,1); dmonitor_node_trap = trap_function(am_dmonitor_node,3); @@ -3148,10 +3142,7 @@ BIF_RETTYPE setnode_2(BIF_ALIST_2) goto error; /* Check that all trap functions are defined !! */ - if (dsend2_trap->addressv[0] == NULL || - dsend3_trap->addressv[0] == NULL || - /* dsend_nosuspend_trap->address == NULL ||*/ - dlink_trap->addressv[0] == NULL || + if (dlink_trap->addressv[0] == NULL || dunlink_trap->addressv[0] == NULL || dmonitor_node_trap->addressv[0] == NULL || dgroup_leader_trap->addressv[0] == NULL || diff --git a/erts/emulator/beam/dist.h b/erts/emulator/beam/dist.h index 26432b21e9..8bb915c76b 100644 --- a/erts/emulator/beam/dist.h +++ b/erts/emulator/beam/dist.h @@ -86,8 +86,6 @@ #define DOP_SEND_SENDER_TT 23 /* distribution trap functions */ -extern Export* dsend2_trap; -extern Export* dsend3_trap; extern Export* dlink_trap; extern Export* dunlink_trap; extern Export* dmonitor_node_trap; diff --git a/erts/preloaded/src/erlang.erl b/erts/preloaded/src/erlang.erl index d63a413f21..56a7d03473 100644 --- a/erts/preloaded/src/erlang.erl +++ b/erts/preloaded/src/erlang.erl @@ -31,7 +31,8 @@ -export([localtime_to_universaltime/1]). -export([suspend_process/1]). -export([min/2, max/2]). --export([dlink/1, dunlink/1, dsend/2, dsend/3, dgroup_leader/2, +-export([dlink/1, dunlink/1]). +-export([dgroup_leader/2, dexit/2, dmonitor_node/3, dmonitor_p/2]). -export([delay_trap/2]). -export([set_cookie/2, get_cookie/0]). @@ -3331,39 +3332,6 @@ dexit(Pid, Reason) -> false -> true end. -dsend(Pid, Msg) when erlang:is_pid(Pid) -> - case net_kernel:connect(erlang:node(Pid)) of - true -> erlang:send(Pid, Msg); - false -> Msg - end; -dsend(Port, Msg) when erlang:is_port(Port) -> - case net_kernel:connect(erlang:node(Port)) of - true -> erlang:send(Port, Msg); - false -> Msg - end; -dsend({Name, Node}, Msg) -> - case net_kernel:connect(Node) of - true -> erlang:send({Name,Node}, Msg); - false -> Msg; - ignored -> Msg % Not distributed. - end. - -dsend(Pid, Msg, Opts) when erlang:is_pid(Pid) -> - case net_kernel:connect(erlang:node(Pid)) of - true -> erlang:send(Pid, Msg, Opts); - false -> ok - end; -dsend(Port, Msg, Opts) when erlang:is_port(Port) -> - case net_kernel:connect(erlang:node(Port)) of - true -> erlang:send(Port, Msg, Opts); - false -> ok - end; -dsend({Name, Node}, Msg, Opts) -> - case net_kernel:connect(Node) of - true -> erlang:send({Name,Node}, Msg, Opts); - false -> ok; - ignored -> ok % Not distributed. - end. -spec erlang:dmonitor_p('process', pid() | {atom(),atom()}) -> reference(). dmonitor_p(process, ProcSpec) -> diff --git a/lib/tools/emacs/erlang.el b/lib/tools/emacs/erlang.el index 429188b028..791f0db4fe 100644 --- a/lib/tools/emacs/erlang.el +++ b/lib/tools/emacs/erlang.el @@ -909,7 +909,6 @@ resulting regexp is surrounded by \\_< and \\_>." "dlink" "dmonitor_node" "dmonitor_p" - "dsend" "dt_append_vm_tag_data" "dt_get_tag" "dt_get_tag_data" -- cgit v1.2.3 From 5ad822ccfd841400bc44cb53acc6a0889ca3f128 Mon Sep 17 00:00:00 2001 From: Sverker Eriksson Date: Tue, 15 Aug 2017 16:26:34 +0200 Subject: Remove obsolete erlang:dlink/1, dunlink/1 and dist_exit/3 --- erts/emulator/beam/atom.names | 2 - erts/emulator/beam/bif.c | 9 ----- erts/emulator/beam/bif.tab | 2 - erts/emulator/beam/dist.c | 87 +------------------------------------------ erts/emulator/beam/dist.h | 2 - erts/preloaded/src/erlang.erl | 26 +------------ lib/tools/emacs/erlang.el | 3 -- 7 files changed, 2 insertions(+), 129 deletions(-) diff --git a/erts/emulator/beam/atom.names b/erts/emulator/beam/atom.names index dd3e11dc26..3c06c26ba0 100644 --- a/erts/emulator/beam/atom.names +++ b/erts/emulator/beam/atom.names @@ -222,7 +222,6 @@ atom dist_ctrl_put_data atom dist_data atom Div='/' atom div -atom dlink atom dmonitor_node atom dmonitor_p atom DollarDollar='$$' @@ -232,7 +231,6 @@ atom dotall atom driver atom driver_options atom dsend_continue_trap -atom dunlink atom duplicate_bag atom duplicated atom dupnames diff --git a/erts/emulator/beam/bif.c b/erts/emulator/beam/bif.c index 2af61e6ebc..8eb4d9e85b 100644 --- a/erts/emulator/beam/bif.c +++ b/erts/emulator/beam/bif.c @@ -1181,16 +1181,7 @@ BIF_RETTYPE unlink_1(BIF_ALIST_1) switch (code) { case ERTS_DSIG_PREP_NOT_ALIVE: case ERTS_DSIG_PREP_NOT_CONNECTED: -#if 1 BIF_RET(am_true); -#else - /* - * This is how we used to do it, but the link is obviously not - * active, so I see no point in setting up a connection. - * /Rickard - */ - BIF_TRAP1(dunlink_trap, BIF_P, BIF_ARG_1); -#endif case ERTS_DSIG_PREP_PENDING: case ERTS_DSIG_PREP_CONNECTED: diff --git a/erts/emulator/beam/bif.tab b/erts/emulator/beam/bif.tab index 66469a011d..5a591caac9 100644 --- a/erts/emulator/beam/bif.tab +++ b/erts/emulator/beam/bif.tab @@ -153,14 +153,12 @@ bif erlang:whereis/1 bif erlang:spawn_opt/1 bif erlang:setnode/2 bif erlang:setnode/3 -bif erlang:dist_exit/3 bif erlang:dist_get_stat/1 bif erlang:dist_ctrl_input_handler/2 bif erlang:dist_ctrl_put_data/2 bif erlang:dist_ctrl_get_data/1 bif erlang:dist_ctrl_get_data_notification/1 - # Static native functions in erts_internal bif erts_internal:port_info/1 bif erts_internal:port_info/2 diff --git a/erts/emulator/beam/dist.c b/erts/emulator/beam/dist.c index 7c44545de3..6e08c72289 100644 --- a/erts/emulator/beam/dist.c +++ b/erts/emulator/beam/dist.c @@ -108,8 +108,6 @@ int erts_dist_buf_busy_limit; /* distribution trap functions */ -Export* dlink_trap = NULL; -Export* dunlink_trap = NULL; Export* dmonitor_node_trap = NULL; Export* dgroup_leader_trap = NULL; Export* dexit_trap = NULL; @@ -633,8 +631,6 @@ void init_dist(void) erts_atomic_init_nob(&no_caches, 0); /* Lookup/Install all references to trap functions */ - dlink_trap = trap_function(am_dlink,1); - dunlink_trap = trap_function(am_dunlink,1); dmonitor_node_trap = trap_function(am_dmonitor_node,3); dgroup_leader_trap = trap_function(am_dgroup_leader,2); dexit_trap = trap_function(am_dexit, 2); @@ -3100,7 +3096,6 @@ int distribution_info(fmtfn_t to, void *arg) /* Called by break handler */ monitor_node -- turn on/off node monitoring node controller only: - dist_exit/3 -- send exit signals from remote to local process dist_link/2 -- link a remote process to a local dist_unlink/2 -- unlink a remote from a local ****************************************************************************/ @@ -3111,9 +3106,6 @@ int distribution_info(fmtfn_t to, void *arg) /* Called by break handler */ ** Set the node name of current node fail if node already is set. ** setnode(name@host, Creation) ** loads functions pointer to trap_functions from module erlang. - ** erlang:dsend/2 - ** erlang:dlink/1 - ** erlang:dunlink/1 ** erlang:dmonitor_node/3 ** erlang:dgroup_leader/2 ** erlang:dexit/2 @@ -3142,9 +3134,7 @@ BIF_RETTYPE setnode_2(BIF_ALIST_2) goto error; /* Check that all trap functions are defined !! */ - if (dlink_trap->addressv[0] == NULL || - dunlink_trap->addressv[0] == NULL || - dmonitor_node_trap->addressv[0] == NULL || + if (dmonitor_node_trap->addressv[0] == NULL || dgroup_leader_trap->addressv[0] == NULL || dmonitor_p_trap->addressv[0] == NULL || dexit_trap->addressv[0] == NULL) { @@ -3559,81 +3549,6 @@ BIF_RETTYPE abort_connection_id_2(BIF_ALIST_2) BIF_RET(am_false); } - -/**********************************************************************/ -/* dist_exit(Local, Term, Remote) -> Bool */ - -BIF_RETTYPE dist_exit_3(BIF_ALIST_3) -{ - Eterm local; - Eterm remote; - DistEntry *rdep; - - local = BIF_ARG_1; - remote = BIF_ARG_3; - - /* Check that remote is a remote process */ - if (is_not_external_pid(remote)) - goto error; - - rdep = external_dist_entry(remote); - - if(rdep == erts_this_dist_entry) - goto error; - - /* Check that local is local */ - if (is_internal_pid(local)) { - Process *lp; - ErtsProcLocks lp_locks; - if (BIF_P->common.id == local) { - lp_locks = ERTS_PROC_LOCKS_ALL; - lp = BIF_P; - erts_proc_lock(BIF_P, ERTS_PROC_LOCKS_ALL_MINOR); - } - else { - lp_locks = ERTS_PROC_LOCKS_XSIG_SEND; - lp = erts_pid2proc(BIF_P, ERTS_PROC_LOCK_MAIN, - local, lp_locks); - if (!lp) { - BIF_RET(am_true); /* ignore */ - } - } - - (void) erts_send_exit_signal(BIF_P, - remote, - lp, - &lp_locks, - BIF_ARG_2, - NIL, - NULL, - 0); - if (lp == BIF_P) - lp_locks &= ~ERTS_PROC_LOCK_MAIN; - erts_proc_unlock(lp, lp_locks); - if (lp == BIF_P) { - erts_aint32_t state = erts_atomic32_read_acqb(&BIF_P->state); - /* - * We may have exited current process and may have to take action. - */ - if (state & (ERTS_PSFLG_EXITING|ERTS_PSFLG_PENDING_EXIT)) { - if (state & ERTS_PSFLG_PENDING_EXIT) - erts_handle_pending_exit(BIF_P, ERTS_PROC_LOCK_MAIN); - ERTS_BIF_EXITED(BIF_P); - } - } - } - else if (is_external_pid(local) - && external_dist_entry(local) == erts_this_dist_entry) { - BIF_RET(am_true); /* ignore */ - } - else - goto error; - BIF_RET(am_true); - - error: - BIF_ERROR(BIF_P, BADARG); -} - /**********************************************************************/ /* node(Object) -> Node */ diff --git a/erts/emulator/beam/dist.h b/erts/emulator/beam/dist.h index 8bb915c76b..50e72ac42c 100644 --- a/erts/emulator/beam/dist.h +++ b/erts/emulator/beam/dist.h @@ -86,8 +86,6 @@ #define DOP_SEND_SENDER_TT 23 /* distribution trap functions */ -extern Export* dlink_trap; -extern Export* dunlink_trap; extern Export* dmonitor_node_trap; extern Export* dgroup_leader_trap; extern Export* dexit_trap; diff --git a/erts/preloaded/src/erlang.erl b/erts/preloaded/src/erlang.erl index 56a7d03473..43ac8a6f26 100644 --- a/erts/preloaded/src/erlang.erl +++ b/erts/preloaded/src/erlang.erl @@ -31,7 +31,6 @@ -export([localtime_to_universaltime/1]). -export([suspend_process/1]). -export([min/2, max/2]). --export([dlink/1, dunlink/1]). -export([dgroup_leader/2, dexit/2, dmonitor_node/3, dmonitor_p/2]). -export([delay_trap/2]). @@ -125,7 +124,7 @@ -export([crc32/2, crc32_combine/3, date/0, decode_packet/3]). -export([delete_element/2]). -export([delete_module/1, demonitor/1, demonitor/2, display/1]). --export([display_nl/0, display_string/1, dist_exit/3, erase/0, erase/1]). +-export([display_nl/0, display_string/1, erase/0, erase/1]). -export([error/1, error/2, exit/1, exit/2, external_size/1]). -export([external_size/2, finish_after_on_load/2, finish_loading/1, float/1]). -export([float_to_binary/1, float_to_binary/2, @@ -707,14 +706,6 @@ display_nl() -> display_string(_P1) -> erlang:nif_error(undefined). -%% dist_exit/3 --spec erlang:dist_exit(P1, P2, P3) -> true when - P1 :: pid(), - P2 :: kill | noconnection | normal, - P3 :: pid() | port(). -dist_exit(_P1, _P2, _P3) -> - erlang:nif_error(undefined). - %% dt_append_vm_tag_data/1 -spec erlang:dt_append_vm_tag_data(IoData) -> IoDataRet when IoData :: iodata(), @@ -3290,21 +3281,6 @@ dist_get_stat(_DHandle) -> %% reactivate the command. %% --spec erlang:dlink(pid() | port()) -> 'true'. -dlink(Pid) -> - case net_kernel:connect(erlang:node(Pid)) of - true -> erlang:link(Pid); - false -> erlang:dist_exit(erlang:self(), noconnection, Pid), true - end. - -%% Can this ever happen? --spec erlang:dunlink(identifier()) -> 'true'. -dunlink(Pid) -> - case net_kernel:connect(erlang:node(Pid)) of - true -> erlang:unlink(Pid); - false -> true - end. - dmonitor_node(Node, _Flag, []) -> %% Only called when auto-connect attempt failed early in VM erlang:self() ! {nodedown, Node}, diff --git a/lib/tools/emacs/erlang.el b/lib/tools/emacs/erlang.el index 791f0db4fe..68a4bffca0 100644 --- a/lib/tools/emacs/erlang.el +++ b/lib/tools/emacs/erlang.el @@ -905,8 +905,6 @@ resulting regexp is surrounded by \\_< and \\_>." "dist_ctrl_get_data_notification" "dist_ctrl_input_handler" "dist_ctrl_put_data" - "dist_exit" - "dlink" "dmonitor_node" "dmonitor_p" "dt_append_vm_tag_data" @@ -916,7 +914,6 @@ resulting regexp is surrounded by \\_< and \\_>." "dt_put_tag" "dt_restore_tag" "dt_spread_tag" - "dunlink" "convert_time_unit" "external_size" "finish_after_on_load" -- cgit v1.2.3 From da2935ce340cef5db1b5f589778eb20044796610 Mon Sep 17 00:00:00 2001 From: Sverker Eriksson Date: Tue, 15 Aug 2017 17:12:02 +0200 Subject: Remove obsolete erlang:dexit/2 --- erts/emulator/beam/atom.names | 1 - erts/emulator/beam/dist.c | 8 +------- erts/emulator/beam/dist.h | 1 - erts/preloaded/src/erlang.erl | 8 +------- lib/tools/emacs/erlang.el | 1 - 5 files changed, 2 insertions(+), 17 deletions(-) diff --git a/erts/emulator/beam/atom.names b/erts/emulator/beam/atom.names index 3c06c26ba0..d755b5393b 100644 --- a/erts/emulator/beam/atom.names +++ b/erts/emulator/beam/atom.names @@ -199,7 +199,6 @@ atom debug_flags atom decimals atom default atom delay_trap -atom dexit atom depth atom dgroup_leader atom dictionary diff --git a/erts/emulator/beam/dist.c b/erts/emulator/beam/dist.c index 6e08c72289..e36ea9ba94 100644 --- a/erts/emulator/beam/dist.c +++ b/erts/emulator/beam/dist.c @@ -110,7 +110,6 @@ int erts_dist_buf_busy_limit; /* distribution trap functions */ Export* dmonitor_node_trap = NULL; Export* dgroup_leader_trap = NULL; -Export* dexit_trap = NULL; Export* dmonitor_p_trap = NULL; /* local variables */ @@ -633,7 +632,6 @@ void init_dist(void) /* Lookup/Install all references to trap functions */ dmonitor_node_trap = trap_function(am_dmonitor_node,3); dgroup_leader_trap = trap_function(am_dgroup_leader,2); - dexit_trap = trap_function(am_dexit, 2); dmonitor_p_trap = trap_function(am_dmonitor_p, 2); dist_ctrl_put_data_trap = erts_export_put(am_erts_internal, am_dist_ctrl_put_data, @@ -3108,9 +3106,6 @@ int distribution_info(fmtfn_t to, void *arg) /* Called by break handler */ ** loads functions pointer to trap_functions from module erlang. ** erlang:dmonitor_node/3 ** erlang:dgroup_leader/2 - ** erlang:dexit/2 - ** -- are these needed ? - ** dexit/1 ***********************************************************************/ BIF_RETTYPE setnode_2(BIF_ALIST_2) @@ -3136,8 +3131,7 @@ BIF_RETTYPE setnode_2(BIF_ALIST_2) /* Check that all trap functions are defined !! */ if (dmonitor_node_trap->addressv[0] == NULL || dgroup_leader_trap->addressv[0] == NULL || - dmonitor_p_trap->addressv[0] == NULL || - dexit_trap->addressv[0] == NULL) { + dmonitor_p_trap->addressv[0] == NULL) { goto error; } diff --git a/erts/emulator/beam/dist.h b/erts/emulator/beam/dist.h index 50e72ac42c..339e4839ea 100644 --- a/erts/emulator/beam/dist.h +++ b/erts/emulator/beam/dist.h @@ -88,7 +88,6 @@ /* distribution trap functions */ extern Export* dmonitor_node_trap; extern Export* dgroup_leader_trap; -extern Export* dexit_trap; extern Export* dmonitor_p_trap; typedef enum { diff --git a/erts/preloaded/src/erlang.erl b/erts/preloaded/src/erlang.erl index 43ac8a6f26..80601ab067 100644 --- a/erts/preloaded/src/erlang.erl +++ b/erts/preloaded/src/erlang.erl @@ -32,7 +32,7 @@ -export([suspend_process/1]). -export([min/2, max/2]). -export([dgroup_leader/2, - dexit/2, dmonitor_node/3, dmonitor_p/2]). + dmonitor_node/3, dmonitor_p/2]). -export([delay_trap/2]). -export([set_cookie/2, get_cookie/0]). -export([nodes/0]). @@ -3302,12 +3302,6 @@ dgroup_leader(Leader, Pid) -> false -> true %% bad arg ? end. -dexit(Pid, Reason) -> - case net_kernel:connect(erlang:node(Pid)) of - true -> erlang:exit(Pid, Reason); - false -> true - end. - -spec erlang:dmonitor_p('process', pid() | {atom(),atom()}) -> reference(). dmonitor_p(process, ProcSpec) -> diff --git a/lib/tools/emacs/erlang.el b/lib/tools/emacs/erlang.el index 68a4bffca0..98b7577f27 100644 --- a/lib/tools/emacs/erlang.el +++ b/lib/tools/emacs/erlang.el @@ -895,7 +895,6 @@ resulting regexp is surrounded by \\_< and \\_>." "decode_packet" "delay_trap" "delete_element" - "dexit" "dgroup_leader" "display" "display_nl" -- cgit v1.2.3 From 389e11b8b8a476ca73ca03a39ad7ec298dc99e83 Mon Sep 17 00:00:00 2001 From: Sverker Eriksson Date: Fri, 18 Aug 2017 17:08:27 +0200 Subject: Remove obsolete erlang:dgroup_leader --- erts/emulator/beam/atom.names | 1 - erts/emulator/beam/dist.c | 6 ------ erts/emulator/beam/dist.h | 1 - erts/preloaded/src/erlang.erl | 10 +--------- lib/tools/emacs/erlang.el | 1 - 5 files changed, 1 insertion(+), 18 deletions(-) diff --git a/erts/emulator/beam/atom.names b/erts/emulator/beam/atom.names index d755b5393b..75a70c3716 100644 --- a/erts/emulator/beam/atom.names +++ b/erts/emulator/beam/atom.names @@ -200,7 +200,6 @@ atom decimals atom default atom delay_trap atom depth -atom dgroup_leader atom dictionary atom dirty_bif_exception atom dirty_bif_result diff --git a/erts/emulator/beam/dist.c b/erts/emulator/beam/dist.c index e36ea9ba94..cd4125e404 100644 --- a/erts/emulator/beam/dist.c +++ b/erts/emulator/beam/dist.c @@ -109,7 +109,6 @@ int erts_dist_buf_busy_limit; /* distribution trap functions */ Export* dmonitor_node_trap = NULL; -Export* dgroup_leader_trap = NULL; Export* dmonitor_p_trap = NULL; /* local variables */ @@ -631,7 +630,6 @@ void init_dist(void) /* Lookup/Install all references to trap functions */ dmonitor_node_trap = trap_function(am_dmonitor_node,3); - dgroup_leader_trap = trap_function(am_dgroup_leader,2); dmonitor_p_trap = trap_function(am_dmonitor_p, 2); dist_ctrl_put_data_trap = erts_export_put(am_erts_internal, am_dist_ctrl_put_data, @@ -3103,9 +3101,6 @@ int distribution_info(fmtfn_t to, void *arg) /* Called by break handler */ /********************************************************************** ** Set the node name of current node fail if node already is set. ** setnode(name@host, Creation) - ** loads functions pointer to trap_functions from module erlang. - ** erlang:dmonitor_node/3 - ** erlang:dgroup_leader/2 ***********************************************************************/ BIF_RETTYPE setnode_2(BIF_ALIST_2) @@ -3130,7 +3125,6 @@ BIF_RETTYPE setnode_2(BIF_ALIST_2) /* Check that all trap functions are defined !! */ if (dmonitor_node_trap->addressv[0] == NULL || - dgroup_leader_trap->addressv[0] == NULL || dmonitor_p_trap->addressv[0] == NULL) { goto error; } diff --git a/erts/emulator/beam/dist.h b/erts/emulator/beam/dist.h index 339e4839ea..49f3eac340 100644 --- a/erts/emulator/beam/dist.h +++ b/erts/emulator/beam/dist.h @@ -87,7 +87,6 @@ /* distribution trap functions */ extern Export* dmonitor_node_trap; -extern Export* dgroup_leader_trap; extern Export* dmonitor_p_trap; typedef enum { diff --git a/erts/preloaded/src/erlang.erl b/erts/preloaded/src/erlang.erl index 80601ab067..0aa8dae3e5 100644 --- a/erts/preloaded/src/erlang.erl +++ b/erts/preloaded/src/erlang.erl @@ -31,8 +31,7 @@ -export([localtime_to_universaltime/1]). -export([suspend_process/1]). -export([min/2, max/2]). --export([dgroup_leader/2, - dmonitor_node/3, dmonitor_p/2]). +-export([dmonitor_node/3, dmonitor_p/2]). -export([delay_trap/2]). -export([set_cookie/2, get_cookie/0]). -export([nodes/0]). @@ -3296,13 +3295,6 @@ dmonitor_node(Node, Flag, Opts) -> dmonitor_node(Node,Flag,[]) end. -dgroup_leader(Leader, Pid) -> - case net_kernel:connect(erlang:node(Pid)) of - true -> erlang:group_leader(Leader, Pid); - false -> true %% bad arg ? - end. - - -spec erlang:dmonitor_p('process', pid() | {atom(),atom()}) -> reference(). dmonitor_p(process, ProcSpec) -> %% Only called when auto-connect attempt failed early in VM diff --git a/lib/tools/emacs/erlang.el b/lib/tools/emacs/erlang.el index 98b7577f27..d9efadf64a 100644 --- a/lib/tools/emacs/erlang.el +++ b/lib/tools/emacs/erlang.el @@ -895,7 +895,6 @@ resulting regexp is surrounded by \\_< and \\_>." "decode_packet" "delay_trap" "delete_element" - "dgroup_leader" "display" "display_nl" "display_string" -- cgit v1.2.3 From 9271fd39c48a121b6be79e7a44a524c883fafc41 Mon Sep 17 00:00:00 2001 From: Sverker Eriksson Date: Fri, 8 Sep 2017 17:11:20 +0200 Subject: erts: Let send(_,_,[noconnect]) enqueue msg on pending connection. The least bad behavior I think: * We cannot return 'noconnect' as caller might already have enqueued monitor/link that never triggers. * We cannot block waiting for connection as that can ruin latency when 'noconnect' is used to avoid blocking auto-connect (see gen_server and gen_statem). But there might be users getting more cases of bad latency waiting for a pendig connection, instead of a fast 'noconnect'. --- erts/emulator/beam/bif.c | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/erts/emulator/beam/bif.c b/erts/emulator/beam/bif.c index 8eb4d9e85b..30278cbe36 100644 --- a/erts/emulator/beam/bif.c +++ b/erts/emulator/beam/bif.c @@ -2192,11 +2192,10 @@ do_send(Process *p, Eterm to, Eterm msg, Eterm *refp, ErtsSendContext *ctx) if (is_not_atom(tp[1]) || is_not_atom(tp[2])) return SEND_BADARG; - /* sysname_to_connected_dist_entry will return NULL if there - is no dist_entry or the dist_entry has no port, + /* erts_find_dist_entry will return NULL if there is no dist_entry but remote_send() will handle that. */ - dep = erts_sysname_to_connected_dist_entry(tp[2]); + dep = erts_find_dist_entry(tp[2]); if (dep == erts_this_dist_entry) { Eterm id; -- cgit v1.2.3 From 5857245406584b8bfcbdf0be450ad494a992d5c8 Mon Sep 17 00:00:00 2001 From: Sverker Eriksson Date: Fri, 8 Sep 2017 17:11:43 +0200 Subject: erts: Fix auto-connect toward erl_interface/jinterface --- erts/emulator/beam/dist.c | 63 +++++++++++++++++------------------- erts/emulator/beam/dist.h | 2 +- erts/emulator/beam/erl_node_tables.h | 1 + erts/emulator/beam/external.c | 44 +++++++++++++++++++------ erts/emulator/beam/external.h | 2 +- 5 files changed, 66 insertions(+), 46 deletions(-) diff --git a/erts/emulator/beam/dist.c b/erts/emulator/beam/dist.c index cd4125e404..6d2e907f18 100644 --- a/erts/emulator/beam/dist.c +++ b/erts/emulator/beam/dist.c @@ -1879,33 +1879,32 @@ erts_dsig_send(ErtsDSigData *dsdp, struct erts_dsig_send_context* ctx) if (ctx->flags & DFLAG_DIST_HDR_ATOM_CACHE) { ctx->acmp = erts_get_atom_cache_map(ctx->c_p); - ctx->pass_through_size = 0; + ctx->max_finalize_prepend = 0; } else { ctx->acmp = NULL; - ctx->pass_through_size = 3; /* SVERK rename */ + ctx->max_finalize_prepend = 3; } #ifdef ERTS_DIST_MSG_DBG - erts_fprintf(stderr, ">>%s CTL: %T\n", ctx->pass_through_size ? "P" : " ", ctx->ctl); - if (is_value(ctx->msg)) - erts_fprintf(stderr, " MSG: %T\n", ctx->msg); + erts_fprintf(stderr, ">> CTL: %T\n", ctx->ctl); + if (is_value(ctx->msg)) + erts_fprintf(stderr, " MSG: %T\n", ctx->msg); #endif - ctx->data_size = ctx->pass_through_size; + ctx->data_size = ctx->max_finalize_prepend; erts_reset_atom_cache_map(ctx->acmp); erts_encode_dist_ext_size(ctx->ctl, ctx->flags, ctx->acmp, &ctx->data_size); - if (is_value(ctx->msg)) { - ctx->u.sc.wstack.wstart = NULL; - ctx->u.sc.flags = ctx->flags; - ctx->u.sc.level = 0; - ctx->phase = ERTS_DSIG_SEND_PHASE_MSG_SIZE; - } else { - ctx->phase = ERTS_DSIG_SEND_PHASE_ALLOC; - } - break; + if (is_non_value(ctx->msg)) { + ctx->phase = ERTS_DSIG_SEND_PHASE_ALLOC; + break; + } + ctx->u.sc.wstack.wstart = NULL; + ctx->u.sc.flags = ctx->flags; + ctx->u.sc.level = 0; + ctx->phase = ERTS_DSIG_SEND_PHASE_MSG_SIZE; case ERTS_DSIG_SEND_PHASE_MSG_SIZE: if (erts_encode_dist_ext_size_int(ctx->msg, ctx, &ctx->data_size)) { retval = ERTS_DSIG_SEND_CONTINUE; @@ -1920,23 +1919,24 @@ erts_dsig_send(ErtsDSigData *dsdp, struct erts_dsig_send_context* ctx) ctx->data_size += ctx->dhdr_ext_size; ctx->obuf = alloc_dist_obuf(ctx->data_size); - ctx->obuf->ext_endp = &ctx->obuf->data[0] + ctx->pass_through_size + ctx->dhdr_ext_size; + ctx->obuf->ext_endp = &ctx->obuf->data[0] + ctx->max_finalize_prepend + ctx->dhdr_ext_size; /* Encode internal version of dist header */ ctx->obuf->extp = erts_encode_ext_dist_header_setup(ctx->obuf->ext_endp, ctx->acmp); /* Encode control message */ erts_encode_dist_ext(ctx->ctl, &ctx->obuf->ext_endp, ctx->flags, ctx->acmp, NULL, NULL); - if (is_value(ctx->msg)) { - ctx->u.ec.flags = ctx->flags; - ctx->u.ec.level = 0; - ctx->u.ec.wstack.wstart = NULL; - ctx->phase = ERTS_DSIG_SEND_PHASE_MSG_ENCODE; - } else { - ctx->phase = ERTS_DSIG_SEND_PHASE_FIN; - } - break; + if (is_non_value(ctx->msg)) { + ctx->obuf->msg_start = NULL; + ctx->phase = ERTS_DSIG_SEND_PHASE_FIN; + break; + } + ctx->u.ec.flags = ctx->flags; + ctx->u.ec.level = 0; + ctx->u.ec.wstack.wstart = NULL; + ctx->obuf->msg_start = ctx->obuf->ext_endp; - case ERTS_DSIG_SEND_PHASE_MSG_ENCODE: + ctx->phase = ERTS_DSIG_SEND_PHASE_MSG_ENCODE; + case ERTS_DSIG_SEND_PHASE_MSG_ENCODE: if (erts_encode_dist_ext(ctx->msg, &ctx->obuf->ext_endp, ctx->flags, ctx->acmp, &ctx->u.ec, &ctx->reds)) { retval = ERTS_DSIG_SEND_CONTINUE; goto done; @@ -1949,7 +1949,7 @@ erts_dsig_send(ErtsDSigData *dsdp, struct erts_dsig_send_context* ctx) int resume = 0; ASSERT(ctx->obuf->extp < ctx->obuf->ext_endp); - ASSERT(&ctx->obuf->data[0] <= ctx->obuf->extp - ctx->pass_through_size); + ASSERT(&ctx->obuf->data[0] <= ctx->obuf->extp - ctx->max_finalize_prepend); ASSERT(ctx->obuf->ext_endp <= &ctx->obuf->data[0] + ctx->data_size); ctx->data_size = ctx->obuf->ext_endp - ctx->obuf->extp; @@ -2308,9 +2308,7 @@ erts_dist_command(Port *prt, int reds_limit) ob = oq.first; ASSERT(ob); do { - ob->extp = erts_encode_ext_dist_header_finalize(ob->extp, - dep->cache, - flags); + erts_encode_ext_dist_header_finalize(ob, dep->cache, flags); ASSERT(&ob->data[0] <= ob->extp && ob->extp < ob->ext_endp); reds += ERTS_PORT_REDS_DIST_CMD_FINALIZE; preempt = reds > reds_limit; @@ -2350,10 +2348,7 @@ erts_dist_command(Port *prt, int reds_limit) while (oq.first && !preempt) { ErtsDistOutputBuf *fob; Uint size; - oq.first->extp - = erts_encode_ext_dist_header_finalize(oq.first->extp, - dep->cache, - flags); + erts_encode_ext_dist_header_finalize(oq.first, dep->cache, flags); reds += ERTS_PORT_REDS_DIST_CMD_FINALIZE; ASSERT(&oq.first->data[0] <= oq.first->extp && oq.first->extp < oq.first->ext_endp); diff --git a/erts/emulator/beam/dist.h b/erts/emulator/beam/dist.h index 49f3eac340..505d510473 100644 --- a/erts/emulator/beam/dist.h +++ b/erts/emulator/beam/dist.h @@ -405,7 +405,7 @@ struct erts_dsig_send_context { Eterm ctl; Eterm msg; int force_busy; - Uint32 pass_through_size; + Uint32 max_finalize_prepend; Uint data_size, dhdr_ext_size; ErtsAtomCacheMap *acmp; ErtsDistOutputBuf *obuf; diff --git a/erts/emulator/beam/erl_node_tables.h b/erts/emulator/beam/erl_node_tables.h index d012f4b2cf..af42814b8b 100644 --- a/erts/emulator/beam/erl_node_tables.h +++ b/erts/emulator/beam/erl_node_tables.h @@ -87,6 +87,7 @@ struct ErtsDistOutputBuf_ { ErtsDistOutputBuf *next; byte *extp; byte *ext_endp; + byte *msg_start; byte data[1]; }; diff --git a/erts/emulator/beam/external.c b/erts/emulator/beam/external.c index c49e2f617a..d8f52ceb57 100644 --- a/erts/emulator/beam/external.c +++ b/erts/emulator/beam/external.c @@ -351,41 +351,63 @@ byte *erts_encode_ext_dist_header_setup(byte *ctl_ext, ErtsAtomCacheMap *acmp) #define PASS_THROUGH 'p' /* This code should go */ -byte *erts_encode_ext_dist_header_finalize(byte *ext, ErtsAtomCache *cache, Uint32 dflags) +void erts_encode_ext_dist_header_finalize(ErtsDistOutputBuf* ob, + ErtsAtomCache *cache, + Uint32 dflags) { byte *ip; byte instr_buf[(2+4)*ERTS_ATOM_CACHE_SIZE]; int ci, sz; byte dist_hdr_flags; int long_atoms; - register byte *ep = ext; + register byte *ep = ob->extp; ASSERT(dflags & DFLAG_UTF8_ATOMS); + /* + * The buffer can have three different layouts at this point depending on + * what was known when encoded: + * + * Pending connection: CtrlTerm [, MsgTerm] + * With atom cache : VERSION_MAGIC, DIST_HEADER, ..., CtrlTerm [, MsgTerm] + * No atom cache : VERSION_MAGIC, CtrlTerm [, VERSION_MAGIC, MsgTerm] + */ + if (ep[0] != VERSION_MAGIC) { + /* + * Was encoded without atom cache toward pending connection. + */ ASSERT(ep[0] == SMALL_TUPLE_EXT || ep[0] == LARGE_TUPLE_EXT); if (dflags & DFLAG_DIST_HDR_ATOM_CACHE) { /* - * Encoded without atom cache (toward pending connection) - * but receiver wants dist header. Let's prepend an empty one. + * Receiver wants dist header. Let's prepend an empty one. */ *--ep = 0; /* NumberOfAtomCacheRefs */ *--ep = DIST_HEADER; *--ep = VERSION_MAGIC; } else { - /* Node without atom cache, 'pass through' needed */ - - ASSERT(!"SVERK: Must insert VERSION_MAGIC's"); + /* + * Primitive receiver without atom cache (erl_interface/jinterface). + * Must prepend VERSION_MAGIC to both ctrl and message + * And add PASSTHROUGH. + */ + if (ob->msg_start) { + ASSERT(ep < ob->msg_start && ob->msg_start < ob->ext_endp); + sys_memmove(ep-1, ep, (ob->msg_start - ep)); + ob->msg_start[-1] = VERSION_MAGIC; + --ep; + } + *--ep = VERSION_MAGIC; *--ep = PASS_THROUGH; } - return ep; + goto done; } else if (ep[1] != DIST_HEADER) { ASSERT(ep[1] == SMALL_TUPLE_EXT || ep[1] == LARGE_TUPLE_EXT); ASSERT(!(dflags & DFLAG_DIST_HDR_ATOM_CACHE)); /* Node without atom cache, 'pass through' needed */ *--ep = PASS_THROUGH; - return ep; + goto done; } dist_hdr_flags = ep[2]; @@ -510,7 +532,9 @@ byte *erts_encode_ext_dist_header_finalize(byte *ext, ErtsAtomCache *cache, Uint put_int8(ci, ep); *--ep = DIST_HEADER; *--ep = VERSION_MAGIC; - return ep; +done: + ASSERT(ep >= ob->data); + ob->extp = ep; } int erts_encode_dist_ext_size(Eterm term, Uint32 flags, ErtsAtomCacheMap *acmp, diff --git a/erts/emulator/beam/external.h b/erts/emulator/beam/external.h index 49950c4aad..11044b17ef 100644 --- a/erts/emulator/beam/external.h +++ b/erts/emulator/beam/external.h @@ -154,7 +154,7 @@ void erts_finalize_atom_cache_map(ErtsAtomCacheMap *, Uint32); Uint erts_encode_ext_dist_header_size(ErtsAtomCacheMap *); byte *erts_encode_ext_dist_header_setup(byte *, ErtsAtomCacheMap *); -byte *erts_encode_ext_dist_header_finalize(byte *, ErtsAtomCache *, Uint32); +void erts_encode_ext_dist_header_finalize(ErtsDistOutputBuf*, ErtsAtomCache *, Uint32); struct erts_dsig_send_context; int erts_encode_dist_ext_size(Eterm, Uint32, ErtsAtomCacheMap*, Uint* szp); int erts_encode_dist_ext_size_int(Eterm term, struct erts_dsig_send_context* ctx, Uint* szp); -- cgit v1.2.3 From d736e87ff94ab8191f33dca55516e6c1d440b915 Mon Sep 17 00:00:00 2001 From: Sverker Eriksson Date: Thu, 24 Aug 2017 17:20:21 +0200 Subject: Add optimistic DFLAG_DIST_HOPEFULLY for pending connections to avoid tuple fallbacks for export funs and bitstrings. ToDo: Re-encode if receiver turn out to be erl_interface/jinterface. --- erts/emulator/beam/dist.c | 2 +- erts/emulator/beam/dist.h | 9 ++++-- erts/emulator/beam/erl_node_tables.c | 2 +- erts/emulator/beam/external.c | 4 +-- erts/emulator/test/distribution_SUITE.erl | 53 +++++++++++++++++++++++++++++-- 5 files changed, 62 insertions(+), 8 deletions(-) diff --git a/erts/emulator/beam/dist.c b/erts/emulator/beam/dist.c index 6d2e907f18..7877865ad4 100644 --- a/erts/emulator/beam/dist.c +++ b/erts/emulator/beam/dist.c @@ -3443,7 +3443,7 @@ BIF_RETTYPE new_connection_id_1(BIF_ALIST_1) conn_id = dep->connection_id; else if (dep->status == 0) { dep->status = ERTS_DE_SFLG_PENDING; - dep->flags = DFLAG_DIST_MANDATORY | DFLAG_PENDING_CONNECTION; + dep->flags = (DFLAG_DIST_MANDATORY | DFLAG_DIST_HOPEFULLY); dep->connection_id++; dep->connection_id &= ERTS_DIST_CON_ID_MASK; conn_id = dep->connection_id; diff --git a/erts/emulator/beam/dist.h b/erts/emulator/beam/dist.h index 505d510473..f9a2037687 100644 --- a/erts/emulator/beam/dist.h +++ b/erts/emulator/beam/dist.h @@ -45,7 +45,7 @@ #define DFLAG_MAP_TAG 0x20000 #define DFLAG_BIG_CREATION 0x40000 #define DFLAG_SEND_SENDER 0x80000 -#define DFLAG_PENDING_CONNECTION 0x100000 +#define DFLAG_NO_MAGIC 0x100000 /* Mandatory flags for distribution (sync with dist_util.erl) */ #define DFLAG_DIST_MANDATORY (DFLAG_EXTENDED_REFERENCES \ @@ -53,6 +53,11 @@ | DFLAG_UTF8_ATOMS \ | DFLAG_NEW_FUN_TAGS) +/* Additional optimistic flags when encoding toward pending connection */ +#define DFLAG_DIST_HOPEFULLY (DFLAG_NO_MAGIC \ + | DFLAG_EXPORT_PTR_TAG \ + | DFLAG_BIT_BINARIES) + /* All flags that should be enabled when term_to_binary/1 is used. */ #define TERM_TO_BINARY_DFLAGS (DFLAG_EXTENDED_REFERENCES \ | DFLAG_NEW_FUN_TAGS \ @@ -206,7 +211,7 @@ retry: Eterm msg, conn_id; dep->status = ERTS_DE_SFLG_PENDING; - dep->flags = DFLAG_DIST_MANDATORY | DFLAG_PENDING_CONNECTION; + dep->flags = (DFLAG_DIST_MANDATORY | DFLAG_DIST_HOPEFULLY); dep->connection_id++; dep->connection_id &= ERTS_DIST_CON_ID_MASK; conn_id = make_small(dep->connection_id); diff --git a/erts/emulator/beam/erl_node_tables.c b/erts/emulator/beam/erl_node_tables.c index dd607f438e..2a33766ac8 100644 --- a/erts/emulator/beam/erl_node_tables.c +++ b/erts/emulator/beam/erl_node_tables.c @@ -628,7 +628,7 @@ erts_set_dist_entry_connected(DistEntry *dep, Eterm cid, Uint flags) dep->connection_id &= ERTS_DIST_CON_ID_MASK; } dep->status |= ERTS_DE_SFLG_CONNECTED; - dep->flags = flags & ~DFLAG_PENDING_CONNECTION; + dep->flags = flags & ~DFLAG_NO_MAGIC; dep->cid = cid; erts_atomic_set_nob(&dep->input_handler, (erts_aint_t) cid); diff --git a/erts/emulator/beam/external.c b/erts/emulator/beam/external.c index d8f52ceb57..bfac48580c 100644 --- a/erts/emulator/beam/external.c +++ b/erts/emulator/beam/external.c @@ -561,7 +561,7 @@ int erts_encode_dist_ext_size_int(Eterm term, struct erts_dsig_send_context* ctx return -1; } else { #ifndef ERTS_DEBUG_USE_DIST_SEP - if (!(ctx->flags & (DFLAG_DIST_HDR_ATOM_CACHE | DFLAG_PENDING_CONNECTION))) + if (!(ctx->flags & (DFLAG_DIST_HDR_ATOM_CACHE | DFLAG_NO_MAGIC))) #endif sz++ /* VERSION_MAGIC */; @@ -593,7 +593,7 @@ int erts_encode_dist_ext(Eterm term, byte **ext, Uint32 flags, ErtsAtomCacheMap { if (!ctx || !ctx->wstack.wstart) { #ifndef ERTS_DEBUG_USE_DIST_SEP - if (!(flags & (DFLAG_DIST_HDR_ATOM_CACHE | DFLAG_PENDING_CONNECTION))) + if (!(flags & (DFLAG_DIST_HDR_ATOM_CACHE | DFLAG_NO_MAGIC))) #endif *(*ext)++ = VERSION_MAGIC; } diff --git a/erts/emulator/test/distribution_SUITE.erl b/erts/emulator/test/distribution_SUITE.erl index fa683f363f..f1831b7c45 100644 --- a/erts/emulator/test/distribution_SUITE.erl +++ b/erts/emulator/test/distribution_SUITE.erl @@ -41,6 +41,7 @@ -export([all/0, suite/0, groups/0, ping/1, bulk_send_small/1, group_leader/1, + optimistic_dflags/1, bulk_send_big/1, bulk_send_bigbig/1, local_send_small/1, local_send_big/1, local_send_legal/1, link_to_busy/1, exit_to_busy/1, @@ -66,6 +67,7 @@ %% Internal exports. -export([sender/3, receiver2/2, dummy_waiter/0, dead_process/0, group_leader_1/1, + optimistic_dflags_echo/0, optimistic_dflags_sender/1, roundtrip/1, bounce/1, do_dist_auto_connect/1, inet_rpc_server/1, dist_parallel_sender/3, dist_parallel_receiver/0, dist_evil_parallel_receiver/0]). @@ -80,6 +82,7 @@ suite() -> all() -> [ping, {group, bulk_send}, {group, local_send}, group_leader, + optimistic_dflags, link_to_busy, exit_to_busy, lost_exit, link_to_dead, link_to_dead_new_node, ref_port_roundtrip, nil_roundtrip, stop_dist, @@ -166,12 +169,58 @@ group_leader_1(Node2) -> ?Line {ExtPid, group_leader, GL2} = receive_one(), ok. -receive_one() -> - receive M -> M after 1000 -> timeout end. +%% Test optimistic distribution flags toward pending connections (DFLAG_DIST_HOPEFULLY) +optimistic_dflags(Config) when is_list(Config) -> + ?Line Sender = start_relay_node(optimistic_dflags_sender, []), + ?Line Echo = start_relay_node(optimistic_dflags_echo, []), + try + ?Line {ok, ok} = do_inet_rpc(Echo, ?MODULE, optimistic_dflags_echo, []), + + ?Line EchoNode = inet_rpc_nodename(Echo), + ?Line {ok, ok} = do_inet_rpc(Sender, ?MODULE, optimistic_dflags_sender, [EchoNode]) + after + ?Line stop_relay_node(Sender), + ?Line stop_relay_node(Echo) + end, + ok. + +optimistic_dflags_echo() -> + P = spawn(fun F() -> + receive {From, Term} -> + From ! {self(), Term} + end, + F() + end), + register(optimistic_dflags_echo, P), + optimistic_dflags_echo ! {self(), hello}, + {P, hello} = receive_one(), + ok. + +optimistic_dflags_sender(EchoNode) -> + ?Line net_kernel:monitor_nodes(true), + optimistic_dflags_do(EchoNode, <<1:1>>), + optimistic_dflags_do(EchoNode, fun lists:map/2), + ok. +optimistic_dflags_do(EchoNode, Term) -> + ?Line {optimistic_dflags_echo, EchoNode} ! {self(), Term}, + ?Line {nodeup, EchoNode} = receive_one(), + ?Line {EchoPid, Term} = receive_one(), + %% repeat with pid destination + ?Line net_kernel:disconnect(EchoNode), + ?Line {nodedown, EchoNode} = receive_one(), + ?Line EchoPid ! {self(), Term}, + ?Line {nodeup, EchoNode} = receive_one(), + ?Line {EchoPid, Term} = receive_one(), + + ?Line net_kernel:disconnect(EchoNode), + ?Line {nodedown, EchoNode} = receive_one(), + ok. +receive_one() -> + receive M -> M after 1000 -> timeout end. bulk_send_small(Config) when is_list(Config) -> -- cgit v1.2.3 From 966b0f9b4095984abd2c5196d40c4e15d5d1a06c Mon Sep 17 00:00:00 2001 From: Sverker Eriksson Date: Thu, 24 Aug 2017 20:34:16 +0200 Subject: erl_interface: Refactor ei_accept_SUITE in order to call ei_publish() separately. --- lib/erl_interface/test/ei_accept_SUITE.erl | 14 ++++++++--- .../test/ei_accept_SUITE_data/ei_accept_test.c | 28 ++++++++++++++++++++-- 2 files changed, 37 insertions(+), 5 deletions(-) diff --git a/lib/erl_interface/test/ei_accept_SUITE.erl b/lib/erl_interface/test/ei_accept_SUITE.erl index e06ee762d7..8881eb85bd 100644 --- a/lib/erl_interface/test/ei_accept_SUITE.erl +++ b/lib/erl_interface/test/ei_accept_SUITE.erl @@ -59,7 +59,8 @@ ei_accept(Config) when is_list(Config) -> spawn(F), Port = 6543, - {ok, Fd, _Node} = ei_accept(P, Port), + {ok, ListenFd} = ei_publish(P, Port), + {ok, Fd, _Node} = ei_accept(P, ListenFd), TermReceived= ei_receive(P, Fd), io:format("Sent ~p received ~p ~n", [TermToSend, TermReceived]), TermToSend= TermReceived, @@ -137,8 +138,15 @@ ei_connect_init(P, Num, Cookie, Creation) -> {term,Int} when is_integer(Int) -> Int end. -ei_accept(P, PortNo) -> - send_command(P, ei_accept, [PortNo]), +ei_publish(P, PortNo) -> + send_command(P, ei_publish, [PortNo]), + case get_term(P) of + {term,{ListenFd, EpmdFd, _}} when ListenFd >= 0, EpmdFd >= 0 -> {ok, ListenFd}; + {term,{_, _, Errno}} -> {error,Errno} + end. + +ei_accept(P, ListenFd) -> + send_command(P, ei_accept, [ListenFd]), case get_term(P) of {term,{Fd, _, Node}} when Fd >= 0 -> {ok, Fd, Node}; {term,{_Fd, Errno, _Node}} -> {error,Errno} diff --git a/lib/erl_interface/test/ei_accept_SUITE_data/ei_accept_test.c b/lib/erl_interface/test/ei_accept_SUITE_data/ei_accept_test.c index 7b81ee5491..2355192cc2 100644 --- a/lib/erl_interface/test/ei_accept_SUITE_data/ei_accept_test.c +++ b/lib/erl_interface/test/ei_accept_SUITE_data/ei_accept_test.c @@ -43,6 +43,7 @@ #include "ei_runner.h" static void cmd_ei_connect_init(char* buf, int len); +static void cmd_ei_publish(char* buf, int len); static void cmd_ei_accept(char* buf, int len); static void cmd_ei_receive(char* buf, int len); static void cmd_ei_unpublish(char* buf, int len); @@ -58,6 +59,7 @@ static struct { void (*func)(char* buf, int len); } commands[] = { "ei_connect_init", 3, cmd_ei_connect_init, + "ei_publish", 1, cmd_ei_publish, "ei_accept", 1, cmd_ei_accept, "ei_receive", 1, cmd_ei_receive, "ei_unpublish", 0, cmd_ei_unpublish @@ -149,11 +151,10 @@ static int my_listen(int port) return listen_fd; } -static void cmd_ei_accept(char* buf, int len) +static void cmd_ei_publish(char* buf, int len) { int index = 0; int listen, r; - ErlConnect conn; long port; ei_x_buff x; int i; @@ -170,6 +171,29 @@ static void cmd_ei_accept(char* buf, int len) #ifdef VXWORKS save_fd(i); #endif + /* send listen-fd, result and errno */ + ei_x_new_with_version(&x); + ei_x_encode_tuple_header(&x, 3); + ei_x_encode_long(&x, listen); + ei_x_encode_long(&x, i); + ei_x_encode_long(&x, erl_errno); + send_bin_term(&x); + ei_x_free(&x); +} + +static void cmd_ei_accept(char* buf, int len) +{ + int index = 0; + int r; + ErlConnect conn; + long listen; + ei_x_buff x; + int i; + + /* get port */ + if (ei_decode_long(buf, &index, &listen) < 0) + fail("expected int (listen fd)"); + r = ei_accept(&ec, listen, &conn); #ifdef VXWORKS save_fd(r); -- cgit v1.2.3 From 38c70e35b6da2571c7bc90442108fe5d5ddbfad1 Mon Sep 17 00:00:00 2001 From: Sverker Eriksson Date: Thu, 24 Aug 2017 21:53:56 +0200 Subject: erl_interface: Add tuple fallback tests DFLAG_EXPORT_PTR_TAG and DFLAG_BIT_BINARIES --- lib/erl_interface/test/ei_accept_SUITE.erl | 45 ++++++++++++++---------------- 1 file changed, 21 insertions(+), 24 deletions(-) diff --git a/lib/erl_interface/test/ei_accept_SUITE.erl b/lib/erl_interface/test/ei_accept_SUITE.erl index 8881eb85bd..16eb0901ef 100644 --- a/lib/erl_interface/test/ei_accept_SUITE.erl +++ b/lib/erl_interface/test/ei_accept_SUITE.erl @@ -44,34 +44,31 @@ ei_accept(Config) when is_list(Config) -> io:format("Myname ~p ~n", [Myname]), EINode = list_to_atom("c42@"++Myname), io:format("EINode ~p ~n", [EINode]), + + %% We take this opportunity to also test export-funs and bit-strings + %% with (ugly) tuple fallbacks. + %% Test both toward pending connection and established connection. + RealTerms = [<<1:1>>, fun lists:map/2], + Fallbacks = [{<<128>>,1}, {lists,map}], + Self = self(), - TermToSend= {call, Self, "Test"}, - F= fun() -> - case waitfornode("c42",20) of - true -> - {any, EINode} ! TermToSend, - Self ! sent_ok; - false -> - Self ! never_published - end, - ok - end, - - spawn(F), + Funny = fun() -> hello end, + TermToSend = {call, Self, "Test", Funny, RealTerms}, + TermToGet = {call, Self, "Test", Funny, Fallbacks}, Port = 6543, {ok, ListenFd} = ei_publish(P, Port), + {any, EINode} ! TermToSend, {ok, Fd, _Node} = ei_accept(P, ListenFd), - TermReceived= ei_receive(P, Fd), - io:format("Sent ~p received ~p ~n", [TermToSend, TermReceived]), - TermToSend= TermReceived, - receive - sent_ok -> - ok; - Unknown -> - io:format("~p ~n", [Unknown]) - after 1000 -> - io:format("timeout ~n") - end, + Got1 = ei_receive(P, Fd), + + %% Send again, now without auto-connect + {any, EINode} ! TermToSend, + Got2 = ei_receive(P, Fd), + + io:format("Sent ~p~nExp. ~p~nGot1 ~p~nGot2 ~p~n", [TermToSend, TermToGet, Got1, Got2]), + TermToGet = Got1, + TermToGet = Got2, + runner:finish(P), ok. -- cgit v1.2.3 From 42dadd12ac6b6977513a6edd73ff6137488c4a4a Mon Sep 17 00:00:00 2001 From: Sverker Eriksson Date: Wed, 6 Sep 2017 20:38:52 +0200 Subject: erts: Ensure enc_term_int() always do progress even when reds <= 1 Removed micro optimization for first fun variable to make things simpler. --- erts/emulator/beam/external.c | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/erts/emulator/beam/external.c b/erts/emulator/beam/external.c index bfac48580c..8408d7dd12 100644 --- a/erts/emulator/beam/external.c +++ b/erts/emulator/beam/external.c @@ -2554,8 +2554,6 @@ enc_term_int(TTBEncodeContext* ctx, ErtsAtomCacheMap *acmp, Eterm obj, byte* ep, break; } - L_jump_start: - if (ctx && --r <= 0) { *reds = 0; ctx->obj = obj; @@ -2563,6 +2561,8 @@ enc_term_int(TTBEncodeContext* ctx, ErtsAtomCacheMap *acmp, Eterm obj, byte* ep, WSTACK_SAVE(s, &ctx->wstack); return -1; } + + L_jump_start: switch(tag_val_def(obj)) { case NIL_DEF: *ep++ = NIL_EXT; @@ -2945,13 +2945,9 @@ enc_term_int(TTBEncodeContext* ctx, ErtsAtomCacheMap *acmp, Eterm obj, byte* ep, ep = enc_term(acmp, make_small(funp->fe->old_uniq), ep, dflags, off_heap); ep = enc_pid(acmp, funp->creator, ep, dflags); - for (ei = funp->num_free-1; ei > 0; ei--) { + for (ei = funp->num_free-1; ei >= 0; ei--) { WSTACK_PUSH2(s, ENC_TERM, (UWord) funp->env[ei]); } - if (funp->num_free != 0) { - obj = funp->env[0]; - goto L_jump_start; - } } break; } -- cgit v1.2.3 From e4d28f70133af9b3b8c320fe5e70e2086830874d Mon Sep 17 00:00:00 2001 From: Sverker Eriksson Date: Fri, 25 Aug 2017 18:46:58 +0200 Subject: erts: Transcode tuple fallbacks When finalizing outgoing distribution messages we transcode them into using tuple fallbacks if the receiver does not support bitstrings and export-funs. This can only happen if the message was first encoded toward a pending connection when the receiver was unknown. It's an optimistic approach optmimized for modern beam nodes, that expect real bitstrings and funs (since orig_bytes[0]; #ifdef DEBUG obuf->dbg_pattern = ERTS_DIST_OUTPUT_BUF_DBG_PATTERN; + obuf->alloc_endp = obuf->data + size; ASSERT(bin == ErtsDistOutputBuf2Binary(obuf)); #endif return obuf; @@ -692,6 +693,7 @@ static ErtsDistOutputBuf* clear_de_out_queues(DistEntry* dep) dep->tmp_out_queue.last = NULL; dep->finalized_out_queue.first = NULL; dep->finalized_out_queue.last = NULL; + return obuf; } @@ -755,6 +757,8 @@ static void clear_dist_entry(DistEntry *dep) delete_cache(cache); free_de_out_queues(dep, obuf); + if (dep->transcode_ctx) + transcode_free_ctx(dep); } int erts_dsend_context_dtor(Binary* ctx_bin) @@ -2208,7 +2212,6 @@ dist_port_commandv(Port *prt, ErtsDistOutputBuf *obuf) #endif #define ERTS_PORT_REDS_DIST_CMD_START 5 -#define ERTS_PORT_REDS_DIST_CMD_FINALIZE 3 #define ERTS_PORT_REDS_DIST_CMD_EXIT 200 #define ERTS_PORT_REDS_DIST_CMD_RESUMED 5 #define ERTS_PORT_REDS_DIST_CMD_DATA(SZ) \ @@ -2217,9 +2220,9 @@ dist_port_commandv(Port *prt, ErtsDistOutputBuf *obuf) : ((((Sint) (SZ)) >> 10) & ((Sint) ERTS_PORT_REDS_MASK__))) int -erts_dist_command(Port *prt, int reds_limit) +erts_dist_command(Port *prt, int initial_reds) { - Sint reds = ERTS_PORT_REDS_DIST_CMD_START; + Sint reds = initial_reds - ERTS_PORT_REDS_DIST_CMD_START; Uint32 status; Uint32 flags; Sint qsize, obufsize = 0; @@ -2241,9 +2244,12 @@ erts_dist_command(Port *prt, int reds_limit) if (status & ERTS_DE_SFLG_EXITING) { erts_deliver_port_exit(prt, prt->common.id, am_killed, 0, 1); - return reds + ERTS_PORT_REDS_DIST_CMD_EXIT; + reds -= ERTS_PORT_REDS_DIST_CMD_EXIT; + return initial_reds - reds; } + ASSERT(!(status & ERTS_DE_SFLG_PENDING)); + ASSERT(send); /* @@ -2268,7 +2274,7 @@ erts_dist_command(Port *prt, int reds_limit) sched_flags = erts_atomic32_read_nob(&prt->sched.flags); - if (reds > reds_limit) + if (reds < 0) goto preempted; if (!(sched_flags & ERTS_PTS_FLG_BUSY_PORT) && foq.first) { @@ -2283,13 +2289,13 @@ erts_dist_command(Port *prt, int reds_limit) erts_fprintf(stderr, ">> "); bw(foq.first->extp, size); #endif - reds += ERTS_PORT_REDS_DIST_CMD_DATA(size); - fob = foq.first; - obufsize += size_obuf(fob); - foq.first = foq.first->next; - free_dist_obuf(fob); + reds -= ERTS_PORT_REDS_DIST_CMD_DATA(size); + fob = foq.first; + obufsize += size_obuf(fob); + foq.first = foq.first->next; + free_dist_obuf(fob); sched_flags = erts_atomic32_read_nob(&prt->sched.flags); - preempt = reds > reds_limit || (sched_flags & ERTS_PTS_FLG_EXIT); + preempt = reds < 0 || (sched_flags & ERTS_PTS_FLG_EXIT); if (sched_flags & ERTS_PTS_FLG_BUSY_PORT) break; } while (foq.first && !preempt); @@ -2302,44 +2308,42 @@ erts_dist_command(Port *prt, int reds_limit) if (sched_flags & ERTS_PTS_FLG_BUSY_PORT) { if (oq.first) { ErtsDistOutputBuf *ob; - int preempt; + ErtsDistOutputBuf *last_finalized = NULL; finalize_only: - preempt = 0; ob = oq.first; ASSERT(ob); do { - erts_encode_ext_dist_header_finalize(ob, dep->cache, flags); - ASSERT(&ob->data[0] <= ob->extp && ob->extp < ob->ext_endp); - reds += ERTS_PORT_REDS_DIST_CMD_FINALIZE; - preempt = reds > reds_limit; - if (preempt) - break; - ob = ob->next; + reds = erts_encode_ext_dist_header_finalize(ob, dep, flags, reds); + if (reds >= 0) { + last_finalized = ob; + ob = ob->next; + } } while (ob); - /* - * At least one buffer was finalized; if we got preempted, - * ob points to the last buffer that we finalized. - */ - if (foq.last) - foq.last->next = oq.first; - else - foq.first = oq.first; - if (!preempt) { - /* All buffers finalized */ - foq.last = oq.last; - oq.first = oq.last = NULL; - } - else { - /* Not all buffers finalized; split oq. */ - foq.last = ob; - oq.first = ob->next; - if (oq.first) - ob->next = NULL; - else - oq.last = NULL; - } - if (preempt) - goto preempted; + if (last_finalized) { + /* + * At least one buffer was finalized; if we got preempted, + * ob points to the next buffer to continue finalize. + */ + if (foq.last) + foq.last->next = oq.first; + else + foq.first = oq.first; + foq.last = last_finalized; + if (!ob) { + /* All buffers finalized */ + ASSERT(foq.last == oq.last); + ASSERT(foq.last->next == NULL); + oq.first = oq.last = NULL; + } + else { + /* Not all buffers finalized; split oq. */ + ASSERT(foq.last->next == ob); + foq.last->next = NULL; + oq.first = ob; + } + } + if (reds <= 0) + goto preempted; } } else { @@ -2348,8 +2352,11 @@ erts_dist_command(Port *prt, int reds_limit) while (oq.first && !preempt) { ErtsDistOutputBuf *fob; Uint size; - erts_encode_ext_dist_header_finalize(oq.first, dep->cache, flags); - reds += ERTS_PORT_REDS_DIST_CMD_FINALIZE; + reds = erts_encode_ext_dist_header_finalize(oq.first, dep, flags, reds); + if (reds < 0) { + preempt = 1; + break; + } ASSERT(&oq.first->data[0] <= oq.first->extp && oq.first->extp < oq.first->ext_endp); size = (*send)(prt, oq.first); @@ -2359,13 +2366,13 @@ erts_dist_command(Port *prt, int reds_limit) erts_fprintf(stderr, ">> "); bw(oq.first->extp, size); #endif - reds += ERTS_PORT_REDS_DIST_CMD_DATA(size); - fob = oq.first; - obufsize += size_obuf(fob); - oq.first = oq.first->next; - free_dist_obuf(fob); + reds -= ERTS_PORT_REDS_DIST_CMD_DATA(size); + fob = oq.first; + obufsize += size_obuf(fob); + oq.first = oq.first->next; + free_dist_obuf(fob); sched_flags = erts_atomic32_read_nob(&prt->sched.flags); - preempt = reds > reds_limit || (sched_flags & ERTS_PTS_FLG_EXIT); + preempt = reds <= 0 || (sched_flags & ERTS_PTS_FLG_EXIT); if ((sched_flags & ERTS_PTS_FLG_BUSY_PORT) && oq.first && !preempt) goto finalize_only; } @@ -2405,7 +2412,7 @@ erts_dist_command(Port *prt, int reds_limit) erts_mtx_unlock(&dep->qlock); resumed = erts_resume_processes(suspendees); - reds += resumed*ERTS_PORT_REDS_DIST_CMD_RESUMED; + reds -= resumed*ERTS_PORT_REDS_DIST_CMD_RESUMED; } else erts_mtx_unlock(&dep->qlock); @@ -2428,8 +2435,7 @@ erts_dist_command(Port *prt, int reds_limit) erts_mtx_unlock(&dep->qlock); } - ASSERT(foq.first || !foq.last); - ASSERT(!foq.first || foq.last); + ASSERT(!!foq.first == !!foq.last); ASSERT(!dep->finalized_out_queue.first); ASSERT(!dep->finalized_out_queue.last); @@ -2439,10 +2445,10 @@ erts_dist_command(Port *prt, int reds_limit) } /* Avoid wrapping reduction counter... */ - if (reds > INT_MAX/2) - reds = INT_MAX/2; + if (reds < INT_MIN/2) + reds = INT_MIN/2; - return reds; + return initial_reds - reds; preempted: /* @@ -2450,8 +2456,7 @@ erts_dist_command(Port *prt, int reds_limit) * since last call to driver. */ - ASSERT(oq.first || !oq.last); - ASSERT(!oq.first || oq.last); + ASSERT(!!oq.first == !!oq.last); if (sched_flags & ERTS_PTS_FLG_EXIT) { /* @@ -2475,14 +2480,11 @@ erts_dist_command(Port *prt, int reds_limit) foq.first = NULL; foq.last = NULL; - -/* SVERK Hmmm.... #ifdef DEBUG erts_mtx_lock(&dep->qlock); ASSERT(erts_atomic_read_nob(&dep->qsize) == obufsize); erts_mtx_unlock(&dep->qlock); #endif -*/ } else { if (oq.first) { @@ -2772,7 +2774,8 @@ BIF_RETTYPE dist_ctrl_get_data_1(BIF_ALIST_1) { DistEntry *dep = ERTS_PROC_GET_DIST_ENTRY(BIF_P); - int reds = 1; + const Sint initial_reds = ERTS_BIF_REDS_LEFT(BIF_P); + Sint reds = initial_reds; ErtsDistOutputBuf *obuf; Eterm *hp; ProcBin *pb; @@ -2803,6 +2806,7 @@ dist_ctrl_get_data_1(BIF_ALIST_1) { if (!dep->tmp_out_queue.first) { ASSERT(!dep->tmp_out_queue.last); + ASSERT(!dep->transcode_ctx); qsize = erts_atomic_read_acqb(&dep->qsize); if (qsize > 0) { erts_mtx_lock(&dep->qlock); @@ -2820,21 +2824,18 @@ dist_ctrl_get_data_1(BIF_ALIST_1) erts_de_runlock(dep); BIF_RET(am_none); } - else { - obuf = dep->tmp_out_queue.first; - dep->tmp_out_queue.first = obuf->next; - if (!obuf->next) - dep->tmp_out_queue.last = NULL; + + obuf = dep->tmp_out_queue.first; + reds = erts_encode_ext_dist_header_finalize(obuf, dep, dep->flags, reds); + if (reds < 0) { + erts_de_runlock(dep); + ERTS_BIF_YIELD1(bif_export[BIF_dist_ctrl_get_data_1], + BIF_P, BIF_ARG_1); } - obuf->extp = erts_encode_ext_dist_header_finalize(obuf->extp, - dep->cache, - dep->flags); - reds += ERTS_PORT_REDS_DIST_CMD_FINALIZE; - if (!(dep->flags & DFLAG_DIST_HDR_ATOM_CACHE)) - *--obuf->extp = PASS_THROUGH; /* 'pass through' needed */ - ASSERT(&obuf->data[0] <= obuf->extp - && obuf->extp < obuf->ext_endp); + dep->tmp_out_queue.first = obuf->next; + if (!obuf->next) + dep->tmp_out_queue.last = NULL; } erts_atomic64_inc_nob(&dep->out); @@ -2862,11 +2863,11 @@ dist_ctrl_get_data_1(BIF_ALIST_1) erts_mtx_unlock(&dep->qlock); if (resume_procs) { int resumed = erts_resume_processes(resume_procs); - reds += resumed*ERTS_PORT_REDS_DIST_CMD_RESUMED; + reds -= resumed*ERTS_PORT_REDS_DIST_CMD_RESUMED; } } - BIF_RET2(make_binary(pb), reds); + BIF_RET2(make_binary(pb), (initial_reds - reds)); } void diff --git a/erts/emulator/beam/erl_alloc.types b/erts/emulator/beam/erl_alloc.types index 2960272eab..c6cc5c78b3 100644 --- a/erts/emulator/beam/erl_alloc.types +++ b/erts/emulator/beam/erl_alloc.types @@ -261,6 +261,7 @@ type MINDIRECTION FIXED_SIZE SYSTEM magic_indirection type BINARY_FIND SHORT_LIVED PROCESSES binary_find type OPEN_PORT_ENV TEMPORARY SYSTEM open_port_env type CRASH_DUMP STANDARD SYSTEM crash_dump +type DIST_TRANSCODE SHORT_LIVED SYSTEM dist_transcode_context type THR_Q_EL STANDARD SYSTEM thr_q_element type THR_Q_EL_SL FIXED_SIZE SYSTEM sl_thr_q_element diff --git a/erts/emulator/beam/erl_node_tables.c b/erts/emulator/beam/erl_node_tables.c index 2a33766ac8..00e8306510 100644 --- a/erts/emulator/beam/erl_node_tables.c +++ b/erts/emulator/beam/erl_node_tables.c @@ -204,6 +204,7 @@ dist_table_alloc(void *dep_tmpl) erts_port_task_handle_init(&dep->dist_cmd); dep->send = NULL; dep->cache = NULL; + dep->transcode_ctx = NULL; /* Link in */ diff --git a/erts/emulator/beam/erl_node_tables.h b/erts/emulator/beam/erl_node_tables.h index af42814b8b..afa83f8b46 100644 --- a/erts/emulator/beam/erl_node_tables.h +++ b/erts/emulator/beam/erl_node_tables.h @@ -83,6 +83,7 @@ typedef struct ErtsDistOutputBuf_ ErtsDistOutputBuf; struct ErtsDistOutputBuf_ { #ifdef DEBUG Uint dbg_pattern; + byte *alloc_endp; #endif ErtsDistOutputBuf *next; byte *extp; @@ -156,6 +157,8 @@ typedef struct dist_entry_ { struct cache* cache; /* The atom cache */ ErtsThrPrgrLaterOp later_op; + + struct transcode_context* transcode_ctx; } DistEntry; typedef struct erl_node_ { diff --git a/erts/emulator/beam/external.c b/erts/emulator/beam/external.c index 8408d7dd12..f2e6399ad7 100644 --- a/erts/emulator/beam/external.c +++ b/erts/emulator/beam/external.c @@ -122,6 +122,9 @@ static int encode_size_struct_int(struct TTBSizeContext_*, ErtsAtomCacheMap *acm static Export binary_to_term_trap_export; static BIF_RETTYPE binary_to_term_trap_1(BIF_ALIST_1); +static Sint transcode_dist_obuf(ErtsDistOutputBuf*, DistEntry*, Uint32 dflags, Sint reds); + + void erts_init_external(void) { erts_init_trap_export(&term_to_binary_trap_export, @@ -349,11 +352,13 @@ byte *erts_encode_ext_dist_header_setup(byte *ctl_ext, ErtsAtomCacheMap *acmp) } } -#define PASS_THROUGH 'p' /* This code should go */ -void erts_encode_ext_dist_header_finalize(ErtsDistOutputBuf* ob, - ErtsAtomCache *cache, - Uint32 dflags) +#define PASS_THROUGH 'p' + +Sint erts_encode_ext_dist_header_finalize(ErtsDistOutputBuf* ob, + DistEntry* dep, + Uint32 dflags, + Sint reds) { byte *ip; byte instr_buf[(2+4)*ERTS_ATOM_CACHE_SIZE]; @@ -364,7 +369,7 @@ void erts_encode_ext_dist_header_finalize(ErtsDistOutputBuf* ob, ASSERT(dflags & DFLAG_UTF8_ATOMS); /* - * The buffer can have three different layouts at this point depending on + * The buffer can have different layouts at this point depending on * what was known when encoded: * * Pending connection: CtrlTerm [, MsgTerm] @@ -372,34 +377,28 @@ void erts_encode_ext_dist_header_finalize(ErtsDistOutputBuf* ob, * No atom cache : VERSION_MAGIC, CtrlTerm [, VERSION_MAGIC, MsgTerm] */ - if (ep[0] != VERSION_MAGIC) { + if (ep[0] != VERSION_MAGIC || dep->transcode_ctx) { /* * Was encoded without atom cache toward pending connection. */ ASSERT(ep[0] == SMALL_TUPLE_EXT || ep[0] == LARGE_TUPLE_EXT); + + if (~dflags & (DFLAG_BIT_BINARIES | DFLAG_EXPORT_PTR_TAG + | DFLAG_DIST_HDR_ATOM_CACHE)) { + reds = transcode_dist_obuf(ob, dep, dflags, reds); + if (reds < 0) + return reds; + ep = ob->extp; + } if (dflags & DFLAG_DIST_HDR_ATOM_CACHE) { /* - * Receiver wants dist header. Let's prepend an empty one. + * Encoding was done without atom caching but receiver expects + * a dist header, so we prepend an empty one. */ *--ep = 0; /* NumberOfAtomCacheRefs */ *--ep = DIST_HEADER; *--ep = VERSION_MAGIC; } - else { - /* - * Primitive receiver without atom cache (erl_interface/jinterface). - * Must prepend VERSION_MAGIC to both ctrl and message - * And add PASSTHROUGH. - */ - if (ob->msg_start) { - ASSERT(ep < ob->msg_start && ob->msg_start < ob->ext_endp); - sys_memmove(ep-1, ep, (ob->msg_start - ep)); - ob->msg_start[-1] = VERSION_MAGIC; - --ep; - } - *--ep = VERSION_MAGIC; - *--ep = PASS_THROUGH; - } goto done; } else if (ep[1] != DIST_HEADER) { @@ -436,6 +435,7 @@ void erts_encode_ext_dist_header_finalize(ErtsDistOutputBuf* ob, / sizeof(Uint32))+1]; register Uint32 flgs; int iix, flgs_bytes, flgs_buf_ix, used_half_bytes; + ErtsAtomCache* cache = dep->cache; #ifdef DEBUG int tot_used_half_bytes; #endif @@ -527,14 +527,16 @@ void erts_encode_ext_dist_header_finalize(ErtsDistOutputBuf* ob, break; } } + reds -= 3; /*was ERTS_PORT_REDS_DIST_CMD_FINALIZE*/ } --ep; put_int8(ci, ep); *--ep = DIST_HEADER; *--ep = VERSION_MAGIC; done: - ASSERT(ep >= ob->data); ob->extp = ep; + ASSERT(&ob->data[0] <= ob->extp && ob->extp < ob->ext_endp); + return reds < 0 ? 0 : reds; } int erts_encode_dist_ext_size(Eterm term, Uint32 flags, ErtsAtomCacheMap *acmp, @@ -545,7 +547,7 @@ int erts_encode_dist_ext_size(Eterm term, Uint32 flags, ErtsAtomCacheMap *acmp, return -1; } else { #ifndef ERTS_DEBUG_USE_DIST_SEP - if (!(flags & DFLAG_DIST_HDR_ATOM_CACHE)) + if (!(flags & (DFLAG_DIST_HDR_ATOM_CACHE | DFLAG_NO_MAGIC))) #endif sz++ /* VERSION_MAGIC */; @@ -3285,6 +3287,7 @@ dec_term_atom_common: n--; if (ctx) { if (reds < n) { + ASSERT(reds > 0); ctx->state = B2TDecodeList; ctx->u.dc.remaining_n = n - reds; n = reds; @@ -4388,7 +4391,7 @@ decoded_size(byte *ep, byte* endp, int internal_tags, B2TContext* ctx) } } else - reds = 0; /* not used but compiler warns anyway */ + ERTS_UNDEF(reds, 0); heap_size = 0; terms = 1; @@ -4698,3 +4701,177 @@ error: #undef SKIP2 #undef CHKSIZE } + + +struct transcode_context { + enum { + TRANSCODE_DEC_MSG_SIZE, + TRANSCODE_DEC_MSG, + TRANSCODE_ENC_CTL, + TRANSCODE_ENC_MSG + }state; + Eterm ctl_term; + Eterm* ctl_heap; + ErtsHeapFactory ctl_factory; + Eterm* msg_heap; + B2TContext b2t; + TTBEncodeContext ttb; +#ifdef DEBUG + ErtsDistOutputBuf* dbg_ob; +#endif +}; + +void transcode_free_ctx(DistEntry* dep) +{ + struct transcode_context* ctx = dep->transcode_ctx; + + erts_factory_close(&ctx->ctl_factory); + erts_free(ERTS_ALC_T_DIST_TRANSCODE, ctx->ctl_heap); + + if (ctx->msg_heap) { + erts_factory_close(&ctx->b2t.u.dc.factory); + erts_free(ERTS_ALC_T_DIST_TRANSCODE, ctx->msg_heap); + } + erts_free(ERTS_ALC_T_DIST_TRANSCODE, ctx); + dep->transcode_ctx = NULL; +} + +Sint transcode_dist_obuf(ErtsDistOutputBuf* ob, + DistEntry* dep, + Uint32 dflags, + Sint reds) +{ + Sint hsz; + byte* decp; + const int have_msg = !!ob->msg_start; + int i; + struct transcode_context* ctx = dep->transcode_ctx; + + if (!ctx) { /* first call for 'ob' */ + + if (~dflags & (DFLAG_BIT_BINARIES | DFLAG_EXPORT_PTR_TAG)) { + /* + * Receiver does not support bitstrings and/or export funs. + * We need to transcode control and message terms to use tuple fallbacks. + */ + ctx = erts_alloc(ERTS_ALC_T_DIST_TRANSCODE, sizeof(struct transcode_context)); + dep->transcode_ctx = ctx; + #ifdef DEBUG + ctx->dbg_ob = ob; + #endif + + hsz = decoded_size(ob->extp, ob->ext_endp, 0, NULL); + ctx->ctl_heap = erts_alloc(ERTS_ALC_T_DIST_TRANSCODE, hsz*sizeof(Eterm)); + erts_factory_tmp_init(&ctx->ctl_factory, ctx->ctl_heap, hsz, ERTS_ALC_T_DIST_TRANSCODE); + ctx->msg_heap = NULL; + + decp = dec_term(NULL, &ctx->ctl_factory, ob->extp, &ctx->ctl_term, NULL); + if (have_msg) { + ASSERT(decp == ob->msg_start); (void)decp; + ctx->b2t.u.sc.ep = NULL; + ctx->b2t.state = B2TSize; + ctx->b2t.aligned_alloc = NULL; + ctx->b2t.b2ts.exttmp = 0; + ctx->state = TRANSCODE_DEC_MSG_SIZE; + } + else { + ASSERT(decp == ob->ext_endp); + ctx->state = TRANSCODE_ENC_CTL; + } + } + else { + /* + * No need for full transcoding, but primitive receiver (erl_/jinterface) + * expects VERSION_MAGIC before both control and message terms. + */ + if (ob->msg_start) { + Sint ctl_bytes = ob->msg_start - ob->extp; + ASSERT(ob->extp < ob->msg_start && ob->msg_start < ob->ext_endp); + /* Move control term back 1 byte to make room */ + sys_memmove(ob->extp-1, ob->extp, ctl_bytes); + *--(ob->msg_start) = VERSION_MAGIC; + --(ob->extp); + reds -= ctl_bytes / (B2T_BYTES_PER_REDUCTION * B2T_MEMCPY_FACTOR); + } + *--(ob->extp) = VERSION_MAGIC; + goto done; + } + } + else { + ASSERT(ctx->dbg_ob == ob); + } + ctx->b2t.reds = reds * B2T_BYTES_PER_REDUCTION; + + switch (ctx->state) { + case TRANSCODE_DEC_MSG_SIZE: + hsz = decoded_size(ob->msg_start, ob->ext_endp, 0, &ctx->b2t); + if (ctx->b2t.state == B2TSize) { + return -1; + } + ASSERT(ctx->b2t.state == B2TDecodeInit); + ctx->msg_heap = erts_alloc(ERTS_ALC_T_DIST_TRANSCODE, hsz*sizeof(Eterm)); + ctx->b2t.u.dc.ep = ob->msg_start; + ctx->b2t.u.dc.res = (Eterm) NULL; + ctx->b2t.u.dc.next = &ctx->b2t.u.dc.res; + erts_factory_tmp_init(&ctx->b2t.u.dc.factory, + ctx->msg_heap, hsz, ERTS_ALC_T_DIST_TRANSCODE); + ctx->b2t.u.dc.flat_maps.wstart = NULL; + ctx->b2t.u.dc.hamt_array.pstart = NULL; + ctx->b2t.state = B2TDecode; + + ctx->state = TRANSCODE_DEC_MSG; + case TRANSCODE_DEC_MSG: + if (ctx->b2t.reds <= 0) + ctx->b2t.reds = 1; + decp = dec_term(NULL, NULL, NULL, NULL, &ctx->b2t); + if (ctx->b2t.state < B2TDone) { + return -1; + } + ASSERT(ctx->b2t.state == B2TDone); + ASSERT(decp && decp <= ob->ext_endp); + reds = ctx->b2t.reds / B2T_BYTES_PER_REDUCTION; + b2t_destroy_context(&ctx->b2t); + + ctx->state = TRANSCODE_ENC_CTL; + case TRANSCODE_ENC_CTL: + if (!(dflags & DFLAG_DIST_HDR_ATOM_CACHE)) { + ASSERT(!(dflags & DFLAG_NO_MAGIC)); + ob->extp -= 2; /* VERSION_MAGIC x 2 */ + } + ob->ext_endp = ob->extp; + i = erts_encode_dist_ext(ctx->ctl_term, &ob->ext_endp, dflags, + NULL, NULL, NULL); + ASSERT(i == 0); (void)i; + ASSERT(ob->ext_endp <= ob->alloc_endp); + + if (!have_msg) { + break; + } + ob->msg_start = ob->ext_endp; + ctx->ttb.wstack.wstart = NULL; + ctx->ttb.flags = dflags; + ctx->ttb.level = 0; + + ctx->state = TRANSCODE_ENC_MSG; + case TRANSCODE_ENC_MSG: + reds *= TERM_TO_BINARY_LOOP_FACTOR; + if (erts_encode_dist_ext(ctx->b2t.u.dc.res, &ob->ext_endp, dflags, NULL, + &ctx->ttb, &reds)) { + return -1; + } + reds /= TERM_TO_BINARY_LOOP_FACTOR; + + ASSERT(ob->ext_endp <= ob->alloc_endp); + + } + transcode_free_ctx(dep); + +done: + if (!(dflags & DFLAG_DIST_HDR_ATOM_CACHE)) + *--(ob->extp) = PASS_THROUGH; + + if (reds < 0) + reds = 0; + + return reds; +} diff --git a/erts/emulator/beam/external.h b/erts/emulator/beam/external.h index 11044b17ef..f9f8abcc27 100644 --- a/erts/emulator/beam/external.h +++ b/erts/emulator/beam/external.h @@ -154,7 +154,7 @@ void erts_finalize_atom_cache_map(ErtsAtomCacheMap *, Uint32); Uint erts_encode_ext_dist_header_size(ErtsAtomCacheMap *); byte *erts_encode_ext_dist_header_setup(byte *, ErtsAtomCacheMap *); -void erts_encode_ext_dist_header_finalize(ErtsDistOutputBuf*, ErtsAtomCache *, Uint32); +Sint erts_encode_ext_dist_header_finalize(ErtsDistOutputBuf*, DistEntry *, Uint32 dflags, Sint reds); struct erts_dsig_send_context; int erts_encode_dist_ext_size(Eterm, Uint32, ErtsAtomCacheMap*, Uint* szp); int erts_encode_dist_ext_size_int(Eterm term, struct erts_dsig_send_context* ctx, Uint* szp); @@ -195,7 +195,7 @@ void erts_binary2term_abort(ErtsBinary2TermState *); Eterm erts_binary2term_create(ErtsBinary2TermState *, ErtsHeapFactory*); int erts_debug_max_atom_out_cache_index(void); int erts_debug_atom_to_out_cache_index(Eterm); - +void transcode_free_ctx(DistEntry* dep); #if ERTS_GLB_INLINE_INCL_FUNC_DEF -- cgit v1.2.3 From 56ab31904f7157d7b4282c2afd474c7bed7b9c75 Mon Sep 17 00:00:00 2001 From: Sverker Eriksson Date: Tue, 19 Sep 2017 17:44:47 +0200 Subject: Remove faulty assert Send may have failed, port exit with dist_entry cleaned up and then new pending connection with queued messages. --- erts/emulator/beam/dist.c | 5 ----- 1 file changed, 5 deletions(-) diff --git a/erts/emulator/beam/dist.c b/erts/emulator/beam/dist.c index a5df92bf9a..474b0e8364 100644 --- a/erts/emulator/beam/dist.c +++ b/erts/emulator/beam/dist.c @@ -2480,11 +2480,6 @@ erts_dist_command(Port *prt, int initial_reds) foq.first = NULL; foq.last = NULL; -#ifdef DEBUG - erts_mtx_lock(&dep->qlock); - ASSERT(erts_atomic_read_nob(&dep->qsize) == obufsize); - erts_mtx_unlock(&dep->qlock); -#endif } else { if (oq.first) { -- cgit v1.2.3 From 08040e5c5cd329395d9d756f1fdf8d66ffbbe705 Mon Sep 17 00:00:00 2001 From: Sverker Eriksson Date: Thu, 21 Sep 2017 19:50:22 +0200 Subject: Allow DistEntries in ETS --- erts/emulator/beam/erl_node_tables.c | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/erts/emulator/beam/erl_node_tables.c b/erts/emulator/beam/erl_node_tables.c index 00e8306510..17d410e42b 100644 --- a/erts/emulator/beam/erl_node_tables.c +++ b/erts/emulator/beam/erl_node_tables.c @@ -1089,6 +1089,7 @@ typedef struct { typedef struct dist_referrer_ { struct dist_referrer_ *next; int heap_ref; + int ets_ref; int node_ref; int ctrl_ref; int system_ref; @@ -1207,10 +1208,11 @@ insert_dist_referrer(ReferredDist *referred_dist, else { Uint *hp = &drp->id_heap[0]; ASSERT(is_tuple(id)); - drp->id = copy_struct(id, size_object(id), &hp, NULL); + drp->id = copy_struct(id, size_object(id), &hp, NULL); } drp->creation = creation; drp->heap_ref = 0; + drp->ets_ref = 0; drp->node_ref = 0; drp->ctrl_ref = 0; drp->system_ref = 0; @@ -1220,6 +1222,7 @@ insert_dist_referrer(ReferredDist *referred_dist, case NODE_REF: drp->node_ref++; break; case CTRL_REF: drp->ctrl_ref++; break; case HEAP_REF: drp->heap_ref++; break; + case ETS_REF: drp->ets_ref++; break; case SYSTEM_REF: drp->system_ref++; break; default: ASSERT(0); } @@ -1897,6 +1900,10 @@ reference_table_term(Uint **hpp, ErlOffHeap *ohp, Uint *szp) tup = MK_2TUP(AM_heap, MK_UINT(drp->heap_ref)); drl = MK_CONS(tup, drl); } + if(drp->ets_ref) { + tup = MK_2TUP(AM_ets, MK_UINT(drp->ets_ref)); + drl = MK_CONS(tup, drl); + } if(drp->system_ref) { tup = MK_2TUP(AM_system, MK_UINT(drp->system_ref)); drl = MK_CONS(tup, drl); @@ -1913,12 +1920,17 @@ reference_table_term(Uint **hpp, ErlOffHeap *ohp, Uint *szp) else if (is_tuple(drp->id)) { Eterm *t; ASSERT(drp->system_ref && !drp->node_ref - && !drp->ctrl_ref && !drp->heap_ref); + && !drp->ctrl_ref && !drp->heap_ref && !drp->ets_ref); t = tuple_val(drp->id); ASSERT(2 == arityval(t[0])); tup = MK_2TUP(t[1], t[2]); } - else { + else if (drp->ets_ref) { + ASSERT(!drp->heap_ref && !drp->node_ref && + !drp->ctrl_ref && !drp->system_ref); + tup = MK_2TUP(AM_ets, drp->id); + } + else { ASSERT(!drp->ctrl_ref && drp->node_ref); ASSERT(is_atom(drp->id)); tup = MK_2TUP(drp->id, MK_UINT(drp->creation)); -- cgit v1.2.3 From 771abbd23709b5a03416278595588931889ab7c5 Mon Sep 17 00:00:00 2001 From: Sverker Eriksson Date: Wed, 20 Sep 2017 15:06:07 +0200 Subject: erts: Keep magic ref to DistEntry in net_kernel to make sure it's kept alive. --- erts/emulator/beam/dist.c | 22 +++++++++++++++++----- erts/emulator/beam/dist.h | 24 ++++++++++++++++-------- erts/emulator/beam/erl_node_tables.c | 30 ++++++++++++++---------------- erts/emulator/beam/erl_node_tables.h | 1 + lib/kernel/src/net_kernel.erl | 17 +++++++++-------- 5 files changed, 57 insertions(+), 37 deletions(-) diff --git a/erts/emulator/beam/dist.c b/erts/emulator/beam/dist.c index 474b0e8364..c16579037c 100644 --- a/erts/emulator/beam/dist.c +++ b/erts/emulator/beam/dist.c @@ -3426,6 +3426,8 @@ BIF_RETTYPE new_connection_id_1(BIF_ALIST_1) { DistEntry* dep; Uint32 conn_id; + Eterm* hp; + Eterm dhandle; if (is_not_atom(BIF_ARG_1)) { BIF_ERROR(BIF_P, BADARG); @@ -3449,17 +3451,25 @@ BIF_RETTYPE new_connection_id_1(BIF_ALIST_1) conn_id = dep->connection_id; } erts_de_rwunlock(dep); - BIF_RET(make_small(conn_id)); + hp = HAlloc(BIF_P, 3 + ERTS_MAGIC_REF_THING_SIZE); + dhandle = erts_build_dhandle(&hp, &BIF_P->off_heap, dep); + erts_deref_dist_entry(dep); + BIF_RET(TUPLE2(hp, make_small(conn_id), dhandle)); } BIF_RETTYPE abort_connection_id_2(BIF_ALIST_2) { DistEntry* dep; + Eterm* tp; - if (is_not_atom(BIF_ARG_1) || is_not_small(BIF_ARG_2)) { + if (is_not_atom(BIF_ARG_1) || is_not_tuple_arity(BIF_ARG_2, 2)) { BIF_ERROR(BIF_P, BADARG); } - dep = erts_find_dist_entry(BIF_ARG_1); + tp = tuple_val(BIF_ARG_2); + dep = erts_dhandle_to_dist_entry(tp[2]); + if (is_not_small(tp[1]) || dep != erts_find_dist_entry(BIF_ARG_1)) { + BIF_ERROR(BIF_P, BADARG); + } ASSERT(dep != erts_this_dist_entry); /* SVERK: What to do? */ if (!dep) { @@ -3469,7 +3479,7 @@ BIF_RETTYPE abort_connection_id_2(BIF_ALIST_2) erts_de_rwlock(dep); if (dep->status == ERTS_DE_SFLG_PENDING - && dep->connection_id == unsigned_val(BIF_ARG_2)) { + && dep->connection_id == unsigned_val(tp[1])) { NetExitsContext nec = {dep}; ErtsLink *nlinks; @@ -3736,10 +3746,12 @@ monitor_node(Process* p, Eterm Node, Eterm Bool, Eterm Options) ++ERTS_LINK_REFC(lnk); lnk = erts_add_or_lookup_link(&ERTS_P_LINKS(p), LINK_NODE, Node); ++ERTS_LINK_REFC(lnk); + erts_de_links_unlock(dep); break; default: ERTS_ASSERT(! "Invalid dsig prepare result"); } + erts_deref_dist_entry(dep); } else { /* Bool == false */ dep = erts_sysname_to_connected_dist_entry(Node); @@ -3777,9 +3789,9 @@ monitor_node(Process* p, Eterm Node, Eterm Bool, Eterm Options) Node)); } } + erts_de_links_unlock(dep); } - erts_de_links_unlock(dep); erts_proc_unlock(p, ERTS_PROC_LOCK_LINK); done: diff --git a/erts/emulator/beam/dist.h b/erts/emulator/beam/dist.h index f9a2037687..2ead588ee9 100644 --- a/erts/emulator/beam/dist.h +++ b/erts/emulator/beam/dist.h @@ -161,6 +161,7 @@ erts_dsig_prepare(ErtsDSigData *dsdp, int connect) { DistEntry* dep = *depp; + int deref_dep = 0; int res; if (!erts_is_alive) @@ -171,6 +172,7 @@ erts_dsig_prepare(ErtsDSigData *dsdp, dep = erts_find_or_insert_dist_entry(dsdp->node); ASSERT(dep != erts_this_dist_entry); /* SVERK: What to do? */ + deref_dep = 1; } #ifdef ERTS_ENABLE_LOCK_CHECK @@ -208,7 +210,7 @@ retry: Eterm *hp; ErlOffHeap *ohp; ErtsMessage *mp; - Eterm msg, conn_id; + Eterm msg, conn_id, dhandle; dep->status = ERTS_DE_SFLG_PENDING; dep->flags = (DFLAG_DIST_MANDATORY | DFLAG_DIST_HOPEFULLY); @@ -220,16 +222,18 @@ retry: net_kernel = erts_whereis_process(proc, proc_locks, am_net_kernel, nk_locks, 0); if (!net_kernel) { - if (!*depp) { - erts_deref_dist_entry(dep); - } + if (deref_dep) + erts_deref_dist_entry(dep); return ERTS_DSIG_PREP_NOT_ALIVE; } - /* Send {auto_connect, Node, ConnId} to net_kernel */ - mp = erts_alloc_message_heap(net_kernel, &nk_locks, 4, &hp, &ohp); - msg = TUPLE3(hp, am_auto_connect, dep->sysname, conn_id); - erts_queue_message(net_kernel, nk_locks, mp, msg, proc->common.id); + /* Send {auto_connect, Node, ConnId, DHandle} to net_kernel */ + mp = erts_alloc_message_heap(net_kernel, &nk_locks, + 5 + ERTS_MAGIC_REF_THING_SIZE, + &hp, &ohp); + dhandle = erts_build_dhandle(&hp, ohp, dep); + msg = TUPLE4(hp, am_auto_connect, dep->sysname, conn_id, dhandle); + erts_queue_message(net_kernel, nk_locks, mp, msg, proc->common.id); erts_proc_unlock(net_kernel, nk_locks); } else @@ -255,6 +259,8 @@ retry: dsdp->no_suspend = no_suspend; if (dspl == ERTS_DSP_NO_LOCK) erts_de_runlock(dep); + if (deref_dep) + erts_deref_dist_entry(dep); *depp = dep; return res; @@ -263,6 +269,8 @@ retry: erts_de_rwunlock(dep); else erts_de_runlock(dep); + if (deref_dep) + erts_deref_dist_entry(dep); return res; } diff --git a/erts/emulator/beam/erl_node_tables.c b/erts/emulator/beam/erl_node_tables.c index 17d410e42b..7c75ec60e9 100644 --- a/erts/emulator/beam/erl_node_tables.c +++ b/erts/emulator/beam/erl_node_tables.c @@ -141,9 +141,7 @@ dist_table_cmp(void *dep1, void *dep2) static void* dist_table_alloc(void *dep_tmpl) { -#ifdef DEBUG erts_aint_t refc; -#endif Eterm sysname; Binary *bin; DistEntry *dep; @@ -160,13 +158,8 @@ dist_table_alloc(void *dep_tmpl) dist_entries++; -#ifdef DEBUG - refc = -#else - (void) -#endif - de_refc_dec_read(dep, -1); - ASSERT(refc == -1); + refc = de_refc_dec_read(dep, -1); + ASSERT(refc == -1); (void)refc; dep->prev = NULL; erts_rwmtx_init_opt(&dep->rwmtx, &rwmtx_opt, "dist_entry", sysname, @@ -227,6 +220,8 @@ dist_table_free(void *vdep) { DistEntry *dep = (DistEntry *) vdep; + ASSERT(de_refc_read(dep, -1) == -1); + ASSERT(dep->status == 0); ASSERT(is_nil(dep->cid)); ASSERT(dep->nlinks == NULL); ASSERT(dep->node_links == NULL); @@ -387,16 +382,19 @@ erts_dhandle_to_dist_entry(Eterm dhandle) } Eterm -erts_make_dhandle(Process *c_p, DistEntry *dep) +erts_build_dhandle(Eterm **hpp, ErlOffHeap* ohp, DistEntry *dep) { - Binary *bin; - Eterm *hp; - - bin = ErtsDistEntry2Bin(dep); + Binary *bin = ErtsDistEntry2Bin(dep); ASSERT(bin); ASSERT(ERTS_MAGIC_BIN_DESTRUCTOR(bin) == erts_dist_entry_destructor); - hp = HAlloc(c_p, ERTS_MAGIC_REF_THING_SIZE); - return erts_mk_magic_ref(&hp, &c_p->off_heap, bin); + return erts_mk_magic_ref(hpp, ohp, bin); +} + +Eterm +erts_make_dhandle(Process *c_p, DistEntry *dep) +{ + Eterm *hp = HAlloc(c_p, ERTS_MAGIC_REF_THING_SIZE); + return erts_build_dhandle(&hp, &c_p->off_heap, dep); } static void start_timer_delete_dist_entry(void *vdep); diff --git a/erts/emulator/beam/erl_node_tables.h b/erts/emulator/beam/erl_node_tables.h index afa83f8b46..83d0678bbc 100644 --- a/erts/emulator/beam/erl_node_tables.h +++ b/erts/emulator/beam/erl_node_tables.h @@ -210,6 +210,7 @@ int erts_lc_is_de_rlocked(DistEntry *); #endif int erts_dist_entry_destructor(Binary *bin); DistEntry *erts_dhandle_to_dist_entry(Eterm dhandle); +Eterm erts_build_dhandle(Eterm **hpp, ErlOffHeap*, DistEntry*); Eterm erts_make_dhandle(Process *c_p, DistEntry *dep); void erts_ref_dist_entry(DistEntry *dep); void erts_deref_dist_entry(DistEntry *dep); diff --git a/lib/kernel/src/net_kernel.erl b/lib/kernel/src/net_kernel.erl index f929e4bf11..eee915b15b 100644 --- a/lib/kernel/src/net_kernel.erl +++ b/lib/kernel/src/net_kernel.erl @@ -435,10 +435,10 @@ handle_connect(ConnLookup, Type, Node, ConnId, From , State) -> -define(ERTS_DIST_CON_ID_MASK, 16#ffffff). % also in external.h -verify_new_conn_id([], ConnId) - when (ConnId band (bnot ?ERTS_DIST_CON_ID_MASK)) =:= 0 -> +verify_new_conn_id([], {Nr,_DHandle}) + when (Nr band (bnot ?ERTS_DIST_CON_ID_MASK)) =:= 0 -> true; -verify_new_conn_id([#connection{conn_id = Old}], New) +verify_new_conn_id([#connection{conn_id = {Old,_}}], {New,_}) when New =:= ((Old+1) band ?ERTS_DIST_CON_ID_MASK) -> true; verify_new_conn_id(_, _) -> @@ -460,7 +460,7 @@ handle_call({auto_connect, Type, Node, WaitForBarred}, From, State) -> verbose({auto_connect, Type, Node, WaitForBarred}, 1, State), R = case (catch erlang:new_connection_id(Node)) of - ConnId when is_integer(ConnId) -> + {Nr,_DHandle}=ConnId when is_integer(Nr) -> handle_auto_connect(Type, Node, ConnId, WaitForBarred, From, State); _Error -> @@ -481,7 +481,7 @@ handle_call({connect, Type, Node}, From, State) -> verbose({connect, Type, Node}, 1, State), ConnLookup = ets:lookup(sys_dist, Node), R = case (catch erlang:new_connection_id(Node)) of - ConnId when is_integer(ConnId) -> + {Nr,_DHandle}=ConnId when is_integer(Nr) -> handle_connect(ConnLookup, Type, Node, ConnId, From, State); _Error -> @@ -699,8 +699,9 @@ terminate(_Reason, State) -> %% %% Asynchronous auto connect request %% -handle_info({auto_connect,Node,ConnId}, State) -> - verbose({auto_connect, Node, ConnId}, 1, State), +handle_info({auto_connect,Node, Nr, DHandle}, State) -> + verbose({auto_connect, Node, Nr, DHandle}, 1, State), + ConnId = {Nr, DHandle}, NewState = case handle_auto_connect(normal, Node, ConnId, false, noreply, State) of {noreply, S} -> %% Pending connection @@ -796,7 +797,7 @@ handle_info({AcceptPid, {accept_pending,MyNode,Node,Address,Type}}, State) -> {noreply, State}; _ -> ConnId = case (catch erlang:new_connection_id(Node)) of - CI when is_integer(CI) -> CI + {Nr,_DHandle}=CI when is_integer(Nr) -> CI %% SVERK What to do? end, ets:insert(sys_dist, #connection{node = Node, -- cgit v1.2.3 From bc1bb9f2a9e33ae4d1e91632852ecdddfa58a4d0 Mon Sep 17 00:00:00 2001 From: Sverker Eriksson Date: Mon, 25 Sep 2017 19:25:27 +0200 Subject: fix erlang specs and preloaded --- erts/preloaded/ebin/erlang.beam | Bin 107332 -> 104644 bytes erts/preloaded/src/erlang.erl | 9 +++++---- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/erts/preloaded/ebin/erlang.beam b/erts/preloaded/ebin/erlang.beam index 181a718287..18137fd53c 100644 Binary files a/erts/preloaded/ebin/erlang.beam and b/erts/preloaded/ebin/erlang.beam differ diff --git a/erts/preloaded/src/erlang.erl b/erts/preloaded/src/erlang.erl index 0aa8dae3e5..2dc2c48f0c 100644 --- a/erts/preloaded/src/erlang.erl +++ b/erts/preloaded/src/erlang.erl @@ -1664,15 +1664,16 @@ setnode(_P1, _P2, _P3) -> erlang:nif_error(undefined). %% new_connection_id/1 --spec erlang:new_connection_id(Node) -> integer() when - Node :: atom(). +-spec erlang:new_connection_id(Node) -> ConnId when + Node :: atom(), + ConnId :: {integer(), dist_handle()}. new_connection_id(_Node) -> erlang:nif_error(undefined). %% abort_connection_id/2 --spec erlang:abort_connection_id(Node, ConnId) -> integer() when +-spec erlang:abort_connection_id(Node, ConnId) -> boolean() when Node :: atom(), - ConnId :: integer(). + ConnId :: {integer(), dist_handle()}. abort_connection_id(_Node, _ConnId) -> erlang:nif_error(undefined). -- cgit v1.2.3 From 23ba06691aa64d7f05781bc9b2ea448c3710a812 Mon Sep 17 00:00:00 2001 From: Sverker Eriksson Date: Mon, 25 Sep 2017 20:23:06 +0200 Subject: Remove unused ERTS_DSP_RWLOCK --- erts/emulator/beam/dist.h | 19 +++++-------------- 1 file changed, 5 insertions(+), 14 deletions(-) diff --git a/erts/emulator/beam/dist.h b/erts/emulator/beam/dist.h index 2ead588ee9..b9b39de3b6 100644 --- a/erts/emulator/beam/dist.h +++ b/erts/emulator/beam/dist.h @@ -96,8 +96,7 @@ extern Export* dmonitor_p_trap; typedef enum { ERTS_DSP_NO_LOCK, - ERTS_DSP_RLOCK, - ERTS_DSP_RWLOCK + ERTS_DSP_RLOCK } ErtsDSigPrepLock; @@ -182,10 +181,7 @@ erts_dsig_prepare(ErtsDSigData *dsdp, #endif retry: - if (dspl == ERTS_DSP_RWLOCK) - erts_de_rwlock(dep); - else - erts_de_rlock(dep); + erts_de_rlock(dep); if (ERTS_DE_IS_CONNECTED(dep)) { res = ERTS_DSIG_PREP_CONNECTED; @@ -200,10 +196,8 @@ retry: } else if (connect) { ASSERT(dep->status == 0); - if (dspl != ERTS_DSP_RWLOCK) { - erts_de_runlock(dep); - erts_de_rwlock(dep); - } + erts_de_runlock(dep); + erts_de_rwlock(dep); if (dep->status == 0) { Process* net_kernel; ErtsProcLocks nk_locks = ERTS_PROC_LOCK_MSGQ; @@ -265,10 +259,7 @@ retry: return res; fail: - if (dspl == ERTS_DSP_RWLOCK) - erts_de_rwunlock(dep); - else - erts_de_runlock(dep); + erts_de_runlock(dep); if (deref_dep) erts_deref_dist_entry(dep); return res; -- cgit v1.2.3 From 25f9d0469e2b1da0aec73b55e54561be8b573634 Mon Sep 17 00:00:00 2001 From: Sverker Eriksson Date: Tue, 26 Sep 2017 16:01:29 +0200 Subject: Refactor auto_connect into an outline function and abort_pending_connection into own utility function. --- erts/emulator/beam/dist.c | 92 ++++++++++++++++++++++++++++++++++------------- erts/emulator/beam/dist.h | 42 ++++------------------ 2 files changed, 75 insertions(+), 59 deletions(-) diff --git a/erts/emulator/beam/dist.c b/erts/emulator/beam/dist.c index c16579037c..3e8c4c65b5 100644 --- a/erts/emulator/beam/dist.c +++ b/erts/emulator/beam/dist.c @@ -120,6 +120,7 @@ static void clear_dist_entry(DistEntry*); static int dsig_send_ctl(ErtsDSigData* dsdp, Eterm ctl, int force_busy); static void send_nodes_mon_msgs(Process *, Eterm, Eterm, Eterm, Eterm); static void init_nodes_monitors(void); +static Sint abort_pending_connection(DistEntry* dep, Uint32 conn_id); static erts_atomic_t no_caches; static erts_atomic_t no_nodes; @@ -3457,30 +3458,16 @@ BIF_RETTYPE new_connection_id_1(BIF_ALIST_1) BIF_RET(TUPLE2(hp, make_small(conn_id), dhandle)); } -BIF_RETTYPE abort_connection_id_2(BIF_ALIST_2) +static Sint abort_pending_connection(DistEntry* dep, Uint32 conn_id) { - DistEntry* dep; - Eterm* tp; - - if (is_not_atom(BIF_ARG_1) || is_not_tuple_arity(BIF_ARG_2, 2)) { - BIF_ERROR(BIF_P, BADARG); - } - tp = tuple_val(BIF_ARG_2); - dep = erts_dhandle_to_dist_entry(tp[2]); - if (is_not_small(tp[1]) || dep != erts_find_dist_entry(BIF_ARG_1)) { - BIF_ERROR(BIF_P, BADARG); - } - ASSERT(dep != erts_this_dist_entry); /* SVERK: What to do? */ - - if (!dep) { - BIF_RET(am_false); - } + Sint reds = 0; erts_de_rwlock(dep); - if (dep->status == ERTS_DE_SFLG_PENDING - && dep->connection_id == unsigned_val(tp[1])) { - + if (dep->status != ERTS_DE_SFLG_PENDING || dep->connection_id != conn_id) { + erts_de_rwunlock(dep); + } + else { NetExitsContext nec = {dep}; ErtsLink *nlinks; ErtsLink *node_links; @@ -3488,7 +3475,6 @@ BIF_RETTYPE abort_connection_id_2(BIF_ALIST_2) ErtsAtomCache *cache; ErtsDistOutputBuf *obuf; ErtsProcList *resume_procs; - Sint reds = 0; ASSERT(is_nil(dep->cid)); @@ -3529,13 +3515,71 @@ BIF_RETTYPE abort_connection_id_2(BIF_ALIST_2) delete_cache(cache); free_de_out_queues(dep, obuf); + } + return reds; +} - BIF_RET2(am_true, reds); +BIF_RETTYPE abort_connection_id_2(BIF_ALIST_2) +{ + DistEntry* dep; + Eterm* tp; + + if (is_not_atom(BIF_ARG_1) || is_not_tuple_arity(BIF_ARG_2, 2)) { + BIF_ERROR(BIF_P, BADARG); + } + tp = tuple_val(BIF_ARG_2); + dep = erts_dhandle_to_dist_entry(tp[2]); + if (is_not_small(tp[1]) || dep != erts_find_dist_entry(BIF_ARG_1)) { + BIF_ERROR(BIF_P, BADARG); } + ASSERT(dep != erts_this_dist_entry); /* SVERK: What to do? */ - erts_de_rwunlock(dep); + if (dep) { + Sint reds = abort_pending_connection(dep, unsigned_val(tp[1])); + BUMP_REDS(BIF_P, reds); + } + BIF_RET(am_true); +} - BIF_RET(am_false); +int erts_auto_connect(DistEntry* dep, Process *proc, ErtsProcLocks proc_locks) +{ + erts_de_rwlock(dep); + if (dep->status != 0) { + erts_de_rwunlock(dep); + } + else { + Process* net_kernel; + ErtsProcLocks nk_locks = ERTS_PROC_LOCK_MSGQ; + Eterm *hp; + ErlOffHeap *ohp; + ErtsMessage *mp; + Eterm msg, dhandle; + Uint32 conn_id; + + dep->status = ERTS_DE_SFLG_PENDING; + dep->flags = (DFLAG_DIST_MANDATORY | DFLAG_DIST_HOPEFULLY); + dep->connection_id = (dep->connection_id + 1) & ERTS_DIST_CON_ID_MASK; + conn_id = dep->connection_id; + erts_de_rwunlock(dep); + + net_kernel = erts_whereis_process(proc, proc_locks, + am_net_kernel, nk_locks, 0); + if (!net_kernel) { + return 0; + } + + /* Send {auto_connect, Node, ConnId, DHandle} to net_kernel */ + mp = erts_alloc_message_heap(net_kernel, &nk_locks, + 5 + ERTS_MAGIC_REF_THING_SIZE, + &hp, &ohp); + dhandle = erts_build_dhandle(&hp, ohp, dep); + msg = TUPLE4(hp, am_auto_connect, dep->sysname, make_small(conn_id), + dhandle); + erts_queue_message(net_kernel, nk_locks, mp, msg, proc->common.id); + erts_proc_unlock(net_kernel, nk_locks); + } + + return 1; } /**********************************************************************/ diff --git a/erts/emulator/beam/dist.h b/erts/emulator/beam/dist.h index b9b39de3b6..2685a55b97 100644 --- a/erts/emulator/beam/dist.h +++ b/erts/emulator/beam/dist.h @@ -148,6 +148,8 @@ ERTS_GLB_INLINE int erts_dsig_prepare(ErtsDSigData *, ERTS_GLB_INLINE void erts_schedule_dist_command(Port *, DistEntry *); +int erts_auto_connect(DistEntry* dep, Process *proc, ErtsProcLocks proc_locks); + #if ERTS_GLB_INLINE_INCL_FUNC_DEF ERTS_GLB_INLINE int @@ -197,41 +199,11 @@ retry: else if (connect) { ASSERT(dep->status == 0); erts_de_runlock(dep); - erts_de_rwlock(dep); - if (dep->status == 0) { - Process* net_kernel; - ErtsProcLocks nk_locks = ERTS_PROC_LOCK_MSGQ; - Eterm *hp; - ErlOffHeap *ohp; - ErtsMessage *mp; - Eterm msg, conn_id, dhandle; - - dep->status = ERTS_DE_SFLG_PENDING; - dep->flags = (DFLAG_DIST_MANDATORY | DFLAG_DIST_HOPEFULLY); - dep->connection_id++; - dep->connection_id &= ERTS_DIST_CON_ID_MASK; - conn_id = make_small(dep->connection_id); - erts_de_rwunlock(dep); - - net_kernel = erts_whereis_process(proc, proc_locks, - am_net_kernel, nk_locks, 0); - if (!net_kernel) { - if (deref_dep) - erts_deref_dist_entry(dep); - return ERTS_DSIG_PREP_NOT_ALIVE; - } - - /* Send {auto_connect, Node, ConnId, DHandle} to net_kernel */ - mp = erts_alloc_message_heap(net_kernel, &nk_locks, - 5 + ERTS_MAGIC_REF_THING_SIZE, - &hp, &ohp); - dhandle = erts_build_dhandle(&hp, ohp, dep); - msg = TUPLE4(hp, am_auto_connect, dep->sysname, conn_id, dhandle); - erts_queue_message(net_kernel, nk_locks, mp, msg, proc->common.id); - erts_proc_unlock(net_kernel, nk_locks); - } - else - erts_de_rwunlock(dep); + if (!erts_auto_connect(dep, proc, proc_locks)) { + if (deref_dep) + erts_deref_dist_entry(dep); + return ERTS_DSIG_PREP_NOT_ALIVE; + } goto retry; } else { -- cgit v1.2.3 From 6b48ef0226084a7f471d391abd4c1c399068840a Mon Sep 17 00:00:00 2001 From: Sverker Eriksson Date: Wed, 27 Sep 2017 19:02:10 +0200 Subject: erts: Put pending DistrEntry in separate list --- erts/emulator/beam/dist.c | 29 +++++--- erts/emulator/beam/erl_node_tables.c | 128 +++++++++++++++++++++++++---------- erts/emulator/beam/erl_node_tables.h | 3 + 3 files changed, 113 insertions(+), 47 deletions(-) diff --git a/erts/emulator/beam/dist.c b/erts/emulator/beam/dist.c index 3e8c4c65b5..44f56ebe28 100644 --- a/erts/emulator/beam/dist.c +++ b/erts/emulator/beam/dist.c @@ -3062,6 +3062,10 @@ int distribution_info(fmtfn_t to, void *arg) /* Called by break handler */ info_dist_entry(to, arg, dep, 0, 1); } + for (dep = erts_pending_dist_entries; dep; dep = dep->next) { + info_dist_entry(to, arg, dep, 0, 0); + } + for (dep = erts_not_connected_dist_entries; dep; dep = dep->next) { if (dep != erts_this_dist_entry) { info_dist_entry(to, arg, dep, 0, 0); @@ -3441,10 +3445,7 @@ BIF_RETTYPE new_connection_id_1(BIF_ALIST_1) if (ERTS_DE_IS_CONNECTED(dep) || dep->status & ERTS_DE_SFLG_PENDING) conn_id = dep->connection_id; else if (dep->status == 0) { - dep->status = ERTS_DE_SFLG_PENDING; - dep->flags = (DFLAG_DIST_MANDATORY | DFLAG_DIST_HOPEFULLY); - dep->connection_id++; - dep->connection_id &= ERTS_DIST_CON_ID_MASK; + erts_set_dist_entry_pending(dep); conn_id = dep->connection_id; } else { @@ -3489,8 +3490,6 @@ static Sint abort_pending_connection(DistEntry* dep, Uint32 conn_id) cache = dep->cache; dep->cache = NULL; - dep->status = 0; - dep->flags = 0; erts_mtx_lock(&dep->qlock); obuf = dep->out_queue.first; dep->out_queue.first = NULL; @@ -3501,6 +3500,9 @@ static Sint abort_pending_connection(DistEntry* dep, Uint32 conn_id) erts_mtx_unlock(&dep->qlock); erts_atomic_set_nob(&dep->dist_cmd_scheduled, 0); dep->send = NULL; + + erts_set_dist_entry_not_connected(dep); + erts_de_rwunlock(dep); erts_sweep_monitors(monitors, &doit_monitor_net_exits, &nec); @@ -3556,9 +3558,7 @@ int erts_auto_connect(DistEntry* dep, Process *proc, ErtsProcLocks proc_locks) Eterm msg, dhandle; Uint32 conn_id; - dep->status = ERTS_DE_SFLG_PENDING; - dep->flags = (DFLAG_DIST_MANDATORY | DFLAG_DIST_HOPEFULLY); - dep->connection_id = (dep->connection_id + 1) & ERTS_DIST_CON_ID_MASK; + erts_set_dist_entry_pending(dep); conn_id = dep->connection_id; erts_de_rwunlock(dep); @@ -3655,9 +3655,11 @@ BIF_RETTYPE nodes_1(BIF_ALIST_1) ASSERT(erts_no_of_not_connected_dist_entries > 0); ASSERT(erts_no_of_hidden_dist_entries >= 0); + ASSERT(erts_no_of_pending_dist_entries >= 0); ASSERT(erts_no_of_visible_dist_entries >= 0); if(not_connected) - length += (erts_no_of_not_connected_dist_entries - 1); + length += ((erts_no_of_not_connected_dist_entries - 1) + + erts_no_of_pending_dist_entries); if(hidden) length += erts_no_of_hidden_dist_entries; if(visible) @@ -3677,13 +3679,18 @@ BIF_RETTYPE nodes_1(BIF_ALIST_1) #ifdef DEBUG endp = hp + length*2; #endif - if(not_connected) + if(not_connected) { for(dep = erts_not_connected_dist_entries; dep; dep = dep->next) { if (dep != erts_this_dist_entry) { result = CONS(hp, dep->sysname, result); hp += 2; } + } + for(dep = erts_pending_dist_entries; dep; dep = dep->next) { + result = CONS(hp, dep->sysname, result); + hp += 2; } + } if(hidden) for(dep = erts_hidden_dist_entries; dep; dep = dep->next) { result = CONS(hp, dep->sysname, result); diff --git a/erts/emulator/beam/erl_node_tables.c b/erts/emulator/beam/erl_node_tables.c index 7c75ec60e9..eaf133f5c0 100644 --- a/erts/emulator/beam/erl_node_tables.c +++ b/erts/emulator/beam/erl_node_tables.c @@ -39,9 +39,11 @@ erts_rwmtx_t erts_node_table_rwmtx; DistEntry *erts_hidden_dist_entries; DistEntry *erts_visible_dist_entries; +DistEntry *erts_pending_dist_entries; DistEntry *erts_not_connected_dist_entries; /* including erts_this_dist_entry */ Sint erts_no_of_hidden_dist_entries; Sint erts_no_of_visible_dist_entries; +Sint erts_no_of_pending_dist_entries; Sint erts_no_of_not_connected_dist_entries; /* including erts_this_dist_entry */ DistEntry *erts_this_dist_entry; @@ -522,12 +524,17 @@ erts_dist_table_size(void) i++; ASSERT(i == erts_no_of_hidden_dist_entries); i = 0; + for(dep = erts_pending_dist_entries; dep; dep = dep->next) + i++; + ASSERT(i == erts_no_of_pending_dist_entries); + i = 0; for(dep = erts_not_connected_dist_entries; dep; dep = dep->next) i++; ASSERT(i == erts_no_of_not_connected_dist_entries); ASSERT(dist_entries == (erts_no_of_visible_dist_entries + erts_no_of_hidden_dist_entries + + erts_no_of_pending_dist_entries + erts_no_of_not_connected_dist_entries)); #endif @@ -542,43 +549,46 @@ erts_dist_table_size(void) void erts_set_dist_entry_not_connected(DistEntry *dep) { + DistEntry** head; + ERTS_LC_ASSERT(erts_lc_is_de_rwlocked(dep)); erts_rwmtx_rwlock(&erts_dist_table_rwmtx); ASSERT(dep != erts_this_dist_entry); - ASSERT(is_internal_port(dep->cid) || is_internal_pid(dep->cid)); - if(dep->flags & DFLAG_PUBLISHED) { - if(dep->prev) { - ASSERT(is_in_de_list(dep, erts_visible_dist_entries)); - dep->prev->next = dep->next; - } - else { - ASSERT(erts_visible_dist_entries == dep); - erts_visible_dist_entries = dep->next; - } - - ASSERT(erts_no_of_visible_dist_entries > 0); - erts_no_of_visible_dist_entries--; + if (dep->status & ERTS_DE_SFLG_PENDING) { + ASSERT(is_nil(dep->cid)); + ASSERT(erts_no_of_pending_dist_entries > 0); + erts_no_of_pending_dist_entries--; + head = &erts_pending_dist_entries; } else { - if(dep->prev) { - ASSERT(is_in_de_list(dep, erts_hidden_dist_entries)); - dep->prev->next = dep->next; - } - else { - ASSERT(erts_hidden_dist_entries == dep); - erts_hidden_dist_entries = dep->next; - } - - ASSERT(erts_no_of_hidden_dist_entries > 0); - erts_no_of_hidden_dist_entries--; + ASSERT(dep->status != 0); + ASSERT(is_internal_port(dep->cid) || is_internal_pid(dep->cid)); + if (dep->flags & DFLAG_PUBLISHED) { + ASSERT(erts_no_of_visible_dist_entries > 0); + erts_no_of_visible_dist_entries--; + head = &erts_visible_dist_entries; + } + else { + ASSERT(erts_no_of_hidden_dist_entries > 0); + erts_no_of_hidden_dist_entries--; + head = &erts_hidden_dist_entries; + } } + if(dep->prev) { + ASSERT(is_in_de_list(dep, *head)); + dep->prev->next = dep->next; + } + else { + ASSERT(*head == dep); + *head = dep->next; + } if(dep->next) dep->next->prev = dep->prev; - dep->status &= ~ERTS_DE_SFLG_CONNECTED; + dep->status &= ~(ERTS_DE_SFLG_PENDING | ERTS_DE_SFLG_CONNECTED); dep->flags = 0; dep->prev = NULL; dep->cid = NIL; @@ -593,6 +603,45 @@ erts_set_dist_entry_not_connected(DistEntry *dep) erts_rwmtx_rwunlock(&erts_dist_table_rwmtx); } +void +erts_set_dist_entry_pending(DistEntry *dep) +{ + ERTS_LC_ASSERT(erts_lc_is_de_rwlocked(dep)); + erts_rwmtx_rwlock(&erts_dist_table_rwmtx); + + ASSERT(dep != erts_this_dist_entry); + ASSERT(dep->status == 0); + ASSERT(is_nil(dep->cid)); + + if(dep->prev) { + ASSERT(is_in_de_list(dep, erts_not_connected_dist_entries)); + dep->prev->next = dep->next; + } + else { + ASSERT(dep == erts_not_connected_dist_entries); + erts_not_connected_dist_entries = dep->next; + } + + if(dep->next) + dep->next->prev = dep->prev; + + erts_no_of_not_connected_dist_entries--; + + dep->status = ERTS_DE_SFLG_PENDING; + dep->flags = (DFLAG_DIST_MANDATORY | DFLAG_DIST_HOPEFULLY); + dep->connection_id = (dep->connection_id + 1) & ERTS_DIST_CON_ID_MASK; + + dep->prev = NULL; + dep->next = erts_pending_dist_entries; + if(erts_pending_dist_entries) { + ASSERT(erts_pending_dist_entries->prev == NULL); + erts_pending_dist_entries->prev = dep; + } + erts_pending_dist_entries = dep; + erts_no_of_pending_dist_entries++; + erts_rwmtx_rwunlock(&erts_dist_table_rwmtx); +} + void erts_set_dist_entry_connected(DistEntry *dep, Eterm cid, Uint flags) { @@ -603,29 +652,25 @@ erts_set_dist_entry_connected(DistEntry *dep, Eterm cid, Uint flags) ASSERT(dep != erts_this_dist_entry); ASSERT(is_nil(dep->cid)); + ASSERT(dep->status & ERTS_DE_SFLG_PENDING); ASSERT(is_internal_port(cid) || is_internal_pid(cid)); if(dep->prev) { - ASSERT(is_in_de_list(dep, erts_not_connected_dist_entries)); + ASSERT(is_in_de_list(dep, erts_pending_dist_entries)); dep->prev->next = dep->next; } else { - ASSERT(erts_not_connected_dist_entries == dep); - erts_not_connected_dist_entries = dep->next; + ASSERT(erts_pending_dist_entries == dep); + erts_pending_dist_entries = dep->next; } if(dep->next) dep->next->prev = dep->prev; - ASSERT(erts_no_of_not_connected_dist_entries > 0); - erts_no_of_not_connected_dist_entries--; + ASSERT(erts_no_of_pending_dist_entries > 0); + erts_no_of_pending_dist_entries--; - if (dep->status & ERTS_DE_SFLG_PENDING) { - dep->status &= ~ERTS_DE_SFLG_PENDING; - } else { - dep->connection_id++; - dep->connection_id &= ERTS_DIST_CON_ID_MASK; - } + dep->status &= ~ERTS_DE_SFLG_PENDING; dep->status |= ERTS_DE_SFLG_CONNECTED; dep->flags = flags & ~DFLAG_NO_MAGIC; dep->cid = cid; @@ -959,9 +1004,11 @@ void erts_init_node_tables(int dd_sec) erts_hidden_dist_entries = NULL; erts_visible_dist_entries = NULL; + erts_pending_dist_entries = NULL; erts_not_connected_dist_entries = NULL; erts_no_of_hidden_dist_entries = 0; erts_no_of_visible_dist_entries = 0; + erts_no_of_pending_dist_entries = 0; erts_no_of_not_connected_dist_entries = 0; node_tmpl.sysname = am_Noname; @@ -1729,6 +1776,15 @@ setup_reference_table(void) insert_monitors(dep->monitors, dep->sysname); } + for(dep = erts_pending_dist_entries; dep; dep = dep->next) { + if(dep->nlinks) + insert_links2(dep->nlinks, dep->sysname); + if(dep->node_links) + insert_links(dep->node_links, dep->sysname); + if(dep->monitors) + insert_monitors(dep->monitors, dep->sysname); + } + /* Not connected dist entries should not have any links, but inspect them anyway */ for(dep = erts_not_connected_dist_entries; dep; dep = dep->next) { diff --git a/erts/emulator/beam/erl_node_tables.h b/erts/emulator/beam/erl_node_tables.h index 83d0678bbc..8d29c83e15 100644 --- a/erts/emulator/beam/erl_node_tables.h +++ b/erts/emulator/beam/erl_node_tables.h @@ -177,9 +177,11 @@ extern erts_rwmtx_t erts_node_table_rwmtx; extern DistEntry *erts_hidden_dist_entries; extern DistEntry *erts_visible_dist_entries; +extern DistEntry *erts_pending_dist_entries; extern DistEntry *erts_not_connected_dist_entries; extern Sint erts_no_of_hidden_dist_entries; extern Sint erts_no_of_visible_dist_entries; +extern Sint erts_no_of_pending_dist_entries; extern Sint erts_no_of_not_connected_dist_entries; extern DistEntry *erts_this_dist_entry; @@ -195,6 +197,7 @@ void erts_schedule_delete_dist_entry(DistEntry *); Uint erts_dist_table_size(void); void erts_dist_table_info(fmtfn_t, void *); void erts_set_dist_entry_not_connected(DistEntry *); +void erts_set_dist_entry_pending(DistEntry *); void erts_set_dist_entry_connected(DistEntry *, Eterm, Uint); ErlNode *erts_find_or_insert_node(Eterm, Uint32); void erts_schedule_delete_node(ErlNode *); -- cgit v1.2.3 From 0a6ccd3d1fb1d9cacd7369533d6e5aa1fc2ded77 Mon Sep 17 00:00:00 2001 From: Sverker Eriksson Date: Wed, 27 Sep 2017 19:06:23 +0200 Subject: Abort all pending connections if net_kernel terminates --- erts/emulator/beam/dist.c | 66 ++++++++++++++++++++++++++++++----------------- 1 file changed, 43 insertions(+), 23 deletions(-) diff --git a/erts/emulator/beam/dist.c b/erts/emulator/beam/dist.c index 44f56ebe28..1b5b730764 100644 --- a/erts/emulator/beam/dist.c +++ b/erts/emulator/beam/dist.c @@ -476,41 +476,54 @@ int erts_do_net_exits(DistEntry *dep, Eterm reason) if (dep == erts_this_dist_entry) { /* Net kernel has died (clean up!!) */ DistEntry *tdep; - int no_dist_ctrl = 0; + int no_dist_ctrl; + int no_pending; Eterm nd_reason = (reason == am_no_network ? am_no_network : am_net_kernel_terminated); + int i = 0; + Eterm *dist_ctrl; + DistEntry** pending; + + ERTS_UNDEF(dist_ctrl, NULL); + ERTS_UNDEF(pending, NULL); + erts_rwmtx_rlock(&erts_dist_table_rwmtx); - for (tdep = erts_hidden_dist_entries; tdep; tdep = tdep->next) - no_dist_ctrl++; - for (tdep = erts_visible_dist_entries; tdep; tdep = tdep->next) - no_dist_ctrl++; + no_dist_ctrl = (erts_no_of_hidden_dist_entries + + erts_no_of_visible_dist_entries); + no_pending = erts_no_of_pending_dist_entries; /* KILL all port controllers */ - if (no_dist_ctrl == 0) - erts_rwmtx_runlock(&erts_dist_table_rwmtx); - else { - Eterm def_buf[128]; - int i = 0; - Eterm *dist_ctrl; - - if (no_dist_ctrl <= sizeof(def_buf)/sizeof(def_buf[0])) - dist_ctrl = &def_buf[0]; - else - dist_ctrl = erts_alloc(ERTS_ALC_T_TMP, - sizeof(Eterm)*no_dist_ctrl); + if (no_dist_ctrl) { + dist_ctrl = erts_alloc(ERTS_ALC_T_TMP, + sizeof(Eterm)*no_dist_ctrl); for (tdep = erts_hidden_dist_entries; tdep; tdep = tdep->next) { ASSERT(is_internal_port(tdep->cid) || is_internal_pid(tdep->cid)); + ASSERT(i < no_dist_ctrl); dist_ctrl[i++] = tdep->cid; } for (tdep = erts_visible_dist_entries; tdep; tdep = tdep->next) { ASSERT(is_internal_port(tdep->cid) || is_internal_pid(tdep->cid)); + ASSERT(i < no_dist_ctrl); dist_ctrl[i++] = tdep->cid; } - erts_rwmtx_runlock(&erts_dist_table_rwmtx); + ASSERT(i == no_dist_ctrl); + } + if (no_pending) { + pending = erts_alloc(ERTS_ALC_T_TMP, sizeof(DistEntry*)*no_pending); + i = 0; + for (tdep = erts_pending_dist_entries; tdep; tdep = tdep->next) { + ASSERT(is_nil(tdep->cid)); + ASSERT(i < no_pending); + pending[i++] = tdep; + } + ASSERT(i == no_pending); + } + erts_rwmtx_runlock(&erts_dist_table_rwmtx); - for (i = 0; i < no_dist_ctrl; i++) { + if (no_dist_ctrl) { + for (i = 0; i < no_dist_ctrl; i++) { if (is_internal_pid(dist_ctrl[i])) schedule_kill_dist_ctrl_proc(dist_ctrl[i]); else { @@ -524,11 +537,17 @@ int erts_do_net_exits(DistEntry *dep, Eterm reason) prt, dist_ctrl[i], nd_reason, NULL); } } - } + } + erts_free(ERTS_ALC_T_TMP, dist_ctrl); + } + + if (no_pending) { + for (i = 0; i < no_pending; i++) { + abort_pending_connection(pending[i], pending[i]->connection_id); + } + erts_free(ERTS_ALC_T_TMP, pending); + } - if (dist_ctrl != &def_buf[0]) - erts_free(ERTS_ALC_T_TMP, dist_ctrl); - } /* * When last dist ctrl exits, node will be taken @@ -3565,6 +3584,7 @@ int erts_auto_connect(DistEntry* dep, Process *proc, ErtsProcLocks proc_locks) net_kernel = erts_whereis_process(proc, proc_locks, am_net_kernel, nk_locks, 0); if (!net_kernel) { + abort_pending_connection(dep, conn_id); return 0; } -- cgit v1.2.3 From 6c4a3094d0263e46827c6b1869a6de0c033b6b64 Mon Sep 17 00:00:00 2001 From: Sverker Eriksson Date: Wed, 27 Sep 2017 19:45:42 +0200 Subject: Improve connection aborting --- erts/emulator/beam/dist.c | 120 ++++++++++++++++++++++++++++-------------- erts/emulator/beam/dist.h | 3 +- lib/kernel/src/net_kernel.erl | 32 ++++++----- 3 files changed, 101 insertions(+), 54 deletions(-) diff --git a/erts/emulator/beam/dist.c b/erts/emulator/beam/dist.c index 1b5b730764..4a25fbdd4c 100644 --- a/erts/emulator/beam/dist.c +++ b/erts/emulator/beam/dist.c @@ -120,7 +120,7 @@ static void clear_dist_entry(DistEntry*); static int dsig_send_ctl(ErtsDSigData* dsdp, Eterm ctl, int force_busy); static void send_nodes_mon_msgs(Process *, Eterm, Eterm, Eterm, Eterm); static void init_nodes_monitors(void); -static Sint abort_pending_connection(DistEntry* dep, Uint32 conn_id); +static Sint abort_connection(DistEntry* dep, Uint32 conn_id); static erts_atomic_t no_caches; static erts_atomic_t no_nodes; @@ -517,6 +517,7 @@ int erts_do_net_exits(DistEntry *dep, Eterm reason) ASSERT(is_nil(tdep->cid)); ASSERT(i < no_pending); pending[i++] = tdep; + erts_ref_dist_entry(tdep); } ASSERT(i == no_pending); } @@ -543,7 +544,8 @@ int erts_do_net_exits(DistEntry *dep, Eterm reason) if (no_pending) { for (i = 0; i < no_pending; i++) { - abort_pending_connection(pending[i], pending[i]->connection_id); + abort_connection(pending[i], pending[i]->connection_id); + erts_deref_dist_entry(pending[i]); } erts_free(ERTS_ALC_T_TMP, pending); } @@ -624,7 +626,6 @@ int erts_do_net_exits(DistEntry *dep, Eterm reason) reason == am_normal ? am_connection_closed : reason); clear_dist_entry(dep); - } dec_no_nodes(); @@ -2904,24 +2905,31 @@ erts_dist_port_not_busy(Port *prt) erts_schedule_dist_command(prt, NULL); } +static void kill_connection(DistEntry *dep) +{ + ERTS_LC_ASSERT(erts_lc_is_de_rwlocked(dep)); + ASSERT(dep->status == ERTS_DE_SFLG_CONNECTED); + + dep->status |= ERTS_DE_SFLG_EXITING; + erts_mtx_lock(&dep->qlock); + ASSERT(!(erts_atomic32_read_nob(&dep->qflgs) & ERTS_DE_QFLG_EXIT)); + erts_atomic32_read_bor_nob(&dep->qflgs, ERTS_DE_QFLG_EXIT); + erts_mtx_unlock(&dep->qlock); + + if (is_internal_port(dep->cid)) + erts_schedule_dist_command(NULL, dep); + else if (is_internal_pid(dep->cid)) + schedule_kill_dist_ctrl_proc(dep->cid); +} + void erts_kill_dist_connection(DistEntry *dep, Uint32 connection_id) { erts_de_rwlock(dep); if (connection_id == dep->connection_id - && !(dep->status & ERTS_DE_SFLG_EXITING)) { - - dep->status |= ERTS_DE_SFLG_EXITING; - - erts_mtx_lock(&dep->qlock); - ASSERT(!(erts_atomic32_read_nob(&dep->qflgs) & ERTS_DE_QFLG_EXIT)); - erts_atomic32_read_bor_nob(&dep->qflgs, ERTS_DE_QFLG_EXIT); - erts_mtx_unlock(&dep->qlock); + && dep->status == ERTS_DE_SFLG_CONNECTED) { - if (is_internal_port(dep->cid)) - erts_schedule_dist_command(NULL, dep); - else if (is_internal_pid(dep->cid)) - schedule_kill_dist_ctrl_proc(dep->cid); + kill_connection(dep); } erts_de_rwunlock(dep); } @@ -3259,6 +3267,11 @@ BIF_RETTYPE setnode_3(BIF_ALIST_3) goto badarg; } + /* + * ToDo: Should we not pass connection_id as well + * to make sure it's the right connection we commit. + */ + /* * Arguments seem to be in order. */ @@ -3302,6 +3315,23 @@ BIF_RETTYPE setnode_3(BIF_ALIST_3) goto badarg; } + if (dep->status & ERTS_DE_SFLG_EXITING) { + /* Suspend on dist entry waiting for the exit to finish */ + ErtsProcList *plp = erts_proclist_create(BIF_P); + plp->next = NULL; + erts_suspend(BIF_P, ERTS_PROC_LOCK_MAIN, NULL); + erts_mtx_lock(&dep->qlock); + erts_proclist_store_last(&dep->suspended, plp); + erts_mtx_unlock(&dep->qlock); + goto yield; + } + if (dep->status != ERTS_DE_SFLG_PENDING) { + if (dep->status == 0) + erts_set_dist_entry_pending(dep); + else + goto badarg; + } + if (is_not_nil(dep->cid)) goto badarg; @@ -3344,8 +3374,12 @@ BIF_RETTYPE setnode_3(BIF_ALIST_3) erts_mtx_unlock(&dep->qlock); goto yield; } - - ASSERT(!(dep->status & ERTS_DE_SFLG_EXITING)); + if (dep->status != ERTS_DE_SFLG_PENDING) { + if (dep->status == 0) + erts_set_dist_entry_pending(dep); + else + goto badarg; + } if (pp->dist_entry || is_not_nil(dep->cid)) goto badarg; @@ -3457,7 +3491,11 @@ BIF_RETTYPE new_connection_id_1(BIF_ALIST_1) BIF_ERROR(BIF_P, BADARG); } dep = erts_find_or_insert_dist_entry(BIF_ARG_1); - ASSERT(dep != erts_this_dist_entry); /* SVERK: What to do? */ + + if (dep == erts_this_dist_entry) { + erts_deref_dist_entry(dep); + BIF_ERROR(BIF_P, BADARG); + } erts_de_rwlock(dep); @@ -3468,8 +3506,8 @@ BIF_RETTYPE new_connection_id_1(BIF_ALIST_1) conn_id = dep->connection_id; } else { - ASSERT(!"SVERK: What to do?"); - conn_id = dep->connection_id; + ASSERT(dep->status & ERTS_DE_SFLG_EXITING); + conn_id = (dep->connection_id + 1) & ERTS_DIST_CON_ID_MASK; } erts_de_rwunlock(dep); hp = HAlloc(BIF_P, 3 + ERTS_MAGIC_REF_THING_SIZE); @@ -3478,23 +3516,24 @@ BIF_RETTYPE new_connection_id_1(BIF_ALIST_1) BIF_RET(TUPLE2(hp, make_small(conn_id), dhandle)); } -static Sint abort_pending_connection(DistEntry* dep, Uint32 conn_id) +static Sint abort_connection(DistEntry* dep, Uint32 conn_id) { - Sint reds = 0; - erts_de_rwlock(dep); - if (dep->status != ERTS_DE_SFLG_PENDING || dep->connection_id != conn_id) { - erts_de_rwunlock(dep); + if (dep->connection_id != conn_id) + ; + else if (dep->status == ERTS_DE_SFLG_CONNECTED) { + kill_connection(dep); } - else { - NetExitsContext nec = {dep}; - ErtsLink *nlinks; - ErtsLink *node_links; - ErtsMonitor *monitors; - ErtsAtomCache *cache; - ErtsDistOutputBuf *obuf; + else if (dep->status == ERTS_DE_SFLG_PENDING) { + NetExitsContext nec = {dep}; + ErtsLink *nlinks; + ErtsLink *node_links; + ErtsMonitor *monitors; + ErtsAtomCache *cache; + ErtsDistOutputBuf *obuf; ErtsProcList *resume_procs; + Sint reds = 0; ASSERT(is_nil(dep->cid)); @@ -3534,10 +3573,11 @@ static Sint abort_pending_connection(DistEntry* dep, Uint32 conn_id) } delete_cache(cache); - free_de_out_queues(dep, obuf); + return reds; } - return reds; + erts_de_rwunlock(dep); + return 0; } BIF_RETTYPE abort_connection_id_2(BIF_ALIST_2) @@ -3550,13 +3590,13 @@ BIF_RETTYPE abort_connection_id_2(BIF_ALIST_2) } tp = tuple_val(BIF_ARG_2); dep = erts_dhandle_to_dist_entry(tp[2]); - if (is_not_small(tp[1]) || dep != erts_find_dist_entry(BIF_ARG_1)) { + if (is_not_small(tp[1]) || dep != erts_find_dist_entry(BIF_ARG_1) + || dep == erts_this_dist_entry) { BIF_ERROR(BIF_P, BADARG); } - ASSERT(dep != erts_this_dist_entry); /* SVERK: What to do? */ if (dep) { - Sint reds = abort_pending_connection(dep, unsigned_val(tp[1])); + Sint reds = abort_connection(dep, unsigned_val(tp[1])); BUMP_REDS(BIF_P, reds); } BIF_RET(am_true); @@ -3584,11 +3624,13 @@ int erts_auto_connect(DistEntry* dep, Process *proc, ErtsProcLocks proc_locks) net_kernel = erts_whereis_process(proc, proc_locks, am_net_kernel, nk_locks, 0); if (!net_kernel) { - abort_pending_connection(dep, conn_id); + abort_connection(dep, conn_id); return 0; } - /* Send {auto_connect, Node, ConnId, DHandle} to net_kernel */ + /* + * Send {auto_connect, Node, ConnId, DHandle} to net_kernel + */ mp = erts_alloc_message_heap(net_kernel, &nk_locks, 5 + ERTS_MAGIC_REF_THING_SIZE, &hp, &ohp); diff --git a/erts/emulator/beam/dist.h b/erts/emulator/beam/dist.h index 2685a55b97..a205ce0cb5 100644 --- a/erts/emulator/beam/dist.h +++ b/erts/emulator/beam/dist.h @@ -172,7 +172,7 @@ erts_dsig_prepare(ErtsDSigData *dsdp, return ERTS_DSIG_PREP_NOT_CONNECTED; dep = erts_find_or_insert_dist_entry(dsdp->node); - ASSERT(dep != erts_this_dist_entry); /* SVERK: What to do? */ + ASSERT(dep != erts_this_dist_entry); deref_dep = 1; } @@ -192,7 +192,6 @@ retry: res = ERTS_DSIG_PREP_PENDING; } else if (dep->status & ERTS_DE_SFLG_EXITING) { - /* SVERK is this ok, or should we trigger another connection setup */ res = ERTS_DSIG_PREP_NOT_CONNECTED; goto fail; } diff --git a/lib/kernel/src/net_kernel.erl b/lib/kernel/src/net_kernel.erl index eee915b15b..fb17e7c1b6 100644 --- a/lib/kernel/src/net_kernel.erl +++ b/lib/kernel/src/net_kernel.erl @@ -796,19 +796,23 @@ handle_info({AcceptPid, {accept_pending,MyNode,Node,Address,Type}}, State) -> AcceptPid ! {self(), {accept_pending, already_pending}}, {noreply, State}; _ -> - ConnId = case (catch erlang:new_connection_id(Node)) of - {Nr,_DHandle}=CI when is_integer(Nr) -> CI - %% SVERK What to do? - end, - ets:insert(sys_dist, #connection{node = Node, - conn_id = ConnId, - state = pending, - owner = AcceptPid, - address = Address, - type = Type}), - AcceptPid ! {self(),{accept_pending,ok}}, - Owners = [{AcceptPid,Node} | State#state.conn_owners], - {noreply, State#state{conn_owners = Owners}} + case (catch erlang:new_connection_id(Node)) of + {Nr,_DHandle}=ConnId when is_integer(Nr) -> + ets:insert(sys_dist, #connection{node = Node, + conn_id = ConnId, + state = pending, + owner = AcceptPid, + address = Address, + type = Type}), + AcceptPid ! {self(),{accept_pending,ok}}, + Owners = [{AcceptPid,Node} | State#state.conn_owners], + {noreply, State#state{conn_owners = Owners}}; + + _ -> + error_logger:error_msg("~n** Cannot get connection id for node ~w~n", + [Node]), + AcceptPid ! {self(),{accept_pending,nok_pending}} + end end; handle_info({SetupPid, {is_pending, Node}}, State) -> @@ -1009,7 +1013,9 @@ up_pending_nodedown(Conn, Node, _Reason, _Type, State) -> AcceptPid = Conn#connection.pending_owner, Owners = State#state.conn_owners, Pend = lists:keydelete(AcceptPid, 1, State#state.pend_owners), + erlang:abort_connection_id(Node, Conn#connection.conn_id), Conn1 = Conn#connection { owner = AcceptPid, + conn_id = erlang:new_connection_id(Node), pending_owner = undefined, state = pending }, ets:insert(sys_dist, Conn1), -- cgit v1.2.3 From 06ff5ffb402f00752b5415349ed817a40a76218f Mon Sep 17 00:00:00 2001 From: Sverker Eriksson Date: Wed, 11 Oct 2017 19:35:13 +0200 Subject: Cleanup net_kernel --- erts/preloaded/src/erlang.erl | 6 ------ lib/kernel/src/net_kernel.erl | 49 ++++++++++++++++++++----------------------- 2 files changed, 23 insertions(+), 32 deletions(-) diff --git a/erts/preloaded/src/erlang.erl b/erts/preloaded/src/erlang.erl index 2dc2c48f0c..ed9b7d9785 100644 --- a/erts/preloaded/src/erlang.erl +++ b/erts/preloaded/src/erlang.erl @@ -3274,12 +3274,6 @@ dist_ctrl_get_data_notification(_DHandle) -> dist_get_stat(_DHandle) -> erlang:nif_error(undefined). -%% -%% If the emulator wants to perform a distributed command and -%% a connection is not established to the actual node the following -%% functions are called in order to set up the connection and then -%% reactivate the command. -%% dmonitor_node(Node, _Flag, []) -> %% Only called when auto-connect attempt failed early in VM diff --git a/lib/kernel/src/net_kernel.erl b/lib/kernel/src/net_kernel.erl index fb17e7c1b6..3c4e8df88f 100644 --- a/lib/kernel/src/net_kernel.erl +++ b/lib/kernel/src/net_kernel.erl @@ -70,8 +70,8 @@ protocol_childspecs/0, epmd_module/0]). --export([connect/1, disconnect/1, hidden_connect/1, passive_cnct/1]). --export([hidden_connect_node/1]). %% explicit connect +-export([disconnect/1, passive_cnct/1]). +-export([hidden_connect_node/1]). -export([set_net_ticktime/1, set_net_ticktime/2, get_net_ticktime/0]). -export([node_info/1, node_info/2, nodes_info/0, @@ -248,14 +248,15 @@ ticktime_res(A) when is_atom(A) -> A. %% Called though BIF's -connect(Node) -> auto_connect(Node, normal, false). %%% Long timeout if blocked (== barred), only affects nodes with %%% {dist_auto_connect, once} set. -passive_cnct(Node) -> auto_connect(Node, normal, true). -disconnect(Node) -> request({disconnect, Node}). +passive_cnct(Node) -> + case request({passive_cnct, Node}) of + ignored -> false; + Other -> Other + end. -%% connect but not seen -hidden_connect(Node) -> auto_connect(Node, hidden, false). +disconnect(Node) -> request({disconnect, Node}). %% Should this node publish itself on Node? publish_on_node(Node) when is_atom(Node) -> @@ -273,11 +274,6 @@ connect_node(Node) when is_atom(Node) -> hidden_connect_node(Node) when is_atom(Node) -> request({connect, hidden, Node}). -auto_connect(Node, Type, WaitForBarred) -> %% Type = normal | hidden - case request({auto_connect, Type, Node, WaitForBarred}) of - ignored -> false; - Other -> Other - end. passive_connect_monitor(From, Node) -> ok = monitor_nodes(true,[{node_type,all}]), @@ -363,7 +359,7 @@ init({Name, LongOrShortNames, TickT, CleanHalt}) -> end. -handle_auto_connect(Type, Node, ConnId, WaitForBarred, From, State) -> +do_auto_connect(Type, Node, ConnId, WaitForBarred, From, State) -> ConnLookup = ets:lookup(sys_dist, Node), case ConnLookup of @@ -412,18 +408,18 @@ handle_auto_connect(Type, Node, ConnId, WaitForBarred, From, State) -> end. -handle_connect([#connection{conn_id = ConnId, state = up}], _, _, ConnId, _From, State) -> +do_explicit_connect([#connection{conn_id = ConnId, state = up}], _, _, ConnId, _From, State) -> {reply, true, State}; -handle_connect([#connection{conn_id = ConnId}=Conn], _, _, ConnId, From, State) +do_explicit_connect([#connection{conn_id = ConnId}=Conn], _, _, ConnId, From, State) when Conn#connection.state =:= pending; Conn#connection.state =:= up_pending -> Waiting = Conn#connection.waiting, ets:insert(sys_dist, Conn#connection{waiting = [From|Waiting]}), {noreply, State}; -handle_connect([#barred_connection{}], Type, Node, ConnId, From , State) -> +do_explicit_connect([#barred_connection{}], Type, Node, ConnId, From , State) -> %% Barred connection only affects auto_connect, ignore it. - handle_connect([], Type, Node, ConnId, From , State); -handle_connect(ConnLookup, Type, Node, ConnId, From , State) -> + do_explicit_connect([], Type, Node, ConnId, From , State); +do_explicit_connect(ConnLookup, Type, Node, ConnId, From , State) -> case setup(ConnLookup, Node,ConnId,Type,From,State) of {ok, SetupPid} -> Owners = [{SetupPid, Node} | State#state.conn_owners], @@ -451,17 +447,18 @@ verify_new_conn_id(_, _) -> %% ------------------------------------------------------------ %% -%% Auto-connect to Node. +%% Passive auto-connect to Node. %% The response is delayed until the connection is up and running. %% -handle_call({auto_connect, _, Node, _}, From, State) when Node =:= node() -> +handle_call({passive_cnct, Node}, From, State) when Node =:= node() -> async_reply({reply, true, State}, From); -handle_call({auto_connect, Type, Node, WaitForBarred}, From, State) -> - verbose({auto_connect, Type, Node, WaitForBarred}, 1, State), - +handle_call({passive_cnct, Node}, From, State) -> + verbose({passive_cnct, Node}, 1, State), + Type = normal, + WaitForBarred = true, R = case (catch erlang:new_connection_id(Node)) of {Nr,_DHandle}=ConnId when is_integer(Nr) -> - handle_auto_connect(Type, Node, ConnId, WaitForBarred, From, State); + do_auto_connect(Type, Node, ConnId, WaitForBarred, From, State); _Error -> error_logger:error_msg("~n** Cannot get connection id for node ~w~n", @@ -482,7 +479,7 @@ handle_call({connect, Type, Node}, From, State) -> ConnLookup = ets:lookup(sys_dist, Node), R = case (catch erlang:new_connection_id(Node)) of {Nr,_DHandle}=ConnId when is_integer(Nr) -> - handle_connect(ConnLookup, Type, Node, ConnId, From, State); + do_explicit_connect(ConnLookup, Type, Node, ConnId, From, State); _Error -> error_logger:error_msg("~n** Cannot get connection id for node ~w~n", @@ -703,7 +700,7 @@ handle_info({auto_connect,Node, Nr, DHandle}, State) -> verbose({auto_connect, Node, Nr, DHandle}, 1, State), ConnId = {Nr, DHandle}, NewState = - case handle_auto_connect(normal, Node, ConnId, false, noreply, State) of + case do_auto_connect(normal, Node, ConnId, false, noreply, State) of {noreply, S} -> %% Pending connection S; -- cgit v1.2.3 From 2f7a0afbbf34699ac1cc1733e3634c1cbeb94c5d Mon Sep 17 00:00:00 2001 From: Sverker Eriksson Date: Wed, 11 Oct 2017 20:38:33 +0200 Subject: Refactor erts_dsig_prepare argument dep(p) Don't need to be pointer-pointer --- erts/emulator/beam/bif.c | 28 +++++++++++++++++----------- erts/emulator/beam/dist.c | 6 +++--- erts/emulator/beam/dist.h | 21 ++++----------------- erts/emulator/beam/erl_process.c | 8 ++++---- erts/emulator/beam/io.c | 2 +- 5 files changed, 29 insertions(+), 36 deletions(-) diff --git a/erts/emulator/beam/bif.c b/erts/emulator/beam/bif.c index 30278cbe36..50699eac31 100644 --- a/erts/emulator/beam/bif.c +++ b/erts/emulator/beam/bif.c @@ -227,7 +227,7 @@ BIF_RETTYPE link_1(BIF_ALIST_1) goto res_no_proc; } - code = erts_dsig_prepare(&dsd, &dep, BIF_P, + code = erts_dsig_prepare(&dsd, dep, BIF_P, (ERTS_PROC_LOCK_MAIN | ERTS_PROC_LOCK_LINK), ERTS_DSP_RLOCK, 0, 1); switch (code) { @@ -314,7 +314,7 @@ remote_demonitor(Process *c_p, DistEntry *dep, Eterm ref, Eterm to) ERTS_LC_ASSERT((ERTS_PROC_LOCK_MAIN|ERTS_PROC_LOCK_LINK) == erts_proc_lc_my_proc_locks(c_p)); - code = erts_dsig_prepare(&dsd, &dep, c_p, ERTS_PROC_LOCK_MAIN, + code = erts_dsig_prepare(&dsd, dep, c_p, ERTS_PROC_LOCK_MAIN, ERTS_DSP_RLOCK, 0, 0); switch (code) { case ERTS_DSIG_PREP_NOT_ALIVE: @@ -792,7 +792,7 @@ remote_monitor(Process *p, Eterm bifarg1, Eterm bifarg2, ASSERT(dep); erts_proc_lock(p, ERTS_PROC_LOCK_LINK); - code = erts_dsig_prepare(&dsd, &dep, + code = erts_dsig_prepare(&dsd, dep, p, (ERTS_PROC_LOCK_MAIN | ERTS_PROC_LOCK_LINK), ERTS_DSP_RLOCK, 0, 1); switch (code) { @@ -1176,7 +1176,7 @@ BIF_RETTYPE unlink_1(BIF_ALIST_1) BIF_RET(am_true); } - code = erts_dsig_prepare(&dsd, &dep, BIF_P, ERTS_PROC_LOCK_MAIN, + code = erts_dsig_prepare(&dsd, dep, BIF_P, ERTS_PROC_LOCK_MAIN, ERTS_DSP_NO_LOCK, 0, 0); switch (code) { case ERTS_DSIG_PREP_NOT_ALIVE: @@ -1560,7 +1560,7 @@ BIF_RETTYPE exit_2(BIF_ALIST_2) if(dep == erts_this_dist_entry) BIF_RET(am_true); - code = erts_dsig_prepare(&dsd, &dep, BIF_P, ERTS_PROC_LOCK_MAIN, + code = erts_dsig_prepare(&dsd, dep, BIF_P, ERTS_PROC_LOCK_MAIN, ERTS_DSP_NO_LOCK, 0, 1); switch (code) { case ERTS_DSIG_PREP_NOT_ALIVE: @@ -1996,7 +1996,7 @@ static Sint remote_send(Process *p, DistEntry *dep, ASSERT(is_atom(to) || is_external_pid(to)); ctx->dep = dep; - code = erts_dsig_prepare(&ctx->dsd, &dep, p, ERTS_PROC_LOCK_MAIN, + code = erts_dsig_prepare(&ctx->dsd, dep, p, ERTS_PROC_LOCK_MAIN, ERTS_DSP_NO_LOCK, !ctx->suspend, ctx->connect); switch (code) { @@ -2185,6 +2185,7 @@ do_send(Process *p, Eterm to, Eterm msg, Eterm *refp, ErtsSendContext *ctx) } return ret_val; } else if (is_tuple(to)) { /* Remote send */ + int deref_dep = 0; int ret; tp = tuple_val(to); if (*tp != make_arityval(2)) @@ -2219,15 +2220,20 @@ do_send(Process *p, Eterm to, Eterm msg, Eterm *refp, ErtsSendContext *ctx) } return 0; } + if (dep == NULL) { + dep = erts_find_or_insert_dist_entry(tp[2]); + ASSERT(dep != erts_this_dist_entry); + deref_dep = 1; + } ctx->dsd.node = tp[2]; ret = remote_send(p, dep, tp[1], to, msg, ctx); if (ret == SEND_YIELD_CONTINUE) { - if (dep) { - erts_ref_dist_entry(dep); - ctx->deref_dep = 1; - } + erts_ref_dist_entry(ctx->dep); + ctx->deref_dep = 1; } + if (deref_dep) + erts_deref_dist_entry(dep); return ret; } else { if (IS_TRACED_FL(p, F_TRACE_SEND)) @@ -4405,7 +4411,7 @@ BIF_RETTYPE group_leader_2(BIF_ALIST_2) if(dep == erts_this_dist_entry) BIF_ERROR(BIF_P, BADARG); - code = erts_dsig_prepare(&dsd, &dep, BIF_P, ERTS_PROC_LOCK_MAIN, + code = erts_dsig_prepare(&dsd, dep, BIF_P, ERTS_PROC_LOCK_MAIN, ERTS_DSP_NO_LOCK, 0, 1); switch (code) { case ERTS_DSIG_PREP_NOT_ALIVE: diff --git a/erts/emulator/beam/dist.c b/erts/emulator/beam/dist.c index 4a25fbdd4c..ae64f394c8 100644 --- a/erts/emulator/beam/dist.c +++ b/erts/emulator/beam/dist.c @@ -1349,7 +1349,7 @@ int erts_net_message(Port *prt, /* This is tricky (we MUST force a distributed send) */ ErtsDSigData dsd; int code; - code = erts_dsig_prepare(&dsd, &dep, NULL, 0, ERTS_DSP_NO_LOCK, 0, 0); + code = erts_dsig_prepare(&dsd, dep, NULL, 0, ERTS_DSP_NO_LOCK, 0, 0); if (code == ERTS_DSIG_PREP_CONNECTED) { code = erts_dsig_send_exit(&dsd, to, from, am_noproc); ASSERT(code == ERTS_DSIG_SEND_OK); @@ -1441,7 +1441,7 @@ int erts_net_message(Port *prt, if (!rp) { ErtsDSigData dsd; int code; - code = erts_dsig_prepare(&dsd, &dep, NULL, 0, ERTS_DSP_NO_LOCK, 0, 0); + code = erts_dsig_prepare(&dsd, dep, NULL, 0, ERTS_DSP_NO_LOCK, 0, 0); if (code == ERTS_DSIG_PREP_CONNECTED) { code = erts_dsig_send_m_exit(&dsd, watcher, watched, ref, am_noproc); @@ -3831,7 +3831,7 @@ monitor_node(Process* p, Eterm Node, Eterm Bool, Eterm Options) erts_proc_lock(p, ERTS_PROC_LOCK_LINK); - switch (erts_dsig_prepare(&dsd, &dep, p, + switch (erts_dsig_prepare(&dsd, dep, p, (ERTS_PROC_LOCK_MAIN | ERTS_PROC_LOCK_LINK), ERTS_DSP_RLOCK, 0, async_connect)) { case ERTS_DSIG_PREP_NOT_ALIVE: diff --git a/erts/emulator/beam/dist.h b/erts/emulator/beam/dist.h index a205ce0cb5..a96e39be89 100644 --- a/erts/emulator/beam/dist.h +++ b/erts/emulator/beam/dist.h @@ -138,7 +138,7 @@ extern int erts_is_alive; #define ERTS_DSIG_PREP_PENDING 4 ERTS_GLB_INLINE int erts_dsig_prepare(ErtsDSigData *, - DistEntry **, + DistEntry*, Process *, ErtsProcLocks, ErtsDSigPrepLock, @@ -154,26 +154,20 @@ int erts_auto_connect(DistEntry* dep, Process *proc, ErtsProcLocks proc_locks); ERTS_GLB_INLINE int erts_dsig_prepare(ErtsDSigData *dsdp, - DistEntry **depp, + DistEntry *dep, Process *proc, ErtsProcLocks proc_locks, ErtsDSigPrepLock dspl, int no_suspend, int connect) { - DistEntry* dep = *depp; - int deref_dep = 0; int res; if (!erts_is_alive) return ERTS_DSIG_PREP_NOT_ALIVE; if (!dep) { - if (!connect) - return ERTS_DSIG_PREP_NOT_CONNECTED; - - dep = erts_find_or_insert_dist_entry(dsdp->node); - ASSERT(dep != erts_this_dist_entry); - deref_dep = 1; + ASSERT(!connect); + return ERTS_DSIG_PREP_NOT_CONNECTED; } #ifdef ERTS_ENABLE_LOCK_CHECK @@ -199,8 +193,6 @@ retry: ASSERT(dep->status == 0); erts_de_runlock(dep); if (!erts_auto_connect(dep, proc, proc_locks)) { - if (deref_dep) - erts_deref_dist_entry(dep); return ERTS_DSIG_PREP_NOT_ALIVE; } goto retry; @@ -224,15 +216,10 @@ retry: dsdp->no_suspend = no_suspend; if (dspl == ERTS_DSP_NO_LOCK) erts_de_runlock(dep); - if (deref_dep) - erts_deref_dist_entry(dep); - *depp = dep; return res; fail: erts_de_runlock(dep); - if (deref_dep) - erts_deref_dist_entry(dep); return res; } diff --git a/erts/emulator/beam/erl_process.c b/erts/emulator/beam/erl_process.c index 2d13cd92b2..a807d60ec7 100644 --- a/erts/emulator/beam/erl_process.c +++ b/erts/emulator/beam/erl_process.c @@ -12659,7 +12659,7 @@ static void doit_exit_monitor(ErtsMonitor *mon, void *vpcontext) erts_de_links_unlock(dep); if (rmon) { ErtsDSigData dsd; - int code = erts_dsig_prepare(&dsd, &dep, NULL, 0, + int code = erts_dsig_prepare(&dsd, dep, NULL, 0, ERTS_DSP_NO_LOCK, 0, 0); if (code == ERTS_DSIG_PREP_CONNECTED || code == ERTS_DSIG_PREP_PENDING) { @@ -12707,7 +12707,7 @@ static void doit_exit_monitor(ErtsMonitor *mon, void *vpcontext) erts_de_links_unlock(dep); if (rmon) { ErtsDSigData dsd; - int code = erts_dsig_prepare(&dsd, &dep, NULL, 0, + int code = erts_dsig_prepare(&dsd, dep, NULL, 0, ERTS_DSP_NO_LOCK, 0, 0); if (code == ERTS_DSIG_PREP_CONNECTED || code == ERTS_DSIG_PREP_PENDING) { @@ -12768,7 +12768,7 @@ static void doit_exit_monitor(ErtsMonitor *mon, void *vpcontext) erts_de_links_unlock(dep); if (rmon) { ErtsDSigData dsd; - int code = erts_dsig_prepare(&dsd, &dep, NULL, 0, + int code = erts_dsig_prepare(&dsd, dep, NULL, 0, ERTS_DSP_NO_LOCK, 0, 0); if (code == ERTS_DSIG_PREP_CONNECTED) { code = erts_dsig_send_m_exit(&dsd, @@ -12893,7 +12893,7 @@ static void doit_exit_link(ErtsLink *lnk, void *vpcontext) erts_remove_dist_link(&dld, p->common.id, item, dep); if (dld.d_lnk) { erts_proc_lock(p, ERTS_PROC_LOCK_MAIN); - code = erts_dsig_prepare(&dsd, &dep, p, 0, ERTS_DSP_NO_LOCK, 0, 0); + code = erts_dsig_prepare(&dsd, dep, p, 0, ERTS_DSP_NO_LOCK, 0, 0); if (code == ERTS_DSIG_PREP_CONNECTED || code == ERTS_DSIG_PREP_PENDING) { diff --git a/erts/emulator/beam/io.c b/erts/emulator/beam/io.c index 6cd1aa5e79..9933c8dda4 100644 --- a/erts/emulator/beam/io.c +++ b/erts/emulator/beam/io.c @@ -3829,7 +3829,7 @@ static void sweep_one_link(ErtsLink *lnk, void *vpsc) ErtsDistLinkData dld; ErtsDSigData dsd; int code; - code = erts_dsig_prepare(&dsd, &dep, NULL, 0, ERTS_DSP_NO_LOCK, 0, 0); + code = erts_dsig_prepare(&dsd, dep, NULL, 0, ERTS_DSP_NO_LOCK, 0, 0); switch (code) { case ERTS_DSIG_PREP_NOT_ALIVE: case ERTS_DSIG_PREP_NOT_CONNECTED: -- cgit v1.2.3 From 19c9f3a61fbef3682056bab3db4787cd763bd06e Mon Sep 17 00:00:00 2001 From: Sverker Eriksson Date: Wed, 15 Nov 2017 19:44:52 +0100 Subject: Move new|abort_connection_id to erts_internal and drop _id suffix. --- erts/emulator/beam/bif.tab | 4 ++-- erts/emulator/beam/dist.c | 4 ++-- erts/preloaded/ebin/erlang.beam | Bin 104644 -> 104248 bytes erts/preloaded/ebin/erts_internal.beam | Bin 11872 -> 12252 bytes erts/preloaded/src/erlang.erl | 16 ---------------- erts/preloaded/src/erts_internal.erl | 16 ++++++++++++++++ lib/kernel/src/net_kernel.erl | 16 ++++++++-------- 7 files changed, 28 insertions(+), 28 deletions(-) diff --git a/erts/emulator/beam/bif.tab b/erts/emulator/beam/bif.tab index 5a591caac9..f739f414de 100644 --- a/erts/emulator/beam/bif.tab +++ b/erts/emulator/beam/bif.tab @@ -694,6 +694,6 @@ bif erlang:iolist_to_iovec/1 # New in 21.0 # -bif erlang:new_connection_id/1 -bif erlang:abort_connection_id/2 +bif erts_internal:new_connection/1 +bif erts_internal:abort_connection/2 diff --git a/erts/emulator/beam/dist.c b/erts/emulator/beam/dist.c index ae64f394c8..16a7bd9eb3 100644 --- a/erts/emulator/beam/dist.c +++ b/erts/emulator/beam/dist.c @@ -3480,7 +3480,7 @@ BIF_RETTYPE setnode_3(BIF_ALIST_3) goto done; } -BIF_RETTYPE new_connection_id_1(BIF_ALIST_1) +BIF_RETTYPE erts_internal_new_connection_1(BIF_ALIST_1) { DistEntry* dep; Uint32 conn_id; @@ -3580,7 +3580,7 @@ static Sint abort_connection(DistEntry* dep, Uint32 conn_id) return 0; } -BIF_RETTYPE abort_connection_id_2(BIF_ALIST_2) +BIF_RETTYPE erts_internal_abort_connection_2(BIF_ALIST_2) { DistEntry* dep; Eterm* tp; diff --git a/erts/preloaded/ebin/erlang.beam b/erts/preloaded/ebin/erlang.beam index 18137fd53c..01ed412562 100644 Binary files a/erts/preloaded/ebin/erlang.beam and b/erts/preloaded/ebin/erlang.beam differ diff --git a/erts/preloaded/ebin/erts_internal.beam b/erts/preloaded/ebin/erts_internal.beam index 5416826f19..cb8e6b6d69 100644 Binary files a/erts/preloaded/ebin/erts_internal.beam and b/erts/preloaded/ebin/erts_internal.beam differ diff --git a/erts/preloaded/src/erlang.erl b/erts/preloaded/src/erlang.erl index ed9b7d9785..2be053575b 100644 --- a/erts/preloaded/src/erlang.erl +++ b/erts/preloaded/src/erlang.erl @@ -164,8 +164,6 @@ -export([registered/0, resume_process/1, round/1, self/0]). -export([seq_trace/2, seq_trace_print/1, seq_trace_print/2, setnode/2]). -export([setnode/3, size/1, spawn/3, spawn_link/3, split_binary/2]). --export([new_connection_id/1]). --export([abort_connection_id/2]). -export([suspend_process/2, system_monitor/0]). -export([system_monitor/1, system_monitor/2, system_profile/0]). -export([system_profile/2, throw/1, time/0, trace/3, trace_delivered/1]). @@ -1663,20 +1661,6 @@ setnode(_P1, _P2) -> setnode(_P1, _P2, _P3) -> erlang:nif_error(undefined). -%% new_connection_id/1 --spec erlang:new_connection_id(Node) -> ConnId when - Node :: atom(), - ConnId :: {integer(), dist_handle()}. -new_connection_id(_Node) -> - erlang:nif_error(undefined). - -%% abort_connection_id/2 --spec erlang:abort_connection_id(Node, ConnId) -> boolean() when - Node :: atom(), - ConnId :: {integer(), dist_handle()}. -abort_connection_id(_Node, _ConnId) -> - erlang:nif_error(undefined). - %% size/1 %% Shadowed by erl_bif_types: erlang:size/1 -spec size(Item) -> non_neg_integer() when diff --git a/erts/preloaded/src/erts_internal.erl b/erts/preloaded/src/erts_internal.erl index bb1824ecd4..1f5c88c4b9 100644 --- a/erts/preloaded/src/erts_internal.erl +++ b/erts/preloaded/src/erts_internal.erl @@ -63,6 +63,9 @@ -export([dist_ctrl_put_data/2]). +-export([new_connection/1]). +-export([abort_connection/2]). + %% Auto import name clash -export([check_process_code/1]). @@ -498,3 +501,16 @@ dist_ctrl_put_data(DHandle, IoList) -> | RootST], erlang:raise(Class, Reason, StackTrace) end. + + +-spec erts_internal:new_connection(Node) -> ConnId when + Node :: atom(), + ConnId :: {integer(), erlang:dist_handle()}. +new_connection(_Node) -> + erlang:nif_error(undefined). + +-spec erts_internal:abort_connection(Node, ConnId) -> boolean() when + Node :: atom(), + ConnId :: {integer(), erlang:dist_handle()}. +abort_connection(_Node, _ConnId) -> + erlang:nif_error(undefined). diff --git a/lib/kernel/src/net_kernel.erl b/lib/kernel/src/net_kernel.erl index 3c4e8df88f..cdb10a7b12 100644 --- a/lib/kernel/src/net_kernel.erl +++ b/lib/kernel/src/net_kernel.erl @@ -456,7 +456,7 @@ handle_call({passive_cnct, Node}, From, State) -> verbose({passive_cnct, Node}, 1, State), Type = normal, WaitForBarred = true, - R = case (catch erlang:new_connection_id(Node)) of + R = case (catch erts_internal:new_connection(Node)) of {Nr,_DHandle}=ConnId when is_integer(Nr) -> do_auto_connect(Type, Node, ConnId, WaitForBarred, From, State); @@ -477,7 +477,7 @@ handle_call({connect, _, Node, _, _}, From, State) when Node =:= node() -> handle_call({connect, Type, Node}, From, State) -> verbose({connect, Type, Node}, 1, State), ConnLookup = ets:lookup(sys_dist, Node), - R = case (catch erlang:new_connection_id(Node)) of + R = case (catch erts_internal:new_connection(Node)) of {Nr,_DHandle}=ConnId when is_integer(Nr) -> do_explicit_connect(ConnLookup, Type, Node, ConnId, From, State); @@ -708,7 +708,7 @@ handle_info({auto_connect,Node, Nr, DHandle}, State) -> S; {reply, false, S} -> %% Connection refused - erlang:abort_connection_id(Node, ConnId), + erts_internal:abort_connection(Node, ConnId), S end, {noreply, NewState}; @@ -793,7 +793,7 @@ handle_info({AcceptPid, {accept_pending,MyNode,Node,Address,Type}}, State) -> AcceptPid ! {self(), {accept_pending, already_pending}}, {noreply, State}; _ -> - case (catch erlang:new_connection_id(Node)) of + case (catch erts_internal:new_connection(Node)) of {Nr,_DHandle}=ConnId when is_integer(Nr) -> ets:insert(sys_dist, #connection{node = Node, conn_id = ConnId, @@ -995,7 +995,7 @@ pending_nodedown(Conn, Node, Type, State) -> % Don't bar connections that have never been alive %mark_sys_dist_nodedown(Node), % - instead just delete the node: - erlang:abort_connection_id(Node, Conn#connection.conn_id), + erts_internal:abort_connection(Node, Conn#connection.conn_id), ets:delete(sys_dist, Node), reply_waiting(Node,Conn#connection.waiting, false), case Type of @@ -1010,9 +1010,9 @@ up_pending_nodedown(Conn, Node, _Reason, _Type, State) -> AcceptPid = Conn#connection.pending_owner, Owners = State#state.conn_owners, Pend = lists:keydelete(AcceptPid, 1, State#state.pend_owners), - erlang:abort_connection_id(Node, Conn#connection.conn_id), + erts_internal:abort_connection(Node, Conn#connection.conn_id), Conn1 = Conn#connection { owner = AcceptPid, - conn_id = erlang:new_connection_id(Node), + conn_id = erts_internal:new_connection(Node), pending_owner = undefined, state = pending }, ets:insert(sys_dist, Conn1), @@ -1029,7 +1029,7 @@ up_nodedown(Conn, Node, _Reason, Type, State) -> State. mark_sys_dist_nodedown(Conn, Node) -> - erlang:abort_connection_id(Node, Conn#connection.conn_id), + erts_internal:abort_connection(Node, Conn#connection.conn_id), case application:get_env(kernel, dist_auto_connect) of {ok, once} -> ets:insert(sys_dist, #barred_connection{node = Node}); -- cgit v1.2.3