aboutsummaryrefslogtreecommitdiffstats
path: root/erts/emulator/beam
diff options
context:
space:
mode:
Diffstat (limited to 'erts/emulator/beam')
-rw-r--r--erts/emulator/beam/beam_bif_load.c5
-rw-r--r--erts/emulator/beam/beam_debug.c3
-rw-r--r--erts/emulator/beam/beam_ranges.c8
-rw-r--r--erts/emulator/beam/bif.c2
-rw-r--r--erts/emulator/beam/dist.c345
-rw-r--r--erts/emulator/beam/dist.h6
-rw-r--r--erts/emulator/beam/erl_alloc.c2
-rw-r--r--erts/emulator/beam/erl_bif_info.c28
-rw-r--r--erts/emulator/beam/erl_db.c1
-rw-r--r--erts/emulator/beam/erl_db_hash.c45
-rw-r--r--erts/emulator/beam/erl_message.c18
-rw-r--r--erts/emulator/beam/erl_message.h6
-rw-r--r--erts/emulator/beam/erl_nif.c52
-rw-r--r--erts/emulator/beam/erl_node_tables.c35
-rw-r--r--erts/emulator/beam/erl_node_tables.h7
-rw-r--r--erts/emulator/beam/erl_port.h10
-rw-r--r--erts/emulator/beam/erl_proc_sig_queue.c38
-rw-r--r--erts/emulator/beam/erl_process.c1
-rw-r--r--erts/emulator/beam/erl_process.h2
-rw-r--r--erts/emulator/beam/external.c80
-rw-r--r--erts/emulator/beam/external.h2
-rw-r--r--erts/emulator/beam/global.h2
-rw-r--r--erts/emulator/beam/io.c39
-rw-r--r--erts/emulator/beam/ops.tab2
24 files changed, 355 insertions, 384 deletions
diff --git a/erts/emulator/beam/beam_bif_load.c b/erts/emulator/beam/beam_bif_load.c
index a0dbd9ec7b..d221e6aea6 100644
--- a/erts/emulator/beam/beam_bif_load.c
+++ b/erts/emulator/beam/beam_bif_load.c
@@ -1753,6 +1753,7 @@ BIF_RETTYPE erts_internal_purge_module_2(BIF_ALIST_2)
if (literals) {
ErtsLiteralAreaRef *ref;
+ ErtsMessage *mp;
ref = erts_alloc(ERTS_ALC_T_LITERAL_REF,
sizeof(ErtsLiteralAreaRef));
ref->literal_area = literals;
@@ -1767,10 +1768,12 @@ BIF_RETTYPE erts_internal_purge_module_2(BIF_ALIST_2)
release_literal_areas.last = ref;
}
erts_mtx_unlock(&release_literal_areas.mtx);
+ mp = erts_alloc_message(0, NULL);
+ ERL_MESSAGE_TOKEN(mp) = am_undefined;
erts_queue_proc_message(BIF_P,
erts_literal_area_collector,
0,
- erts_alloc_message(0, NULL),
+ mp,
am_copy_literals);
}
diff --git a/erts/emulator/beam/beam_debug.c b/erts/emulator/beam/beam_debug.c
index 6d3b99c43e..9633de2021 100644
--- a/erts/emulator/beam/beam_debug.c
+++ b/erts/emulator/beam/beam_debug.c
@@ -667,7 +667,7 @@ print_op(fmtfn_t to, void *to_arg, int op, int size, BeamInstr* addr)
ap++;
break;
case 'l': /* fr(N) */
- erts_print(to, to_arg, "fr(%d)", loader_reg_index(ap[0]));
+ erts_print(to, to_arg, "fr(%d)", ap[0] / sizeof(FloatDef));
ap++;
break;
default:
@@ -1191,6 +1191,7 @@ dirty_send_message(Process *c_p, Eterm to, Eterm tag)
mp = erts_alloc_message_heap(rp, &rp_locks, 3, &hp, &ohp);
msg = TUPLE2(hp, tag, c_p->common.id);
+ ERL_MESSAGE_TOKEN(mp) = am_undefined;
erts_queue_proc_message(c_p, rp, rp_locks, mp, msg);
if (rp == real_c_p)
diff --git a/erts/emulator/beam/beam_ranges.c b/erts/emulator/beam/beam_ranges.c
index f0c9496341..9f3153724a 100644
--- a/erts/emulator/beam/beam_ranges.c
+++ b/erts/emulator/beam/beam_ranges.c
@@ -35,10 +35,8 @@ typedef struct {
/*
* Used for crash dumping of literals. The size of erts_dump_lit_areas is
- * always twice the number of active ranges (to allow for literals in both
- * current and old code).
+ * always at least the number of active ranges.
*/
-
ErtsLiteralArea** erts_dump_lit_areas;
Uint erts_dump_num_lit_areas;
@@ -180,8 +178,8 @@ erts_end_staging_ranges(int commit)
(erts_aint_t) (r[dst].modules +
r[dst].n / 2));
- if (r[dst].allocated * 2 > erts_dump_num_lit_areas) {
- erts_dump_num_lit_areas *= 2;
+ if (r[dst].allocated > erts_dump_num_lit_areas) {
+ erts_dump_num_lit_areas = r[dst].allocated * 2;
erts_dump_lit_areas = (ErtsLiteralArea **)
erts_realloc(ERTS_ALC_T_CRASH_DUMP,
(void *) erts_dump_lit_areas,
diff --git a/erts/emulator/beam/bif.c b/erts/emulator/beam/bif.c
index 97e1ee1286..f18af8bcd7 100644
--- a/erts/emulator/beam/bif.c
+++ b/erts/emulator/beam/bif.c
@@ -2063,7 +2063,7 @@ do_send(Process *p, Eterm to, Eterm msg, Eterm *refp, ErtsSendContext *ctx)
if (p == rp)
rp_locks |= ERTS_PROC_LOCK_MAIN;
/* send to local process */
- erts_send_message(p, rp, &rp_locks, msg, 0);
+ erts_send_message(p, rp, &rp_locks, msg);
erts_proc_unlock(rp,
p == rp
? (rp_locks & ~ERTS_PROC_LOCK_MAIN)
diff --git a/erts/emulator/beam/dist.c b/erts/emulator/beam/dist.c
index 1e822d5c7b..0633bff3c2 100644
--- a/erts/emulator/beam/dist.c
+++ b/erts/emulator/beam/dist.c
@@ -116,11 +116,12 @@ static Export *dist_ctrl_put_data_trap;
/* forward declarations */
-static void clear_dist_entry(DistEntry*);
static int dsig_send_ctl(ErtsDSigData* dsdp, Eterm ctl, int force_busy);
static void send_nodes_mon_msgs(Process *, Eterm, Eterm, Eterm, Eterm);
static void init_nodes_monitors(void);
static Sint abort_connection(DistEntry* dep, Uint32 conn_id);
+static ErtsDistOutputBuf* clear_de_out_queues(DistEntry*);
+static void free_de_out_queues(DistEntry*, ErtsDistOutputBuf*);
static erts_atomic_t no_caches;
static erts_atomic_t no_nodes;
@@ -556,7 +557,10 @@ int erts_do_net_exits(DistEntry *dep, Eterm reason)
}
}
else { /* Call from distribution controller (port/process) */
- ErtsMonLnkDist *mld;
+ ErtsMonLnkDist *mld;
+ ErtsAtomCache *cache;
+ ErtsProcList *suspendees;
+ ErtsDistOutputBuf *obuf;
Uint32 flags;
erts_atomic_set_mb(&dep->dist_cmd_scheduled, 1);
@@ -588,6 +592,22 @@ int erts_do_net_exits(DistEntry *dep, Eterm reason)
nodename = dep->sysname;
flags = dep->flags;
+ erts_atomic_set_nob(&dep->input_handler, (erts_aint_t) NIL);
+ cache = dep->cache;
+ dep->cache = NULL;
+
+ erts_mtx_lock(&dep->qlock);
+
+ erts_atomic64_set_nob(&dep->in, 0);
+ erts_atomic64_set_nob(&dep->out, 0);
+
+ obuf = clear_de_out_queues(dep);
+ suspendees = get_suspended_on_de(dep, ERTS_DE_QFLGS_ALL);
+
+ erts_mtx_unlock(&dep->qlock);
+ erts_atomic_set_nob(&dep->dist_cmd_scheduled, 0);
+ dep->send = NULL;
+
erts_set_dist_entry_not_connected(dep);
erts_de_rwunlock(dep);
@@ -601,7 +621,13 @@ int erts_do_net_exits(DistEntry *dep, Eterm reason)
? am_connection_closed
: reason));
- clear_dist_entry(dep);
+ erts_resume_processes(suspendees);
+
+ delete_cache(cache);
+
+ free_de_out_queues(dep, obuf);
+ if (dep->transcode_ctx)
+ transcode_free_ctx(dep);
}
dec_no_nodes();
@@ -732,41 +758,6 @@ static void free_de_out_queues(DistEntry* dep, ErtsDistOutputBuf *obuf)
}
}
-static void clear_dist_entry(DistEntry *dep)
-{
- ErtsAtomCache *cache;
- ErtsProcList *suspendees;
- ErtsDistOutputBuf *obuf;
-
- erts_de_rwlock(dep);
- erts_atomic_set_nob(&dep->input_handler,
- (erts_aint_t) NIL);
- cache = dep->cache;
- dep->cache = NULL;
-
- erts_mtx_lock(&dep->qlock);
-
- erts_atomic64_set_nob(&dep->in, 0);
- erts_atomic64_set_nob(&dep->out, 0);
-
- obuf = clear_de_out_queues(dep);
- dep->state = ERTS_DE_STATE_IDLE;
- suspendees = get_suspended_on_de(dep, ERTS_DE_QFLGS_ALL);
-
- erts_mtx_unlock(&dep->qlock);
- erts_atomic_set_nob(&dep->dist_cmd_scheduled, 0);
- dep->send = NULL;
- erts_de_rwunlock(dep);
-
- erts_resume_processes(suspendees);
-
- delete_cache(cache);
-
- free_de_out_queues(dep, obuf);
- if (dep->transcode_ctx)
- transcode_free_ctx(dep);
-}
-
int erts_dsend_context_dtor(Binary* ctx_bin)
{
ErtsSendContext* ctx = ERTS_MAGIC_BIN_DATA(ctx_bin);
@@ -861,7 +852,7 @@ erts_dsig_send_m_exit(ErtsDSigData *dsdp, Eterm watcher, Eterm watched,
DeclareTmpHeapNoproc(ctl_heap,6);
int res;
- if (~dsdp->dep->flags & (DFLAG_DIST_MONITOR | DFLAG_DIST_MONITOR_NAME)) {
+ if (~dsdp->flags & (DFLAG_DIST_MONITOR | DFLAG_DIST_MONITOR_NAME)) {
/*
* Receiver does not support DOP_MONITOR_P_EXIT (see dsig_send_monitor)
*/
@@ -889,7 +880,7 @@ erts_dsig_send_monitor(ErtsDSigData *dsdp, Eterm watcher, Eterm watched,
DeclareTmpHeapNoproc(ctl_heap,5);
int res;
- if (~dsdp->dep->flags & (DFLAG_DIST_MONITOR | DFLAG_DIST_MONITOR_NAME)) {
+ if (~dsdp->flags & (DFLAG_DIST_MONITOR | DFLAG_DIST_MONITOR_NAME)) {
/*
* Receiver does not support DOP_MONITOR_P.
* Just avoid sending it and by doing that reduce this monitor
@@ -920,7 +911,7 @@ erts_dsig_send_demonitor(ErtsDSigData *dsdp, Eterm watcher,
DeclareTmpHeapNoproc(ctl_heap,5);
int res;
- if (~dsdp->dep->flags & (DFLAG_DIST_MONITOR | DFLAG_DIST_MONITOR_NAME)) {
+ if (~dsdp->flags & (DFLAG_DIST_MONITOR | DFLAG_DIST_MONITOR_NAME)) {
/*
* Receiver does not support DOP_DEMONITOR_P (see dsig_send_monitor)
*/
@@ -940,7 +931,7 @@ erts_dsig_send_demonitor(ErtsDSigData *dsdp, Eterm watcher,
static int can_send_seqtrace_token(ErtsSendContext* ctx, Eterm token) {
Eterm label;
- if (ctx->dep->flags & DFLAG_BIG_SEQTRACE_LABELS) {
+ if (ctx->dsd.flags & DFLAG_BIG_SEQTRACE_LABELS) {
/* The other end is capable of handling arbitrary seq_trace labels. */
return 1;
}
@@ -1001,7 +992,7 @@ erts_dsig_send_msg(Eterm remote, Eterm message, ErtsSendContext* ctx)
send_token = (token != NIL && can_send_seqtrace_token(ctx, token));
- if (ctx->dep->flags & DFLAG_SEND_SENDER) {
+ if (ctx->dsd.flags & DFLAG_SEND_SENDER) {
dist_op = make_small(send_token ?
DOP_SEND_SENDER_TT :
DOP_SEND_SENDER);
@@ -1200,21 +1191,8 @@ erts_dsig_send_group_leader(ErtsDSigData *dsdp, Eterm leader, Eterm remote)
#include <valgrind/valgrind.h>
#include <valgrind/memcheck.h>
-#ifndef HAVE_VALGRIND_PRINTF_XML
-#define VALGRIND_PRINTF_XML VALGRIND_PRINTF
-#endif
-
# define PURIFY_MSG(msg) \
- do { \
- char buf__[1]; size_t bufsz__ = sizeof(buf__); \
- if (erts_sys_explicit_8bit_getenv("VALGRIND_LOG_XML", buf__, &bufsz__) >= 0) { \
- VALGRIND_PRINTF_XML("<erlang_error_log>" \
- "%s, line %d: %s</erlang_error_log>\n", \
- __FILE__, __LINE__, msg); \
- } else { \
- VALGRIND_PRINTF("%s, line %d: %s", __FILE__, __LINE__, msg); \
- } \
- } while (0)
+ VALGRIND_PRINTF("%s, line %d: %s", __FILE__, __LINE__, msg)
#else
# define PURIFY_MSG(msg)
#endif
@@ -1231,13 +1209,13 @@ erts_dsig_send_group_leader(ErtsDSigData *dsdp, Eterm leader, Eterm remote)
int erts_net_message(Port *prt,
DistEntry *dep,
+ Uint32 conn_id,
byte *hbuf,
ErlDrvSizeT hlen,
byte *buf,
ErlDrvSizeT len)
{
ErtsDistExternal ede;
- byte *t;
Sint ctl_len;
Eterm arg;
Eterm from, to;
@@ -1255,7 +1233,6 @@ int erts_net_message(Port *prt,
Eterm token_size;
Uint tuple_arity;
int res;
- Uint32 connection_id;
#ifdef ERTS_DIST_MSG_DBG
ErlDrvSizeT orig_len = len;
#endif
@@ -1271,7 +1248,6 @@ int erts_net_message(Port *prt,
return 0;
}
-
ASSERT(hlen == 0);
if (len == 0) { /* HANDLE TICK !!! */
@@ -1284,15 +1260,7 @@ int erts_net_message(Port *prt,
bw(buf, len);
#endif
- if (dep->flags & DFLAG_DIST_HDR_ATOM_CACHE)
- t = buf;
- else {
- /* Skip PASS_THROUGH */
- t = buf+1;
- len--;
- }
-
- res = erts_prepare_dist_ext(&ede, t, len, dep, dep->cache, &connection_id);
+ res = erts_prepare_dist_ext(&ede, buf, len, dep, conn_id, dep->cache);
switch (res) {
case ERTS_PREP_DIST_EXT_CLOSED:
@@ -1334,10 +1302,9 @@ int erts_net_message(Port *prt,
PURIFY_MSG("data error");
goto decode_error;
}
- ctl_len = t - buf;
#ifdef ERTS_DIST_MSG_DBG
- erts_fprintf(stderr, "<<%s CTL: %T\n", len != orig_len ? "P" : " ", arg);
+ erts_fprintf(stderr, "<< CTL: %T\n", arg);
#endif
if (is_not_tuple(arg) ||
@@ -1791,7 +1758,7 @@ decode_error:
}
data_error:
UnUseTmpHeapNoproc(DIST_CTL_DEFAULT_SIZE);
- erts_kill_dist_connection(dep, connection_id);
+ erts_kill_dist_connection(dep, conn_id);
ERTS_CHK_NO_PROC_LOCKS;
return -1;
}
@@ -1847,7 +1814,7 @@ erts_dsig_send(ErtsDSigData *dsdp, struct erts_dsig_send_context* ctx)
while (1) {
switch (ctx->phase) {
case ERTS_DSIG_SEND_PHASE_INIT:
- ctx->flags = dsdp->dep->flags;
+ ctx->flags = dsdp->flags;
ctx->c_p = dsdp->proc;
if (!ctx->c_p || dsdp->no_suspend)
@@ -2102,13 +2069,14 @@ dist_port_command(Port *prt, ErtsDistOutputBuf *obuf)
#ifdef USE_VM_PROBES
if (DTRACE_ENABLED(dist_output)) {
+ DistEntry *dep = (DistEntry*) erts_prtsd_get(prt, ERTS_PRTSD_DIST_ENTRY);
DTRACE_CHARBUF(port_str, 64);
DTRACE_CHARBUF(remote_str, 64);
erts_snprintf(port_str, sizeof(DTRACE_CHARBUF_NAME(port_str)),
"%T", prt->common.id);
erts_snprintf(remote_str, sizeof(DTRACE_CHARBUF_NAME(remote_str)),
- "%T", prt->dist_entry->sysname);
+ "%T", dep->sysname);
DTRACE4(dist_output, erts_this_node_sysname, port_str,
remote_str, size);
}
@@ -2164,13 +2132,14 @@ dist_port_commandv(Port *prt, ErtsDistOutputBuf *obuf)
#ifdef USE_VM_PROBES
if (DTRACE_ENABLED(dist_outputv)) {
+ DistEntry *dep = (DistEntry*) erts_prtsd_get(prt, ERTS_PRTSD_DIST_ENTRY);
DTRACE_CHARBUF(port_str, 64);
DTRACE_CHARBUF(remote_str, 64);
erts_snprintf(port_str, sizeof(DTRACE_CHARBUF_NAME(port_str)),
"%T", prt->common.id);
erts_snprintf(remote_str, sizeof(DTRACE_CHARBUF_NAME(remote_str)),
- "%T", prt->dist_entry->sysname);
+ "%T", dep->sysname);
DTRACE4(dist_outputv, erts_this_node_sysname, port_str,
remote_str, size);
}
@@ -2208,7 +2177,7 @@ erts_dist_command(Port *prt, int initial_reds)
Uint32 flags;
Sint qsize, obufsize = 0;
ErtsDistOutputQueue oq, foq;
- DistEntry *dep = prt->dist_entry;
+ DistEntry *dep = (DistEntry*) erts_prtsd_get(prt, ERTS_PRTSD_DIST_ENTRY);
Uint (*send)(Port *prt, ErtsDistOutputBuf *obuf);
erts_aint32_t sched_flags;
ErtsSchedulerData *esdp = erts_get_scheduler_data();
@@ -2584,11 +2553,12 @@ dist_ctrl_get_data_notification_1(BIF_ALIST_1)
erts_aint32_t qflgs;
erts_aint_t qsize;
Eterm receiver = NIL;
+ Uint32 conn_id;
if (!dep)
BIF_ERROR(BIF_P, EXC_NOTSUP);
- if (erts_dhandle_to_dist_entry(BIF_ARG_1) != dep)
+ if (erts_dhandle_to_dist_entry(BIF_ARG_1, &conn_id) != dep)
BIF_ERROR(BIF_P, BADARG);
/*
@@ -2598,6 +2568,11 @@ dist_ctrl_get_data_notification_1(BIF_ALIST_1)
erts_de_rlock(dep);
+ if (dep->connection_id != conn_id) {
+ erts_de_runlock(dep);
+ BIF_ERROR(BIF_P, BADARG);
+ }
+
ASSERT(dep->cid == BIF_P->common.id);
qflgs = erts_atomic32_read_acqb(&dep->qflgs);
@@ -2638,6 +2613,7 @@ dist_ctrl_put_data_2(BIF_ALIST_2)
DistEntry *dep;
ErlDrvSizeT size;
Eterm input_handler;
+ Uint32 conn_id;
if (is_binary(BIF_ARG_2))
size = binary_size(BIF_ARG_2);
@@ -2649,7 +2625,7 @@ dist_ctrl_put_data_2(BIF_ALIST_2)
else
BIF_ERROR(BIF_P, BADARG);
- dep = erts_dhandle_to_dist_entry(BIF_ARG_1);
+ dep = erts_dhandle_to_dist_entry(BIF_ARG_1, &conn_id);
if (!dep)
BIF_ERROR(BIF_P, BADARG);
@@ -2669,7 +2645,7 @@ dist_ctrl_put_data_2(BIF_ALIST_2)
erts_proc_unlock(BIF_P, ERTS_PROC_LOCK_MAIN);
- (void) erts_net_message(NULL, dep, NULL, 0, data, size);
+ (void) erts_net_message(NULL, dep, conn_id, NULL, 0, data, size);
/*
* We ignore any decode failures. On fatal failures the
* connection will be taken down by killing the
@@ -2693,13 +2669,18 @@ dist_get_stat_1(BIF_ALIST_1)
Sint64 read, write, pend;
Eterm res, *hp, **hpp;
Uint sz, *szp;
- DistEntry *dep = erts_dhandle_to_dist_entry(BIF_ARG_1);
+ Uint32 conn_id;
+ DistEntry *dep = erts_dhandle_to_dist_entry(BIF_ARG_1, &conn_id);
if (!dep)
BIF_ERROR(BIF_P, BADARG);
erts_de_rlock(dep);
+ if (dep->connection_id != conn_id) {
+ erts_de_runlock(dep);
+ BIF_ERROR(BIF_P, BADARG);
+ }
read = (Sint64) erts_atomic64_read_nob(&dep->in);
write = (Sint64) erts_atomic64_read_nob(&dep->out);
pend = (Sint64) erts_atomic_read_nob(&dep->qsize);
@@ -2730,19 +2711,25 @@ BIF_RETTYPE
dist_ctrl_input_handler_2(BIF_ALIST_2)
{
DistEntry *dep = ERTS_PROC_GET_DIST_ENTRY(BIF_P);
+ Uint32 conn_id;
if (!dep)
BIF_ERROR(BIF_P, EXC_NOTSUP);
- if (erts_dhandle_to_dist_entry(BIF_ARG_1) != dep)
+ if (erts_dhandle_to_dist_entry(BIF_ARG_1, &conn_id) != dep)
BIF_ERROR(BIF_P, BADARG);
if (is_not_internal_pid(BIF_ARG_2))
BIF_ERROR(BIF_P, BADARG);
+ erts_de_rlock(dep);
+ if (dep->connection_id != conn_id) {
+ erts_de_runlock(dep);
+ BIF_ERROR(BIF_P, BADARG);
+ }
erts_atomic_set_nob(&dep->input_handler,
(erts_aint_t) BIF_ARG_2);
-
+ erts_de_runlock(dep);
BIF_RET(am_ok);
}
@@ -2756,15 +2743,21 @@ dist_ctrl_get_data_1(BIF_ALIST_1)
Eterm *hp;
ProcBin *pb;
erts_aint_t qsize;
+ Uint32 conn_id;
if (!dep)
BIF_ERROR(BIF_P, EXC_NOTSUP);
- if (erts_dhandle_to_dist_entry(BIF_ARG_1) != dep)
+ if (erts_dhandle_to_dist_entry(BIF_ARG_1, &conn_id) != dep)
BIF_ERROR(BIF_P, BADARG);
erts_de_rlock(dep);
+ if (dep->connection_id != conn_id) {
+ erts_de_runlock(dep);
+ BIF_ERROR(BIF_P, BADARG);
+ }
+
if (dep->state == ERTS_DE_STATE_EXITING)
goto return_none;
@@ -2851,13 +2844,14 @@ erts_dist_port_not_busy(Port *prt)
{
#ifdef USE_VM_PROBES
if (DTRACE_ENABLED(dist_port_not_busy)) {
+ DistEntry *dep = (DistEntry*) erts_prtsd_get(prt, ERTS_PRTSD_DIST_ENTRY);
DTRACE_CHARBUF(port_str, 64);
DTRACE_CHARBUF(remote_str, 64);
erts_snprintf(port_str, sizeof(DTRACE_CHARBUF_NAME(port_str)),
"%T", prt->common.id);
erts_snprintf(remote_str, sizeof(DTRACE_CHARBUF_NAME(remote_str)),
- "%T", prt->dist_entry->sysname);
+ "%T", dep->sysname);
DTRACE3(dist_port_not_busy, erts_this_node_sysname,
port_str, remote_str);
}
@@ -2883,10 +2877,10 @@ static void kill_connection(DistEntry *dep)
}
void
-erts_kill_dist_connection(DistEntry *dep, Uint32 connection_id)
+erts_kill_dist_connection(DistEntry *dep, Uint32 conn_id)
{
erts_de_rwlock(dep);
- if (connection_id == dep->connection_id
+ if (conn_id == dep->connection_id
&& dep->state == ERTS_DE_STATE_CONNECTED) {
kill_connection(dep);
@@ -3222,23 +3216,6 @@ BIF_RETTYPE erts_internal_create_dist_channel_4(BIF_ALIST_4)
else if (!dep)
goto system_limit; /* Should never happen!!! */
- erts_de_rlock(dep);
- de_locked = -1;
-
- if (dep->state == ERTS_DE_STATE_EXITING) {
- /* Suspend on dist entry waiting for the exit to finish */
- ErtsProcList *plp = erts_proclist_create(BIF_P);
- plp->next = NULL;
- erts_suspend(BIF_P, ERTS_PROC_LOCK_MAIN, NULL);
- erts_mtx_lock(&dep->qlock);
- erts_proclist_store_last(&dep->suspended, plp);
- erts_mtx_unlock(&dep->qlock);
- goto yield;
- }
-
- erts_de_runlock(dep);
- de_locked = 0;
-
if (is_internal_pid(BIF_ARG_2)) {
if (BIF_P->common.id == BIF_ARG_2) {
ErtsSetupConnDistCtrl scdc;
@@ -3284,7 +3261,7 @@ BIF_RETTYPE erts_internal_create_dist_channel_4(BIF_ALIST_4)
hp = HAlloc(BIF_P, 3);
}
else {
- int new;
+ Uint32 conn_id;
pp = erts_id2port_sflgs(BIF_ARG_2,
BIF_P,
@@ -3293,7 +3270,7 @@ BIF_RETTYPE erts_internal_create_dist_channel_4(BIF_ALIST_4)
erts_de_rwlock(dep);
de_locked = 1;
- if (dep->state == ERTS_DE_STATE_EXITING)
+ if (dep->state != ERTS_DE_STATE_PENDING)
goto badarg;
if (!pp || (erts_atomic32_read_nob(&pp->state)
@@ -3303,49 +3280,39 @@ BIF_RETTYPE erts_internal_create_dist_channel_4(BIF_ALIST_4)
if ((pp->drv_ptr->flags & ERL_DRV_FLAG_SOFT_BUSY) == 0)
goto badarg;
- if (dep->cid == BIF_ARG_2 && pp->dist_entry == dep)
- new = 0;
- else {
- if (dep->state != ERTS_DE_STATE_PENDING) {
- if (dep->state == ERTS_DE_STATE_IDLE)
- erts_set_dist_entry_pending(dep);
- else
- goto badarg;
- }
-
- if (pp->dist_entry || is_not_nil(dep->cid))
- goto badarg;
-
- erts_atomic32_read_bor_nob(&pp->state, ERTS_PORT_SFLG_DISTRIBUTION);
+ if (erts_prtsd_get(pp, ERTS_PRTSD_DIST_ENTRY) != NULL
+ || is_not_nil(dep->cid))
+ goto badarg;
- pp->dist_entry = dep;
+ erts_atomic32_read_bor_nob(&pp->state, ERTS_PORT_SFLG_DISTRIBUTION);
- ASSERT(pp->drv_ptr->outputv || pp->drv_ptr->output);
+ erts_prtsd_set(pp, ERTS_PRTSD_DIST_ENTRY, dep);
+ erts_prtsd_set(pp, ERTS_PRTSD_CONN_ID, (void*)(UWord)dep->connection_id);
- dep->send = (pp->drv_ptr->outputv
- ? dist_port_commandv
- : dist_port_command);
- ASSERT(dep->send);
+ ASSERT(pp->drv_ptr->outputv || pp->drv_ptr->output);
- /*
- * Dist-ports do not use the "busy port message queue" functionality, but
- * instead use "busy dist entry" functionality.
- */
- {
- ErlDrvSizeT disable = ERL_DRV_BUSY_MSGQ_DISABLED;
- erl_drv_busy_msgq_limits(ERTS_Port2ErlDrvPort(pp), &disable, NULL);
- }
+ dep->send = (pp->drv_ptr->outputv
+ ? dist_port_commandv
+ : dist_port_command);
+ ASSERT(dep->send);
- setup_connection_epiloge_rwunlock(BIF_P, dep, BIF_ARG_2, flags, version);
- de_locked = 0;
- new = !0;
+ /*
+ * Dist-ports do not use the "busy port message queue" functionality, but
+ * instead use "busy dist entry" functionality.
+ */
+ {
+ ErlDrvSizeT disable = ERL_DRV_BUSY_MSGQ_DISABLED;
+ erl_drv_busy_msgq_limits(ERTS_Port2ErlDrvPort(pp), &disable, NULL);
}
- hp = HAlloc(BIF_P, 3 + ERTS_MAGIC_REF_THING_SIZE);
- res = erts_build_dhandle(&hp, &BIF_P->off_heap, dep);
+ conn_id = dep->connection_id;
+ setup_connection_epiloge_rwunlock(BIF_P, dep, BIF_ARG_2, flags, version);
+ de_locked = 0;
+
+ hp = HAlloc(BIF_P, 3 + ERTS_DHANDLE_SIZE);
+ res = erts_build_dhandle(&hp, &BIF_P->off_heap, dep, conn_id);
res_tag = am_ok; /* Connection up */
- if (new)
- dep = NULL; /* inc of refc transferred to port (dist_entry field) */
+ dep = NULL; /* inc of refc transferred to port (dist_entry field) */
}
ASSERT(is_value(res) && is_value(res_tag));
@@ -3371,12 +3338,6 @@ BIF_RETTYPE erts_internal_create_dist_channel_4(BIF_ALIST_4)
return ret;
- yield:
- ERTS_BIF_PREP_YIELD4(ret,
- bif_export[BIF_erts_internal_create_dist_channel_4],
- BIF_P, BIF_ARG_1, BIF_ARG_2, BIF_ARG_3, BIF_ARG_4);
- goto done;
-
badarg:
ERTS_BIF_PREP_RET(ret, am_badarg);
goto done;
@@ -3398,8 +3359,7 @@ setup_connection_epiloge_rwunlock(Process *c_p, DistEntry *dep,
dep->creation = 0;
ASSERT(is_internal_port(ctrlr) || is_internal_pid(ctrlr));
- ASSERT(erts_atomic_read_nob(&dep->qsize) == 0
- || (dep->state == ERTS_DE_STATE_PENDING));
+ ASSERT(dep->state == ERTS_DE_STATE_PENDING);
if (flags & DFLAG_DIST_HDR_ATOM_CACHE)
create_cache(dep);
@@ -3445,37 +3405,20 @@ setup_connection_distctrl(Process *c_p, void *arg, int *redsp, ErlHeapFragment *
DistEntry *dep = scdcp->dep;
int dep_locked = 0;
Eterm *hp;
- erts_aint32_t state;
+ Uint32 conn_id;
if (redsp)
*redsp = 1;
- state = erts_atomic32_read_nob(&c_p->state);
-
- if (state & ERTS_PSFLG_EXITING)
- goto badarg;
+ ASSERT(!ERTS_PROC_IS_EXITING(c_p));
erts_de_rwlock(dep);
dep_locked = !0;
- if (dep->state == ERTS_DE_STATE_EXITING)
+ if (dep->state != ERTS_DE_STATE_PENDING)
goto badarg;
- if (ERTS_PROC_GET_DIST_ENTRY(c_p)) {
- if (dep == ERTS_PROC_GET_DIST_ENTRY(c_p)
- && (c_p->flags & F_DISTRIBUTION)
- && dep->cid == c_p->common.id) {
- goto connected;
- }
- goto badarg;
- }
-
- if (dep->state != ERTS_DE_STATE_PENDING) {
- if (dep->state == ERTS_DE_STATE_IDLE)
- erts_set_dist_entry_pending(dep);
- else
- goto badarg;
- }
+ conn_id = dep->connection_id;
if (is_not_nil(dep->cid))
goto badarg;
@@ -3490,18 +3433,17 @@ setup_connection_distctrl(Process *c_p, void *arg, int *redsp, ErlHeapFragment *
setup_connection_epiloge_rwunlock(c_p, dep, c_p->common.id,
scdcp->flags, scdcp->version);
-connected:
/* we take over previous inc in refc of dep */
if (!bpp) /* called directly... */
- return erts_make_dhandle(c_p, dep);
+ return erts_make_dhandle(c_p, dep, conn_id);
erts_free(ERTS_ALC_T_SETUP_CONN_ARG, arg);
- *bpp = new_message_buffer(ERTS_MAGIC_REF_THING_SIZE);
+ *bpp = new_message_buffer(ERTS_DHANDLE_SIZE);
hp = (*bpp)->mem;
- return erts_build_dhandle(&hp, &(*bpp)->off_heap, dep);
+ return erts_build_dhandle(&hp, &(*bpp)->off_heap, dep, conn_id);
badarg:
@@ -3542,30 +3484,27 @@ BIF_RETTYPE erts_internal_new_connection_1(BIF_ALIST_1)
erts_de_rwlock(dep);
switch (dep->state) {
- case ERTS_DE_STATE_PENDING:
case ERTS_DE_STATE_CONNECTED:
+ case ERTS_DE_STATE_EXITING:
+ case ERTS_DE_STATE_PENDING:
conn_id = dep->connection_id;
break;
case ERTS_DE_STATE_IDLE:
erts_set_dist_entry_pending(dep);
conn_id = dep->connection_id;
break;
- case ERTS_DE_STATE_EXITING:
- conn_id = (dep->connection_id + 1) & ERTS_DIST_CON_ID_MASK;
- break;
default:
erts_exit(ERTS_ABORT_EXIT, "Invalid dep->state (%d)\n", dep->state);
}
erts_de_rwunlock(dep);
- hp = HAlloc(BIF_P, 3 + ERTS_MAGIC_REF_THING_SIZE);
- dhandle = erts_build_dhandle(&hp, &BIF_P->off_heap, dep);
+ hp = HAlloc(BIF_P, ERTS_DHANDLE_SIZE);
+ dhandle = erts_build_dhandle(&hp, &BIF_P->off_heap, dep, conn_id);
erts_deref_dist_entry(dep);
- BIF_RET(TUPLE2(hp, make_small(conn_id), dhandle));
+ BIF_RET(dhandle);
}
Sint erts_abort_connection_rwunlock(DistEntry* dep)
{
- Sint reds = 0;
ERTS_LC_ASSERT(erts_lc_is_de_rwlocked(dep));
if (dep->state == ERTS_DE_STATE_CONNECTED) {
@@ -3575,6 +3514,7 @@ Sint erts_abort_connection_rwunlock(DistEntry* dep)
ErtsAtomCache *cache;
ErtsDistOutputBuf *obuf;
ErtsProcList *resume_procs;
+ Sint reds = 0;
ErtsMonLnkDist *mld;
ASSERT(is_nil(dep->cid));
@@ -3596,7 +3536,6 @@ Sint erts_abort_connection_rwunlock(DistEntry* dep)
dep->send = NULL;
erts_set_dist_entry_not_connected(dep);
-
erts_de_rwunlock(dep);
schedule_con_monitor_link_cleanup(mld, THE_NON_VALUE,
@@ -3609,19 +3548,10 @@ Sint erts_abort_connection_rwunlock(DistEntry* dep)
delete_cache(cache);
free_de_out_queues(dep, obuf);
-
- /*
- * We wait to make DistEntry idle and accept new connection attempts
- * until all is cleared and deallocated. This to get some back pressure
- * against repeated failing connection attempts saturating all CPUs
- * with cleanup jobs.
- */
- erts_de_rwlock(dep);
- ASSERT(dep->state == ERTS_DE_STATE_EXITING);
- dep->state = ERTS_DE_STATE_IDLE;
+ return reds;
}
erts_de_rwunlock(dep);
- return reds;
+ return 0;
}
static Sint abort_connection(DistEntry *dep, Uint32 conn_id)
@@ -3636,22 +3566,19 @@ static Sint abort_connection(DistEntry *dep, Uint32 conn_id)
BIF_RETTYPE erts_internal_abort_connection_2(BIF_ALIST_2)
{
DistEntry* dep;
- Eterm* tp;
+ Uint32 conn_id;
+ Sint reds;
- if (is_not_atom(BIF_ARG_1) || is_not_tuple_arity(BIF_ARG_2, 2)) {
- BIF_ERROR(BIF_P, BADARG);
- }
- tp = tuple_val(BIF_ARG_2);
- dep = erts_dhandle_to_dist_entry(tp[2]);
- if (is_not_small(tp[1]) || dep != erts_find_dist_entry(BIF_ARG_1)
+ if (is_not_atom(BIF_ARG_1))
+ BIF_ERROR(BIF_P, BADARG);
+ dep = erts_dhandle_to_dist_entry(BIF_ARG_2, &conn_id);
+ if (!dep || dep != erts_find_dist_entry(BIF_ARG_1)
|| dep == erts_this_dist_entry) {
BIF_ERROR(BIF_P, BADARG);
}
- if (dep) {
- Sint reds = abort_connection(dep, unsigned_val(tp[1]));
- BUMP_REDS(BIF_P, reds);
- }
+ reds = abort_connection(dep, conn_id);
+ BUMP_REDS(BIF_P, reds);
BIF_RET(am_true);
}
@@ -3682,14 +3609,14 @@ int erts_auto_connect(DistEntry* dep, Process *proc, ErtsProcLocks proc_locks)
}
/*
- * Send {auto_connect, Node, ConnId, DHandle} to net_kernel
+ * Send {auto_connect, Node, DHandle} to net_kernel
*/
mp = erts_alloc_message_heap(net_kernel, &nk_locks,
- 5 + ERTS_MAGIC_REF_THING_SIZE,
+ 4 + ERTS_DHANDLE_SIZE,
&hp, &ohp);
- dhandle = erts_build_dhandle(&hp, ohp, dep);
- msg = TUPLE4(hp, am_auto_connect, dep->sysname, make_small(conn_id),
- dhandle);
+ dhandle = erts_build_dhandle(&hp, ohp, dep, conn_id);
+ msg = TUPLE3(hp, am_auto_connect, dep->sysname, dhandle);
+ ERL_MESSAGE_TOKEN(mp) = am_undefined;
erts_queue_proc_message(proc, net_kernel, nk_locks, mp, msg);
erts_proc_unlock(net_kernel, nk_locks);
}
diff --git a/erts/emulator/beam/dist.h b/erts/emulator/beam/dist.h
index 75cb865390..d4d7874a70 100644
--- a/erts/emulator/beam/dist.h
+++ b/erts/emulator/beam/dist.h
@@ -136,6 +136,7 @@ typedef struct {
Eterm cid;
Eterm connection_id;
int no_suspend;
+ Uint32 flags;
} ErtsDSigData;
#define ERTS_DE_BUSY_LIMIT (1024*1024)
@@ -235,6 +236,7 @@ retry:
dsdp->cid = dep->cid;
dsdp->connection_id = dep->connection_id;
dsdp->no_suspend = no_suspend;
+ dsdp->flags = dep->flags;
if (dspl == ERTS_DSP_NO_LOCK)
erts_de_runlock(dep);
return res;
@@ -254,9 +256,9 @@ void erts_schedule_dist_command(Port *prt, DistEntry *dist_entry)
ERTS_LC_ASSERT(erts_lc_is_port_locked(prt));
ASSERT((erts_atomic32_read_nob(&prt->state)
& ERTS_PORT_SFLGS_DEAD) == 0);
- ASSERT(prt->dist_entry);
- dep = prt->dist_entry;
+ dep = (DistEntry*) erts_prtsd_get(prt, ERTS_PRTSD_DIST_ENTRY);
+ ASSERT(dep);
id = prt->common.id;
}
else {
diff --git a/erts/emulator/beam/erl_alloc.c b/erts/emulator/beam/erl_alloc.c
index 575d6ca867..8fe1ccb758 100644
--- a/erts/emulator/beam/erl_alloc.c
+++ b/erts/emulator/beam/erl_alloc.c
@@ -114,7 +114,7 @@ typedef union {
char align_afa[ERTS_ALC_CACHE_LINE_ALIGN_SIZE(sizeof(AFAllctr_t))];
AOFFAllctr_t aoffa;
char align_aoffa[ERTS_ALC_CACHE_LINE_ALIGN_SIZE(sizeof(AOFFAllctr_t))];
-} ErtsAllocatorState_t;
+} ErtsAllocatorState_t erts_align_attribute(ERTS_CACHE_LINE_SIZE);
static ErtsAllocatorState_t std_alloc_state;
static ErtsAllocatorState_t ll_alloc_state;
diff --git a/erts/emulator/beam/erl_bif_info.c b/erts/emulator/beam/erl_bif_info.c
index 5789fa8e71..7fada0d548 100644
--- a/erts/emulator/beam/erl_bif_info.c
+++ b/erts/emulator/beam/erl_bif_info.c
@@ -2093,17 +2093,6 @@ current_stacktrace(ErtsHeapFactory *hfact, Process* rp,
return res;
}
-#if defined(VALGRIND)
-static int check_if_xml(void)
-{
- char buf[1];
- size_t bufsz = sizeof(buf);
- return erts_sys_explicit_8bit_getenv("VALGRIND_LOG_XML", buf, &bufsz) >= 0;
-}
-#else
-#define check_if_xml() 0
-#endif
-
/*
* This function takes care of calls to erlang:system_info/1 when the argument
* is a tuple.
@@ -2200,15 +2189,9 @@ info_1_tuple(Process* BIF_P, /* Pointer to current process. */
#endif
} else if (is_list(*tp)) {
#if defined(PURIFY)
-#define ERTS_ERROR_CHECKER_PRINTF purify_printf
-#define ERTS_ERROR_CHECKER_PRINTF_XML purify_printf
+# define ERTS_ERROR_CHECKER_PRINTF purify_printf
#elif defined(VALGRIND)
-#define ERTS_ERROR_CHECKER_PRINTF VALGRIND_PRINTF
-# ifndef HAVE_VALGRIND_PRINTF_XML
-# define ERTS_ERROR_CHECKER_PRINTF_XML VALGRIND_PRINTF
-# else
-# define ERTS_ERROR_CHECKER_PRINTF_XML VALGRIND_PRINTF_XML
-# endif
+# define ERTS_ERROR_CHECKER_PRINTF VALGRIND_PRINTF
#endif
ErlDrvSizeT buf_size = 8*1024; /* Try with 8KB first */
char *buf = erts_alloc(ERTS_ALC_T_TMP, buf_size);
@@ -2224,12 +2207,7 @@ info_1_tuple(Process* BIF_P, /* Pointer to current process. */
ASSERT(r == buf_size - 1);
}
buf[buf_size - 1 - r] = '\0';
- if (check_if_xml()) {
- ERTS_ERROR_CHECKER_PRINTF_XML("<erlang_info_log>"
- "%s</erlang_info_log>\n", buf);
- } else {
- ERTS_ERROR_CHECKER_PRINTF("%s\n", buf);
- }
+ ERTS_ERROR_CHECKER_PRINTF("%s\n", buf);
erts_free(ERTS_ALC_T_TMP, (void *) buf);
BIF_RET(am_true);
#undef ERTS_ERROR_CHECKER_PRINTF
diff --git a/erts/emulator/beam/erl_db.c b/erts/emulator/beam/erl_db.c
index 36d83d93f4..c009a3bde8 100644
--- a/erts/emulator/beam/erl_db.c
+++ b/erts/emulator/beam/erl_db.c
@@ -3647,6 +3647,7 @@ send_ets_transfer_message(Process *c_p, Process *proc,
hd_copy = copy_struct(heir_data, hd_sz, &hp, ohp);
sender = c_p->common.id;
msg = TUPLE4(hp, am_ETS_TRANSFER, tid, sender, hd_copy);
+ ERL_MESSAGE_TOKEN(mp) = am_undefined;
erts_queue_proc_message(c_p, proc, *locks, mp, msg);
}
diff --git a/erts/emulator/beam/erl_db_hash.c b/erts/emulator/beam/erl_db_hash.c
index b988a19cf4..752d3ae3a8 100644
--- a/erts/emulator/beam/erl_db_hash.c
+++ b/erts/emulator/beam/erl_db_hash.c
@@ -150,6 +150,22 @@ static ERTS_INLINE Uint hash_to_ix(DbTableHash* tb, HashValue hval)
}
+static ERTS_INLINE FixedDeletion* alloc_fixdel(DbTableHash* tb)
+{
+ FixedDeletion* fixd = (FixedDeletion*) erts_db_alloc(ERTS_ALC_T_DB_FIX_DEL,
+ (DbTable *) tb,
+ sizeof(FixedDeletion));
+ ERTS_ETS_MISC_MEM_ADD(sizeof(FixedDeletion));
+ return fixd;
+}
+
+static ERTS_INLINE void free_fixdel(DbTableHash* tb, FixedDeletion* fixd)
+{
+ erts_db_free(ERTS_ALC_T_DB_FIX_DEL, (DbTable*)tb,
+ fixd, sizeof(FixedDeletion));
+ ERTS_ETS_MISC_MEM_ADD(-sizeof(FixedDeletion));
+}
+
static ERTS_INLINE int link_fixdel(DbTableHash* tb,
FixedDeletion* fixd,
erts_aint_t fixated_by_me)
@@ -160,8 +176,7 @@ static ERTS_INLINE int link_fixdel(DbTableHash* tb,
was_next = erts_atomic_read_acqb(&tb->fixdel);
do { /* Lockless atomic insertion in linked list: */
if (NFIXED(tb) <= fixated_by_me) {
- erts_db_free(ERTS_ALC_T_DB_FIX_DEL, (DbTable*)tb,
- fixd, sizeof(FixedDeletion));
+ free_fixdel(tb, fixd);
return 0; /* raced by unfixer */
}
exp_next = was_next;
@@ -180,10 +195,7 @@ static ERTS_INLINE int link_fixdel(DbTableHash* tb,
static int add_fixed_deletion(DbTableHash* tb, int ix,
erts_aint_t fixated_by_me)
{
- FixedDeletion* fixd = (FixedDeletion*) erts_db_alloc(ERTS_ALC_T_DB_FIX_DEL,
- (DbTable *) tb,
- sizeof(FixedDeletion));
- ERTS_ETS_MISC_MEM_ADD(sizeof(FixedDeletion));
+ FixedDeletion* fixd = alloc_fixdel(tb);
fixd->slot = ix;
fixd->all = 0;
return link_fixdel(tb, fixd, fixated_by_me);
@@ -637,11 +649,7 @@ restart:
free_me = fixdel;
fixdel = fixdel->next;
- erts_db_free(ERTS_ALC_T_DB_FIX_DEL,
- (DbTable *) tb,
- (void *) free_me,
- sizeof(FixedDeletion));
- ERTS_ETS_MISC_MEM_ADD(-sizeof(FixedDeletion));
+ free_fixdel(tb, free_me);
work++;
}
@@ -2338,11 +2346,10 @@ static SWord db_mark_all_deleted_hash(DbTable *tbl, SWord reds)
}
else {
/* First call */
- fixdel = erts_db_alloc(ERTS_ALC_T_DB_FIX_DEL,
- (DbTable *) tb,
- sizeof(FixedDeletion));
- ERTS_ETS_MISC_MEM_ADD(sizeof(FixedDeletion));
- link_fixdel(tb, fixdel, 0);
+ int ok;
+ fixdel = alloc_fixdel(tb);
+ ok = link_fixdel(tb, fixdel, 0);
+ ASSERT(ok); (void)ok;
i = 0;
}
@@ -2444,11 +2451,7 @@ static SWord db_free_table_continue_hash(DbTable *tbl, SWord reds)
FixedDeletion *fx = fixdel;
fixdel = fx->next;
- erts_db_free(ERTS_ALC_T_DB_FIX_DEL,
- (DbTable *) tb,
- (void *) fx,
- sizeof(FixedDeletion));
- ERTS_ETS_MISC_MEM_ADD(-sizeof(FixedDeletion));
+ free_fixdel(tb, fx);
if (--reds < 0) {
erts_atomic_set_relb(&tb->fixdel, (erts_aint_t)fixdel);
return reds; /* Not done */
diff --git a/erts/emulator/beam/erl_message.c b/erts/emulator/beam/erl_message.c
index 507cc989d2..a3274d7443 100644
--- a/erts/emulator/beam/erl_message.c
+++ b/erts/emulator/beam/erl_message.c
@@ -409,6 +409,11 @@ ErtsMessage* prepend_pending_sig_maybe(Process* sender, Process* receiver,
*
* @brief Send one message from *NOT* a local process.
*
+ * seq_trace does not work with this type of messages
+ * to it is set to am_undefined which means that the
+ * receiving process will not remove the seq_trace token
+ * when it gets this message.
+ *
*/
void
erts_queue_message(Process* receiver, ErtsProcLocks receiver_locks,
@@ -417,11 +422,19 @@ erts_queue_message(Process* receiver, ErtsProcLocks receiver_locks,
ASSERT(is_not_internal_pid(from));
ERL_MESSAGE_TERM(mp) = msg;
ERL_MESSAGE_FROM(mp) = from;
+ ERL_MESSAGE_TOKEN(mp) = am_undefined;
queue_messages(receiver, receiver_locks, mp, &mp->next, 1);
}
/**
* @brief Send one message from a local process.
+ *
+ * It is up to the caller of this function to set the
+ * correct seq_trace. The general rule of thumb is that
+ * it should be set to am_undefined if the message
+ * cannot be traced using seq_trace, if it can be
+ * traced it should be set to the trace token. It should
+ * very rarely be explicitly set to NIL!
*/
void
erts_queue_proc_message(Process* sender,
@@ -584,8 +597,7 @@ void
erts_send_message(Process* sender,
Process* receiver,
ErtsProcLocks *receiver_locks,
- Eterm message,
- unsigned flags)
+ Eterm message)
{
Uint msize;
ErtsMessage* mp;
@@ -619,7 +631,7 @@ erts_send_message(Process* sender,
receiver_state = erts_atomic32_read_nob(&receiver->state);
- if (SEQ_TRACE_TOKEN(sender) != NIL && !(flags & ERTS_SND_FLG_NO_SEQ_TRACE)) {
+ if (SEQ_TRACE_TOKEN(sender) != NIL) {
Eterm* hp;
Eterm stoken = SEQ_TRACE_TOKEN(sender);
Uint seq_trace_size = 0;
diff --git a/erts/emulator/beam/erl_message.h b/erts/emulator/beam/erl_message.h
index d120111634..b2550814fd 100644
--- a/erts/emulator/beam/erl_message.h
+++ b/erts/emulator/beam/erl_message.h
@@ -405,8 +405,6 @@ typedef struct erl_trace_message_queue__ {
#define SAVE_MESSAGE(p) \
(p)->sig_qs.save = &(*(p)->sig_qs.save)->next
-#define ERTS_SND_FLG_NO_SEQ_TRACE (((unsigned) 1) << 0)
-
#define ERTS_HEAP_FRAG_SIZE(DATA_WORDS) \
(sizeof(ErlHeapFragment) - sizeof(Eterm) + (DATA_WORDS)*sizeof(Eterm))
@@ -429,7 +427,7 @@ typedef struct erl_trace_message_queue__ {
do { \
(MP)->next = NULL; \
ERL_MESSAGE_TERM(MP) = THE_NON_VALUE; \
- ERL_MESSAGE_TOKEN(MP) = NIL; \
+ ERL_MESSAGE_TOKEN(MP) = THE_NON_VALUE; \
ERL_MESSAGE_FROM(MP) = NIL; \
ERL_MESSAGE_DT_UTAG_INIT(MP); \
MP->data.attached = NULL; \
@@ -446,7 +444,7 @@ void erts_queue_proc_message(Process* from,Process* to, ErtsProcLocks,ErtsMessag
void erts_queue_proc_messages(Process* from, Process* to, ErtsProcLocks,
ErtsMessage*, ErtsMessage**, Uint);
void erts_deliver_exit_message(Eterm, Process*, ErtsProcLocks *, Eterm, Eterm);
-void erts_send_message(Process*, Process*, ErtsProcLocks*, Eterm, unsigned);
+void erts_send_message(Process*, Process*, ErtsProcLocks*, Eterm);
void erts_link_mbuf_to_proc(Process *proc, ErlHeapFragment *bp);
Uint erts_msg_attached_data_size_aux(ErtsMessage *msg);
diff --git a/erts/emulator/beam/erl_nif.c b/erts/emulator/beam/erl_nif.c
index ef010ff476..ee6e6085b6 100644
--- a/erts/emulator/beam/erl_nif.c
+++ b/erts/emulator/beam/erl_nif.c
@@ -754,8 +754,58 @@ int enif_send(ErlNifEnv* env, const ErlNifPid* to_pid,
rp_locks = ERTS_PROC_LOCK_MAIN;
if (menv) {
+ Eterm token = c_p ? SEQ_TRACE_TOKEN(c_p) : am_undefined;
+ if (token != NIL && token != am_undefined) {
+ /* This code is copied from erts_send_message */
+ Eterm stoken = SEQ_TRACE_TOKEN(c_p);
+#ifdef USE_VM_PROBES
+ DTRACE_CHARBUF(sender_name, 64);
+ DTRACE_CHARBUF(receiver_name, 64);
+ Sint tok_label = 0;
+ Sint tok_lastcnt = 0;
+ Sint tok_serial = 0;
+ Eterm utag = NIL;
+ *sender_name = *receiver_name = '\0';
+ if (DTRACE_ENABLED(message_send)) {
+ erts_snprintf(sender_name, sizeof(DTRACE_CHARBUF_NAME(sender_name)),
+ "%T", c_p->common.id);
+ erts_snprintf(receiver_name, sizeof(DTRACE_CHARBUF_NAME(receiver_name)),
+ "%T", rp->common.id);
+ }
+#endif
+ if (have_seqtrace(stoken)) {
+ seq_trace_update_send(c_p);
+ seq_trace_output(stoken, msg, SEQ_TRACE_SEND,
+ rp->common.id, c_p);
+ }
+#ifdef USE_VM_PROBES
+ if (!(DT_UTAG_FLAGS(c_p) & DT_UTAG_SPREADING)) {
+ stoken = NIL;
+ }
+#endif
+ token = enif_make_copy(msg_env, stoken);
+
+#ifdef USE_VM_PROBES
+ if (DT_UTAG_FLAGS(c_p) & DT_UTAG_SPREADING) {
+ if (is_immed(DT_UTAG(c_p)))
+ utag = DT_UTAG(c_p);
+ else
+ utag = enif_make_copy(msg_env, DT_UTAG(c_p));
+ }
+ if (DTRACE_ENABLED(message_send)) {
+ if (have_seqtrace(stoken)) {
+ tok_label = SEQ_TRACE_T_DTRACE_LABEL(stoken);
+ tok_lastcnt = signed_val(SEQ_TRACE_T_LASTCNT(stoken));
+ tok_serial = signed_val(SEQ_TRACE_T_SERIAL(stoken));
+ }
+ DTRACE6(message_send, sender_name, receiver_name,
+ size_object(msg), tok_label, tok_lastcnt, tok_serial);
+ }
+#endif
+ }
flush_env(msg_env);
mp = erts_alloc_message(0, NULL);
+ ERL_MESSAGE_TOKEN(mp) = token;
mp->data.heap_frag = menv->env.heap_frag;
ASSERT(mp->data.heap_frag == MBUF(&menv->phony_proc));
if (mp->data.heap_frag != NULL) {
@@ -793,6 +843,7 @@ int enif_send(ErlNifEnv* env, const ErlNifPid* to_pid,
ohp = &bp->off_heap;
}
}
+ ERL_MESSAGE_TOKEN(mp) = am_undefined;
msg = copy_struct_litopt(msg, sz, &hp, ohp, &litarea);
}
@@ -836,6 +887,7 @@ int enif_send(ErlNifEnv* env, const ErlNifPid* to_pid,
ERL_MESSAGE_TERM(mp) = msg;
ERL_MESSAGE_FROM(mp) = from;
+ ERL_MESSAGE_TOKEN(mp) = am_undefined;
if (!msgq) {
msgq = erts_alloc(ERTS_ALC_T_TRACE_MSG_QUEUE,
diff --git a/erts/emulator/beam/erl_node_tables.c b/erts/emulator/beam/erl_node_tables.c
index f4a36d124a..f4dc60941a 100644
--- a/erts/emulator/beam/erl_node_tables.c
+++ b/erts/emulator/beam/erl_node_tables.c
@@ -370,31 +370,43 @@ DistEntry *erts_find_dist_entry(Eterm sysname)
}
DistEntry *
-erts_dhandle_to_dist_entry(Eterm dhandle)
+erts_dhandle_to_dist_entry(Eterm dhandle, Uint32 *conn_id)
{
+ Eterm *tpl;
Binary *bin;
- if (!is_internal_magic_ref(dhandle))
+
+ if (!is_boxed(dhandle))
+ return NULL;
+ tpl = boxed_val(dhandle);
+ if (tpl[0] != make_arityval(2) || !is_small(tpl[1])
+ || !is_internal_magic_ref(tpl[2]))
return NULL;
- bin = erts_magic_ref2bin(dhandle);
+ *conn_id = unsigned_val(tpl[1]);
+ bin = erts_magic_ref2bin(tpl[2]);
if (ERTS_MAGIC_BIN_DESTRUCTOR(bin) != erts_dist_entry_destructor)
return NULL;
return ErtsBin2DistEntry(bin);
}
Eterm
-erts_build_dhandle(Eterm **hpp, ErlOffHeap* ohp, DistEntry *dep)
+erts_build_dhandle(Eterm **hpp, ErlOffHeap* ohp,
+ DistEntry *dep, Uint32 conn_id)
{
Binary *bin = ErtsDistEntry2Bin(dep);
+ Eterm mref, dhandle;
ASSERT(bin);
ASSERT(ERTS_MAGIC_BIN_DESTRUCTOR(bin) == erts_dist_entry_destructor);
- return erts_mk_magic_ref(hpp, ohp, bin);
+ mref = erts_mk_magic_ref(hpp, ohp, bin);
+ dhandle = TUPLE2(*hpp, make_small(conn_id), mref);
+ *hpp += 3;
+ return dhandle;
}
Eterm
-erts_make_dhandle(Process *c_p, DistEntry *dep)
+erts_make_dhandle(Process *c_p, DistEntry *dep, Uint32 conn_id)
{
- Eterm *hp = HAlloc(c_p, ERTS_MAGIC_REF_THING_SIZE);
- return erts_build_dhandle(&hp, &c_p->off_heap, dep);
+ Eterm *hp = HAlloc(c_p, ERTS_DHANDLE_SIZE);
+ return erts_build_dhandle(&hp, &c_p->off_heap, dep, conn_id);
}
static void start_timer_delete_dist_entry(void *vdep);
@@ -627,7 +639,7 @@ erts_set_dist_entry_not_connected(DistEntry *dep)
if(dep->next)
dep->next->prev = dep->prev;
- dep->state = ERTS_DE_STATE_EXITING;
+ dep->state = ERTS_DE_STATE_IDLE;
dep->flags = 0;
dep->prev = NULL;
dep->cid = NIL;
@@ -1908,8 +1920,9 @@ setup_reference_table(void)
if (ohp)
insert_offheap(ohp, HEAP_REF, prt->common.id);
/* Insert controller */
- if (prt->dist_entry)
- insert_dist_entry(prt->dist_entry,
+ dep = (DistEntry*) erts_prtsd_get(prt, ERTS_PRTSD_DIST_ENTRY);
+ if (dep)
+ insert_dist_entry(dep,
CTRL_REF,
prt->common.id,
0);
diff --git a/erts/emulator/beam/erl_node_tables.h b/erts/emulator/beam/erl_node_tables.h
index 9a792b10b1..c44f1f8991 100644
--- a/erts/emulator/beam/erl_node_tables.h
+++ b/erts/emulator/beam/erl_node_tables.h
@@ -214,9 +214,10 @@ int erts_lc_is_de_rwlocked(DistEntry *);
int erts_lc_is_de_rlocked(DistEntry *);
#endif
int erts_dist_entry_destructor(Binary *bin);
-DistEntry *erts_dhandle_to_dist_entry(Eterm dhandle);
-Eterm erts_build_dhandle(Eterm **hpp, ErlOffHeap*, DistEntry*);
-Eterm erts_make_dhandle(Process *c_p, DistEntry *dep);
+DistEntry *erts_dhandle_to_dist_entry(Eterm dhandle, Uint32* connection_id);
+#define ERTS_DHANDLE_SIZE (3+ERTS_MAGIC_REF_THING_SIZE)
+Eterm erts_build_dhandle(Eterm **hpp, ErlOffHeap*, DistEntry*, Uint32 conn_id);
+Eterm erts_make_dhandle(Process *c_p, DistEntry*, Uint32 conn_id);
ERTS_GLB_INLINE void erts_deref_node_entry(ErlNode *np);
ERTS_GLB_INLINE void erts_de_rlock(DistEntry *dep);
diff --git a/erts/emulator/beam/erl_port.h b/erts/emulator/beam/erl_port.h
index 9b52b648e5..2be0a5bf74 100644
--- a/erts/emulator/beam/erl_port.h
+++ b/erts/emulator/beam/erl_port.h
@@ -112,8 +112,10 @@ typedef struct line_buf { /* Buffer used in line oriented I/O */
*/
#define ERTS_PRTSD_SCHED_ID 0
+#define ERTS_PRTSD_DIST_ENTRY 1
+#define ERTS_PRTSD_CONN_ID 2
-#define ERTS_PRTSD_SIZE 1
+#define ERTS_PRTSD_SIZE 3
typedef struct {
void *data[ERTS_PRTSD_SIZE];
@@ -154,7 +156,6 @@ struct _erl_drv_port {
Uint bytes_out; /* Number of bytes written */
ErlPortIOQueue ioq; /* driver accessible i/o queue */
- DistEntry *dist_entry; /* Dist entry used in DISTRIBUTION */
char *name; /* String used in the open */
erts_driver_t* drv_ptr;
UWord drv_data;
@@ -257,6 +258,8 @@ ERTS_GLB_INLINE void *
erts_prtsd_get(Port *prt, int ix)
{
ErtsPrtSD *psd = (ErtsPrtSD *) erts_atomic_read_nob(&prt->psd);
+
+ ASSERT((unsigned)ix < ERTS_PRTSD_SIZE);
if (!psd)
return NULL;
ERTS_THR_DATA_DEPENDENCY_READ_MEMORY_BARRIER;
@@ -272,6 +275,7 @@ erts_prtsd_set(Port *prt, int ix, void *data)
psd = (ErtsPrtSD *) erts_atomic_read_nob(&prt->psd);
+ ASSERT((unsigned)ix < ERTS_PRTSD_SIZE);
if (psd) {
#ifdef ETHR_ORDERED_READ_DEPEND
ETHR_MEMBAR(ETHR_LoadStore|ETHR_StoreStore);
@@ -459,7 +463,7 @@ erts_port_unlock(Port *prt)
ERTS_INVALID_PORT_OPT((PP), (ID), ERTS_PORT_SFLGS_INVALID_TRACER_LOOKUP)
#define ERTS_PORT_SCHED_ID(P, ID) \
- ((Uint) (UWord) erts_prtsd_set((P), ERTS_PSD_SCHED_ID, (void *) (UWord) (ID)))
+ ((Uint) (UWord) erts_prtsd_set((P), ERTS_PRTSD_SCHED_ID, (void *) (UWord) (ID)))
extern const Port erts_invalid_port;
#define ERTS_PORT_LOCK_BUSY ((Port *) &erts_invalid_port)
diff --git a/erts/emulator/beam/erl_proc_sig_queue.c b/erts/emulator/beam/erl_proc_sig_queue.c
index 6af1236145..f343e984f7 100644
--- a/erts/emulator/beam/erl_proc_sig_queue.c
+++ b/erts/emulator/beam/erl_proc_sig_queue.c
@@ -52,8 +52,8 @@
#define ERTS_SIG_Q_OP_MAX 13
-#define ERTS_SIG_Q_OP_EXIT 0
-#define ERTS_SIG_Q_OP_EXIT_LINKED 1
+#define ERTS_SIG_Q_OP_EXIT 0 /* Exit signal due to bif call */
+#define ERTS_SIG_Q_OP_EXIT_LINKED 1 /* Exit signal due to link break*/
#define ERTS_SIG_Q_OP_MONITOR_DOWN 2
#define ERTS_SIG_Q_OP_MONITOR 3
#define ERTS_SIG_Q_OP_DEMONITOR 4
@@ -1167,10 +1167,7 @@ erts_proc_sig_send_persistent_monitor_msg(Uint16 type, Eterm key,
ERL_MESSAGE_TERM(mp) = ERTS_PROC_SIG_MAKE_TAG(ERTS_SIG_Q_OP_PERSISTENT_MON_MSG,
type, 0);
ERL_MESSAGE_FROM(mp) = from;
- ERL_MESSAGE_TOKEN(mp) = NIL;
-#ifdef USE_VM_PROBES
- ERL_MESSAGE_DT_UTAG(mp) = NIL;
-#endif
+ ERL_MESSAGE_TOKEN(mp) = am_undefined;
if (!proc_queue_signal(NULL, to, (ErtsSignal *) mp,
ERTS_SIG_Q_OP_PERSISTENT_MON_MSG)) {
@@ -1564,11 +1561,6 @@ erts_proc_sig_send_is_alive_request(Process *c_p, Eterm to, Eterm ref)
ERL_MESSAGE_TERM(mp) = ERTS_PROC_SIG_MAKE_TAG(ERTS_SIG_Q_OP_IS_ALIVE,
ERTS_SIG_Q_TYPE_UNDEFINED,
0);
- ERL_MESSAGE_TOKEN(mp) = NIL;
- ERL_MESSAGE_FROM(mp) = am_system;
-#ifdef USE_VM_PROBES
- ERL_MESSAGE_DT_UTAG(mp) = NIL;
-#endif
if (proc_queue_signal(c_p, to, (ErtsSignal *) mp, ERTS_SIG_Q_OP_IS_ALIVE))
(void) maybe_elevate_sig_handling_prio(c_p, to);
@@ -1672,11 +1664,6 @@ erts_proc_sig_send_sync_suspend(Process *c_p, Eterm to, Eterm tag, Eterm reply)
ERL_MESSAGE_TERM(mp) = ERTS_PROC_SIG_MAKE_TAG(ERTS_SIG_Q_OP_SYNC_SUSPEND,
ERTS_SIG_Q_TYPE_UNDEFINED,
0);
- ERL_MESSAGE_TOKEN(mp) = NIL;
- ERL_MESSAGE_FROM(mp) = am_system;
-#ifdef USE_VM_PROBES
- ERL_MESSAGE_DT_UTAG(mp) = NIL;
-#endif
if (proc_queue_signal(c_p, to, (ErtsSignal *) mp, ERTS_SIG_Q_OP_SYNC_SUSPEND))
(void) maybe_elevate_sig_handling_prio(c_p, to);
@@ -1790,7 +1777,7 @@ handle_rpc(Process *c_p, ErtsProcSigRPC *rpc, int cnt, int limit, int *yieldp)
msg = TUPLE2(hp, ref, res);
mp->hfrag.next = bp;
-
+ ERL_MESSAGE_TOKEN(mp) = am_undefined;
erts_queue_proc_message(c_p, rp, 0, mp, msg);
}
@@ -2155,10 +2142,7 @@ handle_exit_signal(Process *c_p, ErtsSigRecvTracing *tracing,
pid = STORE_NC(&hp, ohp, from);
ERL_MESSAGE_TERM(mp) = TUPLE3(hp, am_EXIT, pid, reason);
- ERL_MESSAGE_TOKEN(mp) = NIL;
-#ifdef USE_VM_PROBES
- ERL_MESSAGE_DT_UTAG(mp) = NIL;
-#endif
+ ERL_MESSAGE_TOKEN(mp) = am_undefined;
if (is_immed(pid))
ERL_MESSAGE_FROM(mp) = pid;
else {
@@ -2346,10 +2330,7 @@ convert_to_down_message(Process *c_p,
type, from, reason);
hp += 6;
- ERL_MESSAGE_TOKEN(mp) = NIL;
-#ifdef USE_VM_PROBES
- ERL_MESSAGE_DT_UTAG(mp) = NIL;
-#endif
+ ERL_MESSAGE_TOKEN(mp) = am_undefined;
/* Replace original signal with the exit message... */
convert_to_msg(c_p, sig, mp, next_nm_sig);
@@ -2397,10 +2378,7 @@ convert_to_nodedown_messages(Process *c_p,
ERL_MESSAGE_TERM(mp) = TUPLE2(hp, am_nodedown, node);
ERL_MESSAGE_FROM(mp) = am_system;
- ERL_MESSAGE_TOKEN(mp) = NIL;
-#ifdef USE_VM_PROBES
- ERL_MESSAGE_DT_UTAG(mp) = NIL;
-#endif
+ ERL_MESSAGE_TOKEN(mp) = am_undefined;
mp->next = nd_first;
nd_first = mp;
if (!nd_last)
@@ -2830,6 +2808,7 @@ handle_process_info(Process *c_p, ErtsSigRecvTracing *tracing,
if (is_alive)
erts_factory_trim_and_close(&hfact, &msg, 1);
+ ERL_MESSAGE_TOKEN(mp) = am_undefined;
erts_queue_proc_message(c_p, rp, locks, mp, msg);
if (!is_alive && locks)
@@ -2931,6 +2910,7 @@ sync_suspend_reply(Process *c_p, ErtsMessage *mp, erts_aint32_t state)
tp[2] = ssusp->async ? am_not_suspended : am_internal_error;
}
}
+ ERL_MESSAGE_TOKEN(mp) = am_undefined;
erts_queue_proc_message(c_p, rp, 0, mp, ssusp->message);
}
}
diff --git a/erts/emulator/beam/erl_process.c b/erts/emulator/beam/erl_process.c
index d5bd17ff3e..0f7f1598fd 100644
--- a/erts/emulator/beam/erl_process.c
+++ b/erts/emulator/beam/erl_process.c
@@ -9897,6 +9897,7 @@ notify_sys_task_executed(Process *c_p, ErtsProcSysTask *st,
ASSERT(hp_start + hsz == hp);
#endif
+ ERL_MESSAGE_TOKEN(mp) = am_undefined;
erts_queue_proc_message(c_p, rp, rp_locks, mp, msg);
if (c_p == rp)
diff --git a/erts/emulator/beam/erl_process.h b/erts/emulator/beam/erl_process.h
index a60e117bab..8d20ccdf90 100644
--- a/erts/emulator/beam/erl_process.h
+++ b/erts/emulator/beam/erl_process.h
@@ -293,7 +293,7 @@ typedef enum {
* highest index...
*
* Remember to update description in erts_pre_init_process()
- * when adding new flags...
+ * and etp-commands when adding new flags...
*/
typedef enum {
diff --git a/erts/emulator/beam/external.c b/erts/emulator/beam/external.c
index 904993ceb6..621ba108ba 100644
--- a/erts/emulator/beam/external.c
+++ b/erts/emulator/beam/external.c
@@ -671,30 +671,37 @@ erts_prepare_dist_ext(ErtsDistExternal *edep,
byte *ext,
Uint size,
DistEntry *dep,
- ErtsAtomCache *cache,
- Uint32 *connection_id)
+ Uint32 conn_id,
+ ErtsAtomCache *cache)
{
-#undef ERTS_EXT_FAIL
-#undef ERTS_EXT_HDR_FAIL
-#if 1
-#define ERTS_EXT_FAIL goto fail
-#define ERTS_EXT_HDR_FAIL goto bad_hdr
-#else
-#define ERTS_EXT_FAIL abort()
-#define ERTS_EXT_HDR_FAIL abort()
-#endif
+ register byte *ep;
+
+ edep->heap_size = -1;
+ edep->flags = 0;
+ edep->dep = dep;
+
+ ASSERT(dep);
+ erts_de_rlock(dep);
- register byte *ep = ext;
ASSERT(dep->flags & DFLAG_UTF8_ATOMS);
- edep->heap_size = -1;
- edep->ext_endp = ext+size;
+ if ((dep->state != ERTS_DE_STATE_CONNECTED &&
+ dep->state != ERTS_DE_STATE_PENDING)
+ || dep->connection_id != conn_id) {
+ erts_de_runlock(dep);
+ return ERTS_PREP_DIST_EXT_CLOSED;
+ }
- if (size < 2)
- ERTS_EXT_FAIL;
+ if (!(dep->flags & DFLAG_DIST_HDR_ATOM_CACHE)) {
+ /* Skip PASS_THROUGH */
+ ext++;
+ size--;
+ }
+ edep->ext_endp = ext + size;
+ ep = ext;
- if (!dep)
- ERTS_INTERNAL_ERROR("Invalid use");
+ if (size < 2)
+ goto fail;
if (ep[0] != VERSION_MAGIC) {
erts_dsprintf_buf_t *dsbufp = erts_create_logger_dsbuf();
@@ -703,28 +710,17 @@ erts_prepare_dist_ext(ErtsDistExternal *edep,
"channel %d\n",
dist_entry_channel_no(dep));
erts_send_error_to_logger_nogl(dsbufp);
- ERTS_EXT_FAIL;
+ goto fail;
}
- edep->flags = 0;
- edep->dep = dep;
-
- erts_de_rlock(dep);
-
- if (dep->state != ERTS_DE_STATE_CONNECTED &&
- dep->state != ERTS_DE_STATE_PENDING) {
- erts_de_runlock(dep);
- return ERTS_PREP_DIST_EXT_CLOSED;
- }
if (dep->flags & DFLAG_DIST_HDR_ATOM_CACHE)
edep->flags |= ERTS_DIST_EXT_DFLAG_HDR;
- *connection_id = dep->connection_id;
edep->connection_id = dep->connection_id;
if (ep[1] != DIST_HEADER) {
if (edep->flags & ERTS_DIST_EXT_DFLAG_HDR)
- ERTS_EXT_HDR_FAIL;
+ goto bad_hdr;
edep->attab.size = 0;
edep->extp = ext;
}
@@ -733,17 +729,17 @@ erts_prepare_dist_ext(ErtsDistExternal *edep,
int no_atoms;
if (!(edep->flags & ERTS_DIST_EXT_DFLAG_HDR))
- ERTS_EXT_HDR_FAIL;
+ goto bad_hdr;
#undef CHKSIZE
#define CHKSIZE(SZ) \
- do { if ((SZ) > edep->ext_endp - ep) ERTS_EXT_HDR_FAIL; } while(0)
+ do { if ((SZ) > edep->ext_endp - ep) goto bad_hdr; } while(0)
CHKSIZE(1+1+1);
ep += 2;
no_atoms = (int) get_int8(ep);
if (no_atoms < 0 || ERTS_ATOM_CACHE_SIZE < no_atoms)
- ERTS_EXT_HDR_FAIL;
+ goto bad_hdr;
ep++;
if (no_atoms) {
int long_atoms = 0;
@@ -821,18 +817,18 @@ erts_prepare_dist_ext(ErtsDistExternal *edep,
/* atom already cached */
cix += (int) get_int8(ep);
if (cix >= ERTS_ATOM_CACHE_SIZE)
- ERTS_EXT_HDR_FAIL;
+ goto bad_hdr;
ep++;
atom = cache->in_arr[cix];
if (!is_atom(atom))
- ERTS_EXT_HDR_FAIL;
+ goto bad_hdr;
edep->attab.atom[tix] = atom;
}
else {
/* new cached atom */
cix += (int) get_int8(ep);
if (cix >= ERTS_ATOM_CACHE_SIZE)
- ERTS_EXT_HDR_FAIL;
+ goto bad_hdr;
ep++;
if (long_atoms) {
CHKSIZE(2);
@@ -850,7 +846,7 @@ erts_prepare_dist_ext(ErtsDistExternal *edep,
ERTS_ATOM_ENC_UTF8,
0);
if (is_non_value(atom))
- ERTS_EXT_HDR_FAIL;
+ goto bad_hdr;
ep += len;
cache->in_arr[cix] = atom;
edep->attab.atom[tix] = atom;
@@ -870,12 +866,12 @@ erts_prepare_dist_ext(ErtsDistExternal *edep,
edep->extp = ep;
#ifdef ERTS_DEBUG_USE_DIST_SEP
if (*ep != VERSION_MAGIC)
- ERTS_EXT_HDR_FAIL;
+ goto bad_hdr;
#endif
}
#ifdef ERTS_DEBUG_USE_DIST_SEP
if (*ep != VERSION_MAGIC)
- ERTS_EXT_FAIL;
+ goto fail;
#endif
erts_de_runlock(dep);
@@ -883,8 +879,6 @@ erts_prepare_dist_ext(ErtsDistExternal *edep,
return ERTS_PREP_DIST_EXT_SUCCESS;
#undef CHKSIZE
-#undef ERTS_EXT_FAIL
-#undef ERTS_EXT_HDR_FAIL
bad_hdr: {
erts_dsprintf_buf_t *dsbufp = erts_create_logger_dsbuf();
@@ -901,7 +895,7 @@ erts_prepare_dist_ext(ErtsDistExternal *edep,
}
fail: {
erts_de_runlock(dep);
- erts_kill_dist_connection(dep, *connection_id);
+ erts_kill_dist_connection(dep, conn_id);
}
return ERTS_PREP_DIST_EXT_FAILED;
}
diff --git a/erts/emulator/beam/external.h b/erts/emulator/beam/external.h
index bbd9b4bad2..edac177cc6 100644
--- a/erts/emulator/beam/external.h
+++ b/erts/emulator/beam/external.h
@@ -182,7 +182,7 @@ void erts_destroy_dist_ext_copy(ErtsDistExternal *);
#define ERTS_PREP_DIST_EXT_CLOSED (1)
int erts_prepare_dist_ext(ErtsDistExternal *, byte *, Uint,
- DistEntry *, ErtsAtomCache *, Uint32 *);
+ DistEntry *, Uint32 conn_id, ErtsAtomCache *);
Sint erts_decode_dist_ext_size(ErtsDistExternal *);
Eterm erts_decode_dist_ext(ErtsHeapFactory* factory, ErtsDistExternal *);
diff --git a/erts/emulator/beam/global.h b/erts/emulator/beam/global.h
index 2cf268162d..21ae205237 100644
--- a/erts/emulator/beam/global.h
+++ b/erts/emulator/beam/global.h
@@ -1083,7 +1083,7 @@ extern int erts_do_net_exits(DistEntry*, Eterm);
extern int distribution_info(fmtfn_t, void *);
extern int is_node_name_atom(Eterm a);
-extern int erts_net_message(Port *, DistEntry *,
+extern int erts_net_message(Port *, DistEntry *, Uint32 conn_id,
byte *, ErlDrvSizeT, byte *, ErlDrvSizeT);
extern void init_dist(void);
diff --git a/erts/emulator/beam/io.c b/erts/emulator/beam/io.c
index 2446b3c074..5325480901 100644
--- a/erts/emulator/beam/io.c
+++ b/erts/emulator/beam/io.c
@@ -375,7 +375,6 @@ static Port *create_port(char *name,
prt->control_flags = 0;
prt->bytes_in = 0;
prt->bytes_out = 0;
- prt->dist_entry = NULL;
ERTS_PORT_INIT_CONNECTED(prt, pid);
prt->common.u.alive.reg = NULL;
ERTS_PTMR_INIT(prt);
@@ -3287,7 +3286,6 @@ static void deliver_read_message(Port* prt, erts_aint32_t state, Eterm to,
if (trace_send)
trace_port_send(prt, to, tuple, 1);
- ERL_MESSAGE_TOKEN(mp) = am_undefined;
erts_queue_message(rp, rp_locks, mp, tuple, prt->common.id);
if (rp_locks)
erts_proc_unlock(rp, rp_locks);
@@ -3459,7 +3457,6 @@ deliver_vec_message(Port* prt, /* Port */
if (IS_TRACED_FL(prt, F_TRACE_SEND))
trace_port_send(prt, to, tuple, 1);
- ERL_MESSAGE_TOKEN(mp) = am_undefined;
erts_queue_message(rp, rp_locks, mp, tuple, prt->common.id);
erts_proc_unlock(rp, rp_locks);
if (!scheduler)
@@ -3615,12 +3612,12 @@ terminate_port(Port *prt)
erts_cleanup_port_data(prt);
+ ASSERT(erts_prtsd_get(prt, ERTS_PRTSD_DIST_ENTRY) == NULL);
+
psd = (ErtsPrtSD *) erts_atomic_read_nob(&prt->psd);
if (psd)
erts_free(ERTS_ALC_T_PRTSD, psd);
- ASSERT(prt->dist_entry == NULL);
-
kill_port(prt);
/*
@@ -3761,10 +3758,12 @@ erts_deliver_port_exit(Port *prt, Eterm from, Eterm reason, int send_closed,
DRV_MONITOR_UNLOCK_PDL(prt);
}
- if ((state & ERTS_PORT_SFLG_DISTRIBUTION) && prt->dist_entry) {
- erts_do_net_exits(prt->dist_entry, modified_reason);
- erts_deref_dist_entry(prt->dist_entry);
- prt->dist_entry = NULL;
+ if (state & ERTS_PORT_SFLG_DISTRIBUTION) {
+ DistEntry *dep = (DistEntry*) erts_prtsd_get(prt, ERTS_PRTSD_DIST_ENTRY);
+ ASSERT(dep);
+ erts_do_net_exits(dep, modified_reason);
+ erts_deref_dist_entry(dep);
+ erts_prtsd_set(prt, ERTS_PRTSD_DIST_ENTRY, NULL);
erts_atomic32_read_band_relb(&prt->state,
~ERTS_PORT_SFLG_DISTRIBUTION);
}
@@ -5052,7 +5051,7 @@ set_busy_port(ErlDrvPort dprt, int on)
DTRACE1(port_not_busy, port_str);
}
#endif
- if (prt->dist_entry) {
+ if (erts_prtsd_get(prt, ERTS_PRTSD_DIST_ENTRY) != NULL) {
/*
* Processes suspended on distribution ports are
* normally queued on the dist entry.
@@ -5382,7 +5381,6 @@ void driver_report_exit(ErlDrvPort ix, int status)
if (IS_TRACED_FL(prt, F_TRACE_SEND))
trace_port_send(prt, pid, tuple, 1);
- ERL_MESSAGE_TOKEN(mp) = am_undefined;
erts_queue_message(rp, rp_locks, mp, tuple, prt->common.id);
erts_proc_unlock(rp, rp_locks);
@@ -5988,8 +5986,6 @@ driver_deliver_term(Port *prt, Eterm to, ErlDrvTermData* data, int len)
from = prt->common.id;
}
- /* send message */
- ERL_MESSAGE_TOKEN(factory.message) = am_undefined;
erts_queue_message(rp, rp_locks, factory.message, mess, from);
}
else if (res == -2) {
@@ -6174,9 +6170,12 @@ int driver_output_binary(ErlDrvPort ix, char* hbuf, ErlDrvSizeT hlen,
else
erts_atomic64_add_nob(&bytes_in, (erts_aint64_t) (hlen + len));
if (state & ERTS_PORT_SFLG_DISTRIBUTION) {
- erts_atomic64_inc_nob(&prt->dist_entry->in);
+ DistEntry* dep = (DistEntry*) erts_prtsd_get(prt, ERTS_PRTSD_DIST_ENTRY);
+ Uint32 conn_id = (Uint32)(UWord) erts_prtsd_get(prt, ERTS_PRTSD_CONN_ID);
+ erts_atomic64_inc_nob(&dep->in);
return erts_net_message(prt,
- prt->dist_entry,
+ dep,
+ conn_id,
(byte*) hbuf, hlen,
(byte*) (bin->orig_bytes+offs), len);
}
@@ -6215,15 +6214,19 @@ int driver_output2(ErlDrvPort ix, char* hbuf, ErlDrvSizeT hlen,
else
erts_atomic64_add_nob(&bytes_in, (erts_aint64_t) (hlen + len));
if (state & ERTS_PORT_SFLG_DISTRIBUTION) {
- erts_atomic64_inc_nob(&prt->dist_entry->in);
+ DistEntry *dep = (DistEntry*) erts_prtsd_get(prt, ERTS_PRTSD_DIST_ENTRY);
+ Uint32 conn_id = (Uint32)(UWord) erts_prtsd_get(prt, ERTS_PRTSD_CONN_ID);
+ erts_atomic64_inc_nob(&dep->in);
if (len == 0)
return erts_net_message(prt,
- prt->dist_entry,
+ dep,
+ conn_id,
NULL, 0,
(byte*) hbuf, hlen);
else
return erts_net_message(prt,
- prt->dist_entry,
+ dep,
+ conn_id,
(byte*) hbuf, hlen,
(byte*) buf, len);
}
diff --git a/erts/emulator/beam/ops.tab b/erts/emulator/beam/ops.tab
index c51e4ef784..e76d896ffc 100644
--- a/erts/emulator/beam/ops.tab
+++ b/erts/emulator/beam/ops.tab
@@ -244,7 +244,7 @@ if_end
# Optimize for that case.
raise x==2 x==1 => i_raise
raise Trace=y Value=y => move Trace x=2 | move Value x=1 | i_raise
-raise Trace Value => move Trace x=3 | move Value x=1 | move x=3 x=2 | i_raise
+raise Trace Value => move Trace x | move Value x=1 | move x x=2 | i_raise
i_raise