diff options
Diffstat (limited to 'erts/emulator/beam')
-rw-r--r-- | erts/emulator/beam/bif.c | 3 | ||||
-rw-r--r-- | erts/emulator/beam/dist.c | 929 | ||||
-rw-r--r-- | erts/emulator/beam/dist.h | 30 | ||||
-rw-r--r-- | erts/emulator/beam/erl_alloc.types | 1 | ||||
-rw-r--r-- | erts/emulator/beam/erl_message.c | 73 | ||||
-rw-r--r-- | erts/emulator/beam/erl_message.h | 27 | ||||
-rw-r--r-- | erts/emulator/beam/erl_node_tables.c | 54 | ||||
-rw-r--r-- | erts/emulator/beam/erl_node_tables.h | 17 | ||||
-rw-r--r-- | erts/emulator/beam/erl_proc_sig_queue.c | 195 | ||||
-rw-r--r-- | erts/emulator/beam/erl_proc_sig_queue.h | 13 | ||||
-rw-r--r-- | erts/emulator/beam/erl_process.c | 1 | ||||
-rw-r--r-- | erts/emulator/beam/erl_process_dump.c | 23 | ||||
-rw-r--r-- | erts/emulator/beam/erl_ptab.h | 5 | ||||
-rw-r--r-- | erts/emulator/beam/external.c | 298 | ||||
-rw-r--r-- | erts/emulator/beam/external.h | 63 |
15 files changed, 1161 insertions, 571 deletions
diff --git a/erts/emulator/beam/bif.c b/erts/emulator/beam/bif.c index ff7db0e742..a71907505c 100644 --- a/erts/emulator/beam/bif.c +++ b/erts/emulator/beam/bif.c @@ -225,6 +225,7 @@ BIF_RETTYPE link_1(BIF_ALIST_1) code = erts_dsig_send_link(&dsd, BIF_P->common.id, BIF_ARG_1); if (code == ERTS_DSIG_SEND_YIELD) ERTS_BIF_YIELD_RETURN(BIF_P, am_true); + ASSERT(code == ERTS_DSIG_SEND_OK); BIF_RET(am_true); break; } @@ -2094,6 +2095,7 @@ BIF_RETTYPE send_3(BIF_ALIST_3) ctx->return_term = am_ok; ctx->dss.reds = (Sint) (ERTS_BIF_REDS_LEFT(p) * TERM_TO_BINARY_LOOP_FACTOR); ctx->dss.phase = ERTS_DSIG_SEND_PHASE_INIT; + ctx->dss.from = BIF_P->common.id; while (is_list(l)) { if (CAR(list_val(l)) == am_noconnect) { @@ -2246,6 +2248,7 @@ Eterm erl_send(Process *p, Eterm to, Eterm msg) ctx->return_term = msg; ctx->dss.reds = (Sint) (ERTS_BIF_REDS_LEFT(p) * TERM_TO_BINARY_LOOP_FACTOR); ctx->dss.phase = ERTS_DSIG_SEND_PHASE_INIT; + ctx->dss.from = p->common.id; result = do_send(p, to, msg, &ref, ctx); diff --git a/erts/emulator/beam/dist.c b/erts/emulator/beam/dist.c index 91a9481d84..78674848a1 100644 --- a/erts/emulator/beam/dist.c +++ b/erts/emulator/beam/dist.c @@ -55,15 +55,21 @@ */ #if 0 #define ERTS_DIST_MSG_DBG +FILE *dbg_file; #endif #if 0 +/* Enable this to print the dist debug messages to a file instead */ +#define ERTS_DIST_MSG_DBG_FILE "/tmp/dist_dbg.%d" +#endif +#if 0 +/* Enable this to print the raw bytes sent and received */ #define ERTS_RAW_DIST_MSG_DBG #endif #if defined(ERTS_DIST_MSG_DBG) || defined(ERTS_RAW_DIST_MSG_DBG) static void bw(byte *buf, ErlDrvSizeT sz) { - bin_write(ERTS_PRINT_STDERR, NULL, buf, sz); + bin_write(ERTS_PRINT_FILE, dbg_file, buf, sz); } #endif @@ -71,37 +77,93 @@ static void bw(byte *buf, ErlDrvSizeT sz) static void dist_msg_dbg(ErtsDistExternal *edep, char *what, byte *buf, int sz) { - ErtsHeapFactory factory; - byte *extp = edep->extp; + byte *extp = edep->data->extp; Eterm msg; Sint ctl_len; - Sint size = ctl_len = erts_decode_dist_ext_size(edep); + Sint size = ctl_len = erts_decode_dist_ext_size(edep, 0); if (size < 0) { - erts_fprintf(stderr, + erts_fprintf(dbg_file, "DIST MSG DEBUG: erts_decode_dist_ext_size(%s) failed:\n", what); bw(buf, sz); } else { - ErlHeapFragment *mbuf = new_message_buffer(size); - erts_factory_static_init(&factory, mbuf->mem, ctl_len, &mbuf->off_heap); - msg = erts_decode_dist_ext(&factory, edep); + ErtsHeapFactory factory; + ErtsMessage *mbuf = erts_factory_message_create(&factory, NULL, 0, ctl_len); + /* Set mbuf msg to NIL as erts_factory_undo will fail otherwise */ + ERL_MESSAGE_TERM(mbuf) = NIL; + msg = erts_decode_dist_ext(&factory, edep, 0); if (is_value(msg)) - erts_fprintf(stderr, " %s: %.80T\n", what, msg); + erts_fprintf(dbg_file, " %s: %.80T\n", what, msg); else { - erts_fprintf(stderr, + erts_fprintf(dbg_file, "DIST MSG DEBUG: erts_decode_dist_ext(%s) failed:\n", what); bw(buf, sz); } - free_message_buffer(mbuf); - edep->extp = extp; + erts_factory_undo(&factory); + edep->data->extp = extp; } } +static char *erts_dop_to_string(enum dop dop) { + if (dop == DOP_LINK) + return "LINK"; + if (dop == DOP_SEND) + return "SEND"; + if (dop == DOP_EXIT) + return "EXIT"; + if (dop == DOP_UNLINK) + return "UNLINK"; + if (dop == DOP_REG_SEND) + return "REG_SEND"; + if (dop == DOP_GROUP_LEADER) + return "GROUP_LEADER"; + if (dop == DOP_EXIT2) + return "EXIT2"; + if (dop == DOP_SEND_TT) + return "SEND_TT"; + if (dop == DOP_EXIT_TT) + return "EXIT_TT"; + if (dop == DOP_REG_SEND_TT) + return "REG_SEND_TT"; + if (dop == DOP_EXIT2_TT) + return "EXIT2_TT"; + if (dop == DOP_MONITOR_P) + return "MONITOR_P"; + if (dop == DOP_DEMONITOR_P) + return "DEMONITOR_P"; + if (dop == DOP_MONITOR_P_EXIT) + return "MONITOR_P_EXIT"; + if (dop == DOP_SEND_SENDER) + return "SEND_SENDER"; + if (dop == DOP_SEND_SENDER_TT) + return "SEND_SENDER_TT"; + if (dop == DOP_PAYLOAD_EXIT) + return "PAYLOAD_EXIT"; + if (dop == DOP_PAYLOAD_EXIT_TT) + return "PAYLOAD_EXIT_TT"; + if (dop == DOP_PAYLOAD_EXIT2) + return "PAYLOAD_EXIT2"; + if (dop == DOP_PAYLOAD_EXIT2_TT) + return "PAYLOAD_EXIT2_TT"; + if (dop == DOP_PAYLOAD_MONITOR_P_EXIT) + return "PAYLOAD_MONITOR_P_EXIT"; + ASSERT(0); + return "UNKNOWN"; +} + #endif +#if defined(VALGRIND) +#include <valgrind/valgrind.h> +#include <valgrind/memcheck.h> +# define PURIFY_MSG(msg) \ + VALGRIND_PRINTF("%s, line %d: %s", __FILE__, __LINE__, msg) +#else +# define PURIFY_MSG(msg) +#endif int erts_is_alive; /* System must be blocked on change */ int erts_dist_buf_busy_limit; @@ -115,13 +177,16 @@ static Export *dist_ctrl_put_data_trap; /* forward declarations */ -static int dsig_send_exit(ErtsDSigData* dsdp, Eterm ctl, Eterm msg, int force_busy); +static int dsig_send_exit(ErtsDSigData* dsdp, Eterm ctl, Eterm msg, Eterm from, int force_busy); static int dsig_send_ctl(ErtsDSigData* dsdp, Eterm ctl, int force_busy); static void send_nodes_mon_msgs(Process *, Eterm, Eterm, Eterm, Eterm); static void init_nodes_monitors(void); static Sint abort_connection(DistEntry* dep, Uint32 conn_id); static ErtsDistOutputBuf* clear_de_out_queues(DistEntry*); static void free_de_out_queues(DistEntry*, ErtsDistOutputBuf*); +int erts_dist_seq_tree_foreach_delete_yielding(DistSeqNode **root, + void **vyspp, + Sint limit); static erts_atomic_t no_caches; static erts_atomic_t no_nodes; @@ -142,7 +207,6 @@ delete_cache(ErtsAtomCache *cache) } } - static void create_cache(DistEntry *dep) { @@ -207,12 +271,14 @@ typedef enum { ERTS_CML_CLEANUP_STATE_LINKS, ERTS_CML_CLEANUP_STATE_MONITORS, ERTS_CML_CLEANUP_STATE_ONAME_MONITORS, + ERTS_CML_CLEANUP_STATE_SEQUENCES, ERTS_CML_CLEANUP_STATE_NODE_MONITORS -} ErtsConMonLnkCleaupState; +} ErtsConMonLnkSeqCleanupState; typedef struct { - ErtsConMonLnkCleaupState state; + ErtsConMonLnkSeqCleanupState state; ErtsMonLnkDist *dist; + DistSeqNode *seq; void *yield_state; int trigger_node_monitors; Eterm nodename; @@ -220,12 +286,12 @@ typedef struct { Eterm reason; ErlOffHeap oh; Eterm heap[1]; -} ErtsConMonLnkCleanup; +} ErtsConMonLnkSeqCleanup; static void -con_monitor_link_cleanup(void *vcmlcp) +con_monitor_link_seq_cleanup(void *vcmlcp) { - ErtsConMonLnkCleanup *cmlcp = vcmlcp; + ErtsConMonLnkSeqCleanup *cmlcp = vcmlcp; ErtsMonLnkDist *dist = cmlcp->dist; ErtsSchedulerData *esdp; int reds = CONTEXT_REDS; @@ -263,6 +329,15 @@ con_monitor_link_cleanup(void *vcmlcp) erts_mon_link_dist_dec_refc(dist); ASSERT(!cmlcp->yield_state); + cmlcp->state = ERTS_CML_CLEANUP_STATE_SEQUENCES; + case ERTS_CML_CLEANUP_STATE_SEQUENCES: + reds = erts_dist_seq_tree_foreach_delete_yielding(&cmlcp->seq, + &cmlcp->yield_state, + reds); + if (reds <= 0) + break; + + ASSERT(!cmlcp->yield_state); cmlcp->state = ERTS_CML_CLEANUP_STATE_NODE_MONITORS; case ERTS_CML_CLEANUP_STATE_NODE_MONITORS: if (cmlcp->trigger_node_monitors) { @@ -282,22 +357,23 @@ con_monitor_link_cleanup(void *vcmlcp) esdp = erts_get_scheduler_data(); ASSERT(esdp && esdp->type == ERTS_SCHED_NORMAL); erts_schedule_misc_aux_work((int) esdp->no, - con_monitor_link_cleanup, + con_monitor_link_seq_cleanup, (void *) cmlcp); } static void -schedule_con_monitor_link_cleanup(ErtsMonLnkDist *dist, - Eterm nodename, - Eterm visability, - Eterm reason) +schedule_con_monitor_link_seq_cleanup(ErtsMonLnkDist *dist, + DistSeqNode *seq, + Eterm nodename, + Eterm visability, + Eterm reason) { - if (dist || is_value(nodename)) { + if (dist || is_value(nodename) || seq) { ErtsSchedulerData *esdp; - ErtsConMonLnkCleanup *cmlcp; + ErtsConMonLnkSeqCleanup *cmlcp; Uint rsz, size; - size = sizeof(ErtsConMonLnkCleanup); + size = sizeof(ErtsConMonLnkSeqCleanup); if (is_non_value(reason) || is_immed(reason)) { rsz = 0; @@ -324,6 +400,8 @@ schedule_con_monitor_link_cleanup(ErtsMonLnkDist *dist, erts_mtx_unlock(&dist->mtx); } + cmlcp->seq = seq; + cmlcp->trigger_node_monitors = is_value(nodename); cmlcp->nodename = nodename; cmlcp->visability = visability; @@ -337,13 +415,13 @@ schedule_con_monitor_link_cleanup(ErtsMonLnkDist *dist, esdp = erts_get_scheduler_data(); ASSERT(esdp && esdp->type == ERTS_SCHED_NORMAL); erts_schedule_misc_aux_work((int) esdp->no, - con_monitor_link_cleanup, + con_monitor_link_seq_cleanup, (void *) cmlcp); } } /* -** A full node name constists of a "n@h" +** A full node name consists of a "n@h" ** ** n must be a valid node name: string of ([a-z][A-Z][0-9]_-)+ ** @@ -560,6 +638,7 @@ int erts_do_net_exits(DistEntry *dep, Eterm reason) } else { /* Call from distribution controller (port/process) */ ErtsMonLnkDist *mld; + DistSeqNode *sequences; ErtsAtomCache *cache; ErtsProcList *suspendees; ErtsDistOutputBuf *obuf; @@ -589,6 +668,9 @@ int erts_do_net_exits(DistEntry *dep, Eterm reason) mld = dep->mld; dep->mld = NULL; + sequences = dep->sequences; + dep->sequences = NULL; + nodename = dep->sysname; flags = dep->flags; @@ -612,14 +694,15 @@ int erts_do_net_exits(DistEntry *dep, Eterm reason) erts_de_rwunlock(dep); - schedule_con_monitor_link_cleanup(mld, - nodename, - (flags & DFLAG_PUBLISHED - ? am_visible - : am_hidden), - (reason == am_normal - ? am_connection_closed - : reason)); + schedule_con_monitor_link_seq_cleanup(mld, + sequences, + nodename, + (flags & DFLAG_PUBLISHED + ? am_visible + : am_hidden), + (reason == am_normal + ? am_connection_closed + : reason)); erts_resume_processes(suspendees); @@ -653,6 +736,16 @@ void init_dist(void) { init_nodes_monitors(); +#ifdef ERTS_DIST_MSG_DBG_FILE + { + char buff[255]; + sprintf(buff, ERTS_DIST_MSG_DBG_FILE, getpid()); + dbg_file = fopen(buff,"w+"); + } +#elif defined (ERTS_DIST_MSG_DBG) + dbg_file = stderr; +#endif + nodedown.reason = NIL; nodedown.bp = NULL; @@ -676,21 +769,27 @@ void init_dist(void) } } -#define ErtsDistOutputBuf2Binary(OB) \ - ((Binary *) (((char *) (OB)) - offsetof(Binary, orig_bytes))) +#define ErtsDistOutputBuf2Binary(OB) OB->bin static ERTS_INLINE ErtsDistOutputBuf * -alloc_dist_obuf(Uint size) +alloc_dist_obuf(Uint size, Uint headers) { + int i; ErtsDistOutputBuf *obuf; - Uint obuf_size = sizeof(ErtsDistOutputBuf)+sizeof(byte)*(size-1); + Uint obuf_size = sizeof(ErtsDistOutputBuf)*(headers) + + sizeof(byte)*size; Binary *bin = erts_bin_drv_alloc(obuf_size); - obuf = (ErtsDistOutputBuf *) &bin->orig_bytes[0]; + obuf = (ErtsDistOutputBuf *) &bin->orig_bytes[size]; + erts_refc_add(&bin->intern.refc, headers - 1, 1); + for (i = 0; i < headers; i++) { + obuf[i].bin = bin; + obuf[i].extp = (byte *)&bin->orig_bytes[0]; #ifdef DEBUG - obuf->dbg_pattern = ERTS_DIST_OUTPUT_BUF_DBG_PATTERN; - obuf->alloc_endp = obuf->data + size; - ASSERT(bin == ErtsDistOutputBuf2Binary(obuf)); + obuf[i].dbg_pattern = ERTS_DIST_OUTPUT_BUF_DBG_PATTERN; + obuf[i].alloc_endp = obuf->extp + size; + ASSERT(bin == ErtsDistOutputBuf2Binary(obuf)); #endif + } return obuf; } @@ -705,8 +804,8 @@ free_dist_obuf(ErtsDistOutputBuf *obuf) static ERTS_INLINE Sint size_obuf(ErtsDistOutputBuf *obuf) { - Binary *bin = ErtsDistOutputBuf2Binary(obuf); - return bin->orig_size; + return sizeof(ErtsDistOutputBuf) + (obuf->ext_endp - obuf->ext_start) + + (obuf->hdr_endp - obuf->hdrp); } static ErtsDistOutputBuf* clear_de_out_queues(DistEntry* dep) @@ -771,7 +870,9 @@ int erts_dsend_context_dtor(Binary* ctx_bin) default:; } if (ctx->dss.phase >= ERTS_DSIG_SEND_PHASE_ALLOC && ctx->dss.obuf) { - free_dist_obuf(ctx->dss.obuf); + int i; + for (i = 0; i < ctx->dss.fragments; i++) + free_dist_obuf(&ctx->dss.obuf[i]); } if (ctx->deref_dep) erts_deref_dist_entry(ctx->dep); @@ -845,8 +946,8 @@ erts_dsig_send_unlink(ErtsDSigData *dsdp, Eterm local, Eterm remote) /* A local process that's being monitored by a remote one exits. We send: {DOP_MONITOR_P_EXIT, Local pid or name, Remote pid, ref, reason} */ int -erts_dsig_send_m_exit(ErtsDSigData *dsdp, Eterm watcher, Eterm watched, - Eterm ref, Eterm reason) +erts_dsig_send_m_exit(ErtsDSigData *dsdp, Eterm from, Eterm watcher, Eterm watched, + Eterm ref, Eterm reason) { Eterm ctl, msg; DeclareTmpHeapNoproc(ctl_heap,6); @@ -871,7 +972,7 @@ erts_dsig_send_m_exit(ErtsDSigData *dsdp, Eterm watcher, Eterm watched, msg = THE_NON_VALUE; } - res = dsig_send_exit(dsdp, ctl, msg, 1); + res = dsig_send_exit(dsdp, ctl, msg, from, 1); UnUseTmpHeapNoproc(6); return res; } @@ -1031,8 +1132,7 @@ erts_dsig_send_msg(Eterm remote, Eterm message, ErtsSendContext* ctx) } int -erts_dsig_send_reg_msg(Eterm remote_name, Eterm message, - ErtsSendContext* ctx) +erts_dsig_send_reg_msg(Eterm remote_name, Eterm message, ErtsSendContext* ctx) { Eterm ctl; Eterm token = NIL; @@ -1149,13 +1249,13 @@ erts_dsig_send_exit_tt(ErtsDSigData *dsdp, Eterm local, Eterm remote, #endif DTRACE7(process_exit_signal_remote, sender_name, node_name, remote_name, reason_str, tok_label, tok_lastcnt, tok_serial); - res = dsig_send_exit(dsdp, ctl, msg, 1); + res = dsig_send_exit(dsdp, ctl, msg, local, 1); UnUseTmpHeapNoproc(6); return res; } int -erts_dsig_send_exit(ErtsDSigData *dsdp, Eterm local, Eterm remote, Eterm reason) +erts_dsig_send_exit(ErtsDSigData *dsdp, Eterm from, Eterm local, Eterm remote, Eterm reason) { DeclareTmpHeapNoproc(ctl_heap,5); int res; @@ -1170,7 +1270,7 @@ erts_dsig_send_exit(ErtsDSigData *dsdp, Eterm local, Eterm remote, Eterm reason) ctl = TUPLE4(&ctl_heap[0], make_small(DOP_EXIT), local, remote, reason); msg = THE_NON_VALUE; } - res = dsig_send_exit(dsdp, ctl, msg, 1); + res = dsig_send_exit(dsdp, ctl, msg, from, 1); UnUseTmpHeapNoproc(5); return res; } @@ -1193,7 +1293,7 @@ erts_dsig_send_exit2(ErtsDSigData *dsdp, Eterm local, Eterm remote, Eterm reason msg = THE_NON_VALUE; } - res = dsig_send_exit(dsdp, ctl, msg, 0); + res = dsig_send_exit(dsdp, ctl, msg, local, 0); UnUseTmpHeapNoproc(5); return res; } @@ -1215,17 +1315,123 @@ erts_dsig_send_group_leader(ErtsDSigData *dsdp, Eterm leader, Eterm remote) return res; } -#if defined(PURIFY) -# define PURIFY_MSG(msg) \ - purify_printf("%s, line %d: %s", __FILE__, __LINE__, msg) -#elif defined(VALGRIND) -#include <valgrind/valgrind.h> -#include <valgrind/memcheck.h> -# define PURIFY_MSG(msg) \ - VALGRIND_PRINTF("%s, line %d: %s", __FILE__, __LINE__, msg) -#else -# define PURIFY_MSG(msg) -#endif +struct dist_sequences { + ErlHeapFragment hfrag; + struct dist_sequences *parent; + struct dist_sequences *left; + struct dist_sequences *right; + char is_red; + + Uint64 seq_id; + int cnt; + Sint ctl_len; +}; + +#define ERTS_RBT_PREFIX dist_seq +#define ERTS_RBT_T DistSeqNode +#define ERTS_RBT_KEY_T Uint +#define ERTS_RBT_FLAGS_T int +#define ERTS_RBT_INIT_EMPTY_TNODE(T) \ + do { \ + (T)->parent = NULL; \ + (T)->left = NULL; \ + (T)->right = NULL; \ + (T)->is_red = 0; \ + } while(0) +#define ERTS_RBT_IS_RED(T) ((T)->is_red) +#define ERTS_RBT_SET_RED(T) ((T)->is_red = 1) +#define ERTS_RBT_IS_BLACK(T) (!ERTS_RBT_IS_RED(T)) +#define ERTS_RBT_SET_BLACK(T) ((T)->is_red = 0) +#define ERTS_RBT_GET_FLAGS(T) ((T)->is_red) +#define ERTS_RBT_SET_FLAGS(T, F) ((T)->is_red = F) +#define ERTS_RBT_GET_PARENT(T) ((T)->parent) +#define ERTS_RBT_SET_PARENT(T, P) ((T)->parent = P) +#define ERTS_RBT_GET_RIGHT(T) ((T)->right) +#define ERTS_RBT_SET_RIGHT(T, R) ((T)->right = (R)) +#define ERTS_RBT_GET_LEFT(T) ((T)->left) +#define ERTS_RBT_SET_LEFT(T, L) ((T)->left = (L)) +#define ERTS_RBT_GET_KEY(T) ((T)->seq_id) +#define ERTS_RBT_IS_LT(KX, KY) (KX < KY) +#define ERTS_RBT_IS_EQ(KX, KY) (KX == KY) +#define ERTS_RBT_WANT_DELETE +#define ERTS_RBT_WANT_LOOKUP_INSERT +#define ERTS_RBT_WANT_LOOKUP +#define ERTS_RBT_WANT_FOREACH +#define ERTS_RBT_WANT_FOREACH_DESTROY_YIELDING + +#include "erl_rbtree.h" + +struct erts_dist_seq_tree_foreach_iter_arg { + int (*func)(ErtsDistExternal *, void *, Sint); + void *arg; +}; + +static int +erts_dist_seq_tree_foreach_iter(DistSeqNode *seq, void *arg, Sint reds) +{ + struct erts_dist_seq_tree_foreach_iter_arg *state = arg; + return state->func(erts_get_dist_ext(&seq->hfrag), state->arg, reds); +} + +void +erts_dist_seq_tree_foreach(DistEntry *dep, int (*func)(ErtsDistExternal *, void *, Sint), void *arg) +{ + struct erts_dist_seq_tree_foreach_iter_arg state; + state.func = func; + state.arg = arg; + dist_seq_rbt_foreach(dep->sequences, erts_dist_seq_tree_foreach_iter, &state); +} + +static int dist_seq_cleanup(DistSeqNode *seq, void *unused, Sint reds) +{ + erts_free_dist_ext_copy(erts_get_dist_ext(&seq->hfrag)); + free_message_buffer(&seq->hfrag); + return 1; +} + +typedef struct { + DistSeqNode *root; + dist_seq_rbt_yield_state_t rbt_ystate; +} DistSeqNodeYieldState; + +int +erts_dist_seq_tree_foreach_delete_yielding(DistSeqNode **root, + void **vyspp, + Sint limit) +{ + DistSeqNodeYieldState ys = {*root, ERTS_RBT_YIELD_STAT_INITER}; + DistSeqNodeYieldState *ysp; + int res; + + ysp = (DistSeqNodeYieldState *) *vyspp; + if (!ysp) { + *root = NULL; + ysp = &ys; + } + res = dist_seq_rbt_foreach_destroy_yielding(&ysp->root, + dist_seq_cleanup, + NULL, + &ysp->rbt_ystate, + limit); + if (res > 0) { + if (ysp != &ys) + erts_free(ERTS_ALC_T_ML_YIELD_STATE, ysp); + *vyspp = NULL; + } + else { + + if (ysp == &ys) { + ysp = erts_alloc(ERTS_ALC_T_SEQ_YIELD_STATE, + sizeof(DistSeqNodeYieldState)); + sys_memcpy((void *) ysp, (void *) &ys, + sizeof(DistSeqNodeYieldState)); + } + + *vyspp = (void *) ysp; + } + + return res; +} /* ** Input from distribution port. @@ -1246,7 +1452,9 @@ int erts_net_message(Port *prt, byte *buf, ErlDrvSizeT len) { - ErtsDistExternal ede; + ErtsDistExternal ede, *edep = &ede; + ErtsDistExternalData ede_data; + ErlHeapFragment *ede_hfrag = NULL; Sint ctl_len; Eterm arg; Eterm from, to; @@ -1285,10 +1493,12 @@ int erts_net_message(Port *prt, } #ifdef ERTS_RAW_DIST_MSG_DBG - erts_fprintf(stderr, "<< "); + erts_fprintf(dbg_file, "RECV: "); bw(buf, len); #endif + ede.data = &ede_data; + res = erts_prepare_dist_ext(&ede, buf, len, bin, dep, conn_id, dep->cache); switch (res) { @@ -1296,51 +1506,164 @@ int erts_net_message(Port *prt, return 0; /* Connection not alive; ignore signal... */ case ERTS_PREP_DIST_EXT_FAILED: #ifdef ERTS_DIST_MSG_DBG - erts_fprintf(stderr, "DIST MSG DEBUG: erts_prepare_dist_ext() failed:\n"); + erts_fprintf(dbg_file, "DIST MSG DEBUG: erts_prepare_dist_ext() failed:\n"); bw(buf, orig_len); #endif goto data_error; case ERTS_PREP_DIST_EXT_SUCCESS: - ctl_len = erts_decode_dist_ext_size(&ede); + ctl_len = erts_decode_dist_ext_size(&ede, 1); if (ctl_len < 0) { #ifdef ERTS_DIST_MSG_DBG - erts_fprintf(stderr, "DIST MSG DEBUG: erts_decode_dist_ext_size(CTL) failed:\n"); + erts_fprintf(dbg_file, "DIST MSG DEBUG: erts_decode_dist_ext_size(CTL) failed:\n"); bw(buf, orig_len); #endif PURIFY_MSG("data error"); goto data_error; } + + /* A non-fragmented message */ + if (!ede.data->seq_id) { + if (ctl_len > DIST_CTL_DEFAULT_SIZE) { + ctl = erts_alloc(ERTS_ALC_T_DCTRL_BUF, ctl_len * sizeof(Eterm)); + } + + erts_factory_tmp_init(&factory, ctl, ctl_len, ERTS_ALC_T_DCTRL_BUF); + break; + } else { + DistSeqNode *seq; + Uint sz = erts_dist_ext_size(&ede); + Uint used_sz = ctl_len * sizeof(Eterm); + + /* We calculate the size of the heap fragment to be allocated. + The used_size part has to be larger that the ctl data and the + DistSeqNode. */ + if (used_sz + (sizeof(ErlHeapFragment) - sizeof(Eterm)) < sizeof(DistSeqNode)) + used_sz = sizeof(DistSeqNode) - (sizeof(ErlHeapFragment) - sizeof(Eterm)); + + seq = (DistSeqNode *)new_message_buffer((sz + used_sz) / sizeof(Eterm)); + seq->hfrag.used_size = used_sz / sizeof(Eterm); + + seq->ctl_len = ctl_len; + seq->seq_id = ede.data->seq_id; + seq->cnt = ede.data->frag_id; + if (dist_seq_rbt_lookup_insert(&dep->sequences, seq) != NULL) { + free_message_buffer(&seq->hfrag); + goto data_error; + } + + erts_make_dist_ext_copy(&ede, erts_get_dist_ext(&seq->hfrag)); + + if (ede.data->frag_id > 1) { + seq->cnt--; + return 0; + } + } + + /* fall through, the first fragment in the sequence was the last fragment */ + case ERTS_PREP_DIST_EXT_FRAG_CONT: { + DistSeqNode *seq = dist_seq_rbt_lookup(dep->sequences, ede.data->seq_id); + + if (!seq) + goto data_error; + + /* If we did a fall-though we already did this */ + if (res == ERTS_PREP_DIST_EXT_FRAG_CONT) + erts_dist_ext_frag(&ede_data, erts_get_dist_ext(&seq->hfrag)); + + /* Verify that the fragments have arrived in the correct order */ + if (seq->cnt != ede.data->frag_id) + goto data_error; + + seq->cnt--; + + /* Check if this was the last fragment */ + if (ede.data->frag_id > 1) + return 0; + + /* Last fragment arrived, time to dispatch the signal */ + dist_seq_rbt_delete(&dep->sequences, seq); + ctl_len = seq->ctl_len; + + /* Now that we no longer need the DistSeqNode we re-use the heapfragment + to decode the ctl msg into. We don't need the ctl message to be in + the heapfragment, but we decode into the heapfragment speculatively + in case there is a trace token that we need. */ + erts_factory_heap_frag_init(&factory, &seq->hfrag); + edep = erts_get_dist_ext(&seq->hfrag); + ede_hfrag = &seq->hfrag; + + /* If the sequence consisted of more than 1 fragment we create one large + binary out of all of the fragments. This because erts_decode_ext + cannot handle a segmented buffer. + TODO: Move this copy to as late as possible, preferably in in the + erts_decode_dist_ext in the receiving process. + */ + if (edep->data->frag_id > 1) { + Uint sz = 0; + Binary *bin; + int i; + byte *ep; + + for (i = 0; i < edep->data->frag_id; i++) + sz += edep->data[i].ext_endp - edep->data[i].extp; + + bin = erts_bin_nrml_alloc(sz); + ep = (byte*)bin->orig_bytes; + + for (i = 0; i < edep->data->frag_id; i++) { + sys_memcpy(ep, edep->data[i].extp, edep->data[i].ext_endp - edep->data[i].extp); + ep += edep->data[i].ext_endp - edep->data[i].extp; + erts_bin_release(edep->data[i].binp); + edep->data[i].binp = NULL; + edep->data[i].extp = NULL; + edep->data[i].ext_endp = NULL; + } + + edep->data->frag_id = 1; + edep->data->extp = (byte*)bin->orig_bytes; + edep->data->ext_endp = ep; + edep->data->binp = bin; + } + break; + } default: ERTS_INTERNAL_ERROR("Unexpected result from erts_prepare_dist_ext()"); break; } - if (ctl_len > DIST_CTL_DEFAULT_SIZE) { - ctl = erts_alloc(ERTS_ALC_T_DCTRL_BUF, ctl_len * sizeof(Eterm)); - } - - erts_factory_tmp_init(&factory, ctl, ctl_len, ERTS_ALC_T_DCTRL_BUF); - arg = erts_decode_dist_ext(&factory, &ede); + arg = erts_decode_dist_ext(&factory, edep, 1); if (is_non_value(arg)) { #ifdef ERTS_DIST_MSG_DBG - erts_fprintf(stderr, "DIST MSG DEBUG: erts_decode_dist_ext(CTL) failed:\n"); + erts_fprintf(dbg_file, "DIST MSG DEBUG: erts_decode_dist_ext(CTL) failed:\n"); bw(buf, orig_len); #endif PURIFY_MSG("data error"); goto decode_error; } -#ifdef ERTS_DIST_MSG_DBG - erts_fprintf(stderr, "<< CTL: %.80T\n", arg); -#endif + /* Fill the unused part of the hfrag with a bignum header */ + if (ede_hfrag && ede_hfrag->mem + ede_hfrag->used_size > factory.hp) { + Uint slot = factory.hp - ede_hfrag->mem; + ede_hfrag->mem[slot] = make_pos_bignum_header(ede_hfrag->used_size - slot - 1); + } if (is_not_tuple(arg) || (tuple = tuple_val(arg), (tuple_arity = arityval(*tuple)) < 1) || is_not_small(tuple[1])) { +#ifdef ERTS_DIST_MSG_DBG + if (is_tuple(arg) && arityval(*tuple) > 1) + erts_fprintf(dbg_file, "RECV: CTL: %s: %.80T\n", + erts_dop_to_string(unsigned_val(tuple[1])), arg); +#endif goto invalid_message; } +#ifdef ERTS_DIST_MSG_DBG + erts_fprintf(dbg_file, "RECV: CTL: %s: %.80T\n", + erts_dop_to_string(unsigned_val(tuple[1])), arg); +#endif + token = NIL; switch (type = unsigned_val(tuple[1])) { @@ -1385,7 +1708,7 @@ int erts_net_message(Port *prt, code = erts_dsig_prepare(&dsd, dep, NULL, 0, ERTS_DSP_NO_LOCK, 0, 0); if (code == ERTS_DSIG_PREP_CONNECTED) { - code = erts_dsig_send_exit(&dsd, to, from, am_noproc); + code = erts_dsig_send_exit(&dsd, to, to, from, am_noproc); ASSERT(code == ERTS_DSIG_SEND_OK); } @@ -1475,7 +1798,7 @@ int erts_net_message(Port *prt, code = erts_dsig_prepare(&dsd, dep, NULL, 0, ERTS_DSP_NO_LOCK, 0, 0); if (code == ERTS_DSIG_PREP_CONNECTED) { - code = erts_dsig_send_m_exit(&dsd, watcher, watched, ref, + code = erts_dsig_send_m_exit(&dsd, pid, watcher, watched, ref, am_noproc); ASSERT(code == ERTS_DSIG_SEND_OK); } @@ -1549,7 +1872,7 @@ int erts_net_message(Port *prt, } #ifdef ERTS_DIST_MSG_DBG - dist_msg_dbg(&ede, "MSG", buf, orig_len); + dist_msg_dbg(edep, "MSG", buf, orig_len); #endif from = tuple[2]; @@ -1560,7 +1883,6 @@ int erts_net_message(Port *prt, rp = erts_whereis_process(NULL, 0, to, 0, 0); if (rp) { ErtsProcLocks locks = 0; - ErtsDistExternal *ede_copy; if (type == DOP_REG_SEND) { token = NIL; @@ -1568,13 +1890,14 @@ int erts_net_message(Port *prt, token = tuple[5]; } - ede_copy = erts_make_dist_ext_copy(&ede, &token); - - erts_queue_dist_message(rp, locks, ede_copy, token, from); + erts_queue_dist_message(rp, locks, edep, ede_hfrag, token, from); if (locks) erts_proc_unlock(rp, locks); - } + } else if (ede_hfrag) { + erts_free_dist_ext_copy(erts_get_dist_ext(ede_hfrag)); + free_message_buffer(ede_hfrag); + } break; case DOP_SEND_SENDER_TT: { @@ -1606,7 +1929,7 @@ int erts_net_message(Port *prt, : tuple[2] == am_Empty); #ifdef ERTS_DIST_MSG_DBG - dist_msg_dbg(&ede, "MSG", buf, orig_len); + dist_msg_dbg(edep, "MSG", buf, orig_len); #endif to = tuple[3]; if (is_not_pid(to)) { @@ -1615,20 +1938,19 @@ int erts_net_message(Port *prt, rp = erts_proc_lookup(to); if (rp) { ErtsProcLocks locks = 0; - ErtsDistExternal *ede_copy; - - ede_copy = erts_make_dist_ext_copy(&ede, &token); - erts_queue_dist_message(rp, locks, ede_copy, token, am_Empty); + erts_queue_dist_message(rp, locks, edep, ede_hfrag, token, am_Empty); if (locks) erts_proc_unlock(rp, locks); - } + } else if (ede_hfrag) { + erts_free_dist_ext_copy(erts_get_dist_ext(ede_hfrag)); + free_message_buffer(ede_hfrag); + } break; } case DOP_PAYLOAD_MONITOR_P_EXIT: case DOP_MONITOR_P_EXIT: { - ErtsDistExternal *ede_copy = NULL; /* We are monitoring a process on the remote node which dies, we get {DOP_MONITOR_P_EXIT, Remote pid or name, Local pid, ref, reason} */ @@ -1663,19 +1985,18 @@ int erts_net_message(Port *prt, goto invalid_message; } + if (!erts_proc_lookup(watcher)) break; /* Process not alive */ + if (reason == THE_NON_VALUE) { #ifdef ERTS_DIST_MSG_DBG - dist_msg_dbg(&ede, "MSG", buf, orig_len); + dist_msg_dbg(edep, "MSG", buf, orig_len); #endif - if (!erts_proc_lookup(watcher)) break; /* Process not alive */ - - ede_copy = erts_make_dist_ext_copy(&ede, &token); } - erts_proc_sig_send_dist_monitor_down(dep, ref, watched, - watcher, ede_copy, reason); + erts_proc_sig_send_dist_monitor_down( + dep, ref, watched, watcher, edep, ede_hfrag, reason); break; } @@ -1683,7 +2004,6 @@ int erts_net_message(Port *prt, case DOP_PAYLOAD_EXIT_TT: case DOP_EXIT_TT: case DOP_EXIT: { - ErtsDistExternal *ede_copy = NULL; /* 'from', which 'to' is linked to, died */ from = tuple[2]; @@ -1720,19 +2040,16 @@ int erts_net_message(Port *prt, goto invalid_message; } - if (reason == THE_NON_VALUE) { + if (!erts_proc_lookup(to)) break; /* Process not alive */ + if (reason == THE_NON_VALUE) { #ifdef ERTS_DIST_MSG_DBG - dist_msg_dbg(&ede, "MSG", buf, orig_len); + dist_msg_dbg(edep, "MSG", buf, orig_len); #endif - - if (!erts_proc_lookup(to)) break; /* Process not alive */ - - ede_copy = erts_make_dist_ext_copy(&ede, &token); } erts_proc_sig_send_dist_link_exit(dep, - from, to, ede_copy, + from, to, edep, ede_hfrag, reason, token); break; } @@ -1740,7 +2057,6 @@ int erts_net_message(Port *prt, case DOP_PAYLOAD_EXIT2: case DOP_EXIT2_TT: case DOP_EXIT2: { - ErtsDistExternal *ede_copy = NULL; /* 'from' is send an exit signal to 'to' */ from = tuple[2]; @@ -1777,18 +2093,15 @@ int erts_net_message(Port *prt, goto invalid_message; } - if (reason == THE_NON_VALUE) { + if (!erts_proc_lookup(to)) break; /* Process not alive */ + if (reason == THE_NON_VALUE) { #ifdef ERTS_DIST_MSG_DBG - dist_msg_dbg(&ede, "MSG", buf, orig_len); + dist_msg_dbg(edep, "MSG", buf, orig_len); #endif - - if (!erts_proc_lookup(to)) break; /* Process not alive */ - - ede_copy = erts_make_dist_ext_copy(&ede, &token); } - erts_proc_sig_send_dist_exit(dep, from, to, ede_copy, reason, token); + erts_proc_sig_send_dist_exit(dep, from, to, edep, ede_hfrag, reason, token); break; } case DOP_GROUP_LEADER: @@ -1804,13 +2117,15 @@ int erts_net_message(Port *prt, (void) erts_proc_sig_send_group_leader(NULL, to, from, NIL); break; - default: + default: goto invalid_message; } - erts_factory_close(&factory); - if (ctl != ctl_default) { - erts_free(ERTS_ALC_T_DCTRL_BUF, (void *) ctl); + if (ede_hfrag == NULL) { + erts_factory_close(&factory); + if (ctl != ctl_default) { + erts_free(ERTS_ALC_T_DCTRL_BUF, (void *) ctl); + } } UnUseTmpHeapNoproc(DIST_CTL_DEFAULT_SIZE); ERTS_CHK_NO_PROC_LOCKS; @@ -1823,9 +2138,14 @@ int erts_net_message(Port *prt, } decode_error: PURIFY_MSG("data error"); - erts_factory_close(&factory); - if (ctl != ctl_default) { - erts_free(ERTS_ALC_T_DCTRL_BUF, (void *) ctl); + if (ede_hfrag == NULL) { + erts_factory_close(&factory); + if (ctl != ctl_default) { + erts_free(ERTS_ALC_T_DCTRL_BUF, (void *) ctl); + } + } else { + erts_free_dist_ext_copy(erts_get_dist_ext(ede_hfrag)); + free_message_buffer(ede_hfrag); } data_error: UnUseTmpHeapNoproc(DIST_CTL_DEFAULT_SIZE); @@ -1834,18 +2154,17 @@ data_error: return -1; } -static int dsig_send_exit(ErtsDSigData* dsdp, Eterm ctl, Eterm msg, int force_busy) +static int dsig_send_exit(ErtsDSigData* dsdp, Eterm ctl, Eterm msg, Eterm from, int force_busy) { struct erts_dsig_send_context ctx; int ret; ctx.ctl = ctl; ctx.msg = msg; + ctx.from = from; ctx.force_busy = force_busy; ctx.force_encode = 1; ctx.phase = ERTS_DSIG_SEND_PHASE_INIT; -#ifdef DEBUG ctx.reds = 1; /* provoke assert below (no reduction count with force_encode) */ -#endif ret = erts_dsig_send(dsdp, &ctx); ASSERT(ret != ERTS_DSIG_SEND_CONTINUE); return ret; @@ -1857,12 +2176,11 @@ static int dsig_send_ctl(ErtsDSigData* dsdp, Eterm ctl, int force_busy) int ret; ctx.ctl = ctl; ctx.msg = THE_NON_VALUE; + ctx.from = THE_NON_VALUE; ctx.force_busy = force_busy; ctx.force_encode = 1; ctx.phase = ERTS_DSIG_SEND_PHASE_INIT; -#ifdef DEBUG ctx.reds = 1; /* provoke assert below (no reduction count without msg) */ -#endif ret = erts_dsig_send(dsdp, &ctx); ASSERT(ret != ERTS_DSIG_SEND_CONTINUE); return ret; @@ -1926,9 +2244,11 @@ erts_dsig_send(ErtsDSigData *dsdp, struct erts_dsig_send_context* ctx) } #ifdef ERTS_DIST_MSG_DBG - erts_fprintf(stderr, ">> CTL: %.80T\n", ctx->ctl); + erts_fprintf(dbg_file, "SEND: CTL: %s: %.80T\n", + erts_dop_to_string(unsigned_val(tuple_val(ctx->ctl)[1])), + ctx->ctl); if (is_value(ctx->msg)) - erts_fprintf(stderr, " MSG: %.80T\n", ctx->msg); + erts_fprintf(dbg_file, " MSG: %.160T\n", ctx->msg); #endif ctx->data_size = ctx->max_finalize_prepend; @@ -1958,16 +2278,32 @@ erts_dsig_send(ErtsDSigData *dsdp, struct erts_dsig_send_context* ctx) case ERTS_DSIG_SEND_PHASE_ALLOC: erts_finalize_atom_cache_map(ctx->acmp, ctx->flags); - ctx->dhdr_ext_size = erts_encode_ext_dist_header_size(ctx->acmp); - ctx->data_size += ctx->dhdr_ext_size; + if (ctx->flags & DFLAG_FRAGMENTS && is_value(ctx->msg) && is_not_immed(ctx->msg)) { + /* Calculate the max number of fragments that are needed */ + ASSERT(is_pid(ctx->from) && + "from has to be a pid because it is used as sequence id"); + ctx->fragments = ctx->data_size / ERTS_DIST_FRAGMENT_SIZE + 1; + } else + ctx->fragments = 1; - ctx->obuf = alloc_dist_obuf(ctx->data_size); - ctx->obuf->ext_endp = &ctx->obuf->data[0] + ctx->max_finalize_prepend + ctx->dhdr_ext_size; + ctx->dhdr_ext_size = erts_encode_ext_dist_header_size(ctx->acmp, ctx->fragments); + + ctx->obuf = alloc_dist_obuf( + ctx->dhdr_ext_size + ctx->data_size + + (ctx->fragments-1) * ERTS_DIST_FRAGMENT_HEADER_SIZE, + ctx->fragments); + ctx->obuf->ext_start = &ctx->obuf->extp[0]; + ctx->obuf->ext_endp = &ctx->obuf->extp[0] + ctx->max_finalize_prepend + ctx->dhdr_ext_size; /* Encode internal version of dist header */ - ctx->obuf->extp = erts_encode_ext_dist_header_setup(ctx->obuf->ext_endp, ctx->acmp); + ctx->obuf->extp = erts_encode_ext_dist_header_setup( + ctx->obuf->ext_endp, ctx->acmp, ctx->fragments, ctx->from); /* Encode control message */ erts_encode_dist_ext(ctx->ctl, &ctx->obuf->ext_endp, ctx->flags, ctx->acmp, NULL, NULL); + + ctx->obuf->hdrp = NULL; + ctx->obuf->hdr_endp = NULL; + if (is_non_value(ctx->msg)) { ctx->obuf->msg_start = NULL; ctx->phase = ERTS_DSIG_SEND_PHASE_FIN; @@ -1988,33 +2324,91 @@ erts_dsig_send(ErtsDSigData *dsdp, struct erts_dsig_send_context* ctx) goto done; } } else { - erts_encode_dist_ext(ctx->msg, &ctx->obuf->ext_endp, ctx->flags, ctx->acmp, NULL, NULL); + erts_encode_dist_ext(ctx->msg, &ctx->obuf->ext_endp, ctx->flags, + ctx->acmp, NULL, NULL); } - ctx->phase = ERTS_DSIG_SEND_PHASE_FIN; + ctx->phase = ERTS_DSIG_SEND_PHASE_FIN; case ERTS_DSIG_SEND_PHASE_FIN: { - DistEntry *dep = dsdp->dep; - int suspended = 0; - int resume = 0; ASSERT(ctx->obuf->extp < ctx->obuf->ext_endp); - ASSERT(&ctx->obuf->data[0] <= ctx->obuf->extp - ctx->max_finalize_prepend); - ASSERT(ctx->obuf->ext_endp <= &ctx->obuf->data[0] + ctx->data_size); + ASSERT(((byte*)&ctx->obuf->bin->orig_bytes[0]+obuf_list_size) <= ctx->obuf->extp - ctx->max_finalize_prepend); + ASSERT(ctx->obuf->ext_endp <= ((byte*)ctx->obuf->bin->orig_bytes+obuf_list_size) + ctx->data_size + ctx->dhdr_ext_size); ctx->data_size = ctx->obuf->ext_endp - ctx->obuf->extp; - if (ctx->data_size > (Uint) INT_MAX) { - free_dist_obuf(ctx->obuf); - ctx->obuf = NULL; - retval = ERTS_DSIG_SEND_TOO_LRG; - goto done; - } ctx->obuf->hopefull_flags = ctx->u.ec.hopefull_flags; - /* + + if (ctx->fragments > 1) { + int fin_fragments; + int i; + byte *msg = ctx->obuf->msg_start, + *msg_end = ctx->obuf->ext_endp, + *hdrp = msg_end; + + ASSERT((ctx->obuf->hopefull_flags & ctx->flags) == ctx->obuf->hopefull_flags); + ASSERT(get_int64(ctx->obuf->extp + 1 + 1 + 8) == ctx->fragments); + + /* Now that encoding is done we know how large the term will + be so we adjust the number of fragments to send. Note that + this can mean that only 1 fragment is sent. */ + fin_fragments = (ctx->obuf->ext_endp - ctx->obuf->msg_start + ERTS_DIST_FRAGMENT_SIZE-1) / + ERTS_DIST_FRAGMENT_SIZE - 1; + + /* Update the frag_id in the DIST_FRAG_HEADER */ + put_int64(fin_fragments+1, ctx->obuf->extp + 1 + 1 + 8); + + if (fin_fragments > 0) + msg += ERTS_DIST_FRAGMENT_SIZE; + else + msg = msg_end; + ctx->obuf->next = &ctx->obuf[1]; + ctx->obuf->ext_endp = msg; + + /* Loop through all fragments, updating the output buffers + to be correct and also writing the DIST_FRAG_CONT header. */ + for (i = 1; i < fin_fragments + 1; i++) { + ctx->obuf[i].hopefull_flags = 0; + ctx->obuf[i].extp = msg; + ctx->obuf[i].ext_start = msg; + if (msg + ERTS_DIST_FRAGMENT_SIZE > msg_end) + ctx->obuf[i].ext_endp = msg_end; + else { + msg += ERTS_DIST_FRAGMENT_SIZE; + ctx->obuf[i].ext_endp = msg; + } + ASSERT(ctx->obuf[i].ext_endp > ctx->obuf[i].extp); + ctx->obuf[i].hdrp = erts_encode_ext_dist_header_fragment( + &hdrp, fin_fragments - i + 1, ctx->from); + ctx->obuf[i].hdr_endp = hdrp; + ctx->obuf[i].next = &ctx->obuf[i+1]; + } + /* If the initial fragment calculation was incorrect we free the + remaining output buffers. */ + for (; i < ctx->fragments; i++) { + free_dist_obuf(&ctx->obuf[i]); + } + if (!ctx->force_encode && !ctx->force_busy) + ctx->reds -= ctx->fragments; + ctx->fragments = fin_fragments + 1; + } + + ctx->phase = ERTS_DSIG_SEND_PHASE_SEND; + + if (ctx->reds <= 0) { + retval = ERTS_DSIG_SEND_CONTINUE; + goto done; + } + } + case ERTS_DSIG_SEND_PHASE_SEND: { + /* * Signal encoded; now verify that the connection still exists, * and if so enqueue the signal and schedule it for send. */ - ctx->obuf->next = NULL; + DistEntry *dep = dsdp->dep; + int suspended = 0; + int resume = 0; + int i; erts_de_rlock(dep); cid = dep->cid; if (dep->state == ERTS_DE_STATE_EXITING @@ -2022,14 +2416,38 @@ erts_dsig_send(ErtsDSigData *dsdp, struct erts_dsig_send_context* ctx) || dep->connection_id != dsdp->connection_id) { /* Not the same connection as when we started; drop message... */ erts_de_runlock(dep); - free_dist_obuf(ctx->obuf); + for (i = 0; i < ctx->fragments; i++) + free_dist_obuf(&ctx->obuf[i]); + ctx->fragments = 0; } else { - Sint qsize; + Sint qsize = erts_atomic_read_nob(&dep->qsize); erts_aint32_t qflgs; ErtsProcList *plp = NULL; Eterm notify_proc = NIL; - Sint obsz = size_obuf(ctx->obuf); + Sint obsz; + int fragments; + + /* Calculate how many fragments to send. This depends on + the available space in the distr queue and the amount + of remaining reductions. */ + for (fragments = 0, obsz = 0; + fragments < ctx->fragments && + ((ctx->reds > 0 && (qsize + obsz) < erts_dist_buf_busy_limit) || + ctx->force_encode || ctx->force_busy); + fragments++) { +#ifdef DEBUG + int reds = 100; +#else + int reds = 10; +#endif + if (!ctx->force_encode && !ctx->force_busy) + ctx->reds -= reds; + obsz += size_obuf(&ctx->obuf[fragments]); + } + + ASSERT(fragments == ctx->fragments || + (!ctx->force_encode && !ctx->force_busy)); erts_mtx_lock(&dep->qlock); qsize = erts_atomic_add_read_nob(&dep->qsize, (erts_aint_t) obsz); @@ -2059,12 +2477,25 @@ erts_dsig_send(ErtsDSigData *dsdp, struct erts_dsig_send_context* ctx) erts_mtx_lock(&dep->qlock); } - /* Enqueue obuf on dist entry */ - if (dep->out_queue.last) - dep->out_queue.last->next = ctx->obuf; - else - dep->out_queue.first = ctx->obuf; - dep->out_queue.last = ctx->obuf; + if (fragments > 1) { + if (!ctx->obuf->hdrp) { + ASSERT(get_int64(ctx->obuf->extp + 10) == ctx->fragments); + } else { + ASSERT(get_int64(ctx->obuf->hdrp + 10) == ctx->fragments); + } + } + + if (fragments) { + ctx->obuf[fragments-1].next = NULL; + if (dep->out_queue.last) + dep->out_queue.last->next = ctx->obuf; + else + dep->out_queue.first = ctx->obuf; + dep->out_queue.last = &ctx->obuf[fragments-1]; + + ctx->fragments -= fragments; + ctx->obuf = &ctx->obuf[fragments]; + } if (!ctx->force_busy) { qflgs = erts_atomic32_read_nob(&dep->qflgs); @@ -2113,6 +2544,13 @@ erts_dsig_send(ErtsDSigData *dsdp, struct erts_dsig_send_context* ctx) * erroneously scheduled when it shouldn't be. */ } + /* More fragments left to be sent, yield and re-schedule */ + if (ctx->fragments) { + retval = ERTS_DSIG_SEND_CONTINUE; + if (!resume && erts_system_monitor_flags.busy_dist_port) + monitor_generic(ctx->c_p, am_busy_dist_port, cid); + goto done; + } } ctx->obuf = NULL; @@ -2197,9 +2635,9 @@ static Uint dist_port_commandv(Port *prt, ErtsDistOutputBuf *obuf) { int fpe_was_unmasked; - ErlDrvSizeT size; - SysIOVec iov[2]; - ErlDrvBinary* bv[2]; + ErlDrvSizeT size = 0; + SysIOVec iov[3]; + ErlDrvBinary* bv[3]; ErlIOVec eiov; ERTS_CHK_NO_PROC_LOCKS; @@ -2214,12 +2652,31 @@ dist_port_commandv(Port *prt, ErtsDistOutputBuf *obuf) eiov.vsize = 1; } else { - size = obuf->ext_endp - obuf->extp; + int i = 1; eiov.vsize = 2; - iov[1].iov_base = obuf->extp; - iov[1].iov_len = size; - bv[1] = Binary2ErlDrvBinary(ErtsDistOutputBuf2Binary(obuf)); + if (obuf->hdrp) { + eiov.vsize = 3; + iov[i].iov_base = obuf->hdrp; + iov[i].iov_len = obuf->hdr_endp - obuf->hdrp; + size += iov[i].iov_len; + bv[i] = Binary2ErlDrvBinary(ErtsDistOutputBuf2Binary(obuf)); +#ifdef ERTS_RAW_DIST_MSG_DBG + erts_fprintf(dbg_file, "SEND: "); + bw(iov[i].iov_base, iov[i].iov_len); +#endif + i++; + + } + + iov[i].iov_base = obuf->extp; + iov[i].iov_len = obuf->ext_endp - obuf->extp; +#ifdef ERTS_RAW_DIST_MSG_DBG + erts_fprintf(dbg_file, "SEND: "); + bw(iov[i].iov_base, iov[i].iov_len); +#endif + size += iov[i].iov_len; + bv[i] = Binary2ErlDrvBinary(ErtsDistOutputBuf2Binary(obuf)); } eiov.size = size; @@ -2326,6 +2783,23 @@ erts_dist_command(Port *prt, int initial_reds) dep->finalized_out_queue.first = NULL; dep->finalized_out_queue.last = NULL; +#ifdef DEBUG + { + Uint sz = 0; + ErtsDistOutputBuf *curr = oq.first; + while (curr) { + sz += size_obuf(curr); + curr = curr->next; + } + curr = foq.first; + while (curr) { + sz += size_obuf(curr); + curr = curr->next; + } + ASSERT(sz <= erts_atomic_read_nob(&dep->qsize)); + } +#endif + sched_flags = erts_atomic32_read_nob(&prt->sched.flags); if (reds < 0) @@ -2339,10 +2813,6 @@ erts_dist_command(Port *prt, int initial_reds) size = (*send)(prt, foq.first); erts_atomic64_inc_nob(&dep->out); esdp->io.out += (Uint64) size; -#ifdef ERTS_RAW_DIST_MSG_DBG - erts_fprintf(stderr, ">> "); - bw(foq.first->extp, size); -#endif reds -= ERTS_PORT_REDS_DIST_CMD_DATA(size); fob = foq.first; obufsize += size_obuf(fob); @@ -2411,15 +2881,11 @@ erts_dist_command(Port *prt, int initial_reds) preempt = 1; break; } - ASSERT(&oq.first->data[0] <= oq.first->extp - && oq.first->extp <= oq.first->ext_endp); + ASSERT(oq.first->bin->orig_bytes <= (char*)oq.first->extp + && oq.first->extp <= oq.first->ext_endp); size = (*send)(prt, oq.first); erts_atomic64_inc_nob(&dep->out); esdp->io.out += (Uint64) size; -#ifdef ERTS_RAW_DIST_MSG_DBG - erts_fprintf(stderr, ">> "); - bw(oq.first->extp, size); -#endif reds -= ERTS_PORT_REDS_DIST_CMD_DATA(size); fob = oq.first; obufsize += size_obuf(fob); @@ -2556,100 +3022,6 @@ erts_dist_command(Port *prt, int initial_reds) goto done; } -#if 0 - -int -dist_data_finalize(Process *c_p, int reds_limit) -{ - int reds = 5; - DistEntry *dep = ; - ErtsDistOutputQueue oq, foq; - ErtsDistOutputBuf *ob; - int preempt; - - - erts_mtx_lock(&dep->qlock); - flags = dep->flags; - oq.first = dep->out_queue.first; - oq.last = dep->out_queue.last; - dep->out_queue.first = NULL; - dep->out_queue.last = NULL; - erts_mtx_unlock(&dep->qlock); - - if (!oq.first) { - ASSERT(!oq.last); - oq.first = dep->tmp_out_queue.first; - oq.last = dep->tmp_out_queue.last; - } - else { - ErtsDistOutputBuf *f, *l; - ASSERT(oq.last); - if (dep->tmp_out_queue.last) { - dep->tmp_out_queue.last->next = oq.first; - oq.first = dep->tmp_out_queue.first; - } - } - - if (!oq.first) { - /* Nothing to do... */ - ASSERT(!oq.last); - return reds; - } - - foq.first = dep->finalized_out_queue.first; - foq.last = dep->finalized_out_queue.last; - - preempt = 0; - ob = oq.first; - ASSERT(ob); - - do { - ob->extp = erts_encode_ext_dist_header_finalize(ob->extp, - dep->cache, - flags); - if (!(flags & DFLAG_DIST_HDR_ATOM_CACHE)) - *--ob->extp = PASS_THROUGH; /* Old node; 'pass through' - needed */ - ASSERT(&ob->data[0] <= ob->extp && ob->extp < ob->ext_endp); - reds += ERTS_PORT_REDS_DIST_CMD_FINALIZE; - preempt = reds > reds_limit; - if (preempt) - break; - ob = ob->next; - } while (ob); - /* - * At least one buffer was finalized; if we got preempted, - * ob points to the last buffer that we finalized. - */ - if (foq.last) - foq.last->next = oq.first; - else - foq.first = oq.first; - if (!preempt) { - /* All buffers finalized */ - foq.last = oq.last; - oq.first = oq.last = NULL; - } - else { - /* Not all buffers finalized; split oq. */ - foq.last = ob; - oq.first = ob->next; - if (oq.first) - ob->next = NULL; - else - oq.last = NULL; - } - - dep->finalized_out_queue.first = foq.first; - dep->finalized_out_queue.last = foq.last; - dep->tmp_out_queue.first = oq.first; - dep->tmp_out_queue.last = oq.last; - - return reds; -} - -#endif - BIF_RETTYPE dist_ctrl_get_data_notification_1(BIF_ALIST_1) { @@ -2998,6 +3370,10 @@ static void kill_connection(DistEntry *dep) void erts_kill_dist_connection(DistEntry *dep, Uint32 conn_id) { +#ifdef ERTS_DIST_MSG_DBG + erts_fprintf(dbg_file, "INTR: kill dist conn to %T:%u\n", + dep->sysname, conn_id); +#endif erts_de_rwlock(dep); if (conn_id == dep->connection_id && dep->state == ERTS_DE_STATE_CONNECTED) { @@ -3659,8 +4035,9 @@ Sint erts_abort_connection_rwunlock(DistEntry* dep) erts_set_dist_entry_not_connected(dep); erts_de_rwunlock(dep); - schedule_con_monitor_link_cleanup(mld, THE_NON_VALUE, - THE_NON_VALUE, THE_NON_VALUE); + schedule_con_monitor_link_seq_cleanup( + mld, NULL, THE_NON_VALUE, + THE_NON_VALUE, THE_NON_VALUE); if (resume_procs) { int resumed = erts_resume_processes(resume_procs); diff --git a/erts/emulator/beam/dist.h b/erts/emulator/beam/dist.h index 81a09bc69c..034b75df38 100644 --- a/erts/emulator/beam/dist.h +++ b/erts/emulator/beam/dist.h @@ -48,6 +48,7 @@ #define DFLAG_BIG_SEQTRACE_LABELS 0x100000 #define DFLAG_NO_MAGIC 0x200000 /* internal for pending connection */ #define DFLAG_EXIT_PAYLOAD 0x400000 +#define DFLAG_FRAGMENTS 0x800000 /* Mandatory flags for distribution */ #define DFLAG_DIST_MANDATORY (DFLAG_EXTENDED_REFERENCES \ @@ -77,7 +78,8 @@ | DFLAG_BIG_CREATION \ | DFLAG_SEND_SENDER \ | DFLAG_BIG_SEQTRACE_LABELS \ - | DFLAG_EXIT_PAYLOAD) + | DFLAG_EXIT_PAYLOAD \ + | DFLAG_FRAGMENTS) /* Flags addable by local distr implementations */ #define DFLAG_DIST_ADDABLE DFLAG_DIST_DEFAULT @@ -101,7 +103,7 @@ | DFLAG_BIG_CREATION) /* opcodes used in distribution messages */ -enum { +enum dop { DOP_LINK = 1, DOP_SEND = 2, DOP_EXIT = 3, @@ -150,6 +152,16 @@ typedef struct { Uint32 flags; } ErtsDSigData; +/* Must be larger or equal to 16 */ +#ifdef DEBUG +#define ERTS_DIST_FRAGMENT_SIZE 16 +#else +/* This should be made configurable */ +#define ERTS_DIST_FRAGMENT_SIZE (64 * 1024) +#endif + +#define ERTS_DIST_FRAGMENT_HEADER_SIZE (1 + 1 + 8 + 8) /* magic, header, seq id, frag id*/ + #define ERTS_DE_BUSY_LIMIT (1024*1024) extern int erts_dist_buf_busy_limit; extern int erts_is_alive; @@ -347,7 +359,8 @@ enum erts_dsig_send_phase { ERTS_DSIG_SEND_PHASE_MSG_SIZE, ERTS_DSIG_SEND_PHASE_ALLOC, ERTS_DSIG_SEND_PHASE_MSG_ENCODE, - ERTS_DSIG_SEND_PHASE_FIN + ERTS_DSIG_SEND_PHASE_FIN, + ERTS_DSIG_SEND_PHASE_SEND }; struct erts_dsig_send_context { @@ -356,12 +369,14 @@ struct erts_dsig_send_context { Eterm ctl; Eterm msg; + Eterm from; int force_busy; int force_encode; Uint32 max_finalize_prepend; Uint data_size, dhdr_ext_size; ErtsAtomCacheMap *acmp; ErtsDistOutputBuf *obuf; + Uint fragments; Uint32 flags; Process *c_p; union { @@ -383,6 +398,7 @@ typedef struct { Eterm return_term; }ErtsSendContext; +typedef struct dist_sequences DistSeqNode; /* * erts_dsig_send_* return values. @@ -398,11 +414,11 @@ extern int erts_dsig_send_exit_tt(ErtsDSigData *, Eterm, Eterm, Eterm, Eterm); extern int erts_dsig_send_unlink(ErtsDSigData *, Eterm, Eterm); extern int erts_dsig_send_reg_msg(Eterm, Eterm, ErtsSendContext*); extern int erts_dsig_send_group_leader(ErtsDSigData *, Eterm, Eterm); -extern int erts_dsig_send_exit(ErtsDSigData *, Eterm, Eterm, Eterm); +extern int erts_dsig_send_exit(ErtsDSigData *, Eterm, Eterm, Eterm, Eterm); extern int erts_dsig_send_exit2(ErtsDSigData *, Eterm, Eterm, Eterm); extern int erts_dsig_send_demonitor(ErtsDSigData *, Eterm, Eterm, Eterm, int); extern int erts_dsig_send_monitor(ErtsDSigData *, Eterm, Eterm, Eterm); -extern int erts_dsig_send_m_exit(ErtsDSigData *, Eterm, Eterm, Eterm, Eterm); +extern int erts_dsig_send_m_exit(ErtsDSigData *, Eterm, Eterm, Eterm, Eterm, Eterm); extern int erts_dsig_send(ErtsDSigData *dsdp, struct erts_dsig_send_context* ctx); extern int erts_dsend_context_dtor(Binary*); @@ -416,4 +432,8 @@ extern Uint erts_dist_cache_size(void); extern Sint erts_abort_connection_rwunlock(DistEntry *dep); +extern void erts_dist_seq_tree_foreach( + DistEntry *dep, + int (*func)(ErtsDistExternal *, void*, Sint), void *args); + #endif diff --git a/erts/emulator/beam/erl_alloc.types b/erts/emulator/beam/erl_alloc.types index 31f19d1b88..490a033b8a 100644 --- a/erts/emulator/beam/erl_alloc.types +++ b/erts/emulator/beam/erl_alloc.types @@ -276,6 +276,7 @@ type PF3_ARGS SHORT_LIVED PROCESSES process_flag_3_arguments type SETUP_CONN_ARG SHORT_LIVED PROCESSES setup_connection_argument type LIST_TRAP SHORT_LIVED PROCESSES list_bif_trap_state type CONT_EXIT_TRAP SHORT_LIVED PROCESSES continue_exit_trap_state +type SEQ_YIELD_STATE SHORT_LIVED SYSTEM dist_seq_yield_state type ENVIRONMENT SYSTEM SYSTEM environment diff --git a/erts/emulator/beam/erl_message.c b/erts/emulator/beam/erl_message.c index cd7bce8bf4..e12a51a0ae 100644 --- a/erts/emulator/beam/erl_message.c +++ b/erts/emulator/beam/erl_message.c @@ -219,6 +219,10 @@ erts_cleanup_message(ErtsMessage *mp) if (ERTS_SIG_IS_MSG(mp) && mp->data.attached != ERTS_MSG_COMBINED_HFRAG) { bp = mp->data.heap_frag; } else { + /* All non msg signals are combined HFRAG messages, + but we overwrite the mp->data field with the + nm_signal queue ptr so have to fix that here + before freeing it. */ mp->data.attached = ERTS_MSG_COMBINED_HFRAG; bp = mp->hfrag.next; erts_cleanup_offheap(&mp->hfrag.off_heap); @@ -234,7 +238,7 @@ erts_cleanup_messages(ErtsMessage *msgp) ErtsMessage *mp = msgp; while (mp) { ErtsMessage *fmp; - erts_cleanup_message(mp); + erts_cleanup_message(mp); fmp = mp; mp = mp->next; erts_free_message(fmp); @@ -266,6 +270,7 @@ void erts_queue_dist_message(Process *rcvr, ErtsProcLocks rcvr_locks, ErtsDistExternal *dist_ext, + ErlHeapFragment *hfrag, Eterm token, Eterm from) { @@ -274,8 +279,26 @@ erts_queue_dist_message(Process *rcvr, ERTS_LC_ASSERT(rcvr_locks == erts_proc_lc_my_proc_locks(rcvr)); - mp = erts_alloc_message(0, NULL); - mp->data.dist_ext = dist_ext; + if (hfrag) { + /* Fragmented message, allocate a message reference */ + mp = erts_alloc_message(0, NULL); + mp->data.heap_frag = hfrag; + } else { + /* Un-fragmented message, allocate space for + token and dist_ext in message. */ + Uint dist_ext_sz = erts_dist_ext_size(dist_ext) / sizeof(Eterm); + Uint token_sz = size_object(token); + Uint sz = token_sz + dist_ext_sz; + Eterm *hp; + + mp = erts_alloc_message(sz, &hp); + mp->data.heap_frag = &mp->hfrag; + mp->hfrag.used_size = token_sz; + + erts_make_dist_ext_copy(dist_ext, erts_get_dist_ext(mp->data.heap_frag)); + + token = copy_struct(token, token_sz, &hp, &mp->data.heap_frag->off_heap); + } ERL_MESSAGE_FROM(mp) = dist_ext->dep->sysname; ERL_MESSAGE_TERM(mp) = THE_NON_VALUE; @@ -499,25 +522,27 @@ Uint erts_msg_attached_data_size_aux(ErtsMessage *msg) { Sint sz; - ASSERT(is_non_value(ERL_MESSAGE_TERM(msg))); - ASSERT(msg->data.dist_ext); - ASSERT(msg->data.dist_ext->heap_size < 0); - - sz = erts_decode_dist_ext_size(msg->data.dist_ext); - if (sz < 0) { - /* Bad external - * We leave the message intact in this case as it's not worth the trouble - * to make all callers remove it from queue. It will be detected again - * and removed from message queue later anyway. - */ - return 0; - } + ErtsDistExternal *edep = erts_get_dist_ext(msg->data.heap_frag); + ASSERT(ERTS_SIG_IS_EXTERNAL_MSG(msg)); + + if (edep->heap_size < 0) { - msg->data.dist_ext->heap_size = sz; - if (is_not_nil(msg->m[1])) { - ErlHeapFragment *heap_frag; - heap_frag = erts_dist_ext_trailer(msg->data.dist_ext); - sz += heap_frag->used_size; + sz = erts_decode_dist_ext_size(edep, 1); + if (sz < 0) { + /* Bad external + * We leave the message intact in this case as it's not worth the trouble + * to make all callers remove it from queue. It will be detected again + * and removed from message queue later anyway. + */ + return 0; + } + + edep->heap_size = sz; + } else { + sz = edep->heap_size; + } + if (is_not_nil(ERL_MESSAGE_TOKEN(msg))) { + sz += msg->data.heap_frag->used_size; } return sz; } @@ -1165,7 +1190,7 @@ erts_factory_message_create(ErtsHeapFactory* factory, int on_heap; erts_aint32_t state; - state = proc ? erts_atomic32_read_nob(&proc->state) : 0; + state = proc ? erts_atomic32_read_nob(&proc->state) : ERTS_PSFLG_OFF_HEAP_MSGQ; if (state & ERTS_PSFLG_OFF_HEAP_MSGQ) { msgp = erts_alloc_message(sz, &hp); @@ -1398,8 +1423,8 @@ void erts_factory_close(ErtsHeapFactory* factory) else factory->message->data.heap_frag = factory->heap_frags; - /* Fall through */ - case FACTORY_HEAP_FRAGS: + /* Fall through */ + case FACTORY_HEAP_FRAGS: bp = factory->heap_frags; } diff --git a/erts/emulator/beam/erl_message.h b/erts/emulator/beam/erl_message.h index 58294648b4..4c2674394e 100644 --- a/erts/emulator/beam/erl_message.h +++ b/erts/emulator/beam/erl_message.h @@ -138,7 +138,7 @@ typedef struct erl_heap_fragment ErlHeapFragment; struct erl_heap_fragment { ErlHeapFragment* next; /* Next heap fragment */ ErlOffHeap off_heap; /* Offset heap data. */ - Uint alloc_size; /* Size in (half)words of mem */ + Uint alloc_size; /* Size in words of mem */ Uint used_size; /* With terms to be moved to heap by GC */ Eterm mem[1]; /* Data */ }; @@ -167,7 +167,6 @@ struct erl_heap_fragment { #define ERL_MESSAGE_REF_FIELDS__ \ ErtsMessage *next; /* Next message */ \ union { \ - ErtsDistExternal *dist_ext; \ ErlHeapFragment *heap_frag; \ void *attached; \ } data; \ @@ -438,7 +437,8 @@ ErlHeapFragment* new_message_buffer(Uint); ErlHeapFragment* erts_resize_message_buffer(ErlHeapFragment *, Uint, Eterm *, Uint); void free_message_buffer(ErlHeapFragment *); -void erts_queue_dist_message(Process*, ErtsProcLocks, ErtsDistExternal *, Eterm, Eterm); +void erts_queue_dist_message(Process*, ErtsProcLocks, ErtsDistExternal *, + ErlHeapFragment *, Eterm, Eterm); void erts_queue_message(Process*, ErtsProcLocks,ErtsMessage*, Eterm, Eterm); void erts_queue_proc_message(Process* from,Process* to, ErtsProcLocks,ErtsMessage*, Eterm); void erts_queue_proc_messages(Process* from, Process* to, ErtsProcLocks, @@ -583,22 +583,11 @@ ERTS_GLB_INLINE Uint erts_used_frag_sz(const ErlHeapFragment* bp) ERTS_GLB_INLINE Uint erts_msg_attached_data_size(ErtsMessage *msg) { ASSERT(msg->data.attached); - if (is_value(ERL_MESSAGE_TERM(msg))) { - ErlHeapFragment *bp; - bp = erts_message_to_heap_frag(msg); - return erts_used_frag_sz(bp); - } - else if (msg->data.dist_ext->heap_size < 0) - return erts_msg_attached_data_size_aux(msg); - else { - Uint sz = msg->data.dist_ext->heap_size; - if (is_not_nil(ERL_MESSAGE_TOKEN(msg))) { - ErlHeapFragment *heap_frag; - heap_frag = erts_dist_ext_trailer(msg->data.dist_ext); - sz += heap_frag->used_size; - } - return sz; - } + + if (ERTS_SIG_IS_INTERNAL_MSG(msg)) + return erts_used_frag_sz(erts_message_to_heap_frag(msg)); + + return erts_msg_attached_data_size_aux(msg); } #endif diff --git a/erts/emulator/beam/erl_node_tables.c b/erts/emulator/beam/erl_node_tables.c index b7dbe625a2..ede9f092e9 100644 --- a/erts/emulator/beam/erl_node_tables.c +++ b/erts/emulator/beam/erl_node_tables.c @@ -201,6 +201,7 @@ dist_table_alloc(void *dep_tmpl) dep->send = NULL; dep->cache = NULL; dep->transcode_ctx = NULL; + dep->sequences = NULL; /* Link in */ @@ -1177,6 +1178,7 @@ static Eterm AM_system; static Eterm AM_timer; static Eterm AM_delayed_delete_timer; static Eterm AM_thread_progress_delete_timer; +static Eterm AM_sequence; static Eterm AM_signal; static void setup_reference_table(void); @@ -1218,6 +1220,7 @@ typedef struct dist_referrer_ { int ctrl_ref; int system_ref; int signal_ref; + int sequence_ref; Eterm id; Uint creation; Uint id_heap[ID_HEAP_SIZE]; @@ -1272,6 +1275,7 @@ erts_get_node_and_dist_references(struct process *proc) INIT_AM(delayed_delete_timer); INIT_AM(thread_progress_delete_timer); INIT_AM(signal); + INIT_AM(sequence); references_atoms_need_init = 0; } @@ -1309,8 +1313,9 @@ erts_get_node_and_dist_references(struct process *proc) #define TIMER_REF 8 #define SYSTEM_REF 9 #define SIGNAL_REF 10 +#define SEQUENCE_REF 11 -#define INC_TAB_SZ 10 +#define INC_TAB_SZ 11 static void insert_dist_referrer(ReferredDist *referred_dist, @@ -1344,6 +1349,7 @@ insert_dist_referrer(ReferredDist *referred_dist, drp->ctrl_ref = 0; drp->system_ref = 0; drp->signal_ref = 0; + drp->sequence_ref = 0; } switch (type) { @@ -1353,6 +1359,7 @@ insert_dist_referrer(ReferredDist *referred_dist, case ETS_REF: drp->ets_ref++; break; case SYSTEM_REF: drp->system_ref++; break; case SIGNAL_REF: drp->signal_ref++; break; + case SEQUENCE_REF: drp->sequence_ref++; break; default: ASSERT(0); } } @@ -1583,6 +1590,20 @@ insert_dist_monitors(DistEntry *dep) } } + +static int +insert_sequence(ErtsDistExternal *edep, void *arg, Sint reds) +{ + insert_dist_entry(edep->dep, SEQUENCE_REF, *(Eterm*)arg, 0); + return 1; +} + +static void +insert_dist_sequences(DistEntry *dep) +{ + erts_dist_seq_tree_foreach(dep, insert_sequence, (void *) &dep->sysname); +} + static void clear_visited_p_monitors(ErtsPTabElementCommon *p) { @@ -1774,11 +1795,9 @@ insert_message(ErtsMessage *msg, int type, Process *proc) else if (ERTS_SIG_IS_INTERNAL_MSG(msg)) heap_frag = msg->data.heap_frag; else { - if (msg->data.dist_ext->dep) - insert_dist_entry(msg->data.dist_ext->dep, - type, proc->common.id, 0); - if (is_not_nil(ERL_MESSAGE_TOKEN(msg))) - heap_frag = erts_dist_ext_trailer(msg->data.dist_ext); + heap_frag = msg->data.heap_frag; + insert_dist_entry(erts_get_dist_ext(heap_frag)->dep, + type, proc->common.id, 0); } } while (heap_frag) { @@ -1819,6 +1838,13 @@ insert_sig_link(ErtsLink *lnk, void *arg, Sint reds) } static void +insert_sig_ext(ErtsDistExternal *edep, void *arg) +{ + Process *proc = arg; + insert_dist_entry(edep->dep, SIGNAL_REF, proc->common.id, 0); +} + +static void setup_reference_table(void) { ErlHeapFragment *hfp; @@ -1898,6 +1924,7 @@ setup_reference_table(void) insert_sig_offheap, insert_sig_monitor, insert_sig_link, + insert_sig_ext, (void *) proc); /* Insert links */ @@ -1989,16 +2016,19 @@ setup_reference_table(void) for(dep = erts_visible_dist_entries; dep; dep = dep->next) { insert_dist_links(dep); insert_dist_monitors(dep); + insert_dist_sequences(dep); } for(dep = erts_hidden_dist_entries; dep; dep = dep->next) { insert_dist_links(dep); insert_dist_monitors(dep); + insert_dist_sequences(dep); } for(dep = erts_pending_dist_entries; dep; dep = dep->next) { insert_dist_links(dep); insert_dist_monitors(dep); + insert_dist_sequences(dep); } /* Not connected dist entries should not have any links, @@ -2006,6 +2036,7 @@ setup_reference_table(void) for(dep = erts_not_connected_dist_entries; dep; dep = dep->next) { insert_dist_links(dep); insert_dist_monitors(dep); + insert_dist_sequences(dep); } /* Insert all ets tables */ @@ -2179,6 +2210,10 @@ reference_table_term(Uint **hpp, ErlOffHeap *ohp, Uint *szp) tup = MK_2TUP(AM_system, MK_UINT(drp->system_ref)); drl = MK_CONS(tup, drl); } + if(drp->sequence_ref) { + tup = MK_2TUP(AM_sequence, MK_UINT(drp->sequence_ref)); + drl = MK_CONS(tup, drl); + } if(drp->signal_ref) { tup = MK_2TUP(AM_signal, MK_UINT(drp->signal_ref)); drl = MK_CONS(tup, drl); @@ -2253,6 +2288,12 @@ static void noop_sig_offheap(ErlOffHeap *oh, void *arg) } +static void noop_sig_ext(ErtsDistExternal *ext, void *arg) +{ + +} + + static void delete_reference_table(void) { @@ -2303,6 +2344,7 @@ delete_reference_table(void) noop_sig_offheap, clear_visited_monitor, clear_visited_link, + noop_sig_ext, (void *) proc); } } diff --git a/erts/emulator/beam/erl_node_tables.h b/erts/emulator/beam/erl_node_tables.h index c44f1f8991..cd5d69a302 100644 --- a/erts/emulator/beam/erl_node_tables.h +++ b/erts/emulator/beam/erl_node_tables.h @@ -98,11 +98,22 @@ struct ErtsDistOutputBuf_ { byte *alloc_endp; #endif ErtsDistOutputBuf *next; - Uint hopefull_flags; + Binary *bin; + /* Pointers to the distribution header, + if NULL the distr header is in the extp */ + byte *hdrp; + byte *hdr_endp; + /* Pointers to the ctl + payload */ byte *extp; byte *ext_endp; + /* Start of payload and hopefull_flags, used by transcode */ + Uint hopefull_flags; byte *msg_start; - byte data[1]; + /* start of the ext buffer, this is not always the same as extp + as the atom cache handling can use less then the allotted buffer. + This value is needed to calculate the size of this output buffer.*/ + byte *ext_start; + }; typedef struct { @@ -161,6 +172,8 @@ struct dist_entry_ { ErtsThrPrgrLaterOp later_op; struct transcode_context* transcode_ctx; + + struct dist_sequences *sequences; /* Ongoing distribution sequences */ }; typedef struct erl_node_ { diff --git a/erts/emulator/beam/erl_proc_sig_queue.c b/erts/emulator/beam/erl_proc_sig_queue.c index 1bd219b6d7..8113abebde 100644 --- a/erts/emulator/beam/erl_proc_sig_queue.c +++ b/erts/emulator/beam/erl_proc_sig_queue.c @@ -36,6 +36,7 @@ #include "erl_port_task.h" #include "erl_trace.h" #include "beam_bp.h" +#include "erl_binary.h" #include "big.h" #include "erl_gc.h" #include "bif.h" @@ -80,7 +81,10 @@ #define ERTS_SIG_Q_TYPE_ADJUST_TRACE_INFO \ ERTS_SIG_Q_TYPE_MAX -#define ERTS_SIG_IS_EXTERNAL(sig) is_non_value(get_exit_signal_data(sig)->reason) +#define ERTS_SIG_IS_GEN_EXIT(sig) \ + (ERTS_PROC_SIG_TYPE(((ErtsSignal *) sig)->common.tag) == ERTS_SIG_Q_TYPE_GEN_EXIT) +#define ERTS_SIG_IS_GEN_EXIT_EXTERNAL(sig) \ + (ASSERT(ERTS_SIG_IS_GEN_EXIT(sig)),is_non_value(get_exit_signal_data(sig)->reason)) Process *ERTS_WRITE_UNLIKELY(erts_dirty_process_signal_handler); Process *ERTS_WRITE_UNLIKELY(erts_dirty_process_signal_handler_high); @@ -114,8 +118,6 @@ typedef struct { Eterm message; Eterm from; Eterm reason; - struct erl_dist_external *dist_ext; - ErlHeapFragment *heap_frag; union { Eterm ref; int normal_kills; @@ -945,13 +947,15 @@ erts_proc_sig_get_external(ErtsMessage *msgp) { if (ERTS_SIG_IS_EXTERNAL_MSG(msgp)) { return erts_get_dist_ext(msgp->data.heap_frag); - } else if (ERTS_SIG_IS_NON_MSG(msgp) && ERTS_SIG_IS_EXTERNAL(msgp)) { + } else if (ERTS_SIG_IS_NON_MSG(msgp) && + ERTS_SIG_IS_GEN_EXIT(msgp) && + ERTS_SIG_IS_GEN_EXIT_EXTERNAL(msgp)) { ErtsDistExternal *edep; ErtsExitSignalData *xsigd = get_exit_signal_data(msgp); ASSERT(ERTS_PROC_SIG_TYPE(((ErtsSignal *) msgp)->common.tag) == ERTS_SIG_Q_TYPE_GEN_EXIT); ASSERT(is_non_value(xsigd->reason)); if (msgp->hfrag.next == NULL) - edep = (ErtsDistExternal*)((void*)xsigd) + sizeof(ErtsExitSignalData); + edep = (ErtsDistExternal*)(xsigd + 1); else edep = erts_get_dist_ext(msgp->hfrag.next); return edep; @@ -965,6 +969,7 @@ static void send_gen_exit_signal(Process *c_p, Eterm from_tag, Eterm from, Eterm to, Sint16 op, Eterm reason, ErtsDistExternal *dist_ext, + ErlHeapFragment *dist_ext_hfrag, Eterm ref, Eterm token, int normal_kills) { ErtsExitSignalData *xsigd; @@ -972,7 +977,7 @@ send_gen_exit_signal(Process *c_p, Eterm from_tag, ErtsMessage *mp; ErlHeapFragment *hfrag; ErlOffHeap *ohp; - Uint hsz, from_sz, reason_sz, ref_sz, token_sz; + Uint hsz, from_sz, reason_sz, ref_sz, token_sz, dist_ext_sz; int seq_trace; #ifdef USE_VM_PROBES Eterm s_utag, utag; @@ -984,7 +989,7 @@ send_gen_exit_signal(Process *c_p, Eterm from_tag, ASSERT(is_immed(from_tag)); - hsz = sizeof(ErtsExitSignalData)/sizeof(Uint); + hsz = sizeof(ErtsExitSignalData)/sizeof(Eterm); seq_trace = c_p && have_seqtrace(token); if (seq_trace) @@ -1012,6 +1017,8 @@ send_gen_exit_signal(Process *c_p, Eterm from_tag, ref_sz = size_object(ref); hsz += ref_sz; + /* The reason was part of the control message, + just use copy it into the xsigd */ if (is_value(reason)) { reason_sz = size_object(reason); hsz += reason_sz; @@ -1032,6 +1039,11 @@ send_gen_exit_signal(Process *c_p, Eterm from_tag, ERTS_INTERNAL_ERROR("Invalid exit signal op"); break; } + } else if (dist_ext != NULL && dist_ext_hfrag == NULL) { + /* The message was not fragmented so we need to create space + for a single dist_ext element */ + dist_ext_sz = erts_dist_ext_size(dist_ext) / sizeof(Eterm); + hsz += dist_ext_sz; } /* @@ -1087,13 +1099,12 @@ send_gen_exit_signal(Process *c_p, Eterm from_tag, hfrag->used_size = hp - start_hp; - xsigd = (ErtsExitSignalData *) (char *) hp; + xsigd = (ErtsExitSignalData *) hp; xsigd->message = s_message; xsigd->from = s_from; xsigd->reason = s_reason; - xsigd->dist_ext = dist_ext; - xsigd->heap_frag = NULL; + hfrag->next = dist_ext_hfrag; if (is_nil(s_ref)) xsigd->u.normal_kills = normal_kills; @@ -1102,6 +1113,15 @@ send_gen_exit_signal(Process *c_p, Eterm from_tag, xsigd->u.ref = s_ref; } + hp += sizeof(ErtsExitSignalData)/sizeof(Eterm); + + if (dist_ext != NULL && dist_ext_hfrag == NULL && is_non_value(reason)) { + erts_make_dist_ext_copy(dist_ext, (ErtsDistExternal *) hp); + hp += dist_ext_sz; + } + + ASSERT(hp == mp->hfrag.mem + mp->hfrag.alloc_size); + if (seq_trace) do_seq_trace_output(to, s_token, s_message); @@ -1234,17 +1254,18 @@ erts_proc_sig_send_exit(Process *c_p, Eterm from, Eterm to, from_tag = dep->sysname; } send_gen_exit_signal(c_p, from_tag, from, to, ERTS_SIG_Q_OP_EXIT, - reason, NULL, NIL, token, normal_kills); + reason, NULL, NULL, NIL, token, normal_kills); } void erts_proc_sig_send_dist_exit(DistEntry *dep, Eterm from, Eterm to, ErtsDistExternal *dist_ext, + ErlHeapFragment *hfrag, Eterm reason, Eterm token) { send_gen_exit_signal(NULL, dep->sysname, from, to, ERTS_SIG_Q_OP_EXIT, - reason, dist_ext, NIL, token, 0); + reason, dist_ext, hfrag, NIL, token, 0); } @@ -1259,7 +1280,7 @@ erts_proc_sig_send_link_exit(Process *c_p, Eterm from, ErtsLink *lnk, if (is_not_immed(reason) || is_not_nil(token)) { ASSERT(is_internal_pid(from) || is_internal_port(from)); send_gen_exit_signal(c_p, from, from, to, ERTS_SIG_Q_OP_EXIT_LINKED, - reason, NULL, NIL, token, 0); + reason, NULL, NULL, NIL, token, 0); } else { /* Pass signal using old link structure... */ @@ -1315,10 +1336,11 @@ void erts_proc_sig_send_dist_link_exit(DistEntry *dep, Eterm from, Eterm to, ErtsDistExternal *dist_ext, + ErlHeapFragment *hfrag, Eterm reason, Eterm token) { send_gen_exit_signal(NULL, dep->sysname, from, to, ERTS_SIG_Q_OP_EXIT_LINKED, - reason, dist_ext, NIL, token, 0); + reason, dist_ext, hfrag, NIL, token, 0); } @@ -1342,6 +1364,7 @@ void erts_proc_sig_send_dist_monitor_down(DistEntry *dep, Eterm ref, Eterm from, Eterm to, ErtsDistExternal *dist_ext, + ErlHeapFragment *hfrag, Eterm reason) { Eterm monitored, heap[3]; @@ -1351,7 +1374,7 @@ erts_proc_sig_send_dist_monitor_down(DistEntry *dep, Eterm ref, monitored = from; send_gen_exit_signal(NULL, dep->sysname, monitored, to, ERTS_SIG_Q_OP_MONITOR_DOWN, - reason, dist_ext, ref, NIL, 0); + reason, dist_ext, hfrag, ref, NIL, 0); } void @@ -1422,7 +1445,7 @@ erts_proc_sig_send_monitor_down(ErtsMonitor *mon, Eterm reason) } send_gen_exit_signal(NULL, from_tag, monitored, to, ERTS_SIG_Q_OP_MONITOR_DOWN, - reason, NULL, mdp->ref, NIL, 0); + reason, NULL, NULL, mdp->ref, NIL, 0); } erts_monitor_release(mon); } @@ -2105,9 +2128,9 @@ handle_exit_signal(Process *c_p, ErtsSigRecvTracing *tracing, } /* This GEN_EXIT was received from another node, decode the exit reason */ - if (ERTS_SIG_IS_EXTERNAL(sig)) + if (ERTS_SIG_IS_GEN_EXIT_EXTERNAL(sig)) erts_proc_sig_decode_dist(c_p, ERTS_PROC_LOCK_MAIN, sig, 1); - + reason = xsigd->reason; if (is_non_value(reason)) { @@ -2115,17 +2138,14 @@ handle_exit_signal(Process *c_p, ErtsSigRecvTracing *tracing, ignore = !0; destroy = !0; } - + if (!ignore) { if ((op != ERTS_SIG_Q_OP_EXIT || reason != am_kill) && (c_p->flags & F_TRAP_EXIT)) { convert_prepared_sig_to_msg(c_p, sig, xsigd->message, next_nm_sig); - ASSERT(sig->hfrag.next == NULL); - sig->hfrag.next = xsigd->heap_frag; conv_msg = sig; - } else if (reason == am_normal && !xsigd->u.normal_kills) { /* Ignore it... */ @@ -2991,50 +3011,29 @@ erts_proc_sig_decode_dist(Process *proc, ErtsProcLocks proc_locks, ErtsMessage *msgp, int force_off_heap) { ErtsHeapFactory factory; + ErlHeapFragment *hfrag; Eterm msg; - ErlHeapFragment *bp; Sint need; - int decode_in_heap_frag; - ErtsDistExternal *dist_ext; + ErtsDistExternal *edep; ErtsExitSignalData *xsigd = NULL; - decode_in_heap_frag = (force_off_heap - || !(proc_locks & ERTS_PROC_LOCK_MAIN) - || (proc->flags & F_OFF_HEAP_MSGQ)); - - if (ERTS_SIG_IS_EXTERNAL_MSG(msgp)) - dist_ext = msgp->data.dist_ext; - else { + edep = erts_proc_sig_get_external(msgp); + if (!ERTS_SIG_IS_EXTERNAL_MSG(msgp)) xsigd = get_exit_signal_data(msgp); - ASSERT(ERTS_PROC_SIG_TYPE(((ErtsSignal *) msgp)->common.tag) == ERTS_SIG_Q_TYPE_GEN_EXIT); - ASSERT(is_non_value(xsigd->reason)); - dist_ext = xsigd->dist_ext; - } - if (dist_ext->heap_size >= 0) - need = dist_ext->heap_size; + if (edep->heap_size >= 0) + need = edep->heap_size; else { - need = erts_decode_dist_ext_size(dist_ext); + need = erts_decode_dist_ext_size(edep, 1); if (need < 0) { - /* bad msg; remove it... */ - if (is_not_immed(ERL_MESSAGE_TOKEN(msgp))) { - bp = erts_dist_ext_trailer(dist_ext); - erts_cleanup_offheap(&bp->off_heap); - } - erts_free_dist_ext_copy(dist_ext); - dist_ext = NULL; + /* bad signal; remove it... */ return 0; } - dist_ext->heap_size = need; + edep->heap_size = need; } - if (is_not_immed(ERL_MESSAGE_TOKEN(msgp))) { - bp = erts_dist_ext_trailer(dist_ext); - need += bp->used_size; - } - - if (xsigd) { + if (ERTS_SIG_IS_NON_MSG(msgp)) { switch (ERTS_PROC_SIG_OP(ERL_MESSAGE_TERM(msgp))) { case ERTS_SIG_Q_OP_EXIT: case ERTS_SIG_Q_OP_EXIT_LINKED: @@ -3051,26 +3050,22 @@ erts_proc_sig_decode_dist(Process *proc, ErtsProcLocks proc_locks, } } - if (decode_in_heap_frag) - erts_factory_heap_frag_init(&factory, new_message_buffer(need)); - else - erts_factory_proc_prealloc_init(&factory, proc, need); - - ASSERT(dist_ext->heap_size >= 0); - if (is_not_immed(ERL_MESSAGE_TOKEN(msgp))) { - ErlHeapFragment *heap_frag; - heap_frag = erts_dist_ext_trailer(dist_ext); - ERL_MESSAGE_TOKEN(msgp) = copy_struct(ERL_MESSAGE_TOKEN(msgp), - heap_frag->used_size, - &factory.hp, - factory.off_heap); - erts_cleanup_offheap(&heap_frag->off_heap); + hfrag = new_message_buffer(need); + erts_factory_heap_frag_init(&factory, hfrag); + + ASSERT(edep->heap_size >= 0); + + msg = erts_decode_dist_ext(&factory, edep, 1); + + if (is_non_value(msg)) { + erts_factory_undo(&factory); + return 0; } - msg = erts_decode_dist_ext(&factory, dist_ext); - if (!xsigd) { + if (ERTS_SIG_IS_MSG(msgp)) { ERL_MESSAGE_TERM(msgp) = msg; - msgp->data.attached = NULL; + if (msgp->data.heap_frag == &msgp->hfrag) + msgp->data.heap_frag = ERTS_MSG_COMBINED_HFRAG; } else { switch (ERTS_PROC_SIG_OP(ERL_MESSAGE_TERM(msgp))) { case ERTS_SIG_Q_OP_EXIT: @@ -3089,22 +3084,21 @@ erts_proc_sig_decode_dist(Process *proc, ErtsProcLocks proc_locks, } xsigd->reason = msg; } - erts_free_dist_ext_copy(dist_ext); - if (is_non_value(msg)) { - erts_factory_undo(&factory); - return 0; - } + erts_free_dist_ext_copy(edep); erts_factory_close(&factory); - ASSERT(!msgp->data.heap_frag || xsigd); + hfrag = factory.heap_frags; + while (hfrag->next) + hfrag = hfrag->next; - if (decode_in_heap_frag) { - if (!xsigd) - msgp->data.heap_frag = factory.heap_frags; - else - xsigd->heap_frag = factory.heap_frags; + if (ERTS_SIG_IS_MSG(msgp) && msgp->data.heap_frag != ERTS_MSG_COMBINED_HFRAG) { + hfrag->next = msgp->data.heap_frag; + msgp->data.heap_frag = factory.heap_frags; + } else { + hfrag->next = msgp->hfrag.next; + msgp->hfrag.next = factory.heap_frags; } return 1; @@ -3274,8 +3268,9 @@ erts_proc_sig_handle_incoming(Process *c_p, erts_aint32_t *statep, xsigd = get_exit_signal_data(sig); /* This GEN_EXIT was received from another node, decode the exit reason */ - if (ERTS_SIG_IS_EXTERNAL(sig)) - erts_proc_sig_decode_dist(c_p, ERTS_PROC_LOCK_MAIN, sig, 1); + if (ERTS_SIG_IS_GEN_EXIT_EXTERNAL(sig)) + if (!erts_proc_sig_decode_dist(c_p, ERTS_PROC_LOCK_MAIN, sig, 1)) + break; /* Decode failed, just remove signal */ omon = erts_monitor_tree_lookup(ERTS_P_MONITORS(c_p), xsigd->u.ref); @@ -4375,11 +4370,13 @@ handle_msg_tracing(Process *c_p, ErtsSigRecvTracing *tracing, return -1; /* Yield... */ } if (ERTS_SIG_IS_EXTERNAL_MSG(sig)) { - cnt++; + cnt += 50; /* Decode is expensive... */ if (!erts_proc_sig_decode_dist(c_p, ERTS_PROC_LOCK_MAIN, sig, 0)) { /* Bad dist message; remove it... */ remove_mq_m_sig(c_p, sig, next_sig, next_nm_sig); + sig->next = NULL; + erts_cleanup_messages(sig); sig = *next_sig; continue; } @@ -4451,18 +4448,12 @@ erts_proc_sig_prep_msgq_for_inspection(Process *c_p, if (ERTS_SIG_IS_EXTERNAL_MSG(mp)) { /* decode it... */ - if (mp->data.attached) - erts_proc_sig_decode_dist(rp, rp_locks, mp, !0); - - msg = ERL_MESSAGE_TERM(mp); - - if (is_non_value(msg)) { + if (!erts_proc_sig_decode_dist(rp, rp_locks, mp, !0)) { ErtsMessage *bad_mp = mp; /* * Bad distribution message; remove * it from the queue... */ - ASSERT(!mp->data.attached); ASSERT(*mpp == bad_mp); @@ -4474,6 +4465,8 @@ erts_proc_sig_prep_msgq_for_inspection(Process *c_p, erts_cleanup_messages(bad_mp); continue; } + + msg = ERL_MESSAGE_TERM(mp); } ASSERT(is_value(msg)); @@ -4632,12 +4625,21 @@ debug_foreach_sig_fake_oh(Eterm term, } +static void +debug_foreach_sig_external(ErtsMessage *msgp, + void (*ext_func)(ErtsDistExternal *, void *), + void *arg) +{ + ext_func(erts_proc_sig_get_external(msgp), arg); +} + void erts_proc_sig_debug_foreach_sig(Process *c_p, void (*msg_func)(ErtsMessage *, void *), void (*oh_func)(ErlOffHeap *, void *), ErtsMonitorFunc mon_func, ErtsLinkFunc lnk_func, + void (*ext_func)(ErtsDistExternal *, void *), void *arg) { ErtsMessage *queue[] = {c_p->sig_qs.first, c_p->sig_qs.cont, c_p->sig_inq.first}; @@ -4646,10 +4648,10 @@ erts_proc_sig_debug_foreach_sig(Process *c_p, for (qix = 0; qix < sizeof(queue)/sizeof(queue[0]); qix++) { ErtsMessage *sig; for (sig = queue[qix]; sig; sig = sig->next) { - - if (ERTS_SIG_IS_MSG(sig)) + + if (ERTS_SIG_IS_MSG(sig)) { msg_func(sig, arg); - else { + } else { Eterm tag; Uint16 type; int op; @@ -4668,7 +4670,10 @@ erts_proc_sig_debug_foreach_sig(Process *c_p, case ERTS_SIG_Q_OP_MONITOR_DOWN: switch (type) { case ERTS_SIG_Q_TYPE_GEN_EXIT: - debug_foreach_sig_heap_frags(&sig->hfrag, oh_func, arg); + if (ERTS_SIG_IS_GEN_EXIT_EXTERNAL(sig)) + debug_foreach_sig_external(sig, ext_func, arg); + else + debug_foreach_sig_heap_frags(&sig->hfrag, oh_func, arg); break; case ERTS_LNK_TYPE_PORT: case ERTS_LNK_TYPE_PROC: diff --git a/erts/emulator/beam/erl_proc_sig_queue.h b/erts/emulator/beam/erl_proc_sig_queue.h index 39cee6f230..2b055e73bc 100644 --- a/erts/emulator/beam/erl_proc_sig_queue.h +++ b/erts/emulator/beam/erl_proc_sig_queue.h @@ -228,6 +228,9 @@ erts_proc_sig_send_exit(Process *c_p, Eterm from, Eterm to, * * @param[in] dist_ext The exit reason in external term format * + * @param[in] hfrag Heap frag with trace token and dist_ext + * iff available, otherwise NULL. + * * @param[in] reason Exit reason. * * @param[in] token Seq trace token. @@ -237,6 +240,7 @@ void erts_proc_sig_send_dist_exit(DistEntry *dep, Eterm from, Eterm to, ErtsDistExternal *dist_ext, + ErlHeapFragment *hfrag, Eterm reason, Eterm token); /** @@ -322,6 +326,9 @@ erts_proc_sig_send_unlink(Process *c_p, ErtsLink *lnk); * * @param[in] dist_ext The exit reason in external term format * + * @param[in] hfrag Heap frag with trace token and dist_ext + * iff available, otherwise NULL. + * * @param[in] reason Exit reason. * * @param[in] token Seq trace token. @@ -331,6 +338,7 @@ void erts_proc_sig_send_dist_link_exit(struct dist_entry_ *dep, Eterm from, Eterm to, ErtsDistExternal *dist_ext, + ErlHeapFragment *hfrag, Eterm reason, Eterm token); /** @@ -426,6 +434,9 @@ erts_proc_sig_send_monitor(ErtsMonitor *mon, Eterm to); * * @param[in] dist_ext The exit reason in external term format * + * @param[in] hfrag Heap frag with trace token and dist_ext + * iff available, otherwise NULL. + * * @param[in] reason Exit reason. * */ @@ -433,6 +444,7 @@ void erts_proc_sig_send_dist_monitor_down(DistEntry *dep, Eterm ref, Eterm from, Eterm to, ErtsDistExternal *dist_ext, + ErlHeapFragment *hfrag, Eterm reason); /** @@ -1035,6 +1047,7 @@ erts_proc_sig_debug_foreach_sig(Process *c_p, void (*oh_func)(ErlOffHeap *, void *), ErtsMonitorFunc mon_func, ErtsLinkFunc lnk_func, + void (*ext_func)(ErtsDistExternal *, void *), void *arg); extern Process *erts_dirty_process_signal_handler; diff --git a/erts/emulator/beam/erl_process.c b/erts/emulator/beam/erl_process.c index 66af9574b8..837b3f4ace 100644 --- a/erts/emulator/beam/erl_process.c +++ b/erts/emulator/beam/erl_process.c @@ -12111,6 +12111,7 @@ erts_proc_exit_handle_monitor(ErtsMonitor *mon, void *vctxt, Sint reds) case ERTS_DSIG_PREP_PENDING: if (dist->connection_id == dsd.connection_id) { code = erts_dsig_send_m_exit(&dsd, + c_p->common.id, watcher, watched, mdp->ref, diff --git a/erts/emulator/beam/erl_process_dump.c b/erts/emulator/beam/erl_process_dump.c index 46e460582c..a164ed543e 100644 --- a/erts/emulator/beam/erl_process_dump.c +++ b/erts/emulator/beam/erl_process_dump.c @@ -191,11 +191,11 @@ static ERTS_INLINE void dump_msg(fmtfn_t to, void *to_arg, ErtsMessage *mp) { if (ERTS_SIG_IS_MSG((ErtsSignal *) mp)) { - Eterm mesg = ERL_MESSAGE_TERM(mp); - if (is_value(mesg)) - dump_element(to, to_arg, mesg); + Eterm mesg; + if (ERTS_SIG_IS_INTERNAL_MSG(mp)) + dump_element(to, to_arg, ERL_MESSAGE_TERM(mp)); else - dump_dist_ext(to, to_arg, mp->data.dist_ext); + dump_dist_ext(to, to_arg, erts_get_dist_ext(mp->data.heap_frag)); mesg = ERL_MESSAGE_TOKEN(mp); erts_print(to, to_arg, ":"); dump_element(to, to_arg, mesg); @@ -267,6 +267,7 @@ dump_dist_ext(fmtfn_t to, void *to_arg, ErtsDistExternal *edep) else { byte *e; size_t sz; + int i; if (!(edep->flags & ERTS_DIST_EXT_ATOM_TRANS_TAB)) erts_print(to, to_arg, "D0:"); @@ -276,8 +277,8 @@ dump_dist_ext(fmtfn_t to, void *to_arg, ErtsDistExternal *edep) for (i = 0; i < edep->attab.size; i++) dump_element(to, to_arg, edep->attab.atom[i]); } - sz = edep->ext_endp - edep->extp; - e = edep->extp; + sz = edep->data->ext_endp - edep->data->extp; + e = edep->data->extp; if (edep->flags & ERTS_DIST_EXT_DFLAG_HDR) { ASSERT(*e != VERSION_MAGIC); sz++; @@ -288,15 +289,19 @@ dump_dist_ext(fmtfn_t to, void *to_arg, ErtsDistExternal *edep) erts_print(to, to_arg, "E%X:", sz); if (edep->flags & ERTS_DIST_EXT_DFLAG_HDR) { byte sbuf[3]; - int i = 0; + + i = 0; sbuf[i++] = VERSION_MAGIC; - while (i < sizeof(sbuf) && e < edep->ext_endp) { + while (i < sizeof(sbuf) && e < edep->data->ext_endp) { sbuf[i++] = *e++; } erts_print_base64(to, to_arg, sbuf, i); } - erts_print_base64(to, to_arg, e, edep->ext_endp - e); + erts_print_base64(to, to_arg, e, edep->data->ext_endp - e); + for (i = 1; i < edep->data->frag_id; i++) + erts_print_base64(to, to_arg, edep->data[i].extp, + edep->data[i].ext_endp - edep->data[i].extp); } } diff --git a/erts/emulator/beam/erl_ptab.h b/erts/emulator/beam/erl_ptab.h index 94f0247492..c30a684002 100644 --- a/erts/emulator/beam/erl_ptab.h +++ b/erts/emulator/beam/erl_ptab.h @@ -68,8 +68,11 @@ typedef struct { Uint64 started_interval; struct reg_proc *reg; ErtsLink *links; - ErtsMonitor *monitors; + /* Local target monitors, double linked list + contains the remote part of local monitors */ ErtsMonitor *lt_monitors; + /* other monitors, rb tree */ + ErtsMonitor *monitors; } alive; /* --- While being released --- */ diff --git a/erts/emulator/beam/external.c b/erts/emulator/beam/external.c index 640ca338d9..84e6e0d6fd 100644 --- a/erts/emulator/beam/external.c +++ b/erts/emulator/beam/external.c @@ -262,19 +262,12 @@ erts_finalize_atom_cache_map(ErtsAtomCacheMap *acmp, Uint32 dflags) if (acmp) { int long_atoms = 0; /* !0 if one or more atoms are longer than 255. */ int i; - int sz; - int fix_sz - = 1 /* VERSION_MAGIC */ - + 1 /* DIST_HEADER */ - + 1 /* dist header flags */ - + 1 /* number of internal cache entries */ - ; + int sz = 0; int min_sz; ASSERT(dflags & DFLAG_UTF8_ATOMS); ASSERT(acmp->hdr_sz < 0); /* Make sure cache update instructions fit */ - min_sz = fix_sz+(2+4)*acmp->sz; - sz = fix_sz; + min_sz = (2+4)*acmp->sz; for (i = 0; i < acmp->sz; i++) { Atom *a; Eterm atom; @@ -302,17 +295,28 @@ erts_finalize_atom_cache_map(ErtsAtomCacheMap *acmp, Uint32 dflags) } Uint -erts_encode_ext_dist_header_size(ErtsAtomCacheMap *acmp) +erts_encode_ext_dist_header_size(ErtsAtomCacheMap *acmp, Uint fragments) { if (!acmp) return 0; else { + int fix_sz + = 1 /* VERSION_MAGIC */ + + 1 /* DIST_HEADER */ + + 1 /* dist header flags */ + + 1 /* number of internal cache entries */ + ; ASSERT(acmp->hdr_sz >= 0); - return acmp->hdr_sz; + if (fragments > 1) + fix_sz += 8 /* sequence id */ + + 8 /* number of fragments */ + ; + return fix_sz + acmp->hdr_sz; } } -byte *erts_encode_ext_dist_header_setup(byte *ctl_ext, ErtsAtomCacheMap *acmp) +byte *erts_encode_ext_dist_header_setup(byte *ctl_ext, ErtsAtomCacheMap *acmp, + Uint fragments, Eterm from) { /* Maximum number of atom must be less than the maximum of a 32 bits unsigned integer. Check is done in erl_init.c, erl_start function. */ @@ -346,12 +350,37 @@ byte *erts_encode_ext_dist_header_setup(byte *ctl_ext, ErtsAtomCacheMap *acmp) put_int8(acmp->sz, ep); --ep; put_int8(dist_hdr_flags, ep); - *--ep = DIST_HEADER; - *--ep = VERSION_MAGIC; + if (fragments > 1) { + ASSERT(is_pid(from)); + ep -= 8; + put_int64(fragments, ep); + ep -= 8; + put_int64(from, ep); + *--ep = DIST_FRAG_HEADER; + } else { + *--ep = DIST_HEADER; + } + *--ep = VERSION_MAGIC; return ep; } } +byte *erts_encode_ext_dist_header_fragment(byte **hdrpp, + Uint fragment, + Eterm from) +{ + byte *ep = *hdrpp, *start = ep; + ASSERT(is_pid(from)); + *ep++ = VERSION_MAGIC; + *ep++ = DIST_FRAG_CONT; + put_int64(from, ep); + ep += 8; + put_int64(fragment, ep); + ep += 8; + *hdrpp = ep; + return start; +} + #define PASS_THROUGH 'p' @@ -365,7 +394,8 @@ Sint erts_encode_ext_dist_header_finalize(ErtsDistOutputBuf* ob, int ci, sz; byte dist_hdr_flags; int long_atoms; - register byte *ep = ob->extp; + Uint64 seq_id = 0, frag_id = 0; + register byte *ep = ob->hdrp ? ob->hdrp : ob->extp; ASSERT(dflags & DFLAG_UTF8_ATOMS); /* @@ -416,7 +446,7 @@ Sint erts_encode_ext_dist_header_finalize(ErtsDistOutputBuf* ob, } goto done; } - else if (ep[1] != DIST_HEADER) { + else if (ep[1] != DIST_HEADER && ep[1] != DIST_FRAG_HEADER && ep[1] != DIST_FRAG_CONT) { ASSERT(ep[1] == SMALL_TUPLE_EXT || ep[1] == LARGE_TUPLE_EXT); ASSERT(!(dflags & DFLAG_DIST_HDR_ATOM_CACHE)); /* Node without atom cache, 'pass through' needed */ @@ -424,6 +454,17 @@ Sint erts_encode_ext_dist_header_finalize(ErtsDistOutputBuf* ob, goto done; } + if (ep[1] == DIST_FRAG_CONT) { + ep = ob->extp; + goto done; + } else if (ep[1] == DIST_FRAG_HEADER) { + /* skip the seq id and frag id */ + seq_id = get_int64(&ep[2]); + ep += 8; + frag_id = get_int64(&ep[2]); + ep += 8; + } + dist_hdr_flags = ep[2]; long_atoms = ERTS_DIST_HDR_LONG_ATOMS_FLG & ((int) dist_hdr_flags); @@ -546,11 +587,19 @@ Sint erts_encode_ext_dist_header_finalize(ErtsDistOutputBuf* ob, } --ep; put_int8(ci, ep); - *--ep = DIST_HEADER; + if (seq_id) { + ep -= 8; + put_int64(frag_id, ep); + ep -= 8; + put_int64(seq_id, ep); + *--ep = DIST_FRAG_HEADER; + } else { + *--ep = DIST_HEADER; + } *--ep = VERSION_MAGIC; done: ob->extp = ep; - ASSERT(&ob->data[0] <= ob->extp && ob->extp < ob->ext_endp); + ASSERT((byte*)ob->bin->orig_bytes <= ob->extp && ob->extp < ob->ext_endp); return reds < 0 ? 0 : reds; } @@ -635,72 +684,90 @@ byte* erts_encode_ext_ets(Eterm term, byte *ep, struct erl_off_heap_header** off off_heap); } -ErtsDistExternal * -erts_make_dist_ext_copy(ErtsDistExternal *edep, Eterm *token) + +static Uint +dist_ext_size(ErtsDistExternal *edep) { - size_t align_sz; - size_t dist_ext_sz; - size_t ext_sz = 0; - size_t token_sz = 0; - Eterm token_size; - byte *ep; - ErtsDistExternal *new_edep; + Uint sz = sizeof(ErtsDistExternal); - dist_ext_sz = ERTS_DIST_EXT_SIZE(edep); - ASSERT(edep->ext_endp && edep->extp); - ASSERT(edep->ext_endp >= edep->extp); - if (edep->binp == NULL) - ext_sz = edep->ext_endp - edep->extp; + ASSERT(edep->data->ext_endp && edep->data->extp); + ASSERT(edep->data->ext_endp >= edep->data->extp); - align_sz = ERTS_EXTRA_DATA_ALIGN_SZ(dist_ext_sz + ext_sz); + if (edep->flags & ERTS_DIST_EXT_ATOM_TRANS_TAB) { + ASSERT(0 <= edep->attab.size \ + && edep->attab.size <= ERTS_ATOM_CACHE_SIZE); + sz -= sizeof(Eterm)*(ERTS_ATOM_CACHE_SIZE - edep->attab.size); + } else { + sz -= sizeof(ErtsAtomTranslationTable); + } + return sz; +} - token_size = size_object(*token); - if (token_size) - token_sz = ERTS_HEAP_FRAG_SIZE(token_size); +Uint +erts_dist_ext_size(ErtsDistExternal *edep) +{ + Uint sz = dist_ext_size(edep); + sz += edep->data[0].frag_id * sizeof(ErtsDistExternalData); + return sz + ERTS_EXTRA_DATA_ALIGN_SZ(sz); +} - new_edep = erts_alloc(ERTS_ALC_T_EXT_TERM_DATA, - dist_ext_sz + ext_sz + align_sz + token_sz); +Uint +erts_dist_ext_data_size(ErtsDistExternal *edep) +{ + Uint sz = 0, i; + for (i = 0; i < edep->data->frag_id; i++) + sz += edep->data[i].ext_endp - edep->data[i].extp; + return sz; +} - ep = (byte *) new_edep; - sys_memcpy((void *) ep, (void *) edep, dist_ext_sz); - if (new_edep->dep) - erts_ref_dist_entry(new_edep->dep); - new_edep->heap_size = -1; - - if (ext_sz) { - ep += dist_ext_sz; - new_edep->extp = ep; - new_edep->ext_endp = ep + ext_sz; - sys_memcpy((void *) ep, (void *) edep->extp, ext_sz); +void +erts_dist_ext_frag(ErtsDistExternalData *ede_datap, ErtsDistExternal *edep) +{ + ErtsDistExternalData *new_ede_datap = &edep->data[edep->data->frag_id - ede_datap->frag_id]; + sys_memcpy(new_ede_datap, ede_datap, sizeof(ErtsDistExternalData)); + + /* If the data is not backed by a binary, we create one here to keep + things simple. Only custom distribution drivers should use lists. */ + if (new_ede_datap->binp == NULL) { + size_t ext_sz = ede_datap->ext_endp - ede_datap->extp; + new_ede_datap->binp = erts_bin_nrml_alloc(ext_sz); + sys_memcpy(new_ede_datap->binp->orig_bytes, (void *) ede_datap->extp, ext_sz); + new_ede_datap->extp = (byte*)new_ede_datap->binp->orig_bytes; + new_ede_datap->ext_endp = (byte*)new_ede_datap->binp->orig_bytes + ext_sz; } else { - erts_refc_inc(&new_edep->binp->intern.refc, 2); + erts_refc_inc(&new_ede_datap->binp->intern.refc, 2); } +} - /* Copy the seq_trace token */ - if (is_not_nil(*token)) { - ErlHeapFragment *heap_frag; - ErlOffHeap *ohp; - Eterm *hp; - heap_frag = erts_dist_ext_trailer(new_edep); - ERTS_INIT_HEAP_FRAG(heap_frag, token_size, token_size); - hp = heap_frag->mem; - ohp = &heap_frag->off_heap; - *token = copy_struct(*token, token_size, &hp, ohp); - } - return new_edep; +void +erts_make_dist_ext_copy(ErtsDistExternal *edep, ErtsDistExternal *new_edep) +{ + size_t dist_ext_sz = dist_ext_size(edep); + byte *ep; + + ep = (byte *) new_edep; + sys_memcpy((void *) ep, (void *) edep, dist_ext_sz); + erts_ref_dist_entry(new_edep->dep); + + ep += dist_ext_sz; + + new_edep->data = (ErtsDistExternalData*)ep; + sys_memzero(new_edep->data, sizeof(ErtsDistExternalData) * edep->data->frag_id); + new_edep->data->frag_id = edep->data->frag_id; + erts_dist_ext_frag(edep->data, new_edep); } void erts_free_dist_ext_copy(ErtsDistExternal *edep) { - if (edep->dep) - erts_deref_dist_entry(edep->dep); - if (edep->binp) - erts_bin_release(edep->binp); - erts_free(ERTS_ALC_T_EXT_TERM_DATA, edep); + int i; + erts_deref_dist_entry(edep->dep); + for (i = 0; i < edep->data->frag_id; i++) + if (edep->data[i].binp) + erts_bin_release(edep->data[i].binp); } -int +ErtsPrepDistExtRes erts_prepare_dist_ext(ErtsDistExternal *edep, byte *ext, Uint size, @@ -711,10 +778,6 @@ erts_prepare_dist_ext(ErtsDistExternal *edep, { register byte *ep; - edep->heap_size = -1; - edep->flags = 0; - edep->dep = dep; - ASSERT(dep); erts_de_rlock(dep); @@ -734,10 +797,6 @@ erts_prepare_dist_ext(ErtsDistExternal *edep, size--; } - edep->heap_size = -1; - edep->ext_endp = ext + size; - edep->binp = binp; - ep = ext; if (size < 2) @@ -753,16 +812,33 @@ erts_prepare_dist_ext(ErtsDistExternal *edep, goto fail; } + edep->heap_size = -1; + edep->flags = 0; + edep->dep = dep; + edep->connection_id = conn_id; + edep->data->ext_endp = ext+size; + edep->data->binp = binp; + edep->data->seq_id = 0; + edep->data->frag_id = 1; + if (dep->flags & DFLAG_DIST_HDR_ATOM_CACHE) edep->flags |= ERTS_DIST_EXT_DFLAG_HDR; - edep->connection_id = dep->connection_id; - - if (ep[1] != DIST_HEADER) { + if (ep[1] != DIST_HEADER && ep[1] != DIST_FRAG_HEADER && ep[1] != DIST_FRAG_CONT) { if (edep->flags & ERTS_DIST_EXT_DFLAG_HDR) goto bad_hdr; edep->attab.size = 0; - edep->extp = ext; + edep->data->extp = ext; + } + else if (ep[1] == DIST_FRAG_CONT) { + if (!(dep->flags & DFLAG_FRAGMENTS)) + goto bad_hdr; + edep->attab.size = 0; + edep->data->extp = ext + 1 + 1 + 8 + 8; + edep->data->seq_id = get_int64(&ep[2]); + edep->data->frag_id = get_int64(&ep[2+8]); + erts_de_runlock(dep); + return ERTS_PREP_DIST_EXT_FRAG_CONT; } else { int tix; @@ -771,9 +847,17 @@ erts_prepare_dist_ext(ErtsDistExternal *edep, if (!(edep->flags & ERTS_DIST_EXT_DFLAG_HDR)) goto bad_hdr; + if (ep[1] == DIST_FRAG_HEADER) { + if (!(dep->flags & DFLAG_FRAGMENTS)) + goto bad_hdr; + edep->data->seq_id = get_int64(&ep[2]); + edep->data->frag_id = get_int64(&ep[2+8]); + ep += 16; + } + #undef CHKSIZE #define CHKSIZE(SZ) \ - do { if ((SZ) > edep->ext_endp - ep) goto bad_hdr; } while(0) + do { if ((SZ) > edep->data->ext_endp - ep) goto bad_hdr; } while(0) CHKSIZE(1+1+1); ep += 2; @@ -903,7 +987,7 @@ erts_prepare_dist_ext(ErtsDistExternal *edep, #endif } } - edep->extp = ep; + edep->data->extp = ep; #ifdef ERTS_DEBUG_USE_DIST_SEP if (*ep != VERSION_MAGIC) goto bad_hdr; @@ -928,7 +1012,7 @@ erts_prepare_dist_ext(ErtsDistExternal *edep, erts_this_node->sysname, edep->dep->sysname, dist_entry_channel_no(edep->dep)); - for (ep = ext; ep < edep->ext_endp; ep++) + for (ep = ext; ep < edep->data->ext_endp; ep++) erts_dsprintf(dsbufp, ep != ext ? ",%b8u" : "<<%b8u", *ep); erts_dsprintf(dsbufp, ">>"); erts_send_warning_to_logger_nogl(dsbufp); @@ -953,9 +1037,9 @@ bad_dist_ext(ErtsDistExternal *edep) erts_this_node->sysname, dep->sysname, dist_entry_channel_no(dep)); - for (ep = edep->extp; ep < edep->ext_endp; ep++) + for (ep = edep->data->extp; ep < edep->data->ext_endp; ep++) erts_dsprintf(dsbufp, - ep != edep->extp ? ",%b8u" : "<<...,%b8u", + ep != edep->data->extp ? ",%b8u" : "<<...,%b8u", *ep); erts_dsprintf(dsbufp, ">>\n"); erts_dsprintf(dsbufp, "ATOM_CACHE_REF translations: "); @@ -973,30 +1057,32 @@ bad_dist_ext(ErtsDistExternal *edep) } Sint -erts_decode_dist_ext_size(ErtsDistExternal *edep) +erts_decode_dist_ext_size(ErtsDistExternal *edep, int kill_connection) { Sint res; byte *ep; - if (edep->extp >= edep->ext_endp) + + if (edep->data->extp >= edep->data->ext_endp) goto fail; #ifndef ERTS_DEBUG_USE_DIST_SEP if (edep->flags & ERTS_DIST_EXT_DFLAG_HDR) { - if (*edep->extp == VERSION_MAGIC) + if (*edep->data->extp == VERSION_MAGIC) goto fail; - ep = edep->extp; + ep = edep->data->extp; } else #endif { - if (*edep->extp != VERSION_MAGIC) + if (*edep->data->extp != VERSION_MAGIC) goto fail; - ep = edep->extp+1; + ep = edep->data->extp+1; } - res = decoded_size(ep, edep->ext_endp, 0, NULL); + res = decoded_size(ep, edep->data->ext_endp, 0, NULL); if (res >= 0) return res; fail: - bad_dist_ext(edep); + if (kill_connection) + bad_dist_ext(edep); return -1; } @@ -1022,12 +1108,15 @@ Sint erts_decode_ext_size_ets(byte *ext, Uint size) */ Eterm erts_decode_dist_ext(ErtsHeapFactory* factory, - ErtsDistExternal *edep) + ErtsDistExternal *edep, + int kill_connection) { Eterm obj; - byte* ep = edep->extp; + byte* ep; + + ep = edep->data->extp; - if (ep >= edep->ext_endp) + if (ep >= edep->data->ext_endp) goto error; #ifndef ERTS_DEBUG_USE_DIST_SEP if (edep->flags & ERTS_DIST_EXT_DFLAG_HDR) { @@ -1045,14 +1134,15 @@ erts_decode_dist_ext(ErtsHeapFactory* factory, if (!ep) goto error; - edep->extp = ep; + edep->data->extp = ep; return obj; error: erts_factory_undo(factory); - bad_dist_ext(edep); + if (kill_connection) + bad_dist_ext(edep); return THE_NON_VALUE; } @@ -1097,6 +1187,7 @@ BIF_RETTYPE erts_debug_dist_ext_to_term_2(BIF_ALIST_2) Eterm res; Sint hsz; ErtsDistExternal ede; + ErtsDistExternalData ede_data; Eterm *tp; Eterm real_bin; Uint offset; @@ -1109,7 +1200,8 @@ BIF_RETTYPE erts_debug_dist_ext_to_term_2(BIF_ALIST_2) ede.flags = ERTS_DIST_EXT_ATOM_TRANS_TAB; ede.dep = NULL; ede.heap_size = -1; - + ede.data = &ede_data; + if (is_not_tuple(BIF_ARG_1)) goto badarg; tp = tuple_val(BIF_ARG_1); @@ -1134,15 +1226,15 @@ BIF_RETTYPE erts_debug_dist_ext_to_term_2(BIF_ALIST_2) if (bitsize != 0) goto badarg; - ede.extp = binary_bytes(real_bin)+offset; - ede.ext_endp = ede.extp + size; + ede.data->extp = binary_bytes(real_bin)+offset; + ede.data->ext_endp = ede.data->extp + size; - hsz = erts_decode_dist_ext_size(&ede); + hsz = erts_decode_dist_ext_size(&ede, 1); if (hsz < 0) goto badarg; erts_factory_proc_prealloc_init(&factory, BIF_P, hsz); - res = erts_decode_dist_ext(&factory, &ede); + res = erts_decode_dist_ext(&factory, &ede, 1); erts_factory_close(&factory); if (is_value(res)) diff --git a/erts/emulator/beam/external.h b/erts/emulator/beam/external.h index 91c028c9bd..396cd9f802 100644 --- a/erts/emulator/beam/external.h +++ b/erts/emulator/beam/external.h @@ -58,6 +58,8 @@ #define SMALL_ATOM_UTF8_EXT 'w' #define DIST_HEADER 'D' +#define DIST_FRAG_HEADER 'E' +#define DIST_FRAG_CONT 'F' #define ATOM_CACHE_REF 'R' #define ATOM_INTERNAL_REF2 'I' #define ATOM_INTERNAL_REF3 'K' @@ -123,15 +125,22 @@ typedef struct { #define ERTS_DIST_CON_ID_MASK ((Uint32) 0x00ffffff) /* also in net_kernel.erl */ struct binary; +typedef struct erl_dist_external_data ErtsDistExternalData; -typedef struct erl_dist_external { - DistEntry *dep; +struct erl_dist_external_data { + Uint64 seq_id; + Uint64 frag_id; byte *extp; byte *ext_endp; struct binary *binp; +}; + +typedef struct erl_dist_external { Sint heap_size; - Uint32 connection_id; + DistEntry *dep; Uint32 flags; + Uint32 connection_id; + ErtsDistExternalData *data; ErtsAtomTranslationTable attab; } ErtsDistExternal; @@ -158,8 +167,9 @@ void erts_reset_atom_cache_map(ErtsAtomCacheMap *); void erts_destroy_atom_cache_map(ErtsAtomCacheMap *); void erts_finalize_atom_cache_map(ErtsAtomCacheMap *, Uint32); -Uint erts_encode_ext_dist_header_size(ErtsAtomCacheMap *); -byte *erts_encode_ext_dist_header_setup(byte *, ErtsAtomCacheMap *); +Uint erts_encode_ext_dist_header_size(ErtsAtomCacheMap *, Uint); +byte *erts_encode_ext_dist_header_setup(byte *, ErtsAtomCacheMap *, Uint, Eterm); +byte *erts_encode_ext_dist_header_fragment(byte **, Uint, Eterm); Sint erts_encode_ext_dist_header_finalize(ErtsDistOutputBuf*, DistEntry *, Uint32 dflags, Sint reds); struct erts_dsig_send_context; int erts_encode_dist_ext_size(Eterm, Uint32, ErtsAtomCacheMap*, Uint* szp); @@ -174,20 +184,24 @@ Uint erts_encode_ext_size_ets(Eterm); void erts_encode_ext(Eterm, byte **); byte* erts_encode_ext_ets(Eterm, byte *, struct erl_off_heap_header** ext_off_heap); +Uint erts_dist_ext_size(ErtsDistExternal *); +Uint erts_dist_ext_data_size(ErtsDistExternal *); void erts_free_dist_ext_copy(ErtsDistExternal *); -ERTS_GLB_INLINE void *erts_dist_ext_trailer(ErtsDistExternal *); -ErtsDistExternal *erts_make_dist_ext_copy(ErtsDistExternal *, Eterm *); -void *erts_dist_ext_trailer(ErtsDistExternal *); -void erts_destroy_dist_ext_copy(ErtsDistExternal *); - -#define ERTS_PREP_DIST_EXT_FAILED (-1) -#define ERTS_PREP_DIST_EXT_SUCCESS (0) -#define ERTS_PREP_DIST_EXT_CLOSED (1) - -int erts_prepare_dist_ext(ErtsDistExternal *, byte *, Uint, struct binary *, - DistEntry *, Uint32, ErtsAtomCache *); -Sint erts_decode_dist_ext_size(ErtsDistExternal *); -Eterm erts_decode_dist_ext(ErtsHeapFactory* factory, ErtsDistExternal *); +void erts_make_dist_ext_copy(ErtsDistExternal *, ErtsDistExternal *); +void erts_dist_ext_frag(ErtsDistExternalData *, ErtsDistExternal *); +#define erts_get_dist_ext(HFRAG) ((ErtsDistExternal*)((HFRAG)->mem + (HFRAG)->used_size)) + +typedef enum { + ERTS_PREP_DIST_EXT_FAILED, + ERTS_PREP_DIST_EXT_SUCCESS, + ERTS_PREP_DIST_EXT_FRAG_CONT, + ERTS_PREP_DIST_EXT_CLOSED +} ErtsPrepDistExtRes; + +ErtsPrepDistExtRes erts_prepare_dist_ext(ErtsDistExternal *, byte *, Uint, struct binary *, + DistEntry *, Uint32, ErtsAtomCache *); +Sint erts_decode_dist_ext_size(ErtsDistExternal *, int); +Eterm erts_decode_dist_ext(ErtsHeapFactory*, ErtsDistExternal *, int); Sint erts_decode_ext_size(byte*, Uint); Sint erts_decode_ext_size_ets(byte*, Uint); @@ -203,17 +217,4 @@ int erts_debug_max_atom_out_cache_index(void); int erts_debug_atom_to_out_cache_index(Eterm); void transcode_free_ctx(DistEntry* dep); -#if ERTS_GLB_INLINE_INCL_FUNC_DEF - -ERTS_GLB_INLINE void * -erts_dist_ext_trailer(ErtsDistExternal *edep) -{ - void *res = (void *) (edep->ext_endp - + ERTS_EXTRA_DATA_ALIGN_SZ(edep->ext_endp)); - ASSERT((((UWord) res) % sizeof(Uint)) == 0); - return res; -} - -#endif - #endif /* ERL_EXTERNAL_H__ */ |