/*
* %CopyrightBegin%
*
* Copyright Ericsson AB 1996-2017. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* %CopyrightEnd%
*/
#ifndef __DIST_H__
#define __DIST_H__
#include "erl_process.h"
#include "erl_node_tables.h"
#include "zlib.h"
#define DFLAG_PUBLISHED 0x01
#define DFLAG_ATOM_CACHE 0x02
#define DFLAG_EXTENDED_REFERENCES 0x04
#define DFLAG_DIST_MONITOR 0x08
#define DFLAG_FUN_TAGS 0x10
#define DFLAG_DIST_MONITOR_NAME 0x20
#define DFLAG_HIDDEN_ATOM_CACHE 0x40
#define DFLAG_NEW_FUN_TAGS 0x80
#define DFLAG_EXTENDED_PIDS_PORTS 0x100
#define DFLAG_EXPORT_PTR_TAG 0x200
#define DFLAG_BIT_BINARIES 0x400
#define DFLAG_NEW_FLOATS 0x800
#define DFLAG_UNICODE_IO 0x1000
#define DFLAG_DIST_HDR_ATOM_CACHE 0x2000
#define DFLAG_SMALL_ATOM_TAGS 0x4000
#define DFLAG_INTERNAL_TAGS 0x8000
#define DFLAG_UTF8_ATOMS 0x10000
#define DFLAG_MAP_TAG 0x20000
#define DFLAG_BIG_CREATION 0x40000
#define DFLAG_SEND_SENDER 0x80000
#define DFLAG_NO_MAGIC 0x100000
/* Mandatory flags for distribution (sync with dist_util.erl) */
#define DFLAG_DIST_MANDATORY (DFLAG_EXTENDED_REFERENCES \
| DFLAG_EXTENDED_PIDS_PORTS \
| DFLAG_UTF8_ATOMS \
| DFLAG_NEW_FUN_TAGS)
/* Additional optimistic flags when encoding toward pending connection */
#define DFLAG_DIST_HOPEFULLY (DFLAG_NO_MAGIC \
| DFLAG_EXPORT_PTR_TAG \
| DFLAG_BIT_BINARIES)
/* All flags that should be enabled when term_to_binary/1 is used. */
#define TERM_TO_BINARY_DFLAGS (DFLAG_EXTENDED_REFERENCES \
| DFLAG_NEW_FUN_TAGS \
| DFLAG_NEW_FLOATS \
| DFLAG_EXTENDED_PIDS_PORTS \
| DFLAG_EXPORT_PTR_TAG \
| DFLAG_BIT_BINARIES \
| DFLAG_MAP_TAG \
| DFLAG_BIG_CREATION)
/* opcodes used in distribution messages */
#define DOP_LINK 1
#define DOP_SEND 2
#define DOP_EXIT 3
#define DOP_UNLINK 4
/* Ancient DOP_NODE_LINK (5) was here, can be reused */
#define DOP_REG_SEND 6
#define DOP_GROUP_LEADER 7
#define DOP_EXIT2 8
#define DOP_SEND_TT 12
#define DOP_EXIT_TT 13
#define DOP_REG_SEND_TT 16
#define DOP_EXIT2_TT 18
#define DOP_MONITOR_P 19
#define DOP_DEMONITOR_P 20
#define DOP_MONITOR_P_EXIT 21
#define DOP_SEND_SENDER 22
#define DOP_SEND_SENDER_TT 23
/* distribution trap functions */
extern Export* dmonitor_node_trap;
extern Export* dmonitor_p_trap;
typedef enum {
ERTS_DSP_NO_LOCK,
ERTS_DSP_RLOCK
} ErtsDSigPrepLock;
typedef struct {
Process *proc;
DistEntry *dep;
Eterm node; /* used if dep == NULL */
Eterm cid;
Eterm connection_id;
int no_suspend;
} ErtsDSigData;
#define ERTS_DE_IS_NOT_CONNECTED(DEP) \
(ERTS_LC_ASSERT(erts_lc_rwmtx_is_rlocked(&(DEP)->rwmtx) \
|| erts_lc_rwmtx_is_rwlocked(&(DEP)->rwmtx)), \
(is_nil((DEP)->cid) || ((DEP)->status & ERTS_DE_SFLG_EXITING)))
#define ERTS_DE_IS_CONNECTED(DEP) \
(!ERTS_DE_IS_NOT_CONNECTED((DEP)))
#define ERTS_DE_BUSY_LIMIT (1024*1024)
extern int erts_dist_buf_busy_limit;
extern int erts_is_alive;
/*
* erts_dsig_prepare() prepares a send of a distributed signal.
* One of the values defined below are returned.
*/
/* Connected; signals can be enqueued and sent. */
#define ERTS_DSIG_PREP_CONNECTED 0
/* Not connected; connection needs to be set up. */
#define ERTS_DSIG_PREP_NOT_CONNECTED 1
/* Caller would be suspended on send operation. */
#define ERTS_DSIG_PREP_WOULD_SUSPEND 2
/* System not alive (distributed) */
#define ERTS_DSIG_PREP_NOT_ALIVE 3
/* Pending connection; signals can be enqueued */
#define ERTS_DSIG_PREP_PENDING 4
ERTS_GLB_INLINE int erts_dsig_prepare(ErtsDSigData *,
DistEntry **,
Process *,
ErtsProcLocks,
ErtsDSigPrepLock,
int,
int);
ERTS_GLB_INLINE
void erts_schedule_dist_command(Port *, DistEntry *);
#if ERTS_GLB_INLINE_INCL_FUNC_DEF
ERTS_GLB_INLINE int
erts_dsig_prepare(ErtsDSigData *dsdp,
DistEntry **depp,
Process *proc,
ErtsProcLocks proc_locks,
ErtsDSigPrepLock dspl,
int no_suspend,
int connect)
{
DistEntry* dep = *depp;
int deref_dep = 0;
int res;
if (!erts_is_alive)
return ERTS_DSIG_PREP_NOT_ALIVE;
if (!dep) {
if (!connect)
return ERTS_DSIG_PREP_NOT_CONNECTED;
dep = erts_find_or_insert_dist_entry(dsdp->node);
ASSERT(dep != erts_this_dist_entry); /* SVERK: What to do? */
deref_dep = 1;
}
#ifdef ERTS_ENABLE_LOCK_CHECK
if (connect) {
erts_proc_lc_might_unlock(proc, proc_locks);
}
#endif
retry:
erts_de_rlock(dep);
if (ERTS_DE_IS_CONNECTED(dep)) {
res = ERTS_DSIG_PREP_CONNECTED;
}
else if (dep->status & ERTS_DE_SFLG_PENDING) {
res = ERTS_DSIG_PREP_PENDING;
}
else if (dep->status & ERTS_DE_SFLG_EXITING) {
/* SVERK is this ok, or should we trigger another connection setup */
res = ERTS_DSIG_PREP_NOT_CONNECTED;
goto fail;
}
else if (connect) {
ASSERT(dep->status == 0);
erts_de_runlock(dep);
erts_de_rwlock(dep);
if (dep->status == 0) {
Process* net_kernel;
ErtsProcLocks nk_locks = ERTS_PROC_LOCK_MSGQ;
Eterm *hp;
ErlOffHeap *ohp;
ErtsMessage *mp;
Eterm msg, conn_id, dhandle;
dep->status = ERTS_DE_SFLG_PENDING;
dep->flags = (DFLAG_DIST_MANDATORY | DFLAG_DIST_HOPEFULLY);
dep->connection_id++;
dep->connection_id &= ERTS_DIST_CON_ID_MASK;
conn_id = make_small(dep->connection_id);
erts_de_rwunlock(dep);
net_kernel = erts_whereis_process(proc, proc_locks,
am_net_kernel, nk_locks, 0);
if (!net_kernel) {
if (deref_dep)
erts_deref_dist_entry(dep);
return ERTS_DSIG_PREP_NOT_ALIVE;
}
/* Send {auto_connect, Node, ConnId, DHandle} to net_kernel */
mp = erts_alloc_message_heap(net_kernel, &nk_locks,
5 + ERTS_MAGIC_REF_THING_SIZE,
&hp, &ohp);
dhandle = erts_build_dhandle(&hp, ohp, dep);
msg = TUPLE4(hp, am_auto_connect, dep->sysname, conn_id, dhandle);
erts_queue_message(net_kernel, nk_locks, mp, msg, proc->common.id);
erts_proc_unlock(net_kernel, nk_locks);
}
else
erts_de_rwunlock(dep);
goto retry;
}
else {
ASSERT(dep->status == 0);
res = ERTS_DSIG_PREP_NOT_CONNECTED;
goto fail;
}
if (no_suspend) {
if (erts_atomic32_read_acqb(&dep->qflgs) & ERTS_DE_QFLG_BUSY) {
res = ERTS_DSIG_PREP_WOULD_SUSPEND;
goto fail;
}
}
dsdp->proc = proc;
dsdp->dep = dep;
dsdp->cid = dep->cid;
dsdp->connection_id = dep->connection_id;
dsdp->no_suspend = no_suspend;
if (dspl == ERTS_DSP_NO_LOCK)
erts_de_runlock(dep);
if (deref_dep)
erts_deref_dist_entry(dep);
*depp = dep;
return res;
fail:
erts_de_runlock(dep);
if (deref_dep)
erts_deref_dist_entry(dep);
return res;
}
ERTS_GLB_INLINE
void erts_schedule_dist_command(Port *prt, DistEntry *dist_entry)
{
DistEntry *dep;
Eterm id;
if (prt) {
ERTS_LC_ASSERT(erts_lc_is_port_locked(prt));
ASSERT((erts_atomic32_read_nob(&prt->state)
& ERTS_PORT_SFLGS_DEAD) == 0);
ASSERT(prt->dist_entry);
dep = prt->dist_entry;
id = prt->common.id;
}
else {
ASSERT(dist_entry);
ERTS_LC_ASSERT(erts_lc_rwmtx_is_rlocked(&dist_entry->rwmtx)
|| erts_lc_rwmtx_is_rwlocked(&dist_entry->rwmtx));
ASSERT(is_internal_port(dist_entry->cid));
dep = dist_entry;
id = dep->cid;
}
if (!erts_atomic_xchg_mb(&dep->dist_cmd_scheduled, 1))
erts_port_task_schedule(id, &dep->dist_cmd, ERTS_PORT_TASK_DIST_CMD);
}
#endif
typedef struct {
ErtsLink *d_lnk;
ErtsLink *d_sub_lnk;
} ErtsDistLinkData;
ERTS_GLB_INLINE void erts_remove_dist_link(ErtsDistLinkData *,
Eterm,
Eterm,
DistEntry *);
ERTS_GLB_INLINE int erts_was_dist_link_removed(ErtsDistLinkData *);
ERTS_GLB_INLINE void erts_destroy_dist_link(ErtsDistLinkData *);
#if ERTS_GLB_INLINE_INCL_FUNC_DEF
ERTS_GLB_INLINE void
erts_remove_dist_link(ErtsDistLinkData *dldp,
Eterm lid,
Eterm rid,
DistEntry *dep)
{
erts_de_links_lock(dep);
dldp->d_lnk = erts_lookup_link(dep->nlinks, lid);
if (!dldp->d_lnk)
dldp->d_sub_lnk = NULL;
else {
dldp->d_sub_lnk = erts_remove_link(&ERTS_LINK_ROOT(dldp->d_lnk), rid);
dldp->d_lnk = (ERTS_LINK_ROOT(dldp->d_lnk)
? NULL
: erts_remove_link(&dep->nlinks, lid));
}
erts_de_links_unlock(dep);
}
ERTS_GLB_INLINE int
erts_was_dist_link_removed(ErtsDistLinkData *dldp)
{
return dldp->d_sub_lnk != NULL;
}
ERTS_GLB_INLINE void
erts_destroy_dist_link(ErtsDistLinkData *dldp)
{
if (dldp->d_lnk)
erts_destroy_link(dldp->d_lnk);
if (dldp->d_sub_lnk)
erts_destroy_link(dldp->d_sub_lnk);
}
#endif
/* Define for testing */
/* #define EXTREME_TTB_TRAPPING 1 */
#ifndef EXTREME_TTB_TRAPPING
#define TERM_TO_BINARY_LOOP_FACTOR 32
#else
#define TERM_TO_BINARY_LOOP_FACTOR 1
#endif
typedef enum { TTBSize, TTBEncode, TTBCompress } TTBState;
typedef struct TTBSizeContext_ {
Uint flags;
int level;
Uint result;
Eterm obj;
ErtsWStack wstack;
} TTBSizeContext;
typedef struct TTBEncodeContext_ {
Uint flags;
int level;
byte* ep;
Eterm obj;
ErtsWStack wstack;
Binary *result_bin;
} TTBEncodeContext;
typedef struct {
Uint real_size;
Uint dest_len;
byte *dbytes;
Binary *result_bin;
Binary *destination_bin;
z_stream stream;
} TTBCompressContext;
typedef struct {
int alive;
TTBState state;
union {
TTBSizeContext sc;
TTBEncodeContext ec;
TTBCompressContext cc;
} s;
} TTBContext;
enum erts_dsig_send_phase {
ERTS_DSIG_SEND_PHASE_INIT,
ERTS_DSIG_SEND_PHASE_MSG_SIZE,
ERTS_DSIG_SEND_PHASE_ALLOC,
ERTS_DSIG_SEND_PHASE_MSG_ENCODE,
ERTS_DSIG_SEND_PHASE_FIN
};
struct erts_dsig_send_context {
enum erts_dsig_send_phase phase;
Sint reds;
Eterm ctl;
Eterm msg;
int force_busy;
Uint32 max_finalize_prepend;
Uint data_size, dhdr_ext_size;
ErtsAtomCacheMap *acmp;
ErtsDistOutputBuf *obuf;
Uint32 flags;
Process *c_p;
union {
TTBSizeContext sc;
TTBEncodeContext ec;
}u;
};
typedef struct {
int suspend;
int connect;
Eterm ctl_heap[6];
ErtsDSigData dsd;
DistEntry *dep;
int deref_dep;
struct erts_dsig_send_context dss;
Eterm return_term;
}ErtsSendContext;
/*
* erts_dsig_send_* return values.
*/
#define ERTS_DSIG_SEND_OK 0
#define ERTS_DSIG_SEND_YIELD 1
#define ERTS_DSIG_SEND_CONTINUE 2
extern int erts_dsig_send_link(ErtsDSigData *, Eterm, Eterm);
extern int erts_dsig_send_msg(Eterm, Eterm, ErtsSendContext*);
extern int erts_dsig_send_exit_tt(ErtsDSigData *, Eterm, Eterm, Eterm, Eterm);
extern int erts_dsig_send_unlink(ErtsDSigData *, Eterm, Eterm);
extern int erts_dsig_send_reg_msg(Eterm, Eterm, ErtsSendContext*);
extern int erts_dsig_send_group_leader(ErtsDSigData *, Eterm, Eterm);
extern int erts_dsig_send_exit(ErtsDSigData *, Eterm, Eterm, Eterm);
extern int erts_dsig_send_exit2(ErtsDSigData *, Eterm, Eterm, Eterm);
extern int erts_dsig_send_demonitor(ErtsDSigData *, Eterm, Eterm, Eterm, int);
extern int erts_dsig_send_monitor(ErtsDSigData *, Eterm, Eterm, Eterm);
extern int erts_dsig_send_m_exit(ErtsDSigData *, Eterm, Eterm, Eterm, Eterm);
extern int erts_dsig_send(ErtsDSigData *dsdp, struct erts_dsig_send_context* ctx);
extern int erts_dsend_context_dtor(Binary*);
extern Eterm erts_dsend_export_trap_context(Process* p, ErtsSendContext* ctx);
extern int erts_dist_command(Port *prt, int reds);
extern void erts_dist_port_not_busy(Port *prt);
extern void erts_kill_dist_connection(DistEntry *dep, Uint32);
extern Uint erts_dist_cache_size(void);
#endif