aboutsummaryrefslogtreecommitdiffstats
path: root/erts/emulator/beam
diff options
context:
space:
mode:
authorLukas Larsson <[email protected]>2018-09-26 11:55:01 +0200
committerLukas Larsson <[email protected]>2019-02-22 11:12:53 +0100
commitf2c4f6f83deecba0c2527e520f0f18fba7d84815 (patch)
tree5009cc4300cdca28dc0f76daaa541051f8d44a01 /erts/emulator/beam
parent6686877360432144bacbf4e95c23b1232eab1b08 (diff)
downloadotp-f2c4f6f83deecba0c2527e520f0f18fba7d84815.tar.gz
otp-f2c4f6f83deecba0c2527e520f0f18fba7d84815.tar.bz2
otp-f2c4f6f83deecba0c2527e520f0f18fba7d84815.zip
erts: Implement fragmentation of distrubution messages
Diffstat (limited to 'erts/emulator/beam')
-rw-r--r--erts/emulator/beam/bif.c3
-rw-r--r--erts/emulator/beam/dist.c929
-rw-r--r--erts/emulator/beam/dist.h30
-rw-r--r--erts/emulator/beam/erl_alloc.types1
-rw-r--r--erts/emulator/beam/erl_message.c73
-rw-r--r--erts/emulator/beam/erl_message.h27
-rw-r--r--erts/emulator/beam/erl_node_tables.c54
-rw-r--r--erts/emulator/beam/erl_node_tables.h17
-rw-r--r--erts/emulator/beam/erl_proc_sig_queue.c195
-rw-r--r--erts/emulator/beam/erl_proc_sig_queue.h13
-rw-r--r--erts/emulator/beam/erl_process.c1
-rw-r--r--erts/emulator/beam/erl_process_dump.c23
-rw-r--r--erts/emulator/beam/erl_ptab.h5
-rw-r--r--erts/emulator/beam/external.c298
-rw-r--r--erts/emulator/beam/external.h63
15 files changed, 1161 insertions, 571 deletions
diff --git a/erts/emulator/beam/bif.c b/erts/emulator/beam/bif.c
index ff7db0e742..a71907505c 100644
--- a/erts/emulator/beam/bif.c
+++ b/erts/emulator/beam/bif.c
@@ -225,6 +225,7 @@ BIF_RETTYPE link_1(BIF_ALIST_1)
code = erts_dsig_send_link(&dsd, BIF_P->common.id, BIF_ARG_1);
if (code == ERTS_DSIG_SEND_YIELD)
ERTS_BIF_YIELD_RETURN(BIF_P, am_true);
+ ASSERT(code == ERTS_DSIG_SEND_OK);
BIF_RET(am_true);
break;
}
@@ -2094,6 +2095,7 @@ BIF_RETTYPE send_3(BIF_ALIST_3)
ctx->return_term = am_ok;
ctx->dss.reds = (Sint) (ERTS_BIF_REDS_LEFT(p) * TERM_TO_BINARY_LOOP_FACTOR);
ctx->dss.phase = ERTS_DSIG_SEND_PHASE_INIT;
+ ctx->dss.from = BIF_P->common.id;
while (is_list(l)) {
if (CAR(list_val(l)) == am_noconnect) {
@@ -2246,6 +2248,7 @@ Eterm erl_send(Process *p, Eterm to, Eterm msg)
ctx->return_term = msg;
ctx->dss.reds = (Sint) (ERTS_BIF_REDS_LEFT(p) * TERM_TO_BINARY_LOOP_FACTOR);
ctx->dss.phase = ERTS_DSIG_SEND_PHASE_INIT;
+ ctx->dss.from = p->common.id;
result = do_send(p, to, msg, &ref, ctx);
diff --git a/erts/emulator/beam/dist.c b/erts/emulator/beam/dist.c
index 91a9481d84..78674848a1 100644
--- a/erts/emulator/beam/dist.c
+++ b/erts/emulator/beam/dist.c
@@ -55,15 +55,21 @@
*/
#if 0
#define ERTS_DIST_MSG_DBG
+FILE *dbg_file;
#endif
#if 0
+/* Enable this to print the dist debug messages to a file instead */
+#define ERTS_DIST_MSG_DBG_FILE "/tmp/dist_dbg.%d"
+#endif
+#if 0
+/* Enable this to print the raw bytes sent and received */
#define ERTS_RAW_DIST_MSG_DBG
#endif
#if defined(ERTS_DIST_MSG_DBG) || defined(ERTS_RAW_DIST_MSG_DBG)
static void bw(byte *buf, ErlDrvSizeT sz)
{
- bin_write(ERTS_PRINT_STDERR, NULL, buf, sz);
+ bin_write(ERTS_PRINT_FILE, dbg_file, buf, sz);
}
#endif
@@ -71,37 +77,93 @@ static void bw(byte *buf, ErlDrvSizeT sz)
static void
dist_msg_dbg(ErtsDistExternal *edep, char *what, byte *buf, int sz)
{
- ErtsHeapFactory factory;
- byte *extp = edep->extp;
+ byte *extp = edep->data->extp;
Eterm msg;
Sint ctl_len;
- Sint size = ctl_len = erts_decode_dist_ext_size(edep);
+ Sint size = ctl_len = erts_decode_dist_ext_size(edep, 0);
if (size < 0) {
- erts_fprintf(stderr,
+ erts_fprintf(dbg_file,
"DIST MSG DEBUG: erts_decode_dist_ext_size(%s) failed:\n",
what);
bw(buf, sz);
}
else {
- ErlHeapFragment *mbuf = new_message_buffer(size);
- erts_factory_static_init(&factory, mbuf->mem, ctl_len, &mbuf->off_heap);
- msg = erts_decode_dist_ext(&factory, edep);
+ ErtsHeapFactory factory;
+ ErtsMessage *mbuf = erts_factory_message_create(&factory, NULL, 0, ctl_len);
+ /* Set mbuf msg to NIL as erts_factory_undo will fail otherwise */
+ ERL_MESSAGE_TERM(mbuf) = NIL;
+ msg = erts_decode_dist_ext(&factory, edep, 0);
if (is_value(msg))
- erts_fprintf(stderr, " %s: %.80T\n", what, msg);
+ erts_fprintf(dbg_file, " %s: %.80T\n", what, msg);
else {
- erts_fprintf(stderr,
+ erts_fprintf(dbg_file,
"DIST MSG DEBUG: erts_decode_dist_ext(%s) failed:\n",
what);
bw(buf, sz);
}
- free_message_buffer(mbuf);
- edep->extp = extp;
+ erts_factory_undo(&factory);
+ edep->data->extp = extp;
}
}
+static char *erts_dop_to_string(enum dop dop) {
+ if (dop == DOP_LINK)
+ return "LINK";
+ if (dop == DOP_SEND)
+ return "SEND";
+ if (dop == DOP_EXIT)
+ return "EXIT";
+ if (dop == DOP_UNLINK)
+ return "UNLINK";
+ if (dop == DOP_REG_SEND)
+ return "REG_SEND";
+ if (dop == DOP_GROUP_LEADER)
+ return "GROUP_LEADER";
+ if (dop == DOP_EXIT2)
+ return "EXIT2";
+ if (dop == DOP_SEND_TT)
+ return "SEND_TT";
+ if (dop == DOP_EXIT_TT)
+ return "EXIT_TT";
+ if (dop == DOP_REG_SEND_TT)
+ return "REG_SEND_TT";
+ if (dop == DOP_EXIT2_TT)
+ return "EXIT2_TT";
+ if (dop == DOP_MONITOR_P)
+ return "MONITOR_P";
+ if (dop == DOP_DEMONITOR_P)
+ return "DEMONITOR_P";
+ if (dop == DOP_MONITOR_P_EXIT)
+ return "MONITOR_P_EXIT";
+ if (dop == DOP_SEND_SENDER)
+ return "SEND_SENDER";
+ if (dop == DOP_SEND_SENDER_TT)
+ return "SEND_SENDER_TT";
+ if (dop == DOP_PAYLOAD_EXIT)
+ return "PAYLOAD_EXIT";
+ if (dop == DOP_PAYLOAD_EXIT_TT)
+ return "PAYLOAD_EXIT_TT";
+ if (dop == DOP_PAYLOAD_EXIT2)
+ return "PAYLOAD_EXIT2";
+ if (dop == DOP_PAYLOAD_EXIT2_TT)
+ return "PAYLOAD_EXIT2_TT";
+ if (dop == DOP_PAYLOAD_MONITOR_P_EXIT)
+ return "PAYLOAD_MONITOR_P_EXIT";
+ ASSERT(0);
+ return "UNKNOWN";
+}
+
#endif
+#if defined(VALGRIND)
+#include <valgrind/valgrind.h>
+#include <valgrind/memcheck.h>
+# define PURIFY_MSG(msg) \
+ VALGRIND_PRINTF("%s, line %d: %s", __FILE__, __LINE__, msg)
+#else
+# define PURIFY_MSG(msg)
+#endif
int erts_is_alive; /* System must be blocked on change */
int erts_dist_buf_busy_limit;
@@ -115,13 +177,16 @@ static Export *dist_ctrl_put_data_trap;
/* forward declarations */
-static int dsig_send_exit(ErtsDSigData* dsdp, Eterm ctl, Eterm msg, int force_busy);
+static int dsig_send_exit(ErtsDSigData* dsdp, Eterm ctl, Eterm msg, Eterm from, int force_busy);
static int dsig_send_ctl(ErtsDSigData* dsdp, Eterm ctl, int force_busy);
static void send_nodes_mon_msgs(Process *, Eterm, Eterm, Eterm, Eterm);
static void init_nodes_monitors(void);
static Sint abort_connection(DistEntry* dep, Uint32 conn_id);
static ErtsDistOutputBuf* clear_de_out_queues(DistEntry*);
static void free_de_out_queues(DistEntry*, ErtsDistOutputBuf*);
+int erts_dist_seq_tree_foreach_delete_yielding(DistSeqNode **root,
+ void **vyspp,
+ Sint limit);
static erts_atomic_t no_caches;
static erts_atomic_t no_nodes;
@@ -142,7 +207,6 @@ delete_cache(ErtsAtomCache *cache)
}
}
-
static void
create_cache(DistEntry *dep)
{
@@ -207,12 +271,14 @@ typedef enum {
ERTS_CML_CLEANUP_STATE_LINKS,
ERTS_CML_CLEANUP_STATE_MONITORS,
ERTS_CML_CLEANUP_STATE_ONAME_MONITORS,
+ ERTS_CML_CLEANUP_STATE_SEQUENCES,
ERTS_CML_CLEANUP_STATE_NODE_MONITORS
-} ErtsConMonLnkCleaupState;
+} ErtsConMonLnkSeqCleanupState;
typedef struct {
- ErtsConMonLnkCleaupState state;
+ ErtsConMonLnkSeqCleanupState state;
ErtsMonLnkDist *dist;
+ DistSeqNode *seq;
void *yield_state;
int trigger_node_monitors;
Eterm nodename;
@@ -220,12 +286,12 @@ typedef struct {
Eterm reason;
ErlOffHeap oh;
Eterm heap[1];
-} ErtsConMonLnkCleanup;
+} ErtsConMonLnkSeqCleanup;
static void
-con_monitor_link_cleanup(void *vcmlcp)
+con_monitor_link_seq_cleanup(void *vcmlcp)
{
- ErtsConMonLnkCleanup *cmlcp = vcmlcp;
+ ErtsConMonLnkSeqCleanup *cmlcp = vcmlcp;
ErtsMonLnkDist *dist = cmlcp->dist;
ErtsSchedulerData *esdp;
int reds = CONTEXT_REDS;
@@ -263,6 +329,15 @@ con_monitor_link_cleanup(void *vcmlcp)
erts_mon_link_dist_dec_refc(dist);
ASSERT(!cmlcp->yield_state);
+ cmlcp->state = ERTS_CML_CLEANUP_STATE_SEQUENCES;
+ case ERTS_CML_CLEANUP_STATE_SEQUENCES:
+ reds = erts_dist_seq_tree_foreach_delete_yielding(&cmlcp->seq,
+ &cmlcp->yield_state,
+ reds);
+ if (reds <= 0)
+ break;
+
+ ASSERT(!cmlcp->yield_state);
cmlcp->state = ERTS_CML_CLEANUP_STATE_NODE_MONITORS;
case ERTS_CML_CLEANUP_STATE_NODE_MONITORS:
if (cmlcp->trigger_node_monitors) {
@@ -282,22 +357,23 @@ con_monitor_link_cleanup(void *vcmlcp)
esdp = erts_get_scheduler_data();
ASSERT(esdp && esdp->type == ERTS_SCHED_NORMAL);
erts_schedule_misc_aux_work((int) esdp->no,
- con_monitor_link_cleanup,
+ con_monitor_link_seq_cleanup,
(void *) cmlcp);
}
static void
-schedule_con_monitor_link_cleanup(ErtsMonLnkDist *dist,
- Eterm nodename,
- Eterm visability,
- Eterm reason)
+schedule_con_monitor_link_seq_cleanup(ErtsMonLnkDist *dist,
+ DistSeqNode *seq,
+ Eterm nodename,
+ Eterm visability,
+ Eterm reason)
{
- if (dist || is_value(nodename)) {
+ if (dist || is_value(nodename) || seq) {
ErtsSchedulerData *esdp;
- ErtsConMonLnkCleanup *cmlcp;
+ ErtsConMonLnkSeqCleanup *cmlcp;
Uint rsz, size;
- size = sizeof(ErtsConMonLnkCleanup);
+ size = sizeof(ErtsConMonLnkSeqCleanup);
if (is_non_value(reason) || is_immed(reason)) {
rsz = 0;
@@ -324,6 +400,8 @@ schedule_con_monitor_link_cleanup(ErtsMonLnkDist *dist,
erts_mtx_unlock(&dist->mtx);
}
+ cmlcp->seq = seq;
+
cmlcp->trigger_node_monitors = is_value(nodename);
cmlcp->nodename = nodename;
cmlcp->visability = visability;
@@ -337,13 +415,13 @@ schedule_con_monitor_link_cleanup(ErtsMonLnkDist *dist,
esdp = erts_get_scheduler_data();
ASSERT(esdp && esdp->type == ERTS_SCHED_NORMAL);
erts_schedule_misc_aux_work((int) esdp->no,
- con_monitor_link_cleanup,
+ con_monitor_link_seq_cleanup,
(void *) cmlcp);
}
}
/*
-** A full node name constists of a "n@h"
+** A full node name consists of a "n@h"
**
** n must be a valid node name: string of ([a-z][A-Z][0-9]_-)+
**
@@ -560,6 +638,7 @@ int erts_do_net_exits(DistEntry *dep, Eterm reason)
}
else { /* Call from distribution controller (port/process) */
ErtsMonLnkDist *mld;
+ DistSeqNode *sequences;
ErtsAtomCache *cache;
ErtsProcList *suspendees;
ErtsDistOutputBuf *obuf;
@@ -589,6 +668,9 @@ int erts_do_net_exits(DistEntry *dep, Eterm reason)
mld = dep->mld;
dep->mld = NULL;
+ sequences = dep->sequences;
+ dep->sequences = NULL;
+
nodename = dep->sysname;
flags = dep->flags;
@@ -612,14 +694,15 @@ int erts_do_net_exits(DistEntry *dep, Eterm reason)
erts_de_rwunlock(dep);
- schedule_con_monitor_link_cleanup(mld,
- nodename,
- (flags & DFLAG_PUBLISHED
- ? am_visible
- : am_hidden),
- (reason == am_normal
- ? am_connection_closed
- : reason));
+ schedule_con_monitor_link_seq_cleanup(mld,
+ sequences,
+ nodename,
+ (flags & DFLAG_PUBLISHED
+ ? am_visible
+ : am_hidden),
+ (reason == am_normal
+ ? am_connection_closed
+ : reason));
erts_resume_processes(suspendees);
@@ -653,6 +736,16 @@ void init_dist(void)
{
init_nodes_monitors();
+#ifdef ERTS_DIST_MSG_DBG_FILE
+ {
+ char buff[255];
+ sprintf(buff, ERTS_DIST_MSG_DBG_FILE, getpid());
+ dbg_file = fopen(buff,"w+");
+ }
+#elif defined (ERTS_DIST_MSG_DBG)
+ dbg_file = stderr;
+#endif
+
nodedown.reason = NIL;
nodedown.bp = NULL;
@@ -676,21 +769,27 @@ void init_dist(void)
}
}
-#define ErtsDistOutputBuf2Binary(OB) \
- ((Binary *) (((char *) (OB)) - offsetof(Binary, orig_bytes)))
+#define ErtsDistOutputBuf2Binary(OB) OB->bin
static ERTS_INLINE ErtsDistOutputBuf *
-alloc_dist_obuf(Uint size)
+alloc_dist_obuf(Uint size, Uint headers)
{
+ int i;
ErtsDistOutputBuf *obuf;
- Uint obuf_size = sizeof(ErtsDistOutputBuf)+sizeof(byte)*(size-1);
+ Uint obuf_size = sizeof(ErtsDistOutputBuf)*(headers) +
+ sizeof(byte)*size;
Binary *bin = erts_bin_drv_alloc(obuf_size);
- obuf = (ErtsDistOutputBuf *) &bin->orig_bytes[0];
+ obuf = (ErtsDistOutputBuf *) &bin->orig_bytes[size];
+ erts_refc_add(&bin->intern.refc, headers - 1, 1);
+ for (i = 0; i < headers; i++) {
+ obuf[i].bin = bin;
+ obuf[i].extp = (byte *)&bin->orig_bytes[0];
#ifdef DEBUG
- obuf->dbg_pattern = ERTS_DIST_OUTPUT_BUF_DBG_PATTERN;
- obuf->alloc_endp = obuf->data + size;
- ASSERT(bin == ErtsDistOutputBuf2Binary(obuf));
+ obuf[i].dbg_pattern = ERTS_DIST_OUTPUT_BUF_DBG_PATTERN;
+ obuf[i].alloc_endp = obuf->extp + size;
+ ASSERT(bin == ErtsDistOutputBuf2Binary(obuf));
#endif
+ }
return obuf;
}
@@ -705,8 +804,8 @@ free_dist_obuf(ErtsDistOutputBuf *obuf)
static ERTS_INLINE Sint
size_obuf(ErtsDistOutputBuf *obuf)
{
- Binary *bin = ErtsDistOutputBuf2Binary(obuf);
- return bin->orig_size;
+ return sizeof(ErtsDistOutputBuf) + (obuf->ext_endp - obuf->ext_start)
+ + (obuf->hdr_endp - obuf->hdrp);
}
static ErtsDistOutputBuf* clear_de_out_queues(DistEntry* dep)
@@ -771,7 +870,9 @@ int erts_dsend_context_dtor(Binary* ctx_bin)
default:;
}
if (ctx->dss.phase >= ERTS_DSIG_SEND_PHASE_ALLOC && ctx->dss.obuf) {
- free_dist_obuf(ctx->dss.obuf);
+ int i;
+ for (i = 0; i < ctx->dss.fragments; i++)
+ free_dist_obuf(&ctx->dss.obuf[i]);
}
if (ctx->deref_dep)
erts_deref_dist_entry(ctx->dep);
@@ -845,8 +946,8 @@ erts_dsig_send_unlink(ErtsDSigData *dsdp, Eterm local, Eterm remote)
/* A local process that's being monitored by a remote one exits. We send:
{DOP_MONITOR_P_EXIT, Local pid or name, Remote pid, ref, reason} */
int
-erts_dsig_send_m_exit(ErtsDSigData *dsdp, Eterm watcher, Eterm watched,
- Eterm ref, Eterm reason)
+erts_dsig_send_m_exit(ErtsDSigData *dsdp, Eterm from, Eterm watcher, Eterm watched,
+ Eterm ref, Eterm reason)
{
Eterm ctl, msg;
DeclareTmpHeapNoproc(ctl_heap,6);
@@ -871,7 +972,7 @@ erts_dsig_send_m_exit(ErtsDSigData *dsdp, Eterm watcher, Eterm watched,
msg = THE_NON_VALUE;
}
- res = dsig_send_exit(dsdp, ctl, msg, 1);
+ res = dsig_send_exit(dsdp, ctl, msg, from, 1);
UnUseTmpHeapNoproc(6);
return res;
}
@@ -1031,8 +1132,7 @@ erts_dsig_send_msg(Eterm remote, Eterm message, ErtsSendContext* ctx)
}
int
-erts_dsig_send_reg_msg(Eterm remote_name, Eterm message,
- ErtsSendContext* ctx)
+erts_dsig_send_reg_msg(Eterm remote_name, Eterm message, ErtsSendContext* ctx)
{
Eterm ctl;
Eterm token = NIL;
@@ -1149,13 +1249,13 @@ erts_dsig_send_exit_tt(ErtsDSigData *dsdp, Eterm local, Eterm remote,
#endif
DTRACE7(process_exit_signal_remote, sender_name, node_name,
remote_name, reason_str, tok_label, tok_lastcnt, tok_serial);
- res = dsig_send_exit(dsdp, ctl, msg, 1);
+ res = dsig_send_exit(dsdp, ctl, msg, local, 1);
UnUseTmpHeapNoproc(6);
return res;
}
int
-erts_dsig_send_exit(ErtsDSigData *dsdp, Eterm local, Eterm remote, Eterm reason)
+erts_dsig_send_exit(ErtsDSigData *dsdp, Eterm from, Eterm local, Eterm remote, Eterm reason)
{
DeclareTmpHeapNoproc(ctl_heap,5);
int res;
@@ -1170,7 +1270,7 @@ erts_dsig_send_exit(ErtsDSigData *dsdp, Eterm local, Eterm remote, Eterm reason)
ctl = TUPLE4(&ctl_heap[0], make_small(DOP_EXIT), local, remote, reason);
msg = THE_NON_VALUE;
}
- res = dsig_send_exit(dsdp, ctl, msg, 1);
+ res = dsig_send_exit(dsdp, ctl, msg, from, 1);
UnUseTmpHeapNoproc(5);
return res;
}
@@ -1193,7 +1293,7 @@ erts_dsig_send_exit2(ErtsDSigData *dsdp, Eterm local, Eterm remote, Eterm reason
msg = THE_NON_VALUE;
}
- res = dsig_send_exit(dsdp, ctl, msg, 0);
+ res = dsig_send_exit(dsdp, ctl, msg, local, 0);
UnUseTmpHeapNoproc(5);
return res;
}
@@ -1215,17 +1315,123 @@ erts_dsig_send_group_leader(ErtsDSigData *dsdp, Eterm leader, Eterm remote)
return res;
}
-#if defined(PURIFY)
-# define PURIFY_MSG(msg) \
- purify_printf("%s, line %d: %s", __FILE__, __LINE__, msg)
-#elif defined(VALGRIND)
-#include <valgrind/valgrind.h>
-#include <valgrind/memcheck.h>
-# define PURIFY_MSG(msg) \
- VALGRIND_PRINTF("%s, line %d: %s", __FILE__, __LINE__, msg)
-#else
-# define PURIFY_MSG(msg)
-#endif
+struct dist_sequences {
+ ErlHeapFragment hfrag;
+ struct dist_sequences *parent;
+ struct dist_sequences *left;
+ struct dist_sequences *right;
+ char is_red;
+
+ Uint64 seq_id;
+ int cnt;
+ Sint ctl_len;
+};
+
+#define ERTS_RBT_PREFIX dist_seq
+#define ERTS_RBT_T DistSeqNode
+#define ERTS_RBT_KEY_T Uint
+#define ERTS_RBT_FLAGS_T int
+#define ERTS_RBT_INIT_EMPTY_TNODE(T) \
+ do { \
+ (T)->parent = NULL; \
+ (T)->left = NULL; \
+ (T)->right = NULL; \
+ (T)->is_red = 0; \
+ } while(0)
+#define ERTS_RBT_IS_RED(T) ((T)->is_red)
+#define ERTS_RBT_SET_RED(T) ((T)->is_red = 1)
+#define ERTS_RBT_IS_BLACK(T) (!ERTS_RBT_IS_RED(T))
+#define ERTS_RBT_SET_BLACK(T) ((T)->is_red = 0)
+#define ERTS_RBT_GET_FLAGS(T) ((T)->is_red)
+#define ERTS_RBT_SET_FLAGS(T, F) ((T)->is_red = F)
+#define ERTS_RBT_GET_PARENT(T) ((T)->parent)
+#define ERTS_RBT_SET_PARENT(T, P) ((T)->parent = P)
+#define ERTS_RBT_GET_RIGHT(T) ((T)->right)
+#define ERTS_RBT_SET_RIGHT(T, R) ((T)->right = (R))
+#define ERTS_RBT_GET_LEFT(T) ((T)->left)
+#define ERTS_RBT_SET_LEFT(T, L) ((T)->left = (L))
+#define ERTS_RBT_GET_KEY(T) ((T)->seq_id)
+#define ERTS_RBT_IS_LT(KX, KY) (KX < KY)
+#define ERTS_RBT_IS_EQ(KX, KY) (KX == KY)
+#define ERTS_RBT_WANT_DELETE
+#define ERTS_RBT_WANT_LOOKUP_INSERT
+#define ERTS_RBT_WANT_LOOKUP
+#define ERTS_RBT_WANT_FOREACH
+#define ERTS_RBT_WANT_FOREACH_DESTROY_YIELDING
+
+#include "erl_rbtree.h"
+
+struct erts_dist_seq_tree_foreach_iter_arg {
+ int (*func)(ErtsDistExternal *, void *, Sint);
+ void *arg;
+};
+
+static int
+erts_dist_seq_tree_foreach_iter(DistSeqNode *seq, void *arg, Sint reds)
+{
+ struct erts_dist_seq_tree_foreach_iter_arg *state = arg;
+ return state->func(erts_get_dist_ext(&seq->hfrag), state->arg, reds);
+}
+
+void
+erts_dist_seq_tree_foreach(DistEntry *dep, int (*func)(ErtsDistExternal *, void *, Sint), void *arg)
+{
+ struct erts_dist_seq_tree_foreach_iter_arg state;
+ state.func = func;
+ state.arg = arg;
+ dist_seq_rbt_foreach(dep->sequences, erts_dist_seq_tree_foreach_iter, &state);
+}
+
+static int dist_seq_cleanup(DistSeqNode *seq, void *unused, Sint reds)
+{
+ erts_free_dist_ext_copy(erts_get_dist_ext(&seq->hfrag));
+ free_message_buffer(&seq->hfrag);
+ return 1;
+}
+
+typedef struct {
+ DistSeqNode *root;
+ dist_seq_rbt_yield_state_t rbt_ystate;
+} DistSeqNodeYieldState;
+
+int
+erts_dist_seq_tree_foreach_delete_yielding(DistSeqNode **root,
+ void **vyspp,
+ Sint limit)
+{
+ DistSeqNodeYieldState ys = {*root, ERTS_RBT_YIELD_STAT_INITER};
+ DistSeqNodeYieldState *ysp;
+ int res;
+
+ ysp = (DistSeqNodeYieldState *) *vyspp;
+ if (!ysp) {
+ *root = NULL;
+ ysp = &ys;
+ }
+ res = dist_seq_rbt_foreach_destroy_yielding(&ysp->root,
+ dist_seq_cleanup,
+ NULL,
+ &ysp->rbt_ystate,
+ limit);
+ if (res > 0) {
+ if (ysp != &ys)
+ erts_free(ERTS_ALC_T_ML_YIELD_STATE, ysp);
+ *vyspp = NULL;
+ }
+ else {
+
+ if (ysp == &ys) {
+ ysp = erts_alloc(ERTS_ALC_T_SEQ_YIELD_STATE,
+ sizeof(DistSeqNodeYieldState));
+ sys_memcpy((void *) ysp, (void *) &ys,
+ sizeof(DistSeqNodeYieldState));
+ }
+
+ *vyspp = (void *) ysp;
+ }
+
+ return res;
+}
/*
** Input from distribution port.
@@ -1246,7 +1452,9 @@ int erts_net_message(Port *prt,
byte *buf,
ErlDrvSizeT len)
{
- ErtsDistExternal ede;
+ ErtsDistExternal ede, *edep = &ede;
+ ErtsDistExternalData ede_data;
+ ErlHeapFragment *ede_hfrag = NULL;
Sint ctl_len;
Eterm arg;
Eterm from, to;
@@ -1285,10 +1493,12 @@ int erts_net_message(Port *prt,
}
#ifdef ERTS_RAW_DIST_MSG_DBG
- erts_fprintf(stderr, "<< ");
+ erts_fprintf(dbg_file, "RECV: ");
bw(buf, len);
#endif
+ ede.data = &ede_data;
+
res = erts_prepare_dist_ext(&ede, buf, len, bin, dep, conn_id, dep->cache);
switch (res) {
@@ -1296,51 +1506,164 @@ int erts_net_message(Port *prt,
return 0; /* Connection not alive; ignore signal... */
case ERTS_PREP_DIST_EXT_FAILED:
#ifdef ERTS_DIST_MSG_DBG
- erts_fprintf(stderr, "DIST MSG DEBUG: erts_prepare_dist_ext() failed:\n");
+ erts_fprintf(dbg_file, "DIST MSG DEBUG: erts_prepare_dist_ext() failed:\n");
bw(buf, orig_len);
#endif
goto data_error;
case ERTS_PREP_DIST_EXT_SUCCESS:
- ctl_len = erts_decode_dist_ext_size(&ede);
+ ctl_len = erts_decode_dist_ext_size(&ede, 1);
if (ctl_len < 0) {
#ifdef ERTS_DIST_MSG_DBG
- erts_fprintf(stderr, "DIST MSG DEBUG: erts_decode_dist_ext_size(CTL) failed:\n");
+ erts_fprintf(dbg_file, "DIST MSG DEBUG: erts_decode_dist_ext_size(CTL) failed:\n");
bw(buf, orig_len);
#endif
PURIFY_MSG("data error");
goto data_error;
}
+
+ /* A non-fragmented message */
+ if (!ede.data->seq_id) {
+ if (ctl_len > DIST_CTL_DEFAULT_SIZE) {
+ ctl = erts_alloc(ERTS_ALC_T_DCTRL_BUF, ctl_len * sizeof(Eterm));
+ }
+
+ erts_factory_tmp_init(&factory, ctl, ctl_len, ERTS_ALC_T_DCTRL_BUF);
+ break;
+ } else {
+ DistSeqNode *seq;
+ Uint sz = erts_dist_ext_size(&ede);
+ Uint used_sz = ctl_len * sizeof(Eterm);
+
+ /* We calculate the size of the heap fragment to be allocated.
+ The used_size part has to be larger that the ctl data and the
+ DistSeqNode. */
+ if (used_sz + (sizeof(ErlHeapFragment) - sizeof(Eterm)) < sizeof(DistSeqNode))
+ used_sz = sizeof(DistSeqNode) - (sizeof(ErlHeapFragment) - sizeof(Eterm));
+
+ seq = (DistSeqNode *)new_message_buffer((sz + used_sz) / sizeof(Eterm));
+ seq->hfrag.used_size = used_sz / sizeof(Eterm);
+
+ seq->ctl_len = ctl_len;
+ seq->seq_id = ede.data->seq_id;
+ seq->cnt = ede.data->frag_id;
+ if (dist_seq_rbt_lookup_insert(&dep->sequences, seq) != NULL) {
+ free_message_buffer(&seq->hfrag);
+ goto data_error;
+ }
+
+ erts_make_dist_ext_copy(&ede, erts_get_dist_ext(&seq->hfrag));
+
+ if (ede.data->frag_id > 1) {
+ seq->cnt--;
+ return 0;
+ }
+ }
+
+ /* fall through, the first fragment in the sequence was the last fragment */
+ case ERTS_PREP_DIST_EXT_FRAG_CONT: {
+ DistSeqNode *seq = dist_seq_rbt_lookup(dep->sequences, ede.data->seq_id);
+
+ if (!seq)
+ goto data_error;
+
+ /* If we did a fall-though we already did this */
+ if (res == ERTS_PREP_DIST_EXT_FRAG_CONT)
+ erts_dist_ext_frag(&ede_data, erts_get_dist_ext(&seq->hfrag));
+
+ /* Verify that the fragments have arrived in the correct order */
+ if (seq->cnt != ede.data->frag_id)
+ goto data_error;
+
+ seq->cnt--;
+
+ /* Check if this was the last fragment */
+ if (ede.data->frag_id > 1)
+ return 0;
+
+ /* Last fragment arrived, time to dispatch the signal */
+ dist_seq_rbt_delete(&dep->sequences, seq);
+ ctl_len = seq->ctl_len;
+
+ /* Now that we no longer need the DistSeqNode we re-use the heapfragment
+ to decode the ctl msg into. We don't need the ctl message to be in
+ the heapfragment, but we decode into the heapfragment speculatively
+ in case there is a trace token that we need. */
+ erts_factory_heap_frag_init(&factory, &seq->hfrag);
+ edep = erts_get_dist_ext(&seq->hfrag);
+ ede_hfrag = &seq->hfrag;
+
+ /* If the sequence consisted of more than 1 fragment we create one large
+ binary out of all of the fragments. This because erts_decode_ext
+ cannot handle a segmented buffer.
+ TODO: Move this copy to as late as possible, preferably in in the
+ erts_decode_dist_ext in the receiving process.
+ */
+ if (edep->data->frag_id > 1) {
+ Uint sz = 0;
+ Binary *bin;
+ int i;
+ byte *ep;
+
+ for (i = 0; i < edep->data->frag_id; i++)
+ sz += edep->data[i].ext_endp - edep->data[i].extp;
+
+ bin = erts_bin_nrml_alloc(sz);
+ ep = (byte*)bin->orig_bytes;
+
+ for (i = 0; i < edep->data->frag_id; i++) {
+ sys_memcpy(ep, edep->data[i].extp, edep->data[i].ext_endp - edep->data[i].extp);
+ ep += edep->data[i].ext_endp - edep->data[i].extp;
+ erts_bin_release(edep->data[i].binp);
+ edep->data[i].binp = NULL;
+ edep->data[i].extp = NULL;
+ edep->data[i].ext_endp = NULL;
+ }
+
+ edep->data->frag_id = 1;
+ edep->data->extp = (byte*)bin->orig_bytes;
+ edep->data->ext_endp = ep;
+ edep->data->binp = bin;
+ }
+
break;
+ }
default:
ERTS_INTERNAL_ERROR("Unexpected result from erts_prepare_dist_ext()");
break;
}
- if (ctl_len > DIST_CTL_DEFAULT_SIZE) {
- ctl = erts_alloc(ERTS_ALC_T_DCTRL_BUF, ctl_len * sizeof(Eterm));
- }
-
- erts_factory_tmp_init(&factory, ctl, ctl_len, ERTS_ALC_T_DCTRL_BUF);
- arg = erts_decode_dist_ext(&factory, &ede);
+ arg = erts_decode_dist_ext(&factory, edep, 1);
if (is_non_value(arg)) {
#ifdef ERTS_DIST_MSG_DBG
- erts_fprintf(stderr, "DIST MSG DEBUG: erts_decode_dist_ext(CTL) failed:\n");
+ erts_fprintf(dbg_file, "DIST MSG DEBUG: erts_decode_dist_ext(CTL) failed:\n");
bw(buf, orig_len);
#endif
PURIFY_MSG("data error");
goto decode_error;
}
-#ifdef ERTS_DIST_MSG_DBG
- erts_fprintf(stderr, "<< CTL: %.80T\n", arg);
-#endif
+ /* Fill the unused part of the hfrag with a bignum header */
+ if (ede_hfrag && ede_hfrag->mem + ede_hfrag->used_size > factory.hp) {
+ Uint slot = factory.hp - ede_hfrag->mem;
+ ede_hfrag->mem[slot] = make_pos_bignum_header(ede_hfrag->used_size - slot - 1);
+ }
if (is_not_tuple(arg) ||
(tuple = tuple_val(arg), (tuple_arity = arityval(*tuple)) < 1) ||
is_not_small(tuple[1])) {
+#ifdef ERTS_DIST_MSG_DBG
+ if (is_tuple(arg) && arityval(*tuple) > 1)
+ erts_fprintf(dbg_file, "RECV: CTL: %s: %.80T\n",
+ erts_dop_to_string(unsigned_val(tuple[1])), arg);
+#endif
goto invalid_message;
}
+#ifdef ERTS_DIST_MSG_DBG
+ erts_fprintf(dbg_file, "RECV: CTL: %s: %.80T\n",
+ erts_dop_to_string(unsigned_val(tuple[1])), arg);
+#endif
+
token = NIL;
switch (type = unsigned_val(tuple[1])) {
@@ -1385,7 +1708,7 @@ int erts_net_message(Port *prt,
code = erts_dsig_prepare(&dsd, dep, NULL, 0, ERTS_DSP_NO_LOCK, 0, 0);
if (code == ERTS_DSIG_PREP_CONNECTED) {
- code = erts_dsig_send_exit(&dsd, to, from, am_noproc);
+ code = erts_dsig_send_exit(&dsd, to, to, from, am_noproc);
ASSERT(code == ERTS_DSIG_SEND_OK);
}
@@ -1475,7 +1798,7 @@ int erts_net_message(Port *prt,
code = erts_dsig_prepare(&dsd, dep, NULL, 0, ERTS_DSP_NO_LOCK, 0, 0);
if (code == ERTS_DSIG_PREP_CONNECTED) {
- code = erts_dsig_send_m_exit(&dsd, watcher, watched, ref,
+ code = erts_dsig_send_m_exit(&dsd, pid, watcher, watched, ref,
am_noproc);
ASSERT(code == ERTS_DSIG_SEND_OK);
}
@@ -1549,7 +1872,7 @@ int erts_net_message(Port *prt,
}
#ifdef ERTS_DIST_MSG_DBG
- dist_msg_dbg(&ede, "MSG", buf, orig_len);
+ dist_msg_dbg(edep, "MSG", buf, orig_len);
#endif
from = tuple[2];
@@ -1560,7 +1883,6 @@ int erts_net_message(Port *prt,
rp = erts_whereis_process(NULL, 0, to, 0, 0);
if (rp) {
ErtsProcLocks locks = 0;
- ErtsDistExternal *ede_copy;
if (type == DOP_REG_SEND) {
token = NIL;
@@ -1568,13 +1890,14 @@ int erts_net_message(Port *prt,
token = tuple[5];
}
- ede_copy = erts_make_dist_ext_copy(&ede, &token);
-
- erts_queue_dist_message(rp, locks, ede_copy, token, from);
+ erts_queue_dist_message(rp, locks, edep, ede_hfrag, token, from);
if (locks)
erts_proc_unlock(rp, locks);
- }
+ } else if (ede_hfrag) {
+ erts_free_dist_ext_copy(erts_get_dist_ext(ede_hfrag));
+ free_message_buffer(ede_hfrag);
+ }
break;
case DOP_SEND_SENDER_TT: {
@@ -1606,7 +1929,7 @@ int erts_net_message(Port *prt,
: tuple[2] == am_Empty);
#ifdef ERTS_DIST_MSG_DBG
- dist_msg_dbg(&ede, "MSG", buf, orig_len);
+ dist_msg_dbg(edep, "MSG", buf, orig_len);
#endif
to = tuple[3];
if (is_not_pid(to)) {
@@ -1615,20 +1938,19 @@ int erts_net_message(Port *prt,
rp = erts_proc_lookup(to);
if (rp) {
ErtsProcLocks locks = 0;
- ErtsDistExternal *ede_copy;
-
- ede_copy = erts_make_dist_ext_copy(&ede, &token);
- erts_queue_dist_message(rp, locks, ede_copy, token, am_Empty);
+ erts_queue_dist_message(rp, locks, edep, ede_hfrag, token, am_Empty);
if (locks)
erts_proc_unlock(rp, locks);
- }
+ } else if (ede_hfrag) {
+ erts_free_dist_ext_copy(erts_get_dist_ext(ede_hfrag));
+ free_message_buffer(ede_hfrag);
+ }
break;
}
case DOP_PAYLOAD_MONITOR_P_EXIT:
case DOP_MONITOR_P_EXIT: {
- ErtsDistExternal *ede_copy = NULL;
/* We are monitoring a process on the remote node which dies, we get
{DOP_MONITOR_P_EXIT, Remote pid or name, Local pid, ref, reason} */
@@ -1663,19 +1985,18 @@ int erts_net_message(Port *prt,
goto invalid_message;
}
+ if (!erts_proc_lookup(watcher)) break; /* Process not alive */
+
if (reason == THE_NON_VALUE) {
#ifdef ERTS_DIST_MSG_DBG
- dist_msg_dbg(&ede, "MSG", buf, orig_len);
+ dist_msg_dbg(edep, "MSG", buf, orig_len);
#endif
- if (!erts_proc_lookup(watcher)) break; /* Process not alive */
-
- ede_copy = erts_make_dist_ext_copy(&ede, &token);
}
- erts_proc_sig_send_dist_monitor_down(dep, ref, watched,
- watcher, ede_copy, reason);
+ erts_proc_sig_send_dist_monitor_down(
+ dep, ref, watched, watcher, edep, ede_hfrag, reason);
break;
}
@@ -1683,7 +2004,6 @@ int erts_net_message(Port *prt,
case DOP_PAYLOAD_EXIT_TT:
case DOP_EXIT_TT:
case DOP_EXIT: {
- ErtsDistExternal *ede_copy = NULL;
/* 'from', which 'to' is linked to, died */
from = tuple[2];
@@ -1720,19 +2040,16 @@ int erts_net_message(Port *prt,
goto invalid_message;
}
- if (reason == THE_NON_VALUE) {
+ if (!erts_proc_lookup(to)) break; /* Process not alive */
+ if (reason == THE_NON_VALUE) {
#ifdef ERTS_DIST_MSG_DBG
- dist_msg_dbg(&ede, "MSG", buf, orig_len);
+ dist_msg_dbg(edep, "MSG", buf, orig_len);
#endif
-
- if (!erts_proc_lookup(to)) break; /* Process not alive */
-
- ede_copy = erts_make_dist_ext_copy(&ede, &token);
}
erts_proc_sig_send_dist_link_exit(dep,
- from, to, ede_copy,
+ from, to, edep, ede_hfrag,
reason, token);
break;
}
@@ -1740,7 +2057,6 @@ int erts_net_message(Port *prt,
case DOP_PAYLOAD_EXIT2:
case DOP_EXIT2_TT:
case DOP_EXIT2: {
- ErtsDistExternal *ede_copy = NULL;
/* 'from' is send an exit signal to 'to' */
from = tuple[2];
@@ -1777,18 +2093,15 @@ int erts_net_message(Port *prt,
goto invalid_message;
}
- if (reason == THE_NON_VALUE) {
+ if (!erts_proc_lookup(to)) break; /* Process not alive */
+ if (reason == THE_NON_VALUE) {
#ifdef ERTS_DIST_MSG_DBG
- dist_msg_dbg(&ede, "MSG", buf, orig_len);
+ dist_msg_dbg(edep, "MSG", buf, orig_len);
#endif
-
- if (!erts_proc_lookup(to)) break; /* Process not alive */
-
- ede_copy = erts_make_dist_ext_copy(&ede, &token);
}
- erts_proc_sig_send_dist_exit(dep, from, to, ede_copy, reason, token);
+ erts_proc_sig_send_dist_exit(dep, from, to, edep, ede_hfrag, reason, token);
break;
}
case DOP_GROUP_LEADER:
@@ -1804,13 +2117,15 @@ int erts_net_message(Port *prt,
(void) erts_proc_sig_send_group_leader(NULL, to, from, NIL);
break;
- default:
+ default:
goto invalid_message;
}
- erts_factory_close(&factory);
- if (ctl != ctl_default) {
- erts_free(ERTS_ALC_T_DCTRL_BUF, (void *) ctl);
+ if (ede_hfrag == NULL) {
+ erts_factory_close(&factory);
+ if (ctl != ctl_default) {
+ erts_free(ERTS_ALC_T_DCTRL_BUF, (void *) ctl);
+ }
}
UnUseTmpHeapNoproc(DIST_CTL_DEFAULT_SIZE);
ERTS_CHK_NO_PROC_LOCKS;
@@ -1823,9 +2138,14 @@ int erts_net_message(Port *prt,
}
decode_error:
PURIFY_MSG("data error");
- erts_factory_close(&factory);
- if (ctl != ctl_default) {
- erts_free(ERTS_ALC_T_DCTRL_BUF, (void *) ctl);
+ if (ede_hfrag == NULL) {
+ erts_factory_close(&factory);
+ if (ctl != ctl_default) {
+ erts_free(ERTS_ALC_T_DCTRL_BUF, (void *) ctl);
+ }
+ } else {
+ erts_free_dist_ext_copy(erts_get_dist_ext(ede_hfrag));
+ free_message_buffer(ede_hfrag);
}
data_error:
UnUseTmpHeapNoproc(DIST_CTL_DEFAULT_SIZE);
@@ -1834,18 +2154,17 @@ data_error:
return -1;
}
-static int dsig_send_exit(ErtsDSigData* dsdp, Eterm ctl, Eterm msg, int force_busy)
+static int dsig_send_exit(ErtsDSigData* dsdp, Eterm ctl, Eterm msg, Eterm from, int force_busy)
{
struct erts_dsig_send_context ctx;
int ret;
ctx.ctl = ctl;
ctx.msg = msg;
+ ctx.from = from;
ctx.force_busy = force_busy;
ctx.force_encode = 1;
ctx.phase = ERTS_DSIG_SEND_PHASE_INIT;
-#ifdef DEBUG
ctx.reds = 1; /* provoke assert below (no reduction count with force_encode) */
-#endif
ret = erts_dsig_send(dsdp, &ctx);
ASSERT(ret != ERTS_DSIG_SEND_CONTINUE);
return ret;
@@ -1857,12 +2176,11 @@ static int dsig_send_ctl(ErtsDSigData* dsdp, Eterm ctl, int force_busy)
int ret;
ctx.ctl = ctl;
ctx.msg = THE_NON_VALUE;
+ ctx.from = THE_NON_VALUE;
ctx.force_busy = force_busy;
ctx.force_encode = 1;
ctx.phase = ERTS_DSIG_SEND_PHASE_INIT;
-#ifdef DEBUG
ctx.reds = 1; /* provoke assert below (no reduction count without msg) */
-#endif
ret = erts_dsig_send(dsdp, &ctx);
ASSERT(ret != ERTS_DSIG_SEND_CONTINUE);
return ret;
@@ -1926,9 +2244,11 @@ erts_dsig_send(ErtsDSigData *dsdp, struct erts_dsig_send_context* ctx)
}
#ifdef ERTS_DIST_MSG_DBG
- erts_fprintf(stderr, ">> CTL: %.80T\n", ctx->ctl);
+ erts_fprintf(dbg_file, "SEND: CTL: %s: %.80T\n",
+ erts_dop_to_string(unsigned_val(tuple_val(ctx->ctl)[1])),
+ ctx->ctl);
if (is_value(ctx->msg))
- erts_fprintf(stderr, " MSG: %.80T\n", ctx->msg);
+ erts_fprintf(dbg_file, " MSG: %.160T\n", ctx->msg);
#endif
ctx->data_size = ctx->max_finalize_prepend;
@@ -1958,16 +2278,32 @@ erts_dsig_send(ErtsDSigData *dsdp, struct erts_dsig_send_context* ctx)
case ERTS_DSIG_SEND_PHASE_ALLOC:
erts_finalize_atom_cache_map(ctx->acmp, ctx->flags);
- ctx->dhdr_ext_size = erts_encode_ext_dist_header_size(ctx->acmp);
- ctx->data_size += ctx->dhdr_ext_size;
+ if (ctx->flags & DFLAG_FRAGMENTS && is_value(ctx->msg) && is_not_immed(ctx->msg)) {
+ /* Calculate the max number of fragments that are needed */
+ ASSERT(is_pid(ctx->from) &&
+ "from has to be a pid because it is used as sequence id");
+ ctx->fragments = ctx->data_size / ERTS_DIST_FRAGMENT_SIZE + 1;
+ } else
+ ctx->fragments = 1;
- ctx->obuf = alloc_dist_obuf(ctx->data_size);
- ctx->obuf->ext_endp = &ctx->obuf->data[0] + ctx->max_finalize_prepend + ctx->dhdr_ext_size;
+ ctx->dhdr_ext_size = erts_encode_ext_dist_header_size(ctx->acmp, ctx->fragments);
+
+ ctx->obuf = alloc_dist_obuf(
+ ctx->dhdr_ext_size + ctx->data_size +
+ (ctx->fragments-1) * ERTS_DIST_FRAGMENT_HEADER_SIZE,
+ ctx->fragments);
+ ctx->obuf->ext_start = &ctx->obuf->extp[0];
+ ctx->obuf->ext_endp = &ctx->obuf->extp[0] + ctx->max_finalize_prepend + ctx->dhdr_ext_size;
/* Encode internal version of dist header */
- ctx->obuf->extp = erts_encode_ext_dist_header_setup(ctx->obuf->ext_endp, ctx->acmp);
+ ctx->obuf->extp = erts_encode_ext_dist_header_setup(
+ ctx->obuf->ext_endp, ctx->acmp, ctx->fragments, ctx->from);
/* Encode control message */
erts_encode_dist_ext(ctx->ctl, &ctx->obuf->ext_endp, ctx->flags, ctx->acmp, NULL, NULL);
+
+ ctx->obuf->hdrp = NULL;
+ ctx->obuf->hdr_endp = NULL;
+
if (is_non_value(ctx->msg)) {
ctx->obuf->msg_start = NULL;
ctx->phase = ERTS_DSIG_SEND_PHASE_FIN;
@@ -1988,33 +2324,91 @@ erts_dsig_send(ErtsDSigData *dsdp, struct erts_dsig_send_context* ctx)
goto done;
}
} else {
- erts_encode_dist_ext(ctx->msg, &ctx->obuf->ext_endp, ctx->flags, ctx->acmp, NULL, NULL);
+ erts_encode_dist_ext(ctx->msg, &ctx->obuf->ext_endp, ctx->flags,
+ ctx->acmp, NULL, NULL);
}
- ctx->phase = ERTS_DSIG_SEND_PHASE_FIN;
+ ctx->phase = ERTS_DSIG_SEND_PHASE_FIN;
case ERTS_DSIG_SEND_PHASE_FIN: {
- DistEntry *dep = dsdp->dep;
- int suspended = 0;
- int resume = 0;
ASSERT(ctx->obuf->extp < ctx->obuf->ext_endp);
- ASSERT(&ctx->obuf->data[0] <= ctx->obuf->extp - ctx->max_finalize_prepend);
- ASSERT(ctx->obuf->ext_endp <= &ctx->obuf->data[0] + ctx->data_size);
+ ASSERT(((byte*)&ctx->obuf->bin->orig_bytes[0]+obuf_list_size) <= ctx->obuf->extp - ctx->max_finalize_prepend);
+ ASSERT(ctx->obuf->ext_endp <= ((byte*)ctx->obuf->bin->orig_bytes+obuf_list_size) + ctx->data_size + ctx->dhdr_ext_size);
ctx->data_size = ctx->obuf->ext_endp - ctx->obuf->extp;
- if (ctx->data_size > (Uint) INT_MAX) {
- free_dist_obuf(ctx->obuf);
- ctx->obuf = NULL;
- retval = ERTS_DSIG_SEND_TOO_LRG;
- goto done;
- }
ctx->obuf->hopefull_flags = ctx->u.ec.hopefull_flags;
- /*
+
+ if (ctx->fragments > 1) {
+ int fin_fragments;
+ int i;
+ byte *msg = ctx->obuf->msg_start,
+ *msg_end = ctx->obuf->ext_endp,
+ *hdrp = msg_end;
+
+ ASSERT((ctx->obuf->hopefull_flags & ctx->flags) == ctx->obuf->hopefull_flags);
+ ASSERT(get_int64(ctx->obuf->extp + 1 + 1 + 8) == ctx->fragments);
+
+ /* Now that encoding is done we know how large the term will
+ be so we adjust the number of fragments to send. Note that
+ this can mean that only 1 fragment is sent. */
+ fin_fragments = (ctx->obuf->ext_endp - ctx->obuf->msg_start + ERTS_DIST_FRAGMENT_SIZE-1) /
+ ERTS_DIST_FRAGMENT_SIZE - 1;
+
+ /* Update the frag_id in the DIST_FRAG_HEADER */
+ put_int64(fin_fragments+1, ctx->obuf->extp + 1 + 1 + 8);
+
+ if (fin_fragments > 0)
+ msg += ERTS_DIST_FRAGMENT_SIZE;
+ else
+ msg = msg_end;
+ ctx->obuf->next = &ctx->obuf[1];
+ ctx->obuf->ext_endp = msg;
+
+ /* Loop through all fragments, updating the output buffers
+ to be correct and also writing the DIST_FRAG_CONT header. */
+ for (i = 1; i < fin_fragments + 1; i++) {
+ ctx->obuf[i].hopefull_flags = 0;
+ ctx->obuf[i].extp = msg;
+ ctx->obuf[i].ext_start = msg;
+ if (msg + ERTS_DIST_FRAGMENT_SIZE > msg_end)
+ ctx->obuf[i].ext_endp = msg_end;
+ else {
+ msg += ERTS_DIST_FRAGMENT_SIZE;
+ ctx->obuf[i].ext_endp = msg;
+ }
+ ASSERT(ctx->obuf[i].ext_endp > ctx->obuf[i].extp);
+ ctx->obuf[i].hdrp = erts_encode_ext_dist_header_fragment(
+ &hdrp, fin_fragments - i + 1, ctx->from);
+ ctx->obuf[i].hdr_endp = hdrp;
+ ctx->obuf[i].next = &ctx->obuf[i+1];
+ }
+ /* If the initial fragment calculation was incorrect we free the
+ remaining output buffers. */
+ for (; i < ctx->fragments; i++) {
+ free_dist_obuf(&ctx->obuf[i]);
+ }
+ if (!ctx->force_encode && !ctx->force_busy)
+ ctx->reds -= ctx->fragments;
+ ctx->fragments = fin_fragments + 1;
+ }
+
+ ctx->phase = ERTS_DSIG_SEND_PHASE_SEND;
+
+ if (ctx->reds <= 0) {
+ retval = ERTS_DSIG_SEND_CONTINUE;
+ goto done;
+ }
+ }
+ case ERTS_DSIG_SEND_PHASE_SEND: {
+ /*
* Signal encoded; now verify that the connection still exists,
* and if so enqueue the signal and schedule it for send.
*/
- ctx->obuf->next = NULL;
+ DistEntry *dep = dsdp->dep;
+ int suspended = 0;
+ int resume = 0;
+ int i;
erts_de_rlock(dep);
cid = dep->cid;
if (dep->state == ERTS_DE_STATE_EXITING
@@ -2022,14 +2416,38 @@ erts_dsig_send(ErtsDSigData *dsdp, struct erts_dsig_send_context* ctx)
|| dep->connection_id != dsdp->connection_id) {
/* Not the same connection as when we started; drop message... */
erts_de_runlock(dep);
- free_dist_obuf(ctx->obuf);
+ for (i = 0; i < ctx->fragments; i++)
+ free_dist_obuf(&ctx->obuf[i]);
+ ctx->fragments = 0;
}
else {
- Sint qsize;
+ Sint qsize = erts_atomic_read_nob(&dep->qsize);
erts_aint32_t qflgs;
ErtsProcList *plp = NULL;
Eterm notify_proc = NIL;
- Sint obsz = size_obuf(ctx->obuf);
+ Sint obsz;
+ int fragments;
+
+ /* Calculate how many fragments to send. This depends on
+ the available space in the distr queue and the amount
+ of remaining reductions. */
+ for (fragments = 0, obsz = 0;
+ fragments < ctx->fragments &&
+ ((ctx->reds > 0 && (qsize + obsz) < erts_dist_buf_busy_limit) ||
+ ctx->force_encode || ctx->force_busy);
+ fragments++) {
+#ifdef DEBUG
+ int reds = 100;
+#else
+ int reds = 10;
+#endif
+ if (!ctx->force_encode && !ctx->force_busy)
+ ctx->reds -= reds;
+ obsz += size_obuf(&ctx->obuf[fragments]);
+ }
+
+ ASSERT(fragments == ctx->fragments ||
+ (!ctx->force_encode && !ctx->force_busy));
erts_mtx_lock(&dep->qlock);
qsize = erts_atomic_add_read_nob(&dep->qsize, (erts_aint_t) obsz);
@@ -2059,12 +2477,25 @@ erts_dsig_send(ErtsDSigData *dsdp, struct erts_dsig_send_context* ctx)
erts_mtx_lock(&dep->qlock);
}
- /* Enqueue obuf on dist entry */
- if (dep->out_queue.last)
- dep->out_queue.last->next = ctx->obuf;
- else
- dep->out_queue.first = ctx->obuf;
- dep->out_queue.last = ctx->obuf;
+ if (fragments > 1) {
+ if (!ctx->obuf->hdrp) {
+ ASSERT(get_int64(ctx->obuf->extp + 10) == ctx->fragments);
+ } else {
+ ASSERT(get_int64(ctx->obuf->hdrp + 10) == ctx->fragments);
+ }
+ }
+
+ if (fragments) {
+ ctx->obuf[fragments-1].next = NULL;
+ if (dep->out_queue.last)
+ dep->out_queue.last->next = ctx->obuf;
+ else
+ dep->out_queue.first = ctx->obuf;
+ dep->out_queue.last = &ctx->obuf[fragments-1];
+
+ ctx->fragments -= fragments;
+ ctx->obuf = &ctx->obuf[fragments];
+ }
if (!ctx->force_busy) {
qflgs = erts_atomic32_read_nob(&dep->qflgs);
@@ -2113,6 +2544,13 @@ erts_dsig_send(ErtsDSigData *dsdp, struct erts_dsig_send_context* ctx)
* erroneously scheduled when it shouldn't be.
*/
}
+ /* More fragments left to be sent, yield and re-schedule */
+ if (ctx->fragments) {
+ retval = ERTS_DSIG_SEND_CONTINUE;
+ if (!resume && erts_system_monitor_flags.busy_dist_port)
+ monitor_generic(ctx->c_p, am_busy_dist_port, cid);
+ goto done;
+ }
}
ctx->obuf = NULL;
@@ -2197,9 +2635,9 @@ static Uint
dist_port_commandv(Port *prt, ErtsDistOutputBuf *obuf)
{
int fpe_was_unmasked;
- ErlDrvSizeT size;
- SysIOVec iov[2];
- ErlDrvBinary* bv[2];
+ ErlDrvSizeT size = 0;
+ SysIOVec iov[3];
+ ErlDrvBinary* bv[3];
ErlIOVec eiov;
ERTS_CHK_NO_PROC_LOCKS;
@@ -2214,12 +2652,31 @@ dist_port_commandv(Port *prt, ErtsDistOutputBuf *obuf)
eiov.vsize = 1;
}
else {
- size = obuf->ext_endp - obuf->extp;
+ int i = 1;
eiov.vsize = 2;
- iov[1].iov_base = obuf->extp;
- iov[1].iov_len = size;
- bv[1] = Binary2ErlDrvBinary(ErtsDistOutputBuf2Binary(obuf));
+ if (obuf->hdrp) {
+ eiov.vsize = 3;
+ iov[i].iov_base = obuf->hdrp;
+ iov[i].iov_len = obuf->hdr_endp - obuf->hdrp;
+ size += iov[i].iov_len;
+ bv[i] = Binary2ErlDrvBinary(ErtsDistOutputBuf2Binary(obuf));
+#ifdef ERTS_RAW_DIST_MSG_DBG
+ erts_fprintf(dbg_file, "SEND: ");
+ bw(iov[i].iov_base, iov[i].iov_len);
+#endif
+ i++;
+
+ }
+
+ iov[i].iov_base = obuf->extp;
+ iov[i].iov_len = obuf->ext_endp - obuf->extp;
+#ifdef ERTS_RAW_DIST_MSG_DBG
+ erts_fprintf(dbg_file, "SEND: ");
+ bw(iov[i].iov_base, iov[i].iov_len);
+#endif
+ size += iov[i].iov_len;
+ bv[i] = Binary2ErlDrvBinary(ErtsDistOutputBuf2Binary(obuf));
}
eiov.size = size;
@@ -2326,6 +2783,23 @@ erts_dist_command(Port *prt, int initial_reds)
dep->finalized_out_queue.first = NULL;
dep->finalized_out_queue.last = NULL;
+#ifdef DEBUG
+ {
+ Uint sz = 0;
+ ErtsDistOutputBuf *curr = oq.first;
+ while (curr) {
+ sz += size_obuf(curr);
+ curr = curr->next;
+ }
+ curr = foq.first;
+ while (curr) {
+ sz += size_obuf(curr);
+ curr = curr->next;
+ }
+ ASSERT(sz <= erts_atomic_read_nob(&dep->qsize));
+ }
+#endif
+
sched_flags = erts_atomic32_read_nob(&prt->sched.flags);
if (reds < 0)
@@ -2339,10 +2813,6 @@ erts_dist_command(Port *prt, int initial_reds)
size = (*send)(prt, foq.first);
erts_atomic64_inc_nob(&dep->out);
esdp->io.out += (Uint64) size;
-#ifdef ERTS_RAW_DIST_MSG_DBG
- erts_fprintf(stderr, ">> ");
- bw(foq.first->extp, size);
-#endif
reds -= ERTS_PORT_REDS_DIST_CMD_DATA(size);
fob = foq.first;
obufsize += size_obuf(fob);
@@ -2411,15 +2881,11 @@ erts_dist_command(Port *prt, int initial_reds)
preempt = 1;
break;
}
- ASSERT(&oq.first->data[0] <= oq.first->extp
- && oq.first->extp <= oq.first->ext_endp);
+ ASSERT(oq.first->bin->orig_bytes <= (char*)oq.first->extp
+ && oq.first->extp <= oq.first->ext_endp);
size = (*send)(prt, oq.first);
erts_atomic64_inc_nob(&dep->out);
esdp->io.out += (Uint64) size;
-#ifdef ERTS_RAW_DIST_MSG_DBG
- erts_fprintf(stderr, ">> ");
- bw(oq.first->extp, size);
-#endif
reds -= ERTS_PORT_REDS_DIST_CMD_DATA(size);
fob = oq.first;
obufsize += size_obuf(fob);
@@ -2556,100 +3022,6 @@ erts_dist_command(Port *prt, int initial_reds)
goto done;
}
-#if 0
-
-int
-dist_data_finalize(Process *c_p, int reds_limit)
-{
- int reds = 5;
- DistEntry *dep = ;
- ErtsDistOutputQueue oq, foq;
- ErtsDistOutputBuf *ob;
- int preempt;
-
-
- erts_mtx_lock(&dep->qlock);
- flags = dep->flags;
- oq.first = dep->out_queue.first;
- oq.last = dep->out_queue.last;
- dep->out_queue.first = NULL;
- dep->out_queue.last = NULL;
- erts_mtx_unlock(&dep->qlock);
-
- if (!oq.first) {
- ASSERT(!oq.last);
- oq.first = dep->tmp_out_queue.first;
- oq.last = dep->tmp_out_queue.last;
- }
- else {
- ErtsDistOutputBuf *f, *l;
- ASSERT(oq.last);
- if (dep->tmp_out_queue.last) {
- dep->tmp_out_queue.last->next = oq.first;
- oq.first = dep->tmp_out_queue.first;
- }
- }
-
- if (!oq.first) {
- /* Nothing to do... */
- ASSERT(!oq.last);
- return reds;
- }
-
- foq.first = dep->finalized_out_queue.first;
- foq.last = dep->finalized_out_queue.last;
-
- preempt = 0;
- ob = oq.first;
- ASSERT(ob);
-
- do {
- ob->extp = erts_encode_ext_dist_header_finalize(ob->extp,
- dep->cache,
- flags);
- if (!(flags & DFLAG_DIST_HDR_ATOM_CACHE))
- *--ob->extp = PASS_THROUGH; /* Old node; 'pass through'
- needed */
- ASSERT(&ob->data[0] <= ob->extp && ob->extp < ob->ext_endp);
- reds += ERTS_PORT_REDS_DIST_CMD_FINALIZE;
- preempt = reds > reds_limit;
- if (preempt)
- break;
- ob = ob->next;
- } while (ob);
- /*
- * At least one buffer was finalized; if we got preempted,
- * ob points to the last buffer that we finalized.
- */
- if (foq.last)
- foq.last->next = oq.first;
- else
- foq.first = oq.first;
- if (!preempt) {
- /* All buffers finalized */
- foq.last = oq.last;
- oq.first = oq.last = NULL;
- }
- else {
- /* Not all buffers finalized; split oq. */
- foq.last = ob;
- oq.first = ob->next;
- if (oq.first)
- ob->next = NULL;
- else
- oq.last = NULL;
- }
-
- dep->finalized_out_queue.first = foq.first;
- dep->finalized_out_queue.last = foq.last;
- dep->tmp_out_queue.first = oq.first;
- dep->tmp_out_queue.last = oq.last;
-
- return reds;
-}
-
-#endif
-
BIF_RETTYPE
dist_ctrl_get_data_notification_1(BIF_ALIST_1)
{
@@ -2998,6 +3370,10 @@ static void kill_connection(DistEntry *dep)
void
erts_kill_dist_connection(DistEntry *dep, Uint32 conn_id)
{
+#ifdef ERTS_DIST_MSG_DBG
+ erts_fprintf(dbg_file, "INTR: kill dist conn to %T:%u\n",
+ dep->sysname, conn_id);
+#endif
erts_de_rwlock(dep);
if (conn_id == dep->connection_id
&& dep->state == ERTS_DE_STATE_CONNECTED) {
@@ -3659,8 +4035,9 @@ Sint erts_abort_connection_rwunlock(DistEntry* dep)
erts_set_dist_entry_not_connected(dep);
erts_de_rwunlock(dep);
- schedule_con_monitor_link_cleanup(mld, THE_NON_VALUE,
- THE_NON_VALUE, THE_NON_VALUE);
+ schedule_con_monitor_link_seq_cleanup(
+ mld, NULL, THE_NON_VALUE,
+ THE_NON_VALUE, THE_NON_VALUE);
if (resume_procs) {
int resumed = erts_resume_processes(resume_procs);
diff --git a/erts/emulator/beam/dist.h b/erts/emulator/beam/dist.h
index 81a09bc69c..034b75df38 100644
--- a/erts/emulator/beam/dist.h
+++ b/erts/emulator/beam/dist.h
@@ -48,6 +48,7 @@
#define DFLAG_BIG_SEQTRACE_LABELS 0x100000
#define DFLAG_NO_MAGIC 0x200000 /* internal for pending connection */
#define DFLAG_EXIT_PAYLOAD 0x400000
+#define DFLAG_FRAGMENTS 0x800000
/* Mandatory flags for distribution */
#define DFLAG_DIST_MANDATORY (DFLAG_EXTENDED_REFERENCES \
@@ -77,7 +78,8 @@
| DFLAG_BIG_CREATION \
| DFLAG_SEND_SENDER \
| DFLAG_BIG_SEQTRACE_LABELS \
- | DFLAG_EXIT_PAYLOAD)
+ | DFLAG_EXIT_PAYLOAD \
+ | DFLAG_FRAGMENTS)
/* Flags addable by local distr implementations */
#define DFLAG_DIST_ADDABLE DFLAG_DIST_DEFAULT
@@ -101,7 +103,7 @@
| DFLAG_BIG_CREATION)
/* opcodes used in distribution messages */
-enum {
+enum dop {
DOP_LINK = 1,
DOP_SEND = 2,
DOP_EXIT = 3,
@@ -150,6 +152,16 @@ typedef struct {
Uint32 flags;
} ErtsDSigData;
+/* Must be larger or equal to 16 */
+#ifdef DEBUG
+#define ERTS_DIST_FRAGMENT_SIZE 16
+#else
+/* This should be made configurable */
+#define ERTS_DIST_FRAGMENT_SIZE (64 * 1024)
+#endif
+
+#define ERTS_DIST_FRAGMENT_HEADER_SIZE (1 + 1 + 8 + 8) /* magic, header, seq id, frag id*/
+
#define ERTS_DE_BUSY_LIMIT (1024*1024)
extern int erts_dist_buf_busy_limit;
extern int erts_is_alive;
@@ -347,7 +359,8 @@ enum erts_dsig_send_phase {
ERTS_DSIG_SEND_PHASE_MSG_SIZE,
ERTS_DSIG_SEND_PHASE_ALLOC,
ERTS_DSIG_SEND_PHASE_MSG_ENCODE,
- ERTS_DSIG_SEND_PHASE_FIN
+ ERTS_DSIG_SEND_PHASE_FIN,
+ ERTS_DSIG_SEND_PHASE_SEND
};
struct erts_dsig_send_context {
@@ -356,12 +369,14 @@ struct erts_dsig_send_context {
Eterm ctl;
Eterm msg;
+ Eterm from;
int force_busy;
int force_encode;
Uint32 max_finalize_prepend;
Uint data_size, dhdr_ext_size;
ErtsAtomCacheMap *acmp;
ErtsDistOutputBuf *obuf;
+ Uint fragments;
Uint32 flags;
Process *c_p;
union {
@@ -383,6 +398,7 @@ typedef struct {
Eterm return_term;
}ErtsSendContext;
+typedef struct dist_sequences DistSeqNode;
/*
* erts_dsig_send_* return values.
@@ -398,11 +414,11 @@ extern int erts_dsig_send_exit_tt(ErtsDSigData *, Eterm, Eterm, Eterm, Eterm);
extern int erts_dsig_send_unlink(ErtsDSigData *, Eterm, Eterm);
extern int erts_dsig_send_reg_msg(Eterm, Eterm, ErtsSendContext*);
extern int erts_dsig_send_group_leader(ErtsDSigData *, Eterm, Eterm);
-extern int erts_dsig_send_exit(ErtsDSigData *, Eterm, Eterm, Eterm);
+extern int erts_dsig_send_exit(ErtsDSigData *, Eterm, Eterm, Eterm, Eterm);
extern int erts_dsig_send_exit2(ErtsDSigData *, Eterm, Eterm, Eterm);
extern int erts_dsig_send_demonitor(ErtsDSigData *, Eterm, Eterm, Eterm, int);
extern int erts_dsig_send_monitor(ErtsDSigData *, Eterm, Eterm, Eterm);
-extern int erts_dsig_send_m_exit(ErtsDSigData *, Eterm, Eterm, Eterm, Eterm);
+extern int erts_dsig_send_m_exit(ErtsDSigData *, Eterm, Eterm, Eterm, Eterm, Eterm);
extern int erts_dsig_send(ErtsDSigData *dsdp, struct erts_dsig_send_context* ctx);
extern int erts_dsend_context_dtor(Binary*);
@@ -416,4 +432,8 @@ extern Uint erts_dist_cache_size(void);
extern Sint erts_abort_connection_rwunlock(DistEntry *dep);
+extern void erts_dist_seq_tree_foreach(
+ DistEntry *dep,
+ int (*func)(ErtsDistExternal *, void*, Sint), void *args);
+
#endif
diff --git a/erts/emulator/beam/erl_alloc.types b/erts/emulator/beam/erl_alloc.types
index 31f19d1b88..490a033b8a 100644
--- a/erts/emulator/beam/erl_alloc.types
+++ b/erts/emulator/beam/erl_alloc.types
@@ -276,6 +276,7 @@ type PF3_ARGS SHORT_LIVED PROCESSES process_flag_3_arguments
type SETUP_CONN_ARG SHORT_LIVED PROCESSES setup_connection_argument
type LIST_TRAP SHORT_LIVED PROCESSES list_bif_trap_state
type CONT_EXIT_TRAP SHORT_LIVED PROCESSES continue_exit_trap_state
+type SEQ_YIELD_STATE SHORT_LIVED SYSTEM dist_seq_yield_state
type ENVIRONMENT SYSTEM SYSTEM environment
diff --git a/erts/emulator/beam/erl_message.c b/erts/emulator/beam/erl_message.c
index cd7bce8bf4..e12a51a0ae 100644
--- a/erts/emulator/beam/erl_message.c
+++ b/erts/emulator/beam/erl_message.c
@@ -219,6 +219,10 @@ erts_cleanup_message(ErtsMessage *mp)
if (ERTS_SIG_IS_MSG(mp) && mp->data.attached != ERTS_MSG_COMBINED_HFRAG) {
bp = mp->data.heap_frag;
} else {
+ /* All non msg signals are combined HFRAG messages,
+ but we overwrite the mp->data field with the
+ nm_signal queue ptr so have to fix that here
+ before freeing it. */
mp->data.attached = ERTS_MSG_COMBINED_HFRAG;
bp = mp->hfrag.next;
erts_cleanup_offheap(&mp->hfrag.off_heap);
@@ -234,7 +238,7 @@ erts_cleanup_messages(ErtsMessage *msgp)
ErtsMessage *mp = msgp;
while (mp) {
ErtsMessage *fmp;
- erts_cleanup_message(mp);
+ erts_cleanup_message(mp);
fmp = mp;
mp = mp->next;
erts_free_message(fmp);
@@ -266,6 +270,7 @@ void
erts_queue_dist_message(Process *rcvr,
ErtsProcLocks rcvr_locks,
ErtsDistExternal *dist_ext,
+ ErlHeapFragment *hfrag,
Eterm token,
Eterm from)
{
@@ -274,8 +279,26 @@ erts_queue_dist_message(Process *rcvr,
ERTS_LC_ASSERT(rcvr_locks == erts_proc_lc_my_proc_locks(rcvr));
- mp = erts_alloc_message(0, NULL);
- mp->data.dist_ext = dist_ext;
+ if (hfrag) {
+ /* Fragmented message, allocate a message reference */
+ mp = erts_alloc_message(0, NULL);
+ mp->data.heap_frag = hfrag;
+ } else {
+ /* Un-fragmented message, allocate space for
+ token and dist_ext in message. */
+ Uint dist_ext_sz = erts_dist_ext_size(dist_ext) / sizeof(Eterm);
+ Uint token_sz = size_object(token);
+ Uint sz = token_sz + dist_ext_sz;
+ Eterm *hp;
+
+ mp = erts_alloc_message(sz, &hp);
+ mp->data.heap_frag = &mp->hfrag;
+ mp->hfrag.used_size = token_sz;
+
+ erts_make_dist_ext_copy(dist_ext, erts_get_dist_ext(mp->data.heap_frag));
+
+ token = copy_struct(token, token_sz, &hp, &mp->data.heap_frag->off_heap);
+ }
ERL_MESSAGE_FROM(mp) = dist_ext->dep->sysname;
ERL_MESSAGE_TERM(mp) = THE_NON_VALUE;
@@ -499,25 +522,27 @@ Uint
erts_msg_attached_data_size_aux(ErtsMessage *msg)
{
Sint sz;
- ASSERT(is_non_value(ERL_MESSAGE_TERM(msg)));
- ASSERT(msg->data.dist_ext);
- ASSERT(msg->data.dist_ext->heap_size < 0);
-
- sz = erts_decode_dist_ext_size(msg->data.dist_ext);
- if (sz < 0) {
- /* Bad external
- * We leave the message intact in this case as it's not worth the trouble
- * to make all callers remove it from queue. It will be detected again
- * and removed from message queue later anyway.
- */
- return 0;
- }
+ ErtsDistExternal *edep = erts_get_dist_ext(msg->data.heap_frag);
+ ASSERT(ERTS_SIG_IS_EXTERNAL_MSG(msg));
+
+ if (edep->heap_size < 0) {
- msg->data.dist_ext->heap_size = sz;
- if (is_not_nil(msg->m[1])) {
- ErlHeapFragment *heap_frag;
- heap_frag = erts_dist_ext_trailer(msg->data.dist_ext);
- sz += heap_frag->used_size;
+ sz = erts_decode_dist_ext_size(edep, 1);
+ if (sz < 0) {
+ /* Bad external
+ * We leave the message intact in this case as it's not worth the trouble
+ * to make all callers remove it from queue. It will be detected again
+ * and removed from message queue later anyway.
+ */
+ return 0;
+ }
+
+ edep->heap_size = sz;
+ } else {
+ sz = edep->heap_size;
+ }
+ if (is_not_nil(ERL_MESSAGE_TOKEN(msg))) {
+ sz += msg->data.heap_frag->used_size;
}
return sz;
}
@@ -1165,7 +1190,7 @@ erts_factory_message_create(ErtsHeapFactory* factory,
int on_heap;
erts_aint32_t state;
- state = proc ? erts_atomic32_read_nob(&proc->state) : 0;
+ state = proc ? erts_atomic32_read_nob(&proc->state) : ERTS_PSFLG_OFF_HEAP_MSGQ;
if (state & ERTS_PSFLG_OFF_HEAP_MSGQ) {
msgp = erts_alloc_message(sz, &hp);
@@ -1398,8 +1423,8 @@ void erts_factory_close(ErtsHeapFactory* factory)
else
factory->message->data.heap_frag = factory->heap_frags;
- /* Fall through */
- case FACTORY_HEAP_FRAGS:
+ /* Fall through */
+ case FACTORY_HEAP_FRAGS:
bp = factory->heap_frags;
}
diff --git a/erts/emulator/beam/erl_message.h b/erts/emulator/beam/erl_message.h
index 58294648b4..4c2674394e 100644
--- a/erts/emulator/beam/erl_message.h
+++ b/erts/emulator/beam/erl_message.h
@@ -138,7 +138,7 @@ typedef struct erl_heap_fragment ErlHeapFragment;
struct erl_heap_fragment {
ErlHeapFragment* next; /* Next heap fragment */
ErlOffHeap off_heap; /* Offset heap data. */
- Uint alloc_size; /* Size in (half)words of mem */
+ Uint alloc_size; /* Size in words of mem */
Uint used_size; /* With terms to be moved to heap by GC */
Eterm mem[1]; /* Data */
};
@@ -167,7 +167,6 @@ struct erl_heap_fragment {
#define ERL_MESSAGE_REF_FIELDS__ \
ErtsMessage *next; /* Next message */ \
union { \
- ErtsDistExternal *dist_ext; \
ErlHeapFragment *heap_frag; \
void *attached; \
} data; \
@@ -438,7 +437,8 @@ ErlHeapFragment* new_message_buffer(Uint);
ErlHeapFragment* erts_resize_message_buffer(ErlHeapFragment *, Uint,
Eterm *, Uint);
void free_message_buffer(ErlHeapFragment *);
-void erts_queue_dist_message(Process*, ErtsProcLocks, ErtsDistExternal *, Eterm, Eterm);
+void erts_queue_dist_message(Process*, ErtsProcLocks, ErtsDistExternal *,
+ ErlHeapFragment *, Eterm, Eterm);
void erts_queue_message(Process*, ErtsProcLocks,ErtsMessage*, Eterm, Eterm);
void erts_queue_proc_message(Process* from,Process* to, ErtsProcLocks,ErtsMessage*, Eterm);
void erts_queue_proc_messages(Process* from, Process* to, ErtsProcLocks,
@@ -583,22 +583,11 @@ ERTS_GLB_INLINE Uint erts_used_frag_sz(const ErlHeapFragment* bp)
ERTS_GLB_INLINE Uint erts_msg_attached_data_size(ErtsMessage *msg)
{
ASSERT(msg->data.attached);
- if (is_value(ERL_MESSAGE_TERM(msg))) {
- ErlHeapFragment *bp;
- bp = erts_message_to_heap_frag(msg);
- return erts_used_frag_sz(bp);
- }
- else if (msg->data.dist_ext->heap_size < 0)
- return erts_msg_attached_data_size_aux(msg);
- else {
- Uint sz = msg->data.dist_ext->heap_size;
- if (is_not_nil(ERL_MESSAGE_TOKEN(msg))) {
- ErlHeapFragment *heap_frag;
- heap_frag = erts_dist_ext_trailer(msg->data.dist_ext);
- sz += heap_frag->used_size;
- }
- return sz;
- }
+
+ if (ERTS_SIG_IS_INTERNAL_MSG(msg))
+ return erts_used_frag_sz(erts_message_to_heap_frag(msg));
+
+ return erts_msg_attached_data_size_aux(msg);
}
#endif
diff --git a/erts/emulator/beam/erl_node_tables.c b/erts/emulator/beam/erl_node_tables.c
index b7dbe625a2..ede9f092e9 100644
--- a/erts/emulator/beam/erl_node_tables.c
+++ b/erts/emulator/beam/erl_node_tables.c
@@ -201,6 +201,7 @@ dist_table_alloc(void *dep_tmpl)
dep->send = NULL;
dep->cache = NULL;
dep->transcode_ctx = NULL;
+ dep->sequences = NULL;
/* Link in */
@@ -1177,6 +1178,7 @@ static Eterm AM_system;
static Eterm AM_timer;
static Eterm AM_delayed_delete_timer;
static Eterm AM_thread_progress_delete_timer;
+static Eterm AM_sequence;
static Eterm AM_signal;
static void setup_reference_table(void);
@@ -1218,6 +1220,7 @@ typedef struct dist_referrer_ {
int ctrl_ref;
int system_ref;
int signal_ref;
+ int sequence_ref;
Eterm id;
Uint creation;
Uint id_heap[ID_HEAP_SIZE];
@@ -1272,6 +1275,7 @@ erts_get_node_and_dist_references(struct process *proc)
INIT_AM(delayed_delete_timer);
INIT_AM(thread_progress_delete_timer);
INIT_AM(signal);
+ INIT_AM(sequence);
references_atoms_need_init = 0;
}
@@ -1309,8 +1313,9 @@ erts_get_node_and_dist_references(struct process *proc)
#define TIMER_REF 8
#define SYSTEM_REF 9
#define SIGNAL_REF 10
+#define SEQUENCE_REF 11
-#define INC_TAB_SZ 10
+#define INC_TAB_SZ 11
static void
insert_dist_referrer(ReferredDist *referred_dist,
@@ -1344,6 +1349,7 @@ insert_dist_referrer(ReferredDist *referred_dist,
drp->ctrl_ref = 0;
drp->system_ref = 0;
drp->signal_ref = 0;
+ drp->sequence_ref = 0;
}
switch (type) {
@@ -1353,6 +1359,7 @@ insert_dist_referrer(ReferredDist *referred_dist,
case ETS_REF: drp->ets_ref++; break;
case SYSTEM_REF: drp->system_ref++; break;
case SIGNAL_REF: drp->signal_ref++; break;
+ case SEQUENCE_REF: drp->sequence_ref++; break;
default: ASSERT(0);
}
}
@@ -1583,6 +1590,20 @@ insert_dist_monitors(DistEntry *dep)
}
}
+
+static int
+insert_sequence(ErtsDistExternal *edep, void *arg, Sint reds)
+{
+ insert_dist_entry(edep->dep, SEQUENCE_REF, *(Eterm*)arg, 0);
+ return 1;
+}
+
+static void
+insert_dist_sequences(DistEntry *dep)
+{
+ erts_dist_seq_tree_foreach(dep, insert_sequence, (void *) &dep->sysname);
+}
+
static void
clear_visited_p_monitors(ErtsPTabElementCommon *p)
{
@@ -1774,11 +1795,9 @@ insert_message(ErtsMessage *msg, int type, Process *proc)
else if (ERTS_SIG_IS_INTERNAL_MSG(msg))
heap_frag = msg->data.heap_frag;
else {
- if (msg->data.dist_ext->dep)
- insert_dist_entry(msg->data.dist_ext->dep,
- type, proc->common.id, 0);
- if (is_not_nil(ERL_MESSAGE_TOKEN(msg)))
- heap_frag = erts_dist_ext_trailer(msg->data.dist_ext);
+ heap_frag = msg->data.heap_frag;
+ insert_dist_entry(erts_get_dist_ext(heap_frag)->dep,
+ type, proc->common.id, 0);
}
}
while (heap_frag) {
@@ -1819,6 +1838,13 @@ insert_sig_link(ErtsLink *lnk, void *arg, Sint reds)
}
static void
+insert_sig_ext(ErtsDistExternal *edep, void *arg)
+{
+ Process *proc = arg;
+ insert_dist_entry(edep->dep, SIGNAL_REF, proc->common.id, 0);
+}
+
+static void
setup_reference_table(void)
{
ErlHeapFragment *hfp;
@@ -1898,6 +1924,7 @@ setup_reference_table(void)
insert_sig_offheap,
insert_sig_monitor,
insert_sig_link,
+ insert_sig_ext,
(void *) proc);
/* Insert links */
@@ -1989,16 +2016,19 @@ setup_reference_table(void)
for(dep = erts_visible_dist_entries; dep; dep = dep->next) {
insert_dist_links(dep);
insert_dist_monitors(dep);
+ insert_dist_sequences(dep);
}
for(dep = erts_hidden_dist_entries; dep; dep = dep->next) {
insert_dist_links(dep);
insert_dist_monitors(dep);
+ insert_dist_sequences(dep);
}
for(dep = erts_pending_dist_entries; dep; dep = dep->next) {
insert_dist_links(dep);
insert_dist_monitors(dep);
+ insert_dist_sequences(dep);
}
/* Not connected dist entries should not have any links,
@@ -2006,6 +2036,7 @@ setup_reference_table(void)
for(dep = erts_not_connected_dist_entries; dep; dep = dep->next) {
insert_dist_links(dep);
insert_dist_monitors(dep);
+ insert_dist_sequences(dep);
}
/* Insert all ets tables */
@@ -2179,6 +2210,10 @@ reference_table_term(Uint **hpp, ErlOffHeap *ohp, Uint *szp)
tup = MK_2TUP(AM_system, MK_UINT(drp->system_ref));
drl = MK_CONS(tup, drl);
}
+ if(drp->sequence_ref) {
+ tup = MK_2TUP(AM_sequence, MK_UINT(drp->sequence_ref));
+ drl = MK_CONS(tup, drl);
+ }
if(drp->signal_ref) {
tup = MK_2TUP(AM_signal, MK_UINT(drp->signal_ref));
drl = MK_CONS(tup, drl);
@@ -2253,6 +2288,12 @@ static void noop_sig_offheap(ErlOffHeap *oh, void *arg)
}
+static void noop_sig_ext(ErtsDistExternal *ext, void *arg)
+{
+
+}
+
+
static void
delete_reference_table(void)
{
@@ -2303,6 +2344,7 @@ delete_reference_table(void)
noop_sig_offheap,
clear_visited_monitor,
clear_visited_link,
+ noop_sig_ext,
(void *) proc);
}
}
diff --git a/erts/emulator/beam/erl_node_tables.h b/erts/emulator/beam/erl_node_tables.h
index c44f1f8991..cd5d69a302 100644
--- a/erts/emulator/beam/erl_node_tables.h
+++ b/erts/emulator/beam/erl_node_tables.h
@@ -98,11 +98,22 @@ struct ErtsDistOutputBuf_ {
byte *alloc_endp;
#endif
ErtsDistOutputBuf *next;
- Uint hopefull_flags;
+ Binary *bin;
+ /* Pointers to the distribution header,
+ if NULL the distr header is in the extp */
+ byte *hdrp;
+ byte *hdr_endp;
+ /* Pointers to the ctl + payload */
byte *extp;
byte *ext_endp;
+ /* Start of payload and hopefull_flags, used by transcode */
+ Uint hopefull_flags;
byte *msg_start;
- byte data[1];
+ /* start of the ext buffer, this is not always the same as extp
+ as the atom cache handling can use less then the allotted buffer.
+ This value is needed to calculate the size of this output buffer.*/
+ byte *ext_start;
+
};
typedef struct {
@@ -161,6 +172,8 @@ struct dist_entry_ {
ErtsThrPrgrLaterOp later_op;
struct transcode_context* transcode_ctx;
+
+ struct dist_sequences *sequences; /* Ongoing distribution sequences */
};
typedef struct erl_node_ {
diff --git a/erts/emulator/beam/erl_proc_sig_queue.c b/erts/emulator/beam/erl_proc_sig_queue.c
index 1bd219b6d7..8113abebde 100644
--- a/erts/emulator/beam/erl_proc_sig_queue.c
+++ b/erts/emulator/beam/erl_proc_sig_queue.c
@@ -36,6 +36,7 @@
#include "erl_port_task.h"
#include "erl_trace.h"
#include "beam_bp.h"
+#include "erl_binary.h"
#include "big.h"
#include "erl_gc.h"
#include "bif.h"
@@ -80,7 +81,10 @@
#define ERTS_SIG_Q_TYPE_ADJUST_TRACE_INFO \
ERTS_SIG_Q_TYPE_MAX
-#define ERTS_SIG_IS_EXTERNAL(sig) is_non_value(get_exit_signal_data(sig)->reason)
+#define ERTS_SIG_IS_GEN_EXIT(sig) \
+ (ERTS_PROC_SIG_TYPE(((ErtsSignal *) sig)->common.tag) == ERTS_SIG_Q_TYPE_GEN_EXIT)
+#define ERTS_SIG_IS_GEN_EXIT_EXTERNAL(sig) \
+ (ASSERT(ERTS_SIG_IS_GEN_EXIT(sig)),is_non_value(get_exit_signal_data(sig)->reason))
Process *ERTS_WRITE_UNLIKELY(erts_dirty_process_signal_handler);
Process *ERTS_WRITE_UNLIKELY(erts_dirty_process_signal_handler_high);
@@ -114,8 +118,6 @@ typedef struct {
Eterm message;
Eterm from;
Eterm reason;
- struct erl_dist_external *dist_ext;
- ErlHeapFragment *heap_frag;
union {
Eterm ref;
int normal_kills;
@@ -945,13 +947,15 @@ erts_proc_sig_get_external(ErtsMessage *msgp)
{
if (ERTS_SIG_IS_EXTERNAL_MSG(msgp)) {
return erts_get_dist_ext(msgp->data.heap_frag);
- } else if (ERTS_SIG_IS_NON_MSG(msgp) && ERTS_SIG_IS_EXTERNAL(msgp)) {
+ } else if (ERTS_SIG_IS_NON_MSG(msgp) &&
+ ERTS_SIG_IS_GEN_EXIT(msgp) &&
+ ERTS_SIG_IS_GEN_EXIT_EXTERNAL(msgp)) {
ErtsDistExternal *edep;
ErtsExitSignalData *xsigd = get_exit_signal_data(msgp);
ASSERT(ERTS_PROC_SIG_TYPE(((ErtsSignal *) msgp)->common.tag) == ERTS_SIG_Q_TYPE_GEN_EXIT);
ASSERT(is_non_value(xsigd->reason));
if (msgp->hfrag.next == NULL)
- edep = (ErtsDistExternal*)((void*)xsigd) + sizeof(ErtsExitSignalData);
+ edep = (ErtsDistExternal*)(xsigd + 1);
else
edep = erts_get_dist_ext(msgp->hfrag.next);
return edep;
@@ -965,6 +969,7 @@ static void
send_gen_exit_signal(Process *c_p, Eterm from_tag,
Eterm from, Eterm to,
Sint16 op, Eterm reason, ErtsDistExternal *dist_ext,
+ ErlHeapFragment *dist_ext_hfrag,
Eterm ref, Eterm token, int normal_kills)
{
ErtsExitSignalData *xsigd;
@@ -972,7 +977,7 @@ send_gen_exit_signal(Process *c_p, Eterm from_tag,
ErtsMessage *mp;
ErlHeapFragment *hfrag;
ErlOffHeap *ohp;
- Uint hsz, from_sz, reason_sz, ref_sz, token_sz;
+ Uint hsz, from_sz, reason_sz, ref_sz, token_sz, dist_ext_sz;
int seq_trace;
#ifdef USE_VM_PROBES
Eterm s_utag, utag;
@@ -984,7 +989,7 @@ send_gen_exit_signal(Process *c_p, Eterm from_tag,
ASSERT(is_immed(from_tag));
- hsz = sizeof(ErtsExitSignalData)/sizeof(Uint);
+ hsz = sizeof(ErtsExitSignalData)/sizeof(Eterm);
seq_trace = c_p && have_seqtrace(token);
if (seq_trace)
@@ -1012,6 +1017,8 @@ send_gen_exit_signal(Process *c_p, Eterm from_tag,
ref_sz = size_object(ref);
hsz += ref_sz;
+ /* The reason was part of the control message,
+ just use copy it into the xsigd */
if (is_value(reason)) {
reason_sz = size_object(reason);
hsz += reason_sz;
@@ -1032,6 +1039,11 @@ send_gen_exit_signal(Process *c_p, Eterm from_tag,
ERTS_INTERNAL_ERROR("Invalid exit signal op");
break;
}
+ } else if (dist_ext != NULL && dist_ext_hfrag == NULL) {
+ /* The message was not fragmented so we need to create space
+ for a single dist_ext element */
+ dist_ext_sz = erts_dist_ext_size(dist_ext) / sizeof(Eterm);
+ hsz += dist_ext_sz;
}
/*
@@ -1087,13 +1099,12 @@ send_gen_exit_signal(Process *c_p, Eterm from_tag,
hfrag->used_size = hp - start_hp;
- xsigd = (ErtsExitSignalData *) (char *) hp;
+ xsigd = (ErtsExitSignalData *) hp;
xsigd->message = s_message;
xsigd->from = s_from;
xsigd->reason = s_reason;
- xsigd->dist_ext = dist_ext;
- xsigd->heap_frag = NULL;
+ hfrag->next = dist_ext_hfrag;
if (is_nil(s_ref))
xsigd->u.normal_kills = normal_kills;
@@ -1102,6 +1113,15 @@ send_gen_exit_signal(Process *c_p, Eterm from_tag,
xsigd->u.ref = s_ref;
}
+ hp += sizeof(ErtsExitSignalData)/sizeof(Eterm);
+
+ if (dist_ext != NULL && dist_ext_hfrag == NULL && is_non_value(reason)) {
+ erts_make_dist_ext_copy(dist_ext, (ErtsDistExternal *) hp);
+ hp += dist_ext_sz;
+ }
+
+ ASSERT(hp == mp->hfrag.mem + mp->hfrag.alloc_size);
+
if (seq_trace)
do_seq_trace_output(to, s_token, s_message);
@@ -1234,17 +1254,18 @@ erts_proc_sig_send_exit(Process *c_p, Eterm from, Eterm to,
from_tag = dep->sysname;
}
send_gen_exit_signal(c_p, from_tag, from, to, ERTS_SIG_Q_OP_EXIT,
- reason, NULL, NIL, token, normal_kills);
+ reason, NULL, NULL, NIL, token, normal_kills);
}
void
erts_proc_sig_send_dist_exit(DistEntry *dep,
Eterm from, Eterm to,
ErtsDistExternal *dist_ext,
+ ErlHeapFragment *hfrag,
Eterm reason, Eterm token)
{
send_gen_exit_signal(NULL, dep->sysname, from, to, ERTS_SIG_Q_OP_EXIT,
- reason, dist_ext, NIL, token, 0);
+ reason, dist_ext, hfrag, NIL, token, 0);
}
@@ -1259,7 +1280,7 @@ erts_proc_sig_send_link_exit(Process *c_p, Eterm from, ErtsLink *lnk,
if (is_not_immed(reason) || is_not_nil(token)) {
ASSERT(is_internal_pid(from) || is_internal_port(from));
send_gen_exit_signal(c_p, from, from, to, ERTS_SIG_Q_OP_EXIT_LINKED,
- reason, NULL, NIL, token, 0);
+ reason, NULL, NULL, NIL, token, 0);
}
else {
/* Pass signal using old link structure... */
@@ -1315,10 +1336,11 @@ void
erts_proc_sig_send_dist_link_exit(DistEntry *dep,
Eterm from, Eterm to,
ErtsDistExternal *dist_ext,
+ ErlHeapFragment *hfrag,
Eterm reason, Eterm token)
{
send_gen_exit_signal(NULL, dep->sysname, from, to, ERTS_SIG_Q_OP_EXIT_LINKED,
- reason, dist_ext, NIL, token, 0);
+ reason, dist_ext, hfrag, NIL, token, 0);
}
@@ -1342,6 +1364,7 @@ void
erts_proc_sig_send_dist_monitor_down(DistEntry *dep, Eterm ref,
Eterm from, Eterm to,
ErtsDistExternal *dist_ext,
+ ErlHeapFragment *hfrag,
Eterm reason)
{
Eterm monitored, heap[3];
@@ -1351,7 +1374,7 @@ erts_proc_sig_send_dist_monitor_down(DistEntry *dep, Eterm ref,
monitored = from;
send_gen_exit_signal(NULL, dep->sysname, monitored,
to, ERTS_SIG_Q_OP_MONITOR_DOWN,
- reason, dist_ext, ref, NIL, 0);
+ reason, dist_ext, hfrag, ref, NIL, 0);
}
void
@@ -1422,7 +1445,7 @@ erts_proc_sig_send_monitor_down(ErtsMonitor *mon, Eterm reason)
}
send_gen_exit_signal(NULL, from_tag, monitored,
to, ERTS_SIG_Q_OP_MONITOR_DOWN,
- reason, NULL, mdp->ref, NIL, 0);
+ reason, NULL, NULL, mdp->ref, NIL, 0);
}
erts_monitor_release(mon);
}
@@ -2105,9 +2128,9 @@ handle_exit_signal(Process *c_p, ErtsSigRecvTracing *tracing,
}
/* This GEN_EXIT was received from another node, decode the exit reason */
- if (ERTS_SIG_IS_EXTERNAL(sig))
+ if (ERTS_SIG_IS_GEN_EXIT_EXTERNAL(sig))
erts_proc_sig_decode_dist(c_p, ERTS_PROC_LOCK_MAIN, sig, 1);
-
+
reason = xsigd->reason;
if (is_non_value(reason)) {
@@ -2115,17 +2138,14 @@ handle_exit_signal(Process *c_p, ErtsSigRecvTracing *tracing,
ignore = !0;
destroy = !0;
}
-
+
if (!ignore) {
if ((op != ERTS_SIG_Q_OP_EXIT || reason != am_kill)
&& (c_p->flags & F_TRAP_EXIT)) {
convert_prepared_sig_to_msg(c_p, sig,
xsigd->message, next_nm_sig);
- ASSERT(sig->hfrag.next == NULL);
- sig->hfrag.next = xsigd->heap_frag;
conv_msg = sig;
-
}
else if (reason == am_normal && !xsigd->u.normal_kills) {
/* Ignore it... */
@@ -2991,50 +3011,29 @@ erts_proc_sig_decode_dist(Process *proc, ErtsProcLocks proc_locks,
ErtsMessage *msgp, int force_off_heap)
{
ErtsHeapFactory factory;
+ ErlHeapFragment *hfrag;
Eterm msg;
- ErlHeapFragment *bp;
Sint need;
- int decode_in_heap_frag;
- ErtsDistExternal *dist_ext;
+ ErtsDistExternal *edep;
ErtsExitSignalData *xsigd = NULL;
- decode_in_heap_frag = (force_off_heap
- || !(proc_locks & ERTS_PROC_LOCK_MAIN)
- || (proc->flags & F_OFF_HEAP_MSGQ));
-
- if (ERTS_SIG_IS_EXTERNAL_MSG(msgp))
- dist_ext = msgp->data.dist_ext;
- else {
+ edep = erts_proc_sig_get_external(msgp);
+ if (!ERTS_SIG_IS_EXTERNAL_MSG(msgp))
xsigd = get_exit_signal_data(msgp);
- ASSERT(ERTS_PROC_SIG_TYPE(((ErtsSignal *) msgp)->common.tag) == ERTS_SIG_Q_TYPE_GEN_EXIT);
- ASSERT(is_non_value(xsigd->reason));
- dist_ext = xsigd->dist_ext;
- }
- if (dist_ext->heap_size >= 0)
- need = dist_ext->heap_size;
+ if (edep->heap_size >= 0)
+ need = edep->heap_size;
else {
- need = erts_decode_dist_ext_size(dist_ext);
+ need = erts_decode_dist_ext_size(edep, 1);
if (need < 0) {
- /* bad msg; remove it... */
- if (is_not_immed(ERL_MESSAGE_TOKEN(msgp))) {
- bp = erts_dist_ext_trailer(dist_ext);
- erts_cleanup_offheap(&bp->off_heap);
- }
- erts_free_dist_ext_copy(dist_ext);
- dist_ext = NULL;
+ /* bad signal; remove it... */
return 0;
}
- dist_ext->heap_size = need;
+ edep->heap_size = need;
}
- if (is_not_immed(ERL_MESSAGE_TOKEN(msgp))) {
- bp = erts_dist_ext_trailer(dist_ext);
- need += bp->used_size;
- }
-
- if (xsigd) {
+ if (ERTS_SIG_IS_NON_MSG(msgp)) {
switch (ERTS_PROC_SIG_OP(ERL_MESSAGE_TERM(msgp))) {
case ERTS_SIG_Q_OP_EXIT:
case ERTS_SIG_Q_OP_EXIT_LINKED:
@@ -3051,26 +3050,22 @@ erts_proc_sig_decode_dist(Process *proc, ErtsProcLocks proc_locks,
}
}
- if (decode_in_heap_frag)
- erts_factory_heap_frag_init(&factory, new_message_buffer(need));
- else
- erts_factory_proc_prealloc_init(&factory, proc, need);
-
- ASSERT(dist_ext->heap_size >= 0);
- if (is_not_immed(ERL_MESSAGE_TOKEN(msgp))) {
- ErlHeapFragment *heap_frag;
- heap_frag = erts_dist_ext_trailer(dist_ext);
- ERL_MESSAGE_TOKEN(msgp) = copy_struct(ERL_MESSAGE_TOKEN(msgp),
- heap_frag->used_size,
- &factory.hp,
- factory.off_heap);
- erts_cleanup_offheap(&heap_frag->off_heap);
+ hfrag = new_message_buffer(need);
+ erts_factory_heap_frag_init(&factory, hfrag);
+
+ ASSERT(edep->heap_size >= 0);
+
+ msg = erts_decode_dist_ext(&factory, edep, 1);
+
+ if (is_non_value(msg)) {
+ erts_factory_undo(&factory);
+ return 0;
}
- msg = erts_decode_dist_ext(&factory, dist_ext);
- if (!xsigd) {
+ if (ERTS_SIG_IS_MSG(msgp)) {
ERL_MESSAGE_TERM(msgp) = msg;
- msgp->data.attached = NULL;
+ if (msgp->data.heap_frag == &msgp->hfrag)
+ msgp->data.heap_frag = ERTS_MSG_COMBINED_HFRAG;
} else {
switch (ERTS_PROC_SIG_OP(ERL_MESSAGE_TERM(msgp))) {
case ERTS_SIG_Q_OP_EXIT:
@@ -3089,22 +3084,21 @@ erts_proc_sig_decode_dist(Process *proc, ErtsProcLocks proc_locks,
}
xsigd->reason = msg;
}
- erts_free_dist_ext_copy(dist_ext);
- if (is_non_value(msg)) {
- erts_factory_undo(&factory);
- return 0;
- }
+ erts_free_dist_ext_copy(edep);
erts_factory_close(&factory);
- ASSERT(!msgp->data.heap_frag || xsigd);
+ hfrag = factory.heap_frags;
+ while (hfrag->next)
+ hfrag = hfrag->next;
- if (decode_in_heap_frag) {
- if (!xsigd)
- msgp->data.heap_frag = factory.heap_frags;
- else
- xsigd->heap_frag = factory.heap_frags;
+ if (ERTS_SIG_IS_MSG(msgp) && msgp->data.heap_frag != ERTS_MSG_COMBINED_HFRAG) {
+ hfrag->next = msgp->data.heap_frag;
+ msgp->data.heap_frag = factory.heap_frags;
+ } else {
+ hfrag->next = msgp->hfrag.next;
+ msgp->hfrag.next = factory.heap_frags;
}
return 1;
@@ -3274,8 +3268,9 @@ erts_proc_sig_handle_incoming(Process *c_p, erts_aint32_t *statep,
xsigd = get_exit_signal_data(sig);
/* This GEN_EXIT was received from another node, decode the exit reason */
- if (ERTS_SIG_IS_EXTERNAL(sig))
- erts_proc_sig_decode_dist(c_p, ERTS_PROC_LOCK_MAIN, sig, 1);
+ if (ERTS_SIG_IS_GEN_EXIT_EXTERNAL(sig))
+ if (!erts_proc_sig_decode_dist(c_p, ERTS_PROC_LOCK_MAIN, sig, 1))
+ break; /* Decode failed, just remove signal */
omon = erts_monitor_tree_lookup(ERTS_P_MONITORS(c_p),
xsigd->u.ref);
@@ -4375,11 +4370,13 @@ handle_msg_tracing(Process *c_p, ErtsSigRecvTracing *tracing,
return -1; /* Yield... */
}
if (ERTS_SIG_IS_EXTERNAL_MSG(sig)) {
- cnt++;
+ cnt += 50; /* Decode is expensive... */
if (!erts_proc_sig_decode_dist(c_p, ERTS_PROC_LOCK_MAIN,
sig, 0)) {
/* Bad dist message; remove it... */
remove_mq_m_sig(c_p, sig, next_sig, next_nm_sig);
+ sig->next = NULL;
+ erts_cleanup_messages(sig);
sig = *next_sig;
continue;
}
@@ -4451,18 +4448,12 @@ erts_proc_sig_prep_msgq_for_inspection(Process *c_p,
if (ERTS_SIG_IS_EXTERNAL_MSG(mp)) {
/* decode it... */
- if (mp->data.attached)
- erts_proc_sig_decode_dist(rp, rp_locks, mp, !0);
-
- msg = ERL_MESSAGE_TERM(mp);
-
- if (is_non_value(msg)) {
+ if (!erts_proc_sig_decode_dist(rp, rp_locks, mp, !0)) {
ErtsMessage *bad_mp = mp;
/*
* Bad distribution message; remove
* it from the queue...
*/
- ASSERT(!mp->data.attached);
ASSERT(*mpp == bad_mp);
@@ -4474,6 +4465,8 @@ erts_proc_sig_prep_msgq_for_inspection(Process *c_p,
erts_cleanup_messages(bad_mp);
continue;
}
+
+ msg = ERL_MESSAGE_TERM(mp);
}
ASSERT(is_value(msg));
@@ -4632,12 +4625,21 @@ debug_foreach_sig_fake_oh(Eterm term,
}
+static void
+debug_foreach_sig_external(ErtsMessage *msgp,
+ void (*ext_func)(ErtsDistExternal *, void *),
+ void *arg)
+{
+ ext_func(erts_proc_sig_get_external(msgp), arg);
+}
+
void
erts_proc_sig_debug_foreach_sig(Process *c_p,
void (*msg_func)(ErtsMessage *, void *),
void (*oh_func)(ErlOffHeap *, void *),
ErtsMonitorFunc mon_func,
ErtsLinkFunc lnk_func,
+ void (*ext_func)(ErtsDistExternal *, void *),
void *arg)
{
ErtsMessage *queue[] = {c_p->sig_qs.first, c_p->sig_qs.cont, c_p->sig_inq.first};
@@ -4646,10 +4648,10 @@ erts_proc_sig_debug_foreach_sig(Process *c_p,
for (qix = 0; qix < sizeof(queue)/sizeof(queue[0]); qix++) {
ErtsMessage *sig;
for (sig = queue[qix]; sig; sig = sig->next) {
-
- if (ERTS_SIG_IS_MSG(sig))
+
+ if (ERTS_SIG_IS_MSG(sig)) {
msg_func(sig, arg);
- else {
+ } else {
Eterm tag;
Uint16 type;
int op;
@@ -4668,7 +4670,10 @@ erts_proc_sig_debug_foreach_sig(Process *c_p,
case ERTS_SIG_Q_OP_MONITOR_DOWN:
switch (type) {
case ERTS_SIG_Q_TYPE_GEN_EXIT:
- debug_foreach_sig_heap_frags(&sig->hfrag, oh_func, arg);
+ if (ERTS_SIG_IS_GEN_EXIT_EXTERNAL(sig))
+ debug_foreach_sig_external(sig, ext_func, arg);
+ else
+ debug_foreach_sig_heap_frags(&sig->hfrag, oh_func, arg);
break;
case ERTS_LNK_TYPE_PORT:
case ERTS_LNK_TYPE_PROC:
diff --git a/erts/emulator/beam/erl_proc_sig_queue.h b/erts/emulator/beam/erl_proc_sig_queue.h
index 39cee6f230..2b055e73bc 100644
--- a/erts/emulator/beam/erl_proc_sig_queue.h
+++ b/erts/emulator/beam/erl_proc_sig_queue.h
@@ -228,6 +228,9 @@ erts_proc_sig_send_exit(Process *c_p, Eterm from, Eterm to,
*
* @param[in] dist_ext The exit reason in external term format
*
+ * @param[in] hfrag Heap frag with trace token and dist_ext
+ * iff available, otherwise NULL.
+ *
* @param[in] reason Exit reason.
*
* @param[in] token Seq trace token.
@@ -237,6 +240,7 @@ void
erts_proc_sig_send_dist_exit(DistEntry *dep,
Eterm from, Eterm to,
ErtsDistExternal *dist_ext,
+ ErlHeapFragment *hfrag,
Eterm reason, Eterm token);
/**
@@ -322,6 +326,9 @@ erts_proc_sig_send_unlink(Process *c_p, ErtsLink *lnk);
*
* @param[in] dist_ext The exit reason in external term format
*
+ * @param[in] hfrag Heap frag with trace token and dist_ext
+ * iff available, otherwise NULL.
+ *
* @param[in] reason Exit reason.
*
* @param[in] token Seq trace token.
@@ -331,6 +338,7 @@ void
erts_proc_sig_send_dist_link_exit(struct dist_entry_ *dep,
Eterm from, Eterm to,
ErtsDistExternal *dist_ext,
+ ErlHeapFragment *hfrag,
Eterm reason, Eterm token);
/**
@@ -426,6 +434,9 @@ erts_proc_sig_send_monitor(ErtsMonitor *mon, Eterm to);
*
* @param[in] dist_ext The exit reason in external term format
*
+ * @param[in] hfrag Heap frag with trace token and dist_ext
+ * iff available, otherwise NULL.
+ *
* @param[in] reason Exit reason.
*
*/
@@ -433,6 +444,7 @@ void
erts_proc_sig_send_dist_monitor_down(DistEntry *dep, Eterm ref,
Eterm from, Eterm to,
ErtsDistExternal *dist_ext,
+ ErlHeapFragment *hfrag,
Eterm reason);
/**
@@ -1035,6 +1047,7 @@ erts_proc_sig_debug_foreach_sig(Process *c_p,
void (*oh_func)(ErlOffHeap *, void *),
ErtsMonitorFunc mon_func,
ErtsLinkFunc lnk_func,
+ void (*ext_func)(ErtsDistExternal *, void *),
void *arg);
extern Process *erts_dirty_process_signal_handler;
diff --git a/erts/emulator/beam/erl_process.c b/erts/emulator/beam/erl_process.c
index 66af9574b8..837b3f4ace 100644
--- a/erts/emulator/beam/erl_process.c
+++ b/erts/emulator/beam/erl_process.c
@@ -12111,6 +12111,7 @@ erts_proc_exit_handle_monitor(ErtsMonitor *mon, void *vctxt, Sint reds)
case ERTS_DSIG_PREP_PENDING:
if (dist->connection_id == dsd.connection_id) {
code = erts_dsig_send_m_exit(&dsd,
+ c_p->common.id,
watcher,
watched,
mdp->ref,
diff --git a/erts/emulator/beam/erl_process_dump.c b/erts/emulator/beam/erl_process_dump.c
index 46e460582c..a164ed543e 100644
--- a/erts/emulator/beam/erl_process_dump.c
+++ b/erts/emulator/beam/erl_process_dump.c
@@ -191,11 +191,11 @@ static ERTS_INLINE void
dump_msg(fmtfn_t to, void *to_arg, ErtsMessage *mp)
{
if (ERTS_SIG_IS_MSG((ErtsSignal *) mp)) {
- Eterm mesg = ERL_MESSAGE_TERM(mp);
- if (is_value(mesg))
- dump_element(to, to_arg, mesg);
+ Eterm mesg;
+ if (ERTS_SIG_IS_INTERNAL_MSG(mp))
+ dump_element(to, to_arg, ERL_MESSAGE_TERM(mp));
else
- dump_dist_ext(to, to_arg, mp->data.dist_ext);
+ dump_dist_ext(to, to_arg, erts_get_dist_ext(mp->data.heap_frag));
mesg = ERL_MESSAGE_TOKEN(mp);
erts_print(to, to_arg, ":");
dump_element(to, to_arg, mesg);
@@ -267,6 +267,7 @@ dump_dist_ext(fmtfn_t to, void *to_arg, ErtsDistExternal *edep)
else {
byte *e;
size_t sz;
+ int i;
if (!(edep->flags & ERTS_DIST_EXT_ATOM_TRANS_TAB))
erts_print(to, to_arg, "D0:");
@@ -276,8 +277,8 @@ dump_dist_ext(fmtfn_t to, void *to_arg, ErtsDistExternal *edep)
for (i = 0; i < edep->attab.size; i++)
dump_element(to, to_arg, edep->attab.atom[i]);
}
- sz = edep->ext_endp - edep->extp;
- e = edep->extp;
+ sz = edep->data->ext_endp - edep->data->extp;
+ e = edep->data->extp;
if (edep->flags & ERTS_DIST_EXT_DFLAG_HDR) {
ASSERT(*e != VERSION_MAGIC);
sz++;
@@ -288,15 +289,19 @@ dump_dist_ext(fmtfn_t to, void *to_arg, ErtsDistExternal *edep)
erts_print(to, to_arg, "E%X:", sz);
if (edep->flags & ERTS_DIST_EXT_DFLAG_HDR) {
byte sbuf[3];
- int i = 0;
+
+ i = 0;
sbuf[i++] = VERSION_MAGIC;
- while (i < sizeof(sbuf) && e < edep->ext_endp) {
+ while (i < sizeof(sbuf) && e < edep->data->ext_endp) {
sbuf[i++] = *e++;
}
erts_print_base64(to, to_arg, sbuf, i);
}
- erts_print_base64(to, to_arg, e, edep->ext_endp - e);
+ erts_print_base64(to, to_arg, e, edep->data->ext_endp - e);
+ for (i = 1; i < edep->data->frag_id; i++)
+ erts_print_base64(to, to_arg, edep->data[i].extp,
+ edep->data[i].ext_endp - edep->data[i].extp);
}
}
diff --git a/erts/emulator/beam/erl_ptab.h b/erts/emulator/beam/erl_ptab.h
index 94f0247492..c30a684002 100644
--- a/erts/emulator/beam/erl_ptab.h
+++ b/erts/emulator/beam/erl_ptab.h
@@ -68,8 +68,11 @@ typedef struct {
Uint64 started_interval;
struct reg_proc *reg;
ErtsLink *links;
- ErtsMonitor *monitors;
+ /* Local target monitors, double linked list
+ contains the remote part of local monitors */
ErtsMonitor *lt_monitors;
+ /* other monitors, rb tree */
+ ErtsMonitor *monitors;
} alive;
/* --- While being released --- */
diff --git a/erts/emulator/beam/external.c b/erts/emulator/beam/external.c
index 640ca338d9..84e6e0d6fd 100644
--- a/erts/emulator/beam/external.c
+++ b/erts/emulator/beam/external.c
@@ -262,19 +262,12 @@ erts_finalize_atom_cache_map(ErtsAtomCacheMap *acmp, Uint32 dflags)
if (acmp) {
int long_atoms = 0; /* !0 if one or more atoms are longer than 255. */
int i;
- int sz;
- int fix_sz
- = 1 /* VERSION_MAGIC */
- + 1 /* DIST_HEADER */
- + 1 /* dist header flags */
- + 1 /* number of internal cache entries */
- ;
+ int sz = 0;
int min_sz;
ASSERT(dflags & DFLAG_UTF8_ATOMS);
ASSERT(acmp->hdr_sz < 0);
/* Make sure cache update instructions fit */
- min_sz = fix_sz+(2+4)*acmp->sz;
- sz = fix_sz;
+ min_sz = (2+4)*acmp->sz;
for (i = 0; i < acmp->sz; i++) {
Atom *a;
Eterm atom;
@@ -302,17 +295,28 @@ erts_finalize_atom_cache_map(ErtsAtomCacheMap *acmp, Uint32 dflags)
}
Uint
-erts_encode_ext_dist_header_size(ErtsAtomCacheMap *acmp)
+erts_encode_ext_dist_header_size(ErtsAtomCacheMap *acmp, Uint fragments)
{
if (!acmp)
return 0;
else {
+ int fix_sz
+ = 1 /* VERSION_MAGIC */
+ + 1 /* DIST_HEADER */
+ + 1 /* dist header flags */
+ + 1 /* number of internal cache entries */
+ ;
ASSERT(acmp->hdr_sz >= 0);
- return acmp->hdr_sz;
+ if (fragments > 1)
+ fix_sz += 8 /* sequence id */
+ + 8 /* number of fragments */
+ ;
+ return fix_sz + acmp->hdr_sz;
}
}
-byte *erts_encode_ext_dist_header_setup(byte *ctl_ext, ErtsAtomCacheMap *acmp)
+byte *erts_encode_ext_dist_header_setup(byte *ctl_ext, ErtsAtomCacheMap *acmp,
+ Uint fragments, Eterm from)
{
/* Maximum number of atom must be less than the maximum of a 32 bits
unsigned integer. Check is done in erl_init.c, erl_start function. */
@@ -346,12 +350,37 @@ byte *erts_encode_ext_dist_header_setup(byte *ctl_ext, ErtsAtomCacheMap *acmp)
put_int8(acmp->sz, ep);
--ep;
put_int8(dist_hdr_flags, ep);
- *--ep = DIST_HEADER;
- *--ep = VERSION_MAGIC;
+ if (fragments > 1) {
+ ASSERT(is_pid(from));
+ ep -= 8;
+ put_int64(fragments, ep);
+ ep -= 8;
+ put_int64(from, ep);
+ *--ep = DIST_FRAG_HEADER;
+ } else {
+ *--ep = DIST_HEADER;
+ }
+ *--ep = VERSION_MAGIC;
return ep;
}
}
+byte *erts_encode_ext_dist_header_fragment(byte **hdrpp,
+ Uint fragment,
+ Eterm from)
+{
+ byte *ep = *hdrpp, *start = ep;
+ ASSERT(is_pid(from));
+ *ep++ = VERSION_MAGIC;
+ *ep++ = DIST_FRAG_CONT;
+ put_int64(from, ep);
+ ep += 8;
+ put_int64(fragment, ep);
+ ep += 8;
+ *hdrpp = ep;
+ return start;
+}
+
#define PASS_THROUGH 'p'
@@ -365,7 +394,8 @@ Sint erts_encode_ext_dist_header_finalize(ErtsDistOutputBuf* ob,
int ci, sz;
byte dist_hdr_flags;
int long_atoms;
- register byte *ep = ob->extp;
+ Uint64 seq_id = 0, frag_id = 0;
+ register byte *ep = ob->hdrp ? ob->hdrp : ob->extp;
ASSERT(dflags & DFLAG_UTF8_ATOMS);
/*
@@ -416,7 +446,7 @@ Sint erts_encode_ext_dist_header_finalize(ErtsDistOutputBuf* ob,
}
goto done;
}
- else if (ep[1] != DIST_HEADER) {
+ else if (ep[1] != DIST_HEADER && ep[1] != DIST_FRAG_HEADER && ep[1] != DIST_FRAG_CONT) {
ASSERT(ep[1] == SMALL_TUPLE_EXT || ep[1] == LARGE_TUPLE_EXT);
ASSERT(!(dflags & DFLAG_DIST_HDR_ATOM_CACHE));
/* Node without atom cache, 'pass through' needed */
@@ -424,6 +454,17 @@ Sint erts_encode_ext_dist_header_finalize(ErtsDistOutputBuf* ob,
goto done;
}
+ if (ep[1] == DIST_FRAG_CONT) {
+ ep = ob->extp;
+ goto done;
+ } else if (ep[1] == DIST_FRAG_HEADER) {
+ /* skip the seq id and frag id */
+ seq_id = get_int64(&ep[2]);
+ ep += 8;
+ frag_id = get_int64(&ep[2]);
+ ep += 8;
+ }
+
dist_hdr_flags = ep[2];
long_atoms = ERTS_DIST_HDR_LONG_ATOMS_FLG & ((int) dist_hdr_flags);
@@ -546,11 +587,19 @@ Sint erts_encode_ext_dist_header_finalize(ErtsDistOutputBuf* ob,
}
--ep;
put_int8(ci, ep);
- *--ep = DIST_HEADER;
+ if (seq_id) {
+ ep -= 8;
+ put_int64(frag_id, ep);
+ ep -= 8;
+ put_int64(seq_id, ep);
+ *--ep = DIST_FRAG_HEADER;
+ } else {
+ *--ep = DIST_HEADER;
+ }
*--ep = VERSION_MAGIC;
done:
ob->extp = ep;
- ASSERT(&ob->data[0] <= ob->extp && ob->extp < ob->ext_endp);
+ ASSERT((byte*)ob->bin->orig_bytes <= ob->extp && ob->extp < ob->ext_endp);
return reds < 0 ? 0 : reds;
}
@@ -635,72 +684,90 @@ byte* erts_encode_ext_ets(Eterm term, byte *ep, struct erl_off_heap_header** off
off_heap);
}
-ErtsDistExternal *
-erts_make_dist_ext_copy(ErtsDistExternal *edep, Eterm *token)
+
+static Uint
+dist_ext_size(ErtsDistExternal *edep)
{
- size_t align_sz;
- size_t dist_ext_sz;
- size_t ext_sz = 0;
- size_t token_sz = 0;
- Eterm token_size;
- byte *ep;
- ErtsDistExternal *new_edep;
+ Uint sz = sizeof(ErtsDistExternal);
- dist_ext_sz = ERTS_DIST_EXT_SIZE(edep);
- ASSERT(edep->ext_endp && edep->extp);
- ASSERT(edep->ext_endp >= edep->extp);
- if (edep->binp == NULL)
- ext_sz = edep->ext_endp - edep->extp;
+ ASSERT(edep->data->ext_endp && edep->data->extp);
+ ASSERT(edep->data->ext_endp >= edep->data->extp);
- align_sz = ERTS_EXTRA_DATA_ALIGN_SZ(dist_ext_sz + ext_sz);
+ if (edep->flags & ERTS_DIST_EXT_ATOM_TRANS_TAB) {
+ ASSERT(0 <= edep->attab.size \
+ && edep->attab.size <= ERTS_ATOM_CACHE_SIZE);
+ sz -= sizeof(Eterm)*(ERTS_ATOM_CACHE_SIZE - edep->attab.size);
+ } else {
+ sz -= sizeof(ErtsAtomTranslationTable);
+ }
+ return sz;
+}
- token_size = size_object(*token);
- if (token_size)
- token_sz = ERTS_HEAP_FRAG_SIZE(token_size);
+Uint
+erts_dist_ext_size(ErtsDistExternal *edep)
+{
+ Uint sz = dist_ext_size(edep);
+ sz += edep->data[0].frag_id * sizeof(ErtsDistExternalData);
+ return sz + ERTS_EXTRA_DATA_ALIGN_SZ(sz);
+}
- new_edep = erts_alloc(ERTS_ALC_T_EXT_TERM_DATA,
- dist_ext_sz + ext_sz + align_sz + token_sz);
+Uint
+erts_dist_ext_data_size(ErtsDistExternal *edep)
+{
+ Uint sz = 0, i;
+ for (i = 0; i < edep->data->frag_id; i++)
+ sz += edep->data[i].ext_endp - edep->data[i].extp;
+ return sz;
+}
- ep = (byte *) new_edep;
- sys_memcpy((void *) ep, (void *) edep, dist_ext_sz);
- if (new_edep->dep)
- erts_ref_dist_entry(new_edep->dep);
- new_edep->heap_size = -1;
-
- if (ext_sz) {
- ep += dist_ext_sz;
- new_edep->extp = ep;
- new_edep->ext_endp = ep + ext_sz;
- sys_memcpy((void *) ep, (void *) edep->extp, ext_sz);
+void
+erts_dist_ext_frag(ErtsDistExternalData *ede_datap, ErtsDistExternal *edep)
+{
+ ErtsDistExternalData *new_ede_datap = &edep->data[edep->data->frag_id - ede_datap->frag_id];
+ sys_memcpy(new_ede_datap, ede_datap, sizeof(ErtsDistExternalData));
+
+ /* If the data is not backed by a binary, we create one here to keep
+ things simple. Only custom distribution drivers should use lists. */
+ if (new_ede_datap->binp == NULL) {
+ size_t ext_sz = ede_datap->ext_endp - ede_datap->extp;
+ new_ede_datap->binp = erts_bin_nrml_alloc(ext_sz);
+ sys_memcpy(new_ede_datap->binp->orig_bytes, (void *) ede_datap->extp, ext_sz);
+ new_ede_datap->extp = (byte*)new_ede_datap->binp->orig_bytes;
+ new_ede_datap->ext_endp = (byte*)new_ede_datap->binp->orig_bytes + ext_sz;
} else {
- erts_refc_inc(&new_edep->binp->intern.refc, 2);
+ erts_refc_inc(&new_ede_datap->binp->intern.refc, 2);
}
+}
- /* Copy the seq_trace token */
- if (is_not_nil(*token)) {
- ErlHeapFragment *heap_frag;
- ErlOffHeap *ohp;
- Eterm *hp;
- heap_frag = erts_dist_ext_trailer(new_edep);
- ERTS_INIT_HEAP_FRAG(heap_frag, token_size, token_size);
- hp = heap_frag->mem;
- ohp = &heap_frag->off_heap;
- *token = copy_struct(*token, token_size, &hp, ohp);
- }
- return new_edep;
+void
+erts_make_dist_ext_copy(ErtsDistExternal *edep, ErtsDistExternal *new_edep)
+{
+ size_t dist_ext_sz = dist_ext_size(edep);
+ byte *ep;
+
+ ep = (byte *) new_edep;
+ sys_memcpy((void *) ep, (void *) edep, dist_ext_sz);
+ erts_ref_dist_entry(new_edep->dep);
+
+ ep += dist_ext_sz;
+
+ new_edep->data = (ErtsDistExternalData*)ep;
+ sys_memzero(new_edep->data, sizeof(ErtsDistExternalData) * edep->data->frag_id);
+ new_edep->data->frag_id = edep->data->frag_id;
+ erts_dist_ext_frag(edep->data, new_edep);
}
void
erts_free_dist_ext_copy(ErtsDistExternal *edep)
{
- if (edep->dep)
- erts_deref_dist_entry(edep->dep);
- if (edep->binp)
- erts_bin_release(edep->binp);
- erts_free(ERTS_ALC_T_EXT_TERM_DATA, edep);
+ int i;
+ erts_deref_dist_entry(edep->dep);
+ for (i = 0; i < edep->data->frag_id; i++)
+ if (edep->data[i].binp)
+ erts_bin_release(edep->data[i].binp);
}
-int
+ErtsPrepDistExtRes
erts_prepare_dist_ext(ErtsDistExternal *edep,
byte *ext,
Uint size,
@@ -711,10 +778,6 @@ erts_prepare_dist_ext(ErtsDistExternal *edep,
{
register byte *ep;
- edep->heap_size = -1;
- edep->flags = 0;
- edep->dep = dep;
-
ASSERT(dep);
erts_de_rlock(dep);
@@ -734,10 +797,6 @@ erts_prepare_dist_ext(ErtsDistExternal *edep,
size--;
}
- edep->heap_size = -1;
- edep->ext_endp = ext + size;
- edep->binp = binp;
-
ep = ext;
if (size < 2)
@@ -753,16 +812,33 @@ erts_prepare_dist_ext(ErtsDistExternal *edep,
goto fail;
}
+ edep->heap_size = -1;
+ edep->flags = 0;
+ edep->dep = dep;
+ edep->connection_id = conn_id;
+ edep->data->ext_endp = ext+size;
+ edep->data->binp = binp;
+ edep->data->seq_id = 0;
+ edep->data->frag_id = 1;
+
if (dep->flags & DFLAG_DIST_HDR_ATOM_CACHE)
edep->flags |= ERTS_DIST_EXT_DFLAG_HDR;
- edep->connection_id = dep->connection_id;
-
- if (ep[1] != DIST_HEADER) {
+ if (ep[1] != DIST_HEADER && ep[1] != DIST_FRAG_HEADER && ep[1] != DIST_FRAG_CONT) {
if (edep->flags & ERTS_DIST_EXT_DFLAG_HDR)
goto bad_hdr;
edep->attab.size = 0;
- edep->extp = ext;
+ edep->data->extp = ext;
+ }
+ else if (ep[1] == DIST_FRAG_CONT) {
+ if (!(dep->flags & DFLAG_FRAGMENTS))
+ goto bad_hdr;
+ edep->attab.size = 0;
+ edep->data->extp = ext + 1 + 1 + 8 + 8;
+ edep->data->seq_id = get_int64(&ep[2]);
+ edep->data->frag_id = get_int64(&ep[2+8]);
+ erts_de_runlock(dep);
+ return ERTS_PREP_DIST_EXT_FRAG_CONT;
}
else {
int tix;
@@ -771,9 +847,17 @@ erts_prepare_dist_ext(ErtsDistExternal *edep,
if (!(edep->flags & ERTS_DIST_EXT_DFLAG_HDR))
goto bad_hdr;
+ if (ep[1] == DIST_FRAG_HEADER) {
+ if (!(dep->flags & DFLAG_FRAGMENTS))
+ goto bad_hdr;
+ edep->data->seq_id = get_int64(&ep[2]);
+ edep->data->frag_id = get_int64(&ep[2+8]);
+ ep += 16;
+ }
+
#undef CHKSIZE
#define CHKSIZE(SZ) \
- do { if ((SZ) > edep->ext_endp - ep) goto bad_hdr; } while(0)
+ do { if ((SZ) > edep->data->ext_endp - ep) goto bad_hdr; } while(0)
CHKSIZE(1+1+1);
ep += 2;
@@ -903,7 +987,7 @@ erts_prepare_dist_ext(ErtsDistExternal *edep,
#endif
}
}
- edep->extp = ep;
+ edep->data->extp = ep;
#ifdef ERTS_DEBUG_USE_DIST_SEP
if (*ep != VERSION_MAGIC)
goto bad_hdr;
@@ -928,7 +1012,7 @@ erts_prepare_dist_ext(ErtsDistExternal *edep,
erts_this_node->sysname,
edep->dep->sysname,
dist_entry_channel_no(edep->dep));
- for (ep = ext; ep < edep->ext_endp; ep++)
+ for (ep = ext; ep < edep->data->ext_endp; ep++)
erts_dsprintf(dsbufp, ep != ext ? ",%b8u" : "<<%b8u", *ep);
erts_dsprintf(dsbufp, ">>");
erts_send_warning_to_logger_nogl(dsbufp);
@@ -953,9 +1037,9 @@ bad_dist_ext(ErtsDistExternal *edep)
erts_this_node->sysname,
dep->sysname,
dist_entry_channel_no(dep));
- for (ep = edep->extp; ep < edep->ext_endp; ep++)
+ for (ep = edep->data->extp; ep < edep->data->ext_endp; ep++)
erts_dsprintf(dsbufp,
- ep != edep->extp ? ",%b8u" : "<<...,%b8u",
+ ep != edep->data->extp ? ",%b8u" : "<<...,%b8u",
*ep);
erts_dsprintf(dsbufp, ">>\n");
erts_dsprintf(dsbufp, "ATOM_CACHE_REF translations: ");
@@ -973,30 +1057,32 @@ bad_dist_ext(ErtsDistExternal *edep)
}
Sint
-erts_decode_dist_ext_size(ErtsDistExternal *edep)
+erts_decode_dist_ext_size(ErtsDistExternal *edep, int kill_connection)
{
Sint res;
byte *ep;
- if (edep->extp >= edep->ext_endp)
+
+ if (edep->data->extp >= edep->data->ext_endp)
goto fail;
#ifndef ERTS_DEBUG_USE_DIST_SEP
if (edep->flags & ERTS_DIST_EXT_DFLAG_HDR) {
- if (*edep->extp == VERSION_MAGIC)
+ if (*edep->data->extp == VERSION_MAGIC)
goto fail;
- ep = edep->extp;
+ ep = edep->data->extp;
}
else
#endif
{
- if (*edep->extp != VERSION_MAGIC)
+ if (*edep->data->extp != VERSION_MAGIC)
goto fail;
- ep = edep->extp+1;
+ ep = edep->data->extp+1;
}
- res = decoded_size(ep, edep->ext_endp, 0, NULL);
+ res = decoded_size(ep, edep->data->ext_endp, 0, NULL);
if (res >= 0)
return res;
fail:
- bad_dist_ext(edep);
+ if (kill_connection)
+ bad_dist_ext(edep);
return -1;
}
@@ -1022,12 +1108,15 @@ Sint erts_decode_ext_size_ets(byte *ext, Uint size)
*/
Eterm
erts_decode_dist_ext(ErtsHeapFactory* factory,
- ErtsDistExternal *edep)
+ ErtsDistExternal *edep,
+ int kill_connection)
{
Eterm obj;
- byte* ep = edep->extp;
+ byte* ep;
+
+ ep = edep->data->extp;
- if (ep >= edep->ext_endp)
+ if (ep >= edep->data->ext_endp)
goto error;
#ifndef ERTS_DEBUG_USE_DIST_SEP
if (edep->flags & ERTS_DIST_EXT_DFLAG_HDR) {
@@ -1045,14 +1134,15 @@ erts_decode_dist_ext(ErtsHeapFactory* factory,
if (!ep)
goto error;
- edep->extp = ep;
+ edep->data->extp = ep;
return obj;
error:
erts_factory_undo(factory);
- bad_dist_ext(edep);
+ if (kill_connection)
+ bad_dist_ext(edep);
return THE_NON_VALUE;
}
@@ -1097,6 +1187,7 @@ BIF_RETTYPE erts_debug_dist_ext_to_term_2(BIF_ALIST_2)
Eterm res;
Sint hsz;
ErtsDistExternal ede;
+ ErtsDistExternalData ede_data;
Eterm *tp;
Eterm real_bin;
Uint offset;
@@ -1109,7 +1200,8 @@ BIF_RETTYPE erts_debug_dist_ext_to_term_2(BIF_ALIST_2)
ede.flags = ERTS_DIST_EXT_ATOM_TRANS_TAB;
ede.dep = NULL;
ede.heap_size = -1;
-
+ ede.data = &ede_data;
+
if (is_not_tuple(BIF_ARG_1))
goto badarg;
tp = tuple_val(BIF_ARG_1);
@@ -1134,15 +1226,15 @@ BIF_RETTYPE erts_debug_dist_ext_to_term_2(BIF_ALIST_2)
if (bitsize != 0)
goto badarg;
- ede.extp = binary_bytes(real_bin)+offset;
- ede.ext_endp = ede.extp + size;
+ ede.data->extp = binary_bytes(real_bin)+offset;
+ ede.data->ext_endp = ede.data->extp + size;
- hsz = erts_decode_dist_ext_size(&ede);
+ hsz = erts_decode_dist_ext_size(&ede, 1);
if (hsz < 0)
goto badarg;
erts_factory_proc_prealloc_init(&factory, BIF_P, hsz);
- res = erts_decode_dist_ext(&factory, &ede);
+ res = erts_decode_dist_ext(&factory, &ede, 1);
erts_factory_close(&factory);
if (is_value(res))
diff --git a/erts/emulator/beam/external.h b/erts/emulator/beam/external.h
index 91c028c9bd..396cd9f802 100644
--- a/erts/emulator/beam/external.h
+++ b/erts/emulator/beam/external.h
@@ -58,6 +58,8 @@
#define SMALL_ATOM_UTF8_EXT 'w'
#define DIST_HEADER 'D'
+#define DIST_FRAG_HEADER 'E'
+#define DIST_FRAG_CONT 'F'
#define ATOM_CACHE_REF 'R'
#define ATOM_INTERNAL_REF2 'I'
#define ATOM_INTERNAL_REF3 'K'
@@ -123,15 +125,22 @@ typedef struct {
#define ERTS_DIST_CON_ID_MASK ((Uint32) 0x00ffffff) /* also in net_kernel.erl */
struct binary;
+typedef struct erl_dist_external_data ErtsDistExternalData;
-typedef struct erl_dist_external {
- DistEntry *dep;
+struct erl_dist_external_data {
+ Uint64 seq_id;
+ Uint64 frag_id;
byte *extp;
byte *ext_endp;
struct binary *binp;
+};
+
+typedef struct erl_dist_external {
Sint heap_size;
- Uint32 connection_id;
+ DistEntry *dep;
Uint32 flags;
+ Uint32 connection_id;
+ ErtsDistExternalData *data;
ErtsAtomTranslationTable attab;
} ErtsDistExternal;
@@ -158,8 +167,9 @@ void erts_reset_atom_cache_map(ErtsAtomCacheMap *);
void erts_destroy_atom_cache_map(ErtsAtomCacheMap *);
void erts_finalize_atom_cache_map(ErtsAtomCacheMap *, Uint32);
-Uint erts_encode_ext_dist_header_size(ErtsAtomCacheMap *);
-byte *erts_encode_ext_dist_header_setup(byte *, ErtsAtomCacheMap *);
+Uint erts_encode_ext_dist_header_size(ErtsAtomCacheMap *, Uint);
+byte *erts_encode_ext_dist_header_setup(byte *, ErtsAtomCacheMap *, Uint, Eterm);
+byte *erts_encode_ext_dist_header_fragment(byte **, Uint, Eterm);
Sint erts_encode_ext_dist_header_finalize(ErtsDistOutputBuf*, DistEntry *, Uint32 dflags, Sint reds);
struct erts_dsig_send_context;
int erts_encode_dist_ext_size(Eterm, Uint32, ErtsAtomCacheMap*, Uint* szp);
@@ -174,20 +184,24 @@ Uint erts_encode_ext_size_ets(Eterm);
void erts_encode_ext(Eterm, byte **);
byte* erts_encode_ext_ets(Eterm, byte *, struct erl_off_heap_header** ext_off_heap);
+Uint erts_dist_ext_size(ErtsDistExternal *);
+Uint erts_dist_ext_data_size(ErtsDistExternal *);
void erts_free_dist_ext_copy(ErtsDistExternal *);
-ERTS_GLB_INLINE void *erts_dist_ext_trailer(ErtsDistExternal *);
-ErtsDistExternal *erts_make_dist_ext_copy(ErtsDistExternal *, Eterm *);
-void *erts_dist_ext_trailer(ErtsDistExternal *);
-void erts_destroy_dist_ext_copy(ErtsDistExternal *);
-
-#define ERTS_PREP_DIST_EXT_FAILED (-1)
-#define ERTS_PREP_DIST_EXT_SUCCESS (0)
-#define ERTS_PREP_DIST_EXT_CLOSED (1)
-
-int erts_prepare_dist_ext(ErtsDistExternal *, byte *, Uint, struct binary *,
- DistEntry *, Uint32, ErtsAtomCache *);
-Sint erts_decode_dist_ext_size(ErtsDistExternal *);
-Eterm erts_decode_dist_ext(ErtsHeapFactory* factory, ErtsDistExternal *);
+void erts_make_dist_ext_copy(ErtsDistExternal *, ErtsDistExternal *);
+void erts_dist_ext_frag(ErtsDistExternalData *, ErtsDistExternal *);
+#define erts_get_dist_ext(HFRAG) ((ErtsDistExternal*)((HFRAG)->mem + (HFRAG)->used_size))
+
+typedef enum {
+ ERTS_PREP_DIST_EXT_FAILED,
+ ERTS_PREP_DIST_EXT_SUCCESS,
+ ERTS_PREP_DIST_EXT_FRAG_CONT,
+ ERTS_PREP_DIST_EXT_CLOSED
+} ErtsPrepDistExtRes;
+
+ErtsPrepDistExtRes erts_prepare_dist_ext(ErtsDistExternal *, byte *, Uint, struct binary *,
+ DistEntry *, Uint32, ErtsAtomCache *);
+Sint erts_decode_dist_ext_size(ErtsDistExternal *, int);
+Eterm erts_decode_dist_ext(ErtsHeapFactory*, ErtsDistExternal *, int);
Sint erts_decode_ext_size(byte*, Uint);
Sint erts_decode_ext_size_ets(byte*, Uint);
@@ -203,17 +217,4 @@ int erts_debug_max_atom_out_cache_index(void);
int erts_debug_atom_to_out_cache_index(Eterm);
void transcode_free_ctx(DistEntry* dep);
-#if ERTS_GLB_INLINE_INCL_FUNC_DEF
-
-ERTS_GLB_INLINE void *
-erts_dist_ext_trailer(ErtsDistExternal *edep)
-{
- void *res = (void *) (edep->ext_endp
- + ERTS_EXTRA_DATA_ALIGN_SZ(edep->ext_endp));
- ASSERT((((UWord) res) % sizeof(Uint)) == 0);
- return res;
-}
-
-#endif
-
#endif /* ERL_EXTERNAL_H__ */