diff options
Diffstat (limited to 'erts/emulator/beam/dist.h')
-rw-r--r-- | erts/emulator/beam/dist.h | 235 |
1 files changed, 127 insertions, 108 deletions
diff --git a/erts/emulator/beam/dist.h b/erts/emulator/beam/dist.h index e82b416286..d4d7874a70 100644 --- a/erts/emulator/beam/dist.h +++ b/erts/emulator/beam/dist.h @@ -1,7 +1,7 @@ /* * %CopyrightBegin% * - * Copyright Ericsson AB 1996-2016. All Rights Reserved. + * Copyright Ericsson AB 1996-2018. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -40,10 +40,53 @@ #define DFLAG_UNICODE_IO 0x1000 #define DFLAG_DIST_HDR_ATOM_CACHE 0x2000 #define DFLAG_SMALL_ATOM_TAGS 0x4000 -#define DFLAG_INTERNAL_TAGS 0x8000 +#define DFLAG_INTERNAL_TAGS 0x8000 /* used by ETS 'compressed' option */ #define DFLAG_UTF8_ATOMS 0x10000 #define DFLAG_MAP_TAG 0x20000 #define DFLAG_BIG_CREATION 0x40000 +#define DFLAG_SEND_SENDER 0x80000 +#define DFLAG_BIG_SEQTRACE_LABELS 0x100000 +#define DFLAG_NO_MAGIC 0x200000 /* internal for pending connection */ + +/* Mandatory flags for distribution */ +#define DFLAG_DIST_MANDATORY (DFLAG_EXTENDED_REFERENCES \ + | DFLAG_EXTENDED_PIDS_PORTS \ + | DFLAG_UTF8_ATOMS \ + | DFLAG_NEW_FUN_TAGS) + +/* + * Additional optimistic flags when encoding toward pending connection. + * If remote node (erl_interface) does not supporting these then we may need + * to transcode messages enqueued before connection setup was finished. + */ +#define DFLAG_DIST_HOPEFULLY (DFLAG_EXPORT_PTR_TAG \ + | DFLAG_BIT_BINARIES \ + | DFLAG_DIST_MONITOR \ + | DFLAG_DIST_MONITOR_NAME) + +/* Our preferred set of flags. Used for connection setup handshake */ +#define DFLAG_DIST_DEFAULT (DFLAG_DIST_MANDATORY | DFLAG_DIST_HOPEFULLY \ + | DFLAG_FUN_TAGS \ + | DFLAG_NEW_FLOATS \ + | DFLAG_UNICODE_IO \ + | DFLAG_DIST_HDR_ATOM_CACHE \ + | DFLAG_SMALL_ATOM_TAGS \ + | DFLAG_UTF8_ATOMS \ + | DFLAG_MAP_TAG \ + | DFLAG_BIG_CREATION \ + | DFLAG_SEND_SENDER \ + | DFLAG_BIG_SEQTRACE_LABELS) + +/* Flags addable by local distr implementations */ +#define DFLAG_DIST_ADDABLE DFLAG_DIST_DEFAULT + +/* Flags rejectable by local distr implementation */ +#define DFLAG_DIST_REJECTABLE (DFLAG_DIST_HDR_ATOM_CACHE \ + | DFLAG_HIDDEN_ATOM_CACHE \ + | DFLAG_ATOM_CACHE) + +/* Flags for all features needing strict order delivery */ +#define DFLAG_DIST_STRICT_ORDER DFLAG_DIST_HDR_ATOM_CACHE /* All flags that should be enabled when term_to_binary/1 is used. */ #define TERM_TO_BINARY_DFLAGS (DFLAG_EXTENDED_REFERENCES \ @@ -74,53 +117,38 @@ #define DOP_DEMONITOR_P 20 #define DOP_MONITOR_P_EXIT 21 +#define DOP_SEND_SENDER 22 +#define DOP_SEND_SENDER_TT 23 + /* distribution trap functions */ -extern Export* dsend2_trap; -extern Export* dsend3_trap; -extern Export* dlink_trap; -extern Export* dunlink_trap; extern Export* dmonitor_node_trap; -extern Export* dgroup_leader_trap; -extern Export* dexit_trap; -extern Export* dmonitor_p_trap; typedef enum { ERTS_DSP_NO_LOCK, - ERTS_DSP_RLOCK, - ERTS_DSP_RWLOCK + ERTS_DSP_RLOCK } ErtsDSigPrepLock; typedef struct { Process *proc; DistEntry *dep; + Eterm node; /* used if dep == NULL */ Eterm cid; Eterm connection_id; int no_suspend; + Uint32 flags; } ErtsDSigData; -#define ERTS_DE_IS_NOT_CONNECTED(DEP) \ - (ERTS_SMP_LC_ASSERT(erts_lc_rwmtx_is_rlocked(&(DEP)->rwmtx) \ - || erts_lc_rwmtx_is_rwlocked(&(DEP)->rwmtx)), \ - (is_nil((DEP)->cid) || ((DEP)->status & ERTS_DE_SFLG_EXITING))) - -#define ERTS_DE_IS_CONNECTED(DEP) \ - (!ERTS_DE_IS_NOT_CONNECTED((DEP))) - #define ERTS_DE_BUSY_LIMIT (1024*1024) extern int erts_dist_buf_busy_limit; extern int erts_is_alive; /* * erts_dsig_prepare() prepares a send of a distributed signal. - * One of the values defined below are returned. If the returned - * value is another than ERTS_DSIG_PREP_CONNECTED, the - * distributed signal cannot be sent before apropriate actions - * have been taken. Apropriate actions would typically be setting - * up the connection. + * One of the values defined below are returned. */ -/* Connected; signal can be sent. */ +/* Connected; signals can be enqueued and sent. */ #define ERTS_DSIG_PREP_CONNECTED 0 /* Not connected; connection needs to be set up. */ #define ERTS_DSIG_PREP_NOT_CONNECTED 1 @@ -128,63 +156,94 @@ extern int erts_is_alive; #define ERTS_DSIG_PREP_WOULD_SUSPEND 2 /* System not alive (distributed) */ #define ERTS_DSIG_PREP_NOT_ALIVE 3 +/* Pending connection; signals can be enqueued */ +#define ERTS_DSIG_PREP_PENDING 4 ERTS_GLB_INLINE int erts_dsig_prepare(ErtsDSigData *, - DistEntry *, + DistEntry*, Process *, + ErtsProcLocks, ErtsDSigPrepLock, + int, int); ERTS_GLB_INLINE void erts_schedule_dist_command(Port *, DistEntry *); +int erts_auto_connect(DistEntry* dep, Process *proc, ErtsProcLocks proc_locks); + #if ERTS_GLB_INLINE_INCL_FUNC_DEF ERTS_GLB_INLINE int erts_dsig_prepare(ErtsDSigData *dsdp, DistEntry *dep, Process *proc, + ErtsProcLocks proc_locks, ErtsDSigPrepLock dspl, - int no_suspend) + int no_suspend, + int connect) { - int failure; + int res; + if (!erts_is_alive) return ERTS_DSIG_PREP_NOT_ALIVE; - if (!dep) - return ERTS_DSIG_PREP_NOT_CONNECTED; - if (dspl == ERTS_DSP_RWLOCK) - erts_smp_de_rwlock(dep); - else - erts_smp_de_rlock(dep); - if (ERTS_DE_IS_NOT_CONNECTED(dep)) { - failure = ERTS_DSIG_PREP_NOT_CONNECTED; + if (!dep) { + ASSERT(!connect); + return ERTS_DSIG_PREP_NOT_CONNECTED; + } + +#ifdef ERTS_ENABLE_LOCK_CHECK + if (connect) { + erts_proc_lc_might_unlock(proc, proc_locks); + } +#endif + +retry: + erts_de_rlock(dep); + + if (dep->state == ERTS_DE_STATE_CONNECTED) { + res = ERTS_DSIG_PREP_CONNECTED; + } + else if (dep->state == ERTS_DE_STATE_PENDING) { + res = ERTS_DSIG_PREP_PENDING; + } + else if (dep->state == ERTS_DE_STATE_EXITING) { + res = ERTS_DSIG_PREP_NOT_CONNECTED; goto fail; } + else if (connect) { + ASSERT(dep->state == ERTS_DE_STATE_IDLE); + erts_de_runlock(dep); + if (!erts_auto_connect(dep, proc, proc_locks)) { + return ERTS_DSIG_PREP_NOT_ALIVE; + } + goto retry; + } + else { + ASSERT(dep->state == ERTS_DE_STATE_IDLE); + res = ERTS_DSIG_PREP_NOT_CONNECTED; + goto fail; + } + if (no_suspend) { - failure = ERTS_DSIG_PREP_CONNECTED; - erts_smp_mtx_lock(&dep->qlock); - if (dep->qflgs & ERTS_DE_QFLG_BUSY) - failure = ERTS_DSIG_PREP_WOULD_SUSPEND; - erts_smp_mtx_unlock(&dep->qlock); - if (failure == ERTS_DSIG_PREP_WOULD_SUSPEND) + if (erts_atomic32_read_acqb(&dep->qflgs) & ERTS_DE_QFLG_BUSY) { + res = ERTS_DSIG_PREP_WOULD_SUSPEND; goto fail; + } } dsdp->proc = proc; dsdp->dep = dep; dsdp->cid = dep->cid; dsdp->connection_id = dep->connection_id; dsdp->no_suspend = no_suspend; + dsdp->flags = dep->flags; if (dspl == ERTS_DSP_NO_LOCK) - erts_smp_de_runlock(dep); - return ERTS_DSIG_PREP_CONNECTED; + erts_de_runlock(dep); + return res; fail: - if (dspl == ERTS_DSP_RWLOCK) - erts_smp_de_rwunlock(dep); - else - erts_smp_de_runlock(dep); - return failure; - + erts_de_runlock(dep); + return res; } ERTS_GLB_INLINE @@ -194,17 +253,17 @@ void erts_schedule_dist_command(Port *prt, DistEntry *dist_entry) Eterm id; if (prt) { - ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt)); + ERTS_LC_ASSERT(erts_lc_is_port_locked(prt)); ASSERT((erts_atomic32_read_nob(&prt->state) & ERTS_PORT_SFLGS_DEAD) == 0); - ASSERT(prt->dist_entry); - dep = prt->dist_entry; + dep = (DistEntry*) erts_prtsd_get(prt, ERTS_PRTSD_DIST_ENTRY); + ASSERT(dep); id = prt->common.id; } else { ASSERT(dist_entry); - ERTS_SMP_LC_ASSERT(erts_lc_rwmtx_is_rlocked(&dist_entry->rwmtx) + ERTS_LC_ASSERT(erts_lc_rwmtx_is_rlocked(&dist_entry->rwmtx) || erts_lc_rwmtx_is_rwlocked(&dist_entry->rwmtx)); ASSERT(is_internal_port(dist_entry->cid)); @@ -212,64 +271,19 @@ void erts_schedule_dist_command(Port *prt, DistEntry *dist_entry) id = dep->cid; } - if (!erts_smp_atomic_xchg_mb(&dep->dist_cmd_scheduled, 1)) + if (!erts_atomic_xchg_mb(&dep->dist_cmd_scheduled, 1)) erts_port_task_schedule(id, &dep->dist_cmd, ERTS_PORT_TASK_DIST_CMD); } #endif -typedef struct { - ErtsLink *d_lnk; - ErtsLink *d_sub_lnk; -} ErtsDistLinkData; - -ERTS_GLB_INLINE void erts_remove_dist_link(ErtsDistLinkData *, - Eterm, - Eterm, - DistEntry *); -ERTS_GLB_INLINE int erts_was_dist_link_removed(ErtsDistLinkData *); -ERTS_GLB_INLINE void erts_destroy_dist_link(ErtsDistLinkData *); - -#if ERTS_GLB_INLINE_INCL_FUNC_DEF - -ERTS_GLB_INLINE void -erts_remove_dist_link(ErtsDistLinkData *dldp, - Eterm lid, - Eterm rid, - DistEntry *dep) -{ - erts_smp_de_links_lock(dep); - dldp->d_lnk = erts_lookup_link(dep->nlinks, lid); - if (!dldp->d_lnk) - dldp->d_sub_lnk = NULL; - else { - dldp->d_sub_lnk = erts_remove_link(&ERTS_LINK_ROOT(dldp->d_lnk), rid); - dldp->d_lnk = (ERTS_LINK_ROOT(dldp->d_lnk) - ? NULL - : erts_remove_link(&dep->nlinks, lid)); - } - erts_smp_de_links_unlock(dep); -} - -ERTS_GLB_INLINE int -erts_was_dist_link_removed(ErtsDistLinkData *dldp) -{ - return dldp->d_sub_lnk != NULL; -} - -ERTS_GLB_INLINE void -erts_destroy_dist_link(ErtsDistLinkData *dldp) -{ - if (dldp->d_lnk) - erts_destroy_link(dldp->d_lnk); - if (dldp->d_sub_lnk) - erts_destroy_link(dldp->d_sub_lnk); -} - +#ifdef DEBUG +#define ERTS_DBG_CHK_NO_DIST_LNK(D, R, L) \ + erts_dbg_chk_no_dist_proc_link((D), (R), (L)) +#else +#define ERTS_DBG_CHK_NO_DIST_LNK(D, R, L) #endif - - /* Define for testing */ /* #define EXTREME_TTB_TRAPPING 1 */ @@ -290,6 +304,7 @@ typedef struct TTBSizeContext_ { typedef struct TTBEncodeContext_ { Uint flags; + Uint hopefull_flags; int level; byte* ep; Eterm obj; @@ -331,7 +346,7 @@ struct erts_dsig_send_context { Eterm ctl; Eterm msg; int force_busy; - Uint32 pass_through_size; + Uint32 max_finalize_prepend; Uint data_size, dhdr_ext_size; ErtsAtomCacheMap *acmp; ErtsDistOutputBuf *obuf; @@ -345,10 +360,12 @@ struct erts_dsig_send_context { typedef struct { int suspend; + int connect; Eterm ctl_heap[6]; ErtsDSigData dsd; - DistEntry* dep_to_deref; + DistEntry *dep; + int deref_dep; struct erts_dsig_send_context dss; Eterm return_term; @@ -375,7 +392,7 @@ extern int erts_dsig_send_monitor(ErtsDSigData *, Eterm, Eterm, Eterm); extern int erts_dsig_send_m_exit(ErtsDSigData *, Eterm, Eterm, Eterm, Eterm); extern int erts_dsig_send(ErtsDSigData *dsdp, struct erts_dsig_send_context* ctx); -extern void erts_dsend_context_dtor(Binary*); +extern int erts_dsend_context_dtor(Binary*); extern Eterm erts_dsend_export_trap_context(Process* p, ErtsSendContext* ctx); extern int erts_dist_command(Port *prt, int reds); @@ -384,5 +401,7 @@ extern void erts_kill_dist_connection(DistEntry *dep, Uint32); extern Uint erts_dist_cache_size(void); +extern Sint erts_abort_connection_rwunlock(DistEntry *dep); + #endif |