aboutsummaryrefslogtreecommitdiffstats
path: root/erts/emulator/beam/dist.c
diff options
context:
space:
mode:
Diffstat (limited to 'erts/emulator/beam/dist.c')
-rw-r--r--erts/emulator/beam/dist.c533
1 files changed, 311 insertions, 222 deletions
diff --git a/erts/emulator/beam/dist.c b/erts/emulator/beam/dist.c
index ec07ddcd9c..0bbcc5f966 100644
--- a/erts/emulator/beam/dist.c
+++ b/erts/emulator/beam/dist.c
@@ -1,18 +1,19 @@
/*
* %CopyrightBegin%
*
- * Copyright Ericsson AB 1996-2013. All Rights Reserved.
+ * Copyright Ericsson AB 1996-2014. All Rights Reserved.
*
- * The contents of this file are subject to the Erlang Public License,
- * Version 1.1, (the "License"); you may not use this file except in
- * compliance with the License. You should have received a copy of the
- * Erlang Public License along with this software. If not, it can be
- * retrieved online at http://www.erlang.org/.
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
*
- * Software distributed under the License is distributed on an "AS IS"
- * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
- * the License for the specific language governing rights and limitations
- * under the License.
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*
* %CopyrightEnd%
*/
@@ -44,6 +45,8 @@
#include "erl_thr_progress.h"
#include "dtrace-wrapper.h"
+#define DIST_CTL_DEFAULT_SIZE 64
+
/* Turn this on to get printouts of all distribution messages
* which go on the line
*/
@@ -65,9 +68,13 @@ static void bw(byte *buf, ErlDrvSizeT sz)
static void
dist_msg_dbg(ErtsDistExternal *edep, char *what, byte *buf, int sz)
{
+ ErtsHeapFactory factory;
+ DeclareTmpHeapNoproc(ctl_default,DIST_CTL_DEFAULT_SIZE);
+ Eterm* ctl = ctl_default;
byte *extp = edep->extp;
Eterm msg;
- Sint size = erts_decode_dist_ext_size(edep);
+ Sint ctl_len;
+ Sint size = ctl_len = erts_decode_dist_ext_size(edep);
if (size < 0) {
erts_fprintf(stderr,
"DIST MSG DEBUG: erts_decode_dist_ext_size(%s) failed:\n",
@@ -75,10 +82,9 @@ dist_msg_dbg(ErtsDistExternal *edep, char *what, byte *buf, int sz)
bw(buf, sz);
}
else {
- Eterm *hp;
ErlHeapFragment *mbuf = new_message_buffer(size);
- hp = mbuf->mem;
- msg = erts_decode_dist_ext(&hp, &mbuf->off_heap, edep);
+ erts_factory_static_init(&factory, ctl, ctl_len, &mbuf->off_heap);
+ msg = erts_decode_dist_ext(&factory, edep);
if (is_value(msg))
erts_fprintf(stderr, " %s: %T\n", what, msg);
else {
@@ -119,7 +125,7 @@ Export* dmonitor_p_trap = NULL;
/* forward declarations */
static void clear_dist_entry(DistEntry*);
-static int dsig_send(ErtsDSigData *, Eterm, Eterm, int);
+static int dsig_send_ctl(ErtsDSigData* dsdp, Eterm ctl, int force_busy);
static void send_nodes_mon_msgs(Process *, Eterm, Eterm, Eterm, Eterm);
static void init_nodes_monitors(void);
@@ -388,11 +394,7 @@ static void doit_node_link_net_exits(ErtsLink *lnk, void *vnecp)
Eterm tup;
Eterm *hp = erts_alloc_message_heap(3,&bp,&ohp,rp,&rp_locks);
tup = TUPLE2(hp, am_nodedown, name);
- erts_queue_message(rp, &rp_locks, bp, tup, NIL
-#ifdef USE_VM_PROBES
- , NIL
-#endif
- );
+ erts_queue_message(rp, &rp_locks, bp, tup, NIL);
}
erts_smp_proc_unlock(rp, rp_locks);
}
@@ -622,9 +624,7 @@ alloc_dist_obuf(Uint size)
ErtsDistOutputBuf *obuf;
Uint obuf_size = sizeof(ErtsDistOutputBuf)+sizeof(byte)*(size-1);
Binary *bin = erts_bin_drv_alloc(obuf_size);
- bin->flags = BIN_FLAG_DRV;
erts_refc_init(&bin->refc, 1);
- bin->orig_size = (SWord) obuf_size;
obuf = (ErtsDistOutputBuf *) &bin->orig_bytes[0];
#ifdef DEBUG
obuf->dbg_pattern = ERTS_DIST_OUTPUT_BUF_DBG_PATTERN;
@@ -709,6 +709,55 @@ static void clear_dist_entry(DistEntry *dep)
}
}
+void erts_dsend_context_dtor(Binary* ctx_bin)
+{
+ ErtsSendContext* ctx = ERTS_MAGIC_BIN_DATA(ctx_bin);
+ switch (ctx->dss.phase) {
+ case ERTS_DSIG_SEND_PHASE_MSG_SIZE:
+ DESTROY_SAVED_WSTACK(&ctx->dss.u.sc.wstack);
+ break;
+ case ERTS_DSIG_SEND_PHASE_MSG_ENCODE:
+ DESTROY_SAVED_WSTACK(&ctx->dss.u.ec.wstack);
+ break;
+ default:;
+ }
+ if (ctx->dss.phase >= ERTS_DSIG_SEND_PHASE_ALLOC && ctx->dss.obuf) {
+ free_dist_obuf(ctx->dss.obuf);
+ }
+ if (ctx->dep_to_deref)
+ erts_deref_dist_entry(ctx->dep_to_deref);
+}
+
+Eterm erts_dsend_export_trap_context(Process* p, ErtsSendContext* ctx)
+{
+ struct exported_ctx {
+ ErtsSendContext ctx;
+ ErtsAtomCacheMap acm;
+ };
+ Binary* ctx_bin = erts_create_magic_binary(sizeof(struct exported_ctx),
+ erts_dsend_context_dtor);
+ struct exported_ctx* dst = ERTS_MAGIC_BIN_DATA(ctx_bin);
+ Uint ctl_size = !HALFWORD_HEAP ? 0 : (arityval(ctx->ctl_heap[0]) + 1);
+ Eterm* hp = HAlloc(p, ctl_size + PROC_BIN_SIZE);
+
+ sys_memcpy(&dst->ctx, ctx, sizeof(ErtsSendContext));
+ ASSERT(ctx->dss.ctl == make_tuple(ctx->ctl_heap));
+#if !HALFWORD_HEAP
+ dst->ctx.dss.ctl = make_tuple(dst->ctx.ctl_heap);
+#else
+ /* Must put control tuple in low mem */
+ sys_memcpy(hp, ctx->ctl_heap, ctl_size*sizeof(Eterm));
+ dst->ctx.dss.ctl = make_tuple(hp);
+ hp += ctl_size;
+#endif
+ if (ctx->dss.acmp) {
+ sys_memcpy(&dst->acm, ctx->dss.acmp, sizeof(ErtsAtomCacheMap));
+ dst->ctx.dss.acmp = &dst->acm;
+ }
+ return erts_mk_magic_binary_term(&hp, &MSO(p), ctx_bin);
+}
+
+
/*
* The erts_dsig_send_*() functions implemented below, sends asynchronous
* distributed signals to other Erlang nodes. Before sending a distributed
@@ -731,7 +780,7 @@ erts_dsig_send_link(ErtsDSigData *dsdp, Eterm local, Eterm remote)
int res;
UseTmpHeapNoproc(4);
- res = dsig_send(dsdp, ctl, THE_NON_VALUE, 0);
+ res = dsig_send_ctl(dsdp, ctl, 0);
UnUseTmpHeapNoproc(4);
return res;
}
@@ -744,7 +793,7 @@ erts_dsig_send_unlink(ErtsDSigData *dsdp, Eterm local, Eterm remote)
int res;
UseTmpHeapNoproc(4);
- res = dsig_send(dsdp, ctl, THE_NON_VALUE, 0);
+ res = dsig_send_ctl(dsdp, ctl, 0);
UnUseTmpHeapNoproc(4);
return res;
}
@@ -772,7 +821,7 @@ erts_dsig_send_m_exit(ErtsDSigData *dsdp, Eterm watcher, Eterm watched,
erts_smp_de_links_unlock(dsdp->dep);
#endif
- res = dsig_send(dsdp, ctl, THE_NON_VALUE, 1);
+ res = dsig_send_ctl(dsdp, ctl, 1);
UnUseTmpHeapNoproc(6);
return res;
}
@@ -793,7 +842,7 @@ erts_dsig_send_monitor(ErtsDSigData *dsdp, Eterm watcher, Eterm watched,
make_small(DOP_MONITOR_P),
watcher, watched, ref);
- res = dsig_send(dsdp, ctl, THE_NON_VALUE, 0);
+ res = dsig_send_ctl(dsdp, ctl, 0);
UnUseTmpHeapNoproc(5);
return res;
}
@@ -815,18 +864,17 @@ erts_dsig_send_demonitor(ErtsDSigData *dsdp, Eterm watcher,
make_small(DOP_DEMONITOR_P),
watcher, watched, ref);
- res = dsig_send(dsdp, ctl, THE_NON_VALUE, force);
+ res = dsig_send_ctl(dsdp, ctl, force);
UnUseTmpHeapNoproc(5);
return res;
}
int
-erts_dsig_send_msg(ErtsDSigData *dsdp, Eterm remote, Eterm message)
+erts_dsig_send_msg(Eterm remote, Eterm message, ErtsSendContext* ctx)
{
Eterm ctl;
- DeclareTmpHeapNoproc(ctl_heap,5);
Eterm token = NIL;
- Process *sender = dsdp->proc;
+ Process *sender = ctx->dsd.proc;
int res;
#ifdef USE_VM_PROBES
Sint tok_label = 0;
@@ -838,8 +886,7 @@ erts_dsig_send_msg(ErtsDSigData *dsdp, Eterm remote, Eterm message)
DTRACE_CHARBUF(receiver_name, 64);
#endif
- UseTmpHeapNoproc(5);
- if (SEQ_TRACE_TOKEN(sender) != NIL
+ if (SEQ_TRACE_TOKEN(sender) != NIL
#ifdef USE_VM_PROBES
&& SEQ_TRACE_TOKEN(sender) != am_have_dt_utag
#endif
@@ -852,7 +899,7 @@ erts_dsig_send_msg(ErtsDSigData *dsdp, Eterm remote, Eterm message)
*node_name = *sender_name = *receiver_name = '\0';
if (DTRACE_ENABLED(message_send) || DTRACE_ENABLED(message_send_remote)) {
erts_snprintf(node_name, sizeof(DTRACE_CHARBUF_NAME(node_name)),
- "%T", dsdp->dep->sysname);
+ "%T", ctx->dsd.dep->sysname);
erts_snprintf(sender_name, sizeof(DTRACE_CHARBUF_NAME(sender_name)),
"%T", sender->common.id);
erts_snprintf(receiver_name, sizeof(DTRACE_CHARBUF_NAME(receiver_name)),
@@ -867,26 +914,28 @@ erts_dsig_send_msg(ErtsDSigData *dsdp, Eterm remote, Eterm message)
#endif
if (token != NIL)
- ctl = TUPLE4(&ctl_heap[0],
+ ctl = TUPLE4(&ctx->ctl_heap[0],
make_small(DOP_SEND_TT), am_Cookie, remote, token);
else
- ctl = TUPLE3(&ctl_heap[0], make_small(DOP_SEND), am_Cookie, remote);
+ ctl = TUPLE3(&ctx->ctl_heap[0], make_small(DOP_SEND), am_Cookie, remote);
DTRACE6(message_send, sender_name, receiver_name,
msize, tok_label, tok_lastcnt, tok_serial);
DTRACE7(message_send_remote, sender_name, node_name, receiver_name,
msize, tok_label, tok_lastcnt, tok_serial);
- res = dsig_send(dsdp, ctl, message, 0);
- UnUseTmpHeapNoproc(5);
+ ctx->dss.ctl = ctl;
+ ctx->dss.msg = message;
+ ctx->dss.force_busy = 0;
+ res = erts_dsig_send(&ctx->dsd, &ctx->dss);
return res;
}
int
-erts_dsig_send_reg_msg(ErtsDSigData *dsdp, Eterm remote_name, Eterm message)
+erts_dsig_send_reg_msg(Eterm remote_name, Eterm message,
+ ErtsSendContext* ctx)
{
Eterm ctl;
- DeclareTmpHeapNoproc(ctl_heap,6);
Eterm token = NIL;
- Process *sender = dsdp->proc;
+ Process *sender = ctx->dsd.proc;
int res;
#ifdef USE_VM_PROBES
Sint tok_label = 0;
@@ -898,7 +947,6 @@ erts_dsig_send_reg_msg(ErtsDSigData *dsdp, Eterm remote_name, Eterm message)
DTRACE_CHARBUF(receiver_name, 128);
#endif
- UseTmpHeapNoproc(6);
if (SEQ_TRACE_TOKEN(sender) != NIL
#ifdef USE_VM_PROBES
&& SEQ_TRACE_TOKEN(sender) != am_have_dt_utag
@@ -912,7 +960,7 @@ erts_dsig_send_reg_msg(ErtsDSigData *dsdp, Eterm remote_name, Eterm message)
*node_name = *sender_name = *receiver_name = '\0';
if (DTRACE_ENABLED(message_send) || DTRACE_ENABLED(message_send_remote)) {
erts_snprintf(node_name, sizeof(DTRACE_CHARBUF_NAME(node_name)),
- "%T", dsdp->dep->sysname);
+ "%T", ctx->dsd.dep->sysname);
erts_snprintf(sender_name, sizeof(DTRACE_CHARBUF_NAME(sender_name)),
"%T", sender->common.id);
erts_snprintf(receiver_name, sizeof(DTRACE_CHARBUF_NAME(receiver_name)),
@@ -927,17 +975,19 @@ erts_dsig_send_reg_msg(ErtsDSigData *dsdp, Eterm remote_name, Eterm message)
#endif
if (token != NIL)
- ctl = TUPLE5(&ctl_heap[0], make_small(DOP_REG_SEND_TT),
+ ctl = TUPLE5(&ctx->ctl_heap[0], make_small(DOP_REG_SEND_TT),
sender->common.id, am_Cookie, remote_name, token);
else
- ctl = TUPLE4(&ctl_heap[0], make_small(DOP_REG_SEND),
+ ctl = TUPLE4(&ctx->ctl_heap[0], make_small(DOP_REG_SEND),
sender->common.id, am_Cookie, remote_name);
DTRACE6(message_send, sender_name, receiver_name,
msize, tok_label, tok_lastcnt, tok_serial);
DTRACE7(message_send_remote, sender_name, node_name, receiver_name,
msize, tok_label, tok_lastcnt, tok_serial);
- res = dsig_send(dsdp, ctl, message, 0);
- UnUseTmpHeapNoproc(6);
+ ctx->dss.ctl = ctl;
+ ctx->dss.msg = message;
+ ctx->dss.force_busy = 0;
+ res = erts_dsig_send(&ctx->dsd, &ctx->dss);
return res;
}
@@ -994,7 +1044,7 @@ erts_dsig_send_exit_tt(ErtsDSigData *dsdp, Eterm local, Eterm remote,
DTRACE7(process_exit_signal_remote, sender_name, node_name,
remote_name, reason_str, tok_label, tok_lastcnt, tok_serial);
/* forced, i.e ignore busy */
- res = dsig_send(dsdp, ctl, THE_NON_VALUE, 1);
+ res = dsig_send_ctl(dsdp, ctl, 1);
UnUseTmpHeapNoproc(6);
return res;
}
@@ -1010,7 +1060,7 @@ erts_dsig_send_exit(ErtsDSigData *dsdp, Eterm local, Eterm remote, Eterm reason)
ctl = TUPLE4(&ctl_heap[0],
make_small(DOP_EXIT), local, remote, reason);
/* forced, i.e ignore busy */
- res = dsig_send(dsdp, ctl, THE_NON_VALUE, 1);
+ res = dsig_send_ctl(dsdp, ctl, 1);
UnUseTmpHeapNoproc(5);
return res;
}
@@ -1026,7 +1076,7 @@ erts_dsig_send_exit2(ErtsDSigData *dsdp, Eterm local, Eterm remote, Eterm reason
ctl = TUPLE4(&ctl_heap[0],
make_small(DOP_EXIT2), local, remote, reason);
- res = dsig_send(dsdp, ctl, THE_NON_VALUE, 0);
+ res = dsig_send_ctl(dsdp, ctl, 0);
UnUseTmpHeapNoproc(5);
return res;
}
@@ -1043,7 +1093,7 @@ erts_dsig_send_group_leader(ErtsDSigData *dsdp, Eterm leader, Eterm remote)
ctl = TUPLE3(&ctl_heap[0],
make_small(DOP_GROUP_LEADER), leader, remote);
- res = dsig_send(dsdp, ctl, THE_NON_VALUE, 0);
+ res = dsig_send_ctl(dsdp, ctl, 0);
UnUseTmpHeapNoproc(4);
return res;
}
@@ -1091,7 +1141,6 @@ int erts_net_message(Port *prt,
byte *buf,
ErlDrvSizeT len)
{
-#define DIST_CTL_DEFAULT_SIZE 64
ErtsDistExternal ede;
byte *t;
Sint ctl_len;
@@ -1105,6 +1154,7 @@ int erts_net_message(Port *prt,
DeclareTmpHeapNoproc(ctl_default,DIST_CTL_DEFAULT_SIZE);
Eterm* ctl = ctl_default;
ErlOffHeap off_heap;
+ ErtsHeapFactory factory;
Eterm* hp;
Sint type;
Eterm token;
@@ -1181,7 +1231,8 @@ int erts_net_message(Port *prt,
}
hp = ctl;
- arg = erts_decode_dist_ext(&hp, &off_heap, &ede);
+ erts_factory_static_init(&factory, ctl, ctl_len, &off_heap);
+ arg = erts_decode_dist_ext(&factory, &ede);
if (is_non_value(arg)) {
#ifdef ERTS_DIST_MSG_DBG
erts_fprintf(stderr, "DIST MSG DEBUG: erts_dist_ext_size(CTL) failed:\n");
@@ -1693,194 +1744,235 @@ int erts_net_message(Port *prt,
return -1;
}
-static int
-dsig_send(ErtsDSigData *dsdp, Eterm ctl, Eterm msg, int force_busy)
+static int dsig_send_ctl(ErtsDSigData* dsdp, Eterm ctl, int force_busy)
+{
+ struct erts_dsig_send_context ctx;
+ int ret;
+ ctx.ctl = ctl;
+ ctx.msg = THE_NON_VALUE;
+ ctx.force_busy = force_busy;
+ ctx.phase = ERTS_DSIG_SEND_PHASE_INIT;
+#ifdef DEBUG
+ ctx.reds = 1; /* provoke assert below (no reduction count without msg) */
+#endif
+ ret = erts_dsig_send(dsdp, &ctx);
+ ASSERT(ret != ERTS_DSIG_SEND_CONTINUE);
+ return ret;
+}
+
+int
+erts_dsig_send(ErtsDSigData *dsdp, struct erts_dsig_send_context* ctx)
{
+ int retval;
+ Sint initial_reds = ctx->reds;
Eterm cid;
- int suspended = 0;
- int resume = 0;
- Uint32 pass_through_size;
- Uint data_size, dhdr_ext_size;
- ErtsAtomCacheMap *acmp;
- ErtsDistOutputBuf *obuf;
- DistEntry *dep = dsdp->dep;
- Uint32 flags = dep->flags;
- Process *c_p = dsdp->proc;
- if (!c_p || dsdp->no_suspend)
- force_busy = 1;
+ while (1) {
+ switch (ctx->phase) {
+ case ERTS_DSIG_SEND_PHASE_INIT:
+ ctx->flags = dsdp->dep->flags;
+ ctx->c_p = dsdp->proc;
- ERTS_SMP_LC_ASSERT(!c_p
- || (ERTS_PROC_LOCK_MAIN
- == erts_proc_lc_my_proc_locks(c_p)));
+ if (!ctx->c_p || dsdp->no_suspend)
+ ctx->force_busy = 1;
- if (!erts_is_alive)
- return ERTS_DSIG_SEND_OK;
+ ERTS_SMP_LC_ASSERT(!ctx->c_p
+ || (ERTS_PROC_LOCK_MAIN
+ == erts_proc_lc_my_proc_locks(ctx->c_p)));
- if (flags & DFLAG_DIST_HDR_ATOM_CACHE) {
- acmp = erts_get_atom_cache_map(c_p);
- pass_through_size = 0;
- }
- else {
- acmp = NULL;
- pass_through_size = 1;
- }
+ if (!erts_is_alive)
+ return ERTS_DSIG_SEND_OK;
-#ifdef ERTS_DIST_MSG_DBG
- erts_fprintf(stderr, ">>%s CTL: %T\n", pass_through_size ? "P" : " ", ctl);
- if (is_value(msg))
- erts_fprintf(stderr, " MSG: %T\n", msg);
-#endif
+ if (ctx->flags & DFLAG_DIST_HDR_ATOM_CACHE) {
+ ctx->acmp = erts_get_atom_cache_map(ctx->c_p);
+ ctx->pass_through_size = 0;
+ }
+ else {
+ ctx->acmp = NULL;
+ ctx->pass_through_size = 1;
+ }
- data_size = pass_through_size;
- erts_reset_atom_cache_map(acmp);
- data_size += erts_encode_dist_ext_size(ctl, flags, acmp);
- if (is_value(msg))
- data_size += erts_encode_dist_ext_size(msg, flags, acmp);
- erts_finalize_atom_cache_map(acmp, flags);
+ #ifdef ERTS_DIST_MSG_DBG
+ erts_fprintf(stderr, ">>%s CTL: %T\n", ctx->pass_through_size ? "P" : " ", ctx->ctl);
+ if (is_value(ctx->msg))
+ erts_fprintf(stderr, " MSG: %T\n", ctx->msg);
+ #endif
+
+ ctx->data_size = ctx->pass_through_size;
+ erts_reset_atom_cache_map(ctx->acmp);
+ erts_encode_dist_ext_size(ctx->ctl, ctx->flags, ctx->acmp, &ctx->data_size);
+
+ if (is_value(ctx->msg)) {
+ ctx->u.sc.wstack.wstart = NULL;
+ ctx->u.sc.flags = ctx->flags;
+ ctx->u.sc.level = 0;
+ ctx->phase = ERTS_DSIG_SEND_PHASE_MSG_SIZE;
+ } else {
+ ctx->phase = ERTS_DSIG_SEND_PHASE_ALLOC;
+ }
+ break;
- dhdr_ext_size = erts_encode_ext_dist_header_size(acmp);
- data_size += dhdr_ext_size;
+ case ERTS_DSIG_SEND_PHASE_MSG_SIZE:
+ if (erts_encode_dist_ext_size_int(ctx->msg, ctx, &ctx->data_size)) {
+ retval = ERTS_DSIG_SEND_CONTINUE;
+ goto done;
+ }
- obuf = alloc_dist_obuf(data_size);
- obuf->ext_endp = &obuf->data[0] + pass_through_size + dhdr_ext_size;
+ ctx->phase = ERTS_DSIG_SEND_PHASE_ALLOC;
+ case ERTS_DSIG_SEND_PHASE_ALLOC:
+ erts_finalize_atom_cache_map(ctx->acmp, ctx->flags);
+
+ ctx->dhdr_ext_size = erts_encode_ext_dist_header_size(ctx->acmp);
+ ctx->data_size += ctx->dhdr_ext_size;
+
+ ctx->obuf = alloc_dist_obuf(ctx->data_size);
+ ctx->obuf->ext_endp = &ctx->obuf->data[0] + ctx->pass_through_size + ctx->dhdr_ext_size;
+
+ /* Encode internal version of dist header */
+ ctx->obuf->extp = erts_encode_ext_dist_header_setup(ctx->obuf->ext_endp, ctx->acmp);
+ /* Encode control message */
+ erts_encode_dist_ext(ctx->ctl, &ctx->obuf->ext_endp, ctx->flags, ctx->acmp, NULL, NULL);
+ if (is_value(ctx->msg)) {
+ ctx->u.ec.flags = ctx->flags;
+ ctx->u.ec.level = 0;
+ ctx->u.ec.wstack.wstart = NULL;
+ ctx->phase = ERTS_DSIG_SEND_PHASE_MSG_ENCODE;
+ } else {
+ ctx->phase = ERTS_DSIG_SEND_PHASE_FIN;
+ }
+ break;
- /* Encode internal version of dist header */
- obuf->extp = erts_encode_ext_dist_header_setup(obuf->ext_endp, acmp);
- /* Encode control message */
- erts_encode_dist_ext(ctl, &obuf->ext_endp, flags, acmp);
- if (is_value(msg)) {
- /* Encode message */
- erts_encode_dist_ext(msg, &obuf->ext_endp, flags, acmp);
- }
+ case ERTS_DSIG_SEND_PHASE_MSG_ENCODE:
+ if (erts_encode_dist_ext(ctx->msg, &ctx->obuf->ext_endp, ctx->flags, ctx->acmp, &ctx->u.ec, &ctx->reds)) {
+ retval = ERTS_DSIG_SEND_CONTINUE;
+ goto done;
+ }
- ASSERT(obuf->extp < obuf->ext_endp);
- ASSERT(&obuf->data[0] <= obuf->extp - pass_through_size);
- ASSERT(obuf->ext_endp <= &obuf->data[0] + data_size);
+ ctx->phase = ERTS_DSIG_SEND_PHASE_FIN;
+ case ERTS_DSIG_SEND_PHASE_FIN: {
+ DistEntry *dep = dsdp->dep;
+ int suspended = 0;
+ int resume = 0;
- data_size = obuf->ext_endp - obuf->extp;
+ ASSERT(ctx->obuf->extp < ctx->obuf->ext_endp);
+ ASSERT(&ctx->obuf->data[0] <= ctx->obuf->extp - ctx->pass_through_size);
+ ASSERT(ctx->obuf->ext_endp <= &ctx->obuf->data[0] + ctx->data_size);
- /*
- * Signal encoded; now verify that the connection still exists,
- * and if so enqueue the signal and schedule it for send.
- */
- obuf->next = NULL;
- erts_smp_de_rlock(dep);
- cid = dep->cid;
- if (cid != dsdp->cid
- || dep->connection_id != dsdp->connection_id
- || dep->status & ERTS_DE_SFLG_EXITING) {
- /* Not the same connection as when we started; drop message... */
- erts_smp_de_runlock(dep);
- free_dist_obuf(obuf);
- }
- else {
- ErtsProcList *plp = NULL;
- erts_smp_mtx_lock(&dep->qlock);
- dep->qsize += size_obuf(obuf);
- if (dep->qsize >= erts_dist_buf_busy_limit)
- dep->qflgs |= ERTS_DE_QFLG_BUSY;
- if (!force_busy && (dep->qflgs & ERTS_DE_QFLG_BUSY)) {
- erts_smp_mtx_unlock(&dep->qlock);
+ ctx->data_size = ctx->obuf->ext_endp - ctx->obuf->extp;
- plp = erts_proclist_create(c_p);
- erts_suspend(c_p, ERTS_PROC_LOCK_MAIN, NULL);
- suspended = 1;
- erts_smp_mtx_lock(&dep->qlock);
- }
+ /*
+ * Signal encoded; now verify that the connection still exists,
+ * and if so enqueue the signal and schedule it for send.
+ */
+ ctx->obuf->next = NULL;
+ erts_smp_de_rlock(dep);
+ cid = dep->cid;
+ if (cid != dsdp->cid
+ || dep->connection_id != dsdp->connection_id
+ || dep->status & ERTS_DE_SFLG_EXITING) {
+ /* Not the same connection as when we started; drop message... */
+ erts_smp_de_runlock(dep);
+ free_dist_obuf(ctx->obuf);
+ }
+ else {
+ ErtsProcList *plp = NULL;
+ erts_smp_mtx_lock(&dep->qlock);
+ dep->qsize += size_obuf(ctx->obuf);
+ if (dep->qsize >= erts_dist_buf_busy_limit)
+ dep->qflgs |= ERTS_DE_QFLG_BUSY;
+ if (!ctx->force_busy && (dep->qflgs & ERTS_DE_QFLG_BUSY)) {
+ erts_smp_mtx_unlock(&dep->qlock);
+
+ plp = erts_proclist_create(ctx->c_p);
+ erts_suspend(ctx->c_p, ERTS_PROC_LOCK_MAIN, NULL);
+ suspended = 1;
+ erts_smp_mtx_lock(&dep->qlock);
+ }
- /* Enqueue obuf on dist entry */
- if (dep->out_queue.last)
- dep->out_queue.last->next = obuf;
- else
- dep->out_queue.first = obuf;
- dep->out_queue.last = obuf;
+ /* Enqueue obuf on dist entry */
+ if (dep->out_queue.last)
+ dep->out_queue.last->next = ctx->obuf;
+ else
+ dep->out_queue.first = ctx->obuf;
+ dep->out_queue.last = ctx->obuf;
+
+ if (!ctx->force_busy) {
+ if (!(dep->qflgs & ERTS_DE_QFLG_BUSY)) {
+ if (suspended)
+ resume = 1; /* was busy when we started, but isn't now */
+ #ifdef USE_VM_PROBES
+ if (resume && DTRACE_ENABLED(dist_port_not_busy)) {
+ DTRACE_CHARBUF(port_str, 64);
+ DTRACE_CHARBUF(remote_str, 64);
+
+ erts_snprintf(port_str, sizeof(DTRACE_CHARBUF_NAME(port_str)),
+ "%T", cid);
+ erts_snprintf(remote_str, sizeof(DTRACE_CHARBUF_NAME(remote_str)),
+ "%T", dep->sysname);
+ DTRACE3(dist_port_not_busy, erts_this_node_sysname,
+ port_str, remote_str);
+ }
+ #endif
+ }
+ else {
+ /* Enqueue suspended process on dist entry */
+ ASSERT(plp);
+ erts_proclist_store_last(&dep->suspended, plp);
+ }
+ }
- if (!force_busy) {
- if (!(dep->qflgs & ERTS_DE_QFLG_BUSY)) {
- if (suspended)
- resume = 1; /* was busy when we started, but isn't now */
-#ifdef USE_VM_PROBES
- if (resume && DTRACE_ENABLED(dist_port_not_busy)) {
- DTRACE_CHARBUF(port_str, 64);
- DTRACE_CHARBUF(remote_str, 64);
-
- erts_snprintf(port_str, sizeof(DTRACE_CHARBUF_NAME(port_str)),
- "%T", cid);
- erts_snprintf(remote_str, sizeof(DTRACE_CHARBUF_NAME(remote_str)),
- "%T", dep->sysname);
- DTRACE3(dist_port_not_busy, erts_this_node_sysname,
- port_str, remote_str);
- }
-#endif
+ erts_smp_mtx_unlock(&dep->qlock);
+ erts_schedule_dist_command(NULL, dep);
+ erts_smp_de_runlock(dep);
+
+ if (resume) {
+ erts_resume(ctx->c_p, ERTS_PROC_LOCK_MAIN);
+ erts_proclist_destroy(plp);
+ /*
+ * Note that the calling process still have to yield as if it
+ * suspended. If not, the calling process could later be
+ * erroneously scheduled when it shouldn't be.
+ */
+ }
}
- else {
- /* Enqueue suspended process on dist entry */
- ASSERT(plp);
- erts_proclist_store_last(&dep->suspended, plp);
+ ctx->obuf = NULL;
+
+ if (suspended) {
+ #ifdef USE_VM_PROBES
+ if (!resume && DTRACE_ENABLED(dist_port_busy)) {
+ DTRACE_CHARBUF(port_str, 64);
+ DTRACE_CHARBUF(remote_str, 64);
+ DTRACE_CHARBUF(pid_str, 16);
+
+ erts_snprintf(port_str, sizeof(DTRACE_CHARBUF_NAME(port_str)), "%T", cid);
+ erts_snprintf(remote_str, sizeof(DTRACE_CHARBUF_NAME(remote_str)),
+ "%T", dep->sysname);
+ erts_snprintf(pid_str, sizeof(DTRACE_CHARBUF_NAME(pid_str)),
+ "%T", ctx->c_p->common.id);
+ DTRACE4(dist_port_busy, erts_this_node_sysname,
+ port_str, remote_str, pid_str);
+ }
+ #endif
+ if (!resume && erts_system_monitor_flags.busy_dist_port)
+ monitor_generic(ctx->c_p, am_busy_dist_port, cid);
+ retval = ERTS_DSIG_SEND_YIELD;
+ } else {
+ retval = ERTS_DSIG_SEND_OK;
}
+ goto done;
}
-
- erts_smp_mtx_unlock(&dep->qlock);
- erts_schedule_dist_command(NULL, dep);
- erts_smp_de_runlock(dep);
-
- if (resume) {
- erts_resume(c_p, ERTS_PROC_LOCK_MAIN);
- erts_proclist_destroy(plp);
- /*
- * Note that the calling process still have to yield as if it
- * suspended. If not, the calling process could later be
- * erroneously scheduled when it shouldn't be.
- */
+ default:
+ erl_exit(ERTS_ABORT_EXIT, "dsig_send invalid phase (%d)\n", (int)ctx->phase);
}
}
- if (c_p) {
- int reds;
- /*
- * Bump reductions on calling process.
- *
- * This is the reduction cost: Always a base cost of 8 reductions
- * plus 16 reductions per kilobyte generated external data.
- */
-
- data_size >>= (10-4);
-#if defined(ARCH_64) && !HALFWORD_HEAP
- data_size &= 0x003fffffffffffff;
-#elif defined(ARCH_32) || HALFWORD_HEAP
- data_size &= 0x003fffff;
-#else
-# error "Ohh come on ... !?!"
-#endif
- reds = 8 + ((int) data_size > 1000000 ? 1000000 : (int) data_size);
- BUMP_REDS(c_p, reds);
- }
-
- if (suspended) {
-#ifdef USE_VM_PROBES
- if (!resume && DTRACE_ENABLED(dist_port_busy)) {
- DTRACE_CHARBUF(port_str, 64);
- DTRACE_CHARBUF(remote_str, 64);
- DTRACE_CHARBUF(pid_str, 16);
-
- erts_snprintf(port_str, sizeof(DTRACE_CHARBUF_NAME(port_str)), "%T", cid);
- erts_snprintf(remote_str, sizeof(DTRACE_CHARBUF_NAME(remote_str)),
- "%T", dep->sysname);
- erts_snprintf(pid_str, sizeof(DTRACE_CHARBUF_NAME(pid_str)),
- "%T", c_p->common.id);
- DTRACE4(dist_port_busy, erts_this_node_sysname,
- port_str, remote_str, pid_str);
- }
-#endif
- if (!resume && erts_system_monitor_flags.busy_dist_port)
- monitor_generic(c_p, am_busy_dist_port, cid);
- return ERTS_DSIG_SEND_YIELD;
+done:
+ if (ctx->msg && ctx->c_p) {
+ BUMP_REDS(ctx->c_p, (initial_reds - ctx->reds) / TERM_TO_BINARY_LOOP_FACTOR);
}
- return ERTS_DSIG_SEND_OK;
+ return retval;
}
-
static Uint
dist_port_command(Port *prt, ErtsDistOutputBuf *obuf)
{
@@ -2001,6 +2093,7 @@ erts_dist_command(Port *prt, int reds_limit)
DistEntry *dep = prt->dist_entry;
Uint (*send)(Port *prt, ErtsDistOutputBuf *obuf);
erts_aint32_t sched_flags;
+ ErtsSchedulerData *esdp = erts_get_scheduler_data();
ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));
@@ -2055,12 +2148,12 @@ erts_dist_command(Port *prt, int reds_limit)
ErtsDistOutputBuf *fob;
size = (*send)(prt, foq.first);
+ esdp->io.out += (Uint64) size;
#ifdef ERTS_RAW_DIST_MSG_DBG
erts_fprintf(stderr, ">> ");
bw(foq.first->extp, size);
#endif
reds += ERTS_PORT_REDS_DIST_CMD_DATA(size);
- erts_smp_atomic_add_nob(&erts_bytes_out, size);
fob = foq.first;
obufsize += size_obuf(fob);
foq.first = foq.first->next;
@@ -2140,12 +2233,12 @@ erts_dist_command(Port *prt, int reds_limit)
ASSERT(&oq.first->data[0] <= oq.first->extp
&& oq.first->extp < oq.first->ext_endp);
size = (*send)(prt, oq.first);
+ esdp->io.out += (Uint64) size;
#ifdef ERTS_RAW_DIST_MSG_DBG
erts_fprintf(stderr, ">> ");
bw(oq.first->extp, size);
#endif
reds += ERTS_PORT_REDS_DIST_CMD_DATA(size);
- erts_smp_atomic_add_nob(&erts_bytes_out, size);
fob = oq.first;
obufsize += size_obuf(fob);
oq.first = oq.first->next;
@@ -2437,7 +2530,7 @@ info_dist_entry(int to, void *arg, DistEntry *dep, int visible, int connected)
erts_print(to, arg, "Name: %T", dep->sysname);
#ifdef DEBUG
- erts_print(to, arg, " (refc=%d)", erts_refc_read(&dep->refc, 1));
+ erts_print(to, arg, " (refc=%d)", erts_refc_read(&dep->refc, 0));
#endif
erts_print(to, arg, "\n");
if (!connected && is_nil(dep->cid)) {
@@ -3236,11 +3329,7 @@ send_nodes_mon_msg(Process *rp,
}
ASSERT(hend == hp);
- erts_queue_message(rp, rp_locksp, bp, msg, NIL
-#ifdef USE_VM_PROBES
- , NIL
-#endif
- );
+ erts_queue_message(rp, rp_locksp, bp, msg, NIL);
}
static void