diff options
Diffstat (limited to 'erts/emulator')
-rw-r--r-- | erts/emulator/beam/dist.c | 284 | ||||
-rw-r--r-- | erts/emulator/beam/dist.h | 52 | ||||
-rw-r--r-- | erts/emulator/beam/erl_message.c | 124 | ||||
-rw-r--r-- | erts/emulator/beam/erl_message.h | 2 | ||||
-rw-r--r-- | erts/emulator/beam/erl_proc_sig_queue.c | 312 | ||||
-rw-r--r-- | erts/emulator/beam/erl_proc_sig_queue.h | 69 | ||||
-rw-r--r-- | erts/emulator/beam/external.c | 22 | ||||
-rw-r--r-- | erts/emulator/beam/external.h | 2 | ||||
-rw-r--r-- | erts/emulator/beam/msg_instrs.tab | 4 | ||||
-rw-r--r-- | erts/emulator/hipe/hipe_native_bif.c | 2 |
10 files changed, 593 insertions, 280 deletions
diff --git a/erts/emulator/beam/dist.c b/erts/emulator/beam/dist.c index 3b6c0e5db9..91a9481d84 100644 --- a/erts/emulator/beam/dist.c +++ b/erts/emulator/beam/dist.c @@ -50,7 +50,8 @@ #define DIST_CTL_DEFAULT_SIZE 64 /* Turn this on to get printouts of all distribution messages - * which go on the line + * which go on the line. Enabling this may make some testcases + * fail. Especially the broken dist testcases in distribution_SUITE. */ #if 0 #define ERTS_DIST_MSG_DBG @@ -71,8 +72,6 @@ static void dist_msg_dbg(ErtsDistExternal *edep, char *what, byte *buf, int sz) { ErtsHeapFactory factory; - DeclareTmpHeapNoproc(ctl_default,DIST_CTL_DEFAULT_SIZE); - Eterm* ctl = ctl_default; byte *extp = edep->extp; Eterm msg; Sint ctl_len; @@ -116,6 +115,7 @@ static Export *dist_ctrl_put_data_trap; /* forward declarations */ +static int dsig_send_exit(ErtsDSigData* dsdp, Eterm ctl, Eterm msg, int force_busy); static int dsig_send_ctl(ErtsDSigData* dsdp, Eterm ctl, int force_busy); static void send_nodes_mon_msgs(Process *, Eterm, Eterm, Eterm, Eterm); static void init_nodes_monitors(void); @@ -848,7 +848,7 @@ int erts_dsig_send_m_exit(ErtsDSigData *dsdp, Eterm watcher, Eterm watched, Eterm ref, Eterm reason) { - Eterm ctl; + Eterm ctl, msg; DeclareTmpHeapNoproc(ctl_heap,6); int res; @@ -861,10 +861,17 @@ erts_dsig_send_m_exit(ErtsDSigData *dsdp, Eterm watcher, Eterm watched, UseTmpHeapNoproc(6); - ctl = TUPLE5(&ctl_heap[0], make_small(DOP_MONITOR_P_EXIT), - watched, watcher, ref, reason); + if (dsdp->dep->flags & DFLAG_EXIT_PAYLOAD) { + ctl = TUPLE4(&ctl_heap[0], make_small(DOP_PAYLOAD_MONITOR_P_EXIT), + watched, watcher, ref); + msg = reason; + } else { + ctl = TUPLE5(&ctl_heap[0], make_small(DOP_MONITOR_P_EXIT), + watched, watcher, ref, reason); + msg = THE_NON_VALUE; + } - res = dsig_send_ctl(dsdp, ctl, 1); + res = dsig_send_exit(dsdp, ctl, msg, 1); UnUseTmpHeapNoproc(6); return res; } @@ -1018,6 +1025,7 @@ erts_dsig_send_msg(Eterm remote, Eterm message, ErtsSendContext* ctx) ctx->dss.ctl = ctl; ctx->dss.msg = message; ctx->dss.force_busy = 0; + ctx->dss.force_encode = 0; res = erts_dsig_send(&ctx->dsd, &ctx->dss); return res; } @@ -1077,6 +1085,7 @@ erts_dsig_send_reg_msg(Eterm remote_name, Eterm message, ctx->dss.ctl = ctl; ctx->dss.msg = message; ctx->dss.force_busy = 0; + ctx->dss.force_encode = 0; res = erts_dsig_send(&ctx->dsd, &ctx->dss); return res; } @@ -1086,7 +1095,7 @@ int erts_dsig_send_exit_tt(ErtsDSigData *dsdp, Eterm local, Eterm remote, Eterm reason, Eterm token) { - Eterm ctl; + Eterm ctl, msg = THE_NON_VALUE; DeclareTmpHeapNoproc(ctl_heap,6); int res; #ifdef USE_VM_PROBES @@ -1101,13 +1110,24 @@ erts_dsig_send_exit_tt(ErtsDSigData *dsdp, Eterm local, Eterm remote, #endif UseTmpHeapNoproc(6); + + if (dsdp->dep->flags & DFLAG_EXIT_PAYLOAD) + msg = reason; + if (have_seqtrace(token)) { seq_trace_update_send(dsdp->proc); seq_trace_output_exit(token, reason, SEQ_TRACE_SEND, remote, local); - ctl = TUPLE5(&ctl_heap[0], - make_small(DOP_EXIT_TT), local, remote, token, reason); + if (dsdp->dep->flags & DFLAG_EXIT_PAYLOAD) { + ctl = TUPLE4(&ctl_heap[0], + make_small(DOP_PAYLOAD_EXIT_TT), local, remote, token); + } else + ctl = TUPLE5(&ctl_heap[0], + make_small(DOP_EXIT_TT), local, remote, token, reason); } else { - ctl = TUPLE4(&ctl_heap[0], make_small(DOP_EXIT), local, remote, reason); + if (dsdp->dep->flags & DFLAG_EXIT_PAYLOAD) + ctl = TUPLE3(&ctl_heap[0], make_small(DOP_PAYLOAD_EXIT), local, remote); + else + ctl = TUPLE4(&ctl_heap[0], make_small(DOP_EXIT), local, remote, reason); } #ifdef USE_VM_PROBES *node_name = *sender_name = *remote_name = '\0'; @@ -1129,8 +1149,7 @@ erts_dsig_send_exit_tt(ErtsDSigData *dsdp, Eterm local, Eterm remote, #endif DTRACE7(process_exit_signal_remote, sender_name, node_name, remote_name, reason_str, tok_label, tok_lastcnt, tok_serial); - /* forced, i.e ignore busy */ - res = dsig_send_ctl(dsdp, ctl, 1); + res = dsig_send_exit(dsdp, ctl, msg, 1); UnUseTmpHeapNoproc(6); return res; } @@ -1140,13 +1159,18 @@ erts_dsig_send_exit(ErtsDSigData *dsdp, Eterm local, Eterm remote, Eterm reason) { DeclareTmpHeapNoproc(ctl_heap,5); int res; - Eterm ctl; + Eterm ctl, msg = dsdp->dep->flags & DFLAG_EXIT_PAYLOAD ? reason : THE_NON_VALUE; UseTmpHeapNoproc(5); - ctl = TUPLE4(&ctl_heap[0], - make_small(DOP_EXIT), local, remote, reason); - /* forced, i.e ignore busy */ - res = dsig_send_ctl(dsdp, ctl, 1); + + if (dsdp->dep->flags & DFLAG_EXIT_PAYLOAD) { + ctl = TUPLE3(&ctl_heap[0], make_small(DOP_PAYLOAD_EXIT), local, remote); + msg = reason; + } else { + ctl = TUPLE4(&ctl_heap[0], make_small(DOP_EXIT), local, remote, reason); + msg = THE_NON_VALUE; + } + res = dsig_send_exit(dsdp, ctl, msg, 1); UnUseTmpHeapNoproc(5); return res; } @@ -1156,13 +1180,20 @@ erts_dsig_send_exit2(ErtsDSigData *dsdp, Eterm local, Eterm remote, Eterm reason { DeclareTmpHeapNoproc(ctl_heap,5); int res; - Eterm ctl; + Eterm ctl, msg; UseTmpHeapNoproc(5); - ctl = TUPLE4(&ctl_heap[0], - make_small(DOP_EXIT2), local, remote, reason); + if (dsdp->dep->flags & DFLAG_EXIT_PAYLOAD) { + ctl = TUPLE3(&ctl_heap[0], + make_small(DOP_PAYLOAD_EXIT2), local, remote); + msg = reason; + } else { + ctl = TUPLE4(&ctl_heap[0], + make_small(DOP_EXIT2), local, remote, reason); + msg = THE_NON_VALUE; + } - res = dsig_send_ctl(dsdp, ctl, 0); + res = dsig_send_exit(dsdp, ctl, msg, 0); UnUseTmpHeapNoproc(5); return res; } @@ -1227,10 +1258,8 @@ int erts_net_message(Port *prt, DeclareTmpHeapNoproc(ctl_default,DIST_CTL_DEFAULT_SIZE); Eterm* ctl = ctl_default; ErtsHeapFactory factory; - Eterm* hp; Sint type; Eterm token; - Eterm token_size; Uint tuple_arity; int res; #ifdef ERTS_DIST_MSG_DBG @@ -1290,7 +1319,6 @@ int erts_net_message(Port *prt, if (ctl_len > DIST_CTL_DEFAULT_SIZE) { ctl = erts_alloc(ERTS_ALC_T_DCTRL_BUF, ctl_len * sizeof(Eterm)); } - hp = ctl; erts_factory_tmp_init(&factory, ctl, ctl_len, ERTS_ALC_T_DCTRL_BUF); arg = erts_decode_dist_ext(&factory, &ede); @@ -1313,7 +1341,6 @@ int erts_net_message(Port *prt, goto invalid_message; } - token_size = 0; token = NIL; switch (type = unsigned_val(tuple[1])) { @@ -1508,7 +1535,6 @@ int erts_net_message(Port *prt, goto invalid_message; } - token_size = size_object(tuple[5]); /* Fall through ... */ case DOP_REG_SEND: /* {DOP_REG_SEND, From, Cookie, ToName} -- Message */ @@ -1533,35 +1559,25 @@ int erts_net_message(Port *prt, } rp = erts_whereis_process(NULL, 0, to, 0, 0); if (rp) { - Uint xsize = (type == DOP_REG_SEND - ? 0 - : ERTS_HEAP_FRAG_SIZE(token_size)); ErtsProcLocks locks = 0; ErtsDistExternal *ede_copy; - ede_copy = erts_make_dist_ext_copy(&ede, xsize); if (type == DOP_REG_SEND) { token = NIL; } else { - ErlHeapFragment *heap_frag; - ErlOffHeap *ohp; - ASSERT(xsize); - heap_frag = erts_dist_ext_trailer(ede_copy); - ERTS_INIT_HEAP_FRAG(heap_frag, token_size, token_size); - hp = heap_frag->mem; - ohp = &heap_frag->off_heap; token = tuple[5]; - token = copy_struct(token, token_size, &hp, ohp); } - erts_queue_dist_message(rp, locks, ede_copy, token, from); + ede_copy = erts_make_dist_ext_copy(&ede, &token); + + erts_queue_dist_message(rp, locks, ede_copy, token, from); + if (locks) erts_proc_unlock(rp, locks); } break; case DOP_SEND_SENDER_TT: { - Uint xsize; case DOP_SEND_TT: if (tuple_arity != 4) { @@ -1569,15 +1585,12 @@ int erts_net_message(Port *prt, } token = tuple[4]; - token_size = size_object(token); - xsize = ERTS_HEAP_FRAG_SIZE(token_size); goto send_common; case DOP_SEND_SENDER: case DOP_SEND: token = NIL; - xsize = 0; if (tuple_arity != 3) goto invalid_message; @@ -1604,17 +1617,7 @@ int erts_net_message(Port *prt, ErtsProcLocks locks = 0; ErtsDistExternal *ede_copy; - ede_copy = erts_make_dist_ext_copy(&ede, xsize); - if (is_not_nil(token)) { - ErlHeapFragment *heap_frag; - ErlOffHeap *ohp; - ASSERT(xsize); - heap_frag = erts_dist_ext_trailer(ede_copy); - ERTS_INIT_HEAP_FRAG(heap_frag, token_size, token_size); - hp = heap_frag->mem; - ohp = &heap_frag->off_heap; - token = copy_struct(token, token_size, &hp, ohp); - } + ede_copy = erts_make_dist_ext_copy(&ede, &token); erts_queue_dist_message(rp, locks, ede_copy, token, am_Empty); if (locks) @@ -1623,18 +1626,28 @@ int erts_net_message(Port *prt, break; } + case DOP_PAYLOAD_MONITOR_P_EXIT: case DOP_MONITOR_P_EXIT: { + ErtsDistExternal *ede_copy = NULL; + /* We are monitoring a process on the remote node which dies, we get {DOP_MONITOR_P_EXIT, Remote pid or name, Local pid, ref, reason} */ - - if (tuple_arity != 5) { - goto invalid_message; - } - watched = tuple[2]; /* remote proc or name which died */ - watcher = tuple[3]; + watched = tuple[2]; /* remote proc or name which died */ + watcher = tuple[3]; ref = tuple[4]; - reason = tuple[5]; + + if (type == DOP_PAYLOAD_MONITOR_P_EXIT) { + if (tuple_arity != 4) { + goto invalid_message; + } + reason = THE_NON_VALUE; + } else { + if (tuple_arity != 5) { + goto invalid_message; + } + reason = tuple[5]; + } if (is_not_ref(ref)) goto invalid_message; @@ -1650,70 +1663,134 @@ int erts_net_message(Port *prt, goto invalid_message; } + if (reason == THE_NON_VALUE) { + +#ifdef ERTS_DIST_MSG_DBG + dist_msg_dbg(&ede, "MSG", buf, orig_len); +#endif + + if (!erts_proc_lookup(watcher)) break; /* Process not alive */ + + ede_copy = erts_make_dist_ext_copy(&ede, &token); + } + erts_proc_sig_send_dist_monitor_down(dep, ref, watched, - watcher, reason); + watcher, ede_copy, reason); break; } + case DOP_PAYLOAD_EXIT: + case DOP_PAYLOAD_EXIT_TT: case DOP_EXIT_TT: case DOP_EXIT: { + ErtsDistExternal *ede_copy = NULL; + /* 'from', which 'to' is linked to, died */ + from = tuple[2]; + to = tuple[3]; + if (type == DOP_EXIT) { if (tuple_arity != 4) { goto invalid_message; } - - from = tuple[2]; - to = tuple[3]; - reason = tuple[4]; token = NIL; - } else { + reason = tuple[4]; + } else if (type == DOP_EXIT_TT){ if (tuple_arity != 5) { goto invalid_message; } - from = tuple[2]; - to = tuple[3]; token = tuple[4]; reason = tuple[5]; - } + } else if (type == DOP_PAYLOAD_EXIT) { + if (tuple_arity != 3) { + goto invalid_message; + } + token = NIL; + reason = THE_NON_VALUE; + } else { + if (tuple_arity != 4) { + goto invalid_message; + } + token = tuple[4]; + reason = THE_NON_VALUE; + } if (is_not_external_pid(from) || dep != external_pid_dist_entry(from) || is_not_internal_pid(to)) { goto invalid_message; } + if (reason == THE_NON_VALUE) { + +#ifdef ERTS_DIST_MSG_DBG + dist_msg_dbg(&ede, "MSG", buf, orig_len); +#endif + + if (!erts_proc_lookup(to)) break; /* Process not alive */ + + ede_copy = erts_make_dist_ext_copy(&ede, &token); + } + erts_proc_sig_send_dist_link_exit(dep, - from, to, + from, to, ede_copy, reason, token); break; } + case DOP_PAYLOAD_EXIT2_TT: + case DOP_PAYLOAD_EXIT2: case DOP_EXIT2_TT: - case DOP_EXIT2: + case DOP_EXIT2: { + ErtsDistExternal *ede_copy = NULL; + /* 'from' is send an exit signal to 'to' */ + from = tuple[2]; + to = tuple[3]; + if (type == DOP_EXIT2) { if (tuple_arity != 4) { goto invalid_message; } - from = tuple[2]; - to = tuple[3]; reason = tuple[4]; token = NIL; - } else { + } else if (type == DOP_EXIT2_TT) { if (tuple_arity != 5) { goto invalid_message; } - from = tuple[2]; - to = tuple[3]; token = tuple[4]; reason = tuple[5]; - } - if (is_not_pid(from) || is_not_internal_pid(to)) { + } else if (type == DOP_PAYLOAD_EXIT2) { + if (tuple_arity != 3) { + goto invalid_message; + } + reason = THE_NON_VALUE; + token = NIL; + } else { + if (tuple_arity != 4) { + goto invalid_message; + } + reason = THE_NON_VALUE; + token = tuple[4]; + } + if (is_not_pid(from) + || dep != external_pid_dist_entry(from) + || is_not_internal_pid(to)) { goto invalid_message; } - erts_proc_sig_send_exit(NULL, from, to, reason, token, 0); - break; + if (reason == THE_NON_VALUE) { + +#ifdef ERTS_DIST_MSG_DBG + dist_msg_dbg(&ede, "MSG", buf, orig_len); +#endif + + if (!erts_proc_lookup(to)) break; /* Process not alive */ + ede_copy = erts_make_dist_ext_copy(&ede, &token); + } + + erts_proc_sig_send_dist_exit(dep, from, to, ede_copy, reason, token); + break; + } case DOP_GROUP_LEADER: if (tuple_arity != 3) { goto invalid_message; @@ -1757,6 +1834,23 @@ data_error: return -1; } +static int dsig_send_exit(ErtsDSigData* dsdp, Eterm ctl, Eterm msg, int force_busy) +{ + struct erts_dsig_send_context ctx; + int ret; + ctx.ctl = ctl; + ctx.msg = msg; + ctx.force_busy = force_busy; + ctx.force_encode = 1; + ctx.phase = ERTS_DSIG_SEND_PHASE_INIT; +#ifdef DEBUG + ctx.reds = 1; /* provoke assert below (no reduction count with force_encode) */ +#endif + ret = erts_dsig_send(dsdp, &ctx); + ASSERT(ret != ERTS_DSIG_SEND_CONTINUE); + return ret; +} + static int dsig_send_ctl(ErtsDSigData* dsdp, Eterm ctl, int force_busy) { struct erts_dsig_send_context ctx; @@ -1764,6 +1858,7 @@ static int dsig_send_ctl(ErtsDSigData* dsdp, Eterm ctl, int force_busy) ctx.ctl = ctl; ctx.msg = THE_NON_VALUE; ctx.force_busy = force_busy; + ctx.force_encode = 1; ctx.phase = ERTS_DSIG_SEND_PHASE_INIT; #ifdef DEBUG ctx.reds = 1; /* provoke assert below (no reduction count without msg) */ @@ -1850,10 +1945,14 @@ erts_dsig_send(ErtsDSigData *dsdp, struct erts_dsig_send_context* ctx) ctx->phase = ERTS_DSIG_SEND_PHASE_MSG_SIZE; case ERTS_DSIG_SEND_PHASE_MSG_SIZE: - if (erts_encode_dist_ext_size_int(ctx->msg, ctx, &ctx->data_size)) { - retval = ERTS_DSIG_SEND_CONTINUE; - goto done; - } + if (!ctx->force_encode) { + if (erts_encode_dist_ext_size_int(ctx->msg, ctx, &ctx->data_size)) { + retval = ERTS_DSIG_SEND_CONTINUE; + goto done; + } + } else { + erts_encode_dist_ext_size(ctx->msg, ctx->flags, ctx->acmp, &ctx->data_size); + } ctx->phase = ERTS_DSIG_SEND_PHASE_ALLOC; case ERTS_DSIG_SEND_PHASE_ALLOC: @@ -1882,10 +1981,15 @@ erts_dsig_send(ErtsDSigData *dsdp, struct erts_dsig_send_context* ctx) ctx->phase = ERTS_DSIG_SEND_PHASE_MSG_ENCODE; case ERTS_DSIG_SEND_PHASE_MSG_ENCODE: - if (erts_encode_dist_ext(ctx->msg, &ctx->obuf->ext_endp, ctx->flags, ctx->acmp, &ctx->u.ec, &ctx->reds)) { - retval = ERTS_DSIG_SEND_CONTINUE; - goto done; - } + if (!ctx->force_encode) { + if (erts_encode_dist_ext(ctx->msg, &ctx->obuf->ext_endp, ctx->flags, + ctx->acmp, &ctx->u.ec, &ctx->reds)) { + retval = ERTS_DSIG_SEND_CONTINUE; + goto done; + } + } else { + erts_encode_dist_ext(ctx->msg, &ctx->obuf->ext_endp, ctx->flags, ctx->acmp, NULL, NULL); + } ctx->phase = ERTS_DSIG_SEND_PHASE_FIN; case ERTS_DSIG_SEND_PHASE_FIN: { diff --git a/erts/emulator/beam/dist.h b/erts/emulator/beam/dist.h index a8ccd1eaec..81a09bc69c 100644 --- a/erts/emulator/beam/dist.h +++ b/erts/emulator/beam/dist.h @@ -47,6 +47,7 @@ #define DFLAG_SEND_SENDER 0x80000 #define DFLAG_BIG_SEQTRACE_LABELS 0x100000 #define DFLAG_NO_MAGIC 0x200000 /* internal for pending connection */ +#define DFLAG_EXIT_PAYLOAD 0x400000 /* Mandatory flags for distribution */ #define DFLAG_DIST_MANDATORY (DFLAG_EXTENDED_REFERENCES \ @@ -75,7 +76,8 @@ | DFLAG_MAP_TAG \ | DFLAG_BIG_CREATION \ | DFLAG_SEND_SENDER \ - | DFLAG_BIG_SEQTRACE_LABELS) + | DFLAG_BIG_SEQTRACE_LABELS \ + | DFLAG_EXIT_PAYLOAD) /* Flags addable by local distr implementations */ #define DFLAG_DIST_ADDABLE DFLAG_DIST_DEFAULT @@ -99,26 +101,35 @@ | DFLAG_BIG_CREATION) /* opcodes used in distribution messages */ -#define DOP_LINK 1 -#define DOP_SEND 2 -#define DOP_EXIT 3 -#define DOP_UNLINK 4 +enum { + DOP_LINK = 1, + DOP_SEND = 2, + DOP_EXIT = 3, + DOP_UNLINK = 4, /* Ancient DOP_NODE_LINK (5) was here, can be reused */ -#define DOP_REG_SEND 6 -#define DOP_GROUP_LEADER 7 -#define DOP_EXIT2 8 - -#define DOP_SEND_TT 12 -#define DOP_EXIT_TT 13 -#define DOP_REG_SEND_TT 16 -#define DOP_EXIT2_TT 18 - -#define DOP_MONITOR_P 19 -#define DOP_DEMONITOR_P 20 -#define DOP_MONITOR_P_EXIT 21 - -#define DOP_SEND_SENDER 22 -#define DOP_SEND_SENDER_TT 23 + DOP_REG_SEND = 6, + DOP_GROUP_LEADER = 7, + DOP_EXIT2 = 8, + + DOP_SEND_TT = 12, + DOP_EXIT_TT = 13, + DOP_REG_SEND_TT = 16, + DOP_EXIT2_TT = 18, + + DOP_MONITOR_P = 19, + DOP_DEMONITOR_P = 20, + DOP_MONITOR_P_EXIT = 21, + + DOP_SEND_SENDER = 22, + DOP_SEND_SENDER_TT = 23, + + /* These are used when DFLAG_EXIT_PAYLOAD is detected */ + DOP_PAYLOAD_EXIT = 24, + DOP_PAYLOAD_EXIT_TT = 25, + DOP_PAYLOAD_EXIT2 = 26, + DOP_PAYLOAD_EXIT2_TT = 27, + DOP_PAYLOAD_MONITOR_P_EXIT = 28 +}; /* distribution trap functions */ extern Export* dmonitor_node_trap; @@ -346,6 +357,7 @@ struct erts_dsig_send_context { Eterm ctl; Eterm msg; int force_busy; + int force_encode; Uint32 max_finalize_prepend; Uint data_size, dhdr_ext_size; ErtsAtomCacheMap *acmp; diff --git a/erts/emulator/beam/erl_message.c b/erts/emulator/beam/erl_message.c index 942bec84cf..cd7bce8bf4 100644 --- a/erts/emulator/beam/erl_message.c +++ b/erts/emulator/beam/erl_message.c @@ -201,34 +201,40 @@ free_message_buffer(ErlHeapFragment* bp) }while (bp != NULL); } +static void +erts_cleanup_message(ErtsMessage *mp) +{ + ErlHeapFragment *bp; + if (ERTS_SIG_IS_EXTERNAL_MSG(mp) || ERTS_SIG_IS_NON_MSG(mp)) { + ErtsDistExternal *edep = erts_proc_sig_get_external(mp); + if (edep) { + erts_free_dist_ext_copy(edep); + if (mp->data.heap_frag == &mp->hfrag) { + ASSERT(ERTS_SIG_IS_EXTERNAL_MSG(mp)); + mp->data.heap_frag = ERTS_MSG_COMBINED_HFRAG; + } + } + } + + if (ERTS_SIG_IS_MSG(mp) && mp->data.attached != ERTS_MSG_COMBINED_HFRAG) { + bp = mp->data.heap_frag; + } else { + mp->data.attached = ERTS_MSG_COMBINED_HFRAG; + bp = mp->hfrag.next; + erts_cleanup_offheap(&mp->hfrag.off_heap); + } + + if (bp) + free_message_buffer(bp); +} + void erts_cleanup_messages(ErtsMessage *msgp) { ErtsMessage *mp = msgp; while (mp) { ErtsMessage *fmp; - ErlHeapFragment *bp; - if (ERTS_SIG_IS_EXTERNAL_MSG(mp)) { - if (is_not_immed(ERL_MESSAGE_TOKEN(mp))) { - bp = (ErlHeapFragment *) mp->data.dist_ext->ext_endp; - erts_cleanup_offheap(&bp->off_heap); - } - if (mp->data.dist_ext) - erts_free_dist_ext_copy(mp->data.dist_ext); - } - else { - if (ERTS_SIG_IS_INTERNAL_MSG(mp) - && mp->data.attached != ERTS_MSG_COMBINED_HFRAG) { - bp = mp->data.heap_frag; - } - else { - mp->data.attached = ERTS_MSG_COMBINED_HFRAG; - bp = mp->hfrag.next; - erts_cleanup_offheap(&mp->hfrag.off_heap); - } - if (bp) - free_message_buffer(bp); - } + erts_cleanup_message(mp); fmp = mp; mp = mp->next; erts_free_message(fmp); @@ -1099,80 +1105,6 @@ change_to_off_heap: return res; } -int -erts_decode_dist_message(Process *proc, ErtsProcLocks proc_locks, - ErtsMessage *msgp, int force_off_heap) -{ - ErtsHeapFactory factory; - Eterm msg; - ErlHeapFragment *bp; - Sint need; - int decode_in_heap_frag; - - decode_in_heap_frag = (force_off_heap - || !(proc_locks & ERTS_PROC_LOCK_MAIN) - || (proc->flags & F_OFF_HEAP_MSGQ)); - - if (msgp->data.dist_ext->heap_size >= 0) - need = msgp->data.dist_ext->heap_size; - else { - need = erts_decode_dist_ext_size(msgp->data.dist_ext); - if (need < 0) { - /* bad msg; remove it... */ - if (is_not_immed(ERL_MESSAGE_TOKEN(msgp))) { - bp = erts_dist_ext_trailer(msgp->data.dist_ext); - erts_cleanup_offheap(&bp->off_heap); - } - erts_free_dist_ext_copy(msgp->data.dist_ext); - msgp->data.dist_ext = NULL; - return 0; - } - - msgp->data.dist_ext->heap_size = need; - } - - if (is_not_immed(ERL_MESSAGE_TOKEN(msgp))) { - bp = erts_dist_ext_trailer(msgp->data.dist_ext); - need += bp->used_size; - } - - if (decode_in_heap_frag) - erts_factory_heap_frag_init(&factory, new_message_buffer(need)); - else - erts_factory_proc_prealloc_init(&factory, proc, need); - - ASSERT(msgp->data.dist_ext->heap_size >= 0); - if (is_not_immed(ERL_MESSAGE_TOKEN(msgp))) { - ErlHeapFragment *heap_frag; - heap_frag = erts_dist_ext_trailer(msgp->data.dist_ext); - ERL_MESSAGE_TOKEN(msgp) = copy_struct(ERL_MESSAGE_TOKEN(msgp), - heap_frag->used_size, - &factory.hp, - factory.off_heap); - erts_cleanup_offheap(&heap_frag->off_heap); - } - - msg = erts_decode_dist_ext(&factory, msgp->data.dist_ext); - ERL_MESSAGE_TERM(msgp) = msg; - erts_free_dist_ext_copy(msgp->data.dist_ext); - msgp->data.attached = NULL; - - if (is_non_value(msg)) { - erts_factory_undo(&factory); - return 0; - } - - erts_factory_trim_and_close(&factory, msgp->m, - ERL_MESSAGE_REF_ARRAY_SZ); - - ASSERT(!msgp->data.heap_frag); - - if (decode_in_heap_frag) - msgp->data.heap_frag = factory.heap_frags; - - return 1; -} - void erts_factory_proc_init(ErtsHeapFactory* factory, Process* p) { diff --git a/erts/emulator/beam/erl_message.h b/erts/emulator/beam/erl_message.h index b2550814fd..58294648b4 100644 --- a/erts/emulator/beam/erl_message.h +++ b/erts/emulator/beam/erl_message.h @@ -455,8 +455,6 @@ Sint erts_move_messages_off_heap(Process *c_p); Sint erts_complete_off_heap_message_queue_change(Process *c_p); Eterm erts_change_message_queue_management(Process *c_p, Eterm new_state); -int erts_decode_dist_message(Process *, ErtsProcLocks, ErtsMessage *, int); - void erts_cleanup_messages(ErtsMessage *mp); void *erts_alloc_message_ref(void); diff --git a/erts/emulator/beam/erl_proc_sig_queue.c b/erts/emulator/beam/erl_proc_sig_queue.c index 668ee4ce96..1bd219b6d7 100644 --- a/erts/emulator/beam/erl_proc_sig_queue.c +++ b/erts/emulator/beam/erl_proc_sig_queue.c @@ -80,6 +80,8 @@ #define ERTS_SIG_Q_TYPE_ADJUST_TRACE_INFO \ ERTS_SIG_Q_TYPE_MAX +#define ERTS_SIG_IS_EXTERNAL(sig) is_non_value(get_exit_signal_data(sig)->reason) + Process *ERTS_WRITE_UNLIKELY(erts_dirty_process_signal_handler); Process *ERTS_WRITE_UNLIKELY(erts_dirty_process_signal_handler_high); Process *ERTS_WRITE_UNLIKELY(erts_dirty_process_signal_handler_max); @@ -112,6 +114,8 @@ typedef struct { Eterm message; Eterm from; Eterm reason; + struct erl_dist_external *dist_ext; + ErlHeapFragment *heap_frag; union { Eterm ref; int normal_kills; @@ -936,13 +940,32 @@ erts_proc_sig_privqs_len(Process *c_p) return proc_sig_privqs_len(c_p, 0); } +ErtsDistExternal * +erts_proc_sig_get_external(ErtsMessage *msgp) +{ + if (ERTS_SIG_IS_EXTERNAL_MSG(msgp)) { + return erts_get_dist_ext(msgp->data.heap_frag); + } else if (ERTS_SIG_IS_NON_MSG(msgp) && ERTS_SIG_IS_EXTERNAL(msgp)) { + ErtsDistExternal *edep; + ErtsExitSignalData *xsigd = get_exit_signal_data(msgp); + ASSERT(ERTS_PROC_SIG_TYPE(((ErtsSignal *) msgp)->common.tag) == ERTS_SIG_Q_TYPE_GEN_EXIT); + ASSERT(is_non_value(xsigd->reason)); + if (msgp->hfrag.next == NULL) + edep = (ErtsDistExternal*)((void*)xsigd) + sizeof(ErtsExitSignalData); + else + edep = erts_get_dist_ext(msgp->hfrag.next); + return edep; + } + return NULL; +} + static void do_seq_trace_output(Eterm to, Eterm token, Eterm msg); static void send_gen_exit_signal(Process *c_p, Eterm from_tag, Eterm from, Eterm to, - Sint16 op, Eterm reason, Eterm ref, - Eterm token, int normal_kills) + Sint16 op, Eterm reason, ErtsDistExternal *dist_ext, + Eterm ref, Eterm token, int normal_kills) { ErtsExitSignalData *xsigd; Eterm *hp, *start_hp, s_reason, s_ref, s_message, s_token, s_from; @@ -956,6 +979,9 @@ send_gen_exit_signal(Process *c_p, Eterm from_tag, Uint utag_sz; #endif + ASSERT((is_value(reason) && dist_ext == NULL) || + (is_non_value(reason) && dist_ext != NULL)); + ASSERT(is_immed(from_tag)); hsz = sizeof(ErtsExitSignalData)/sizeof(Uint); @@ -977,33 +1003,35 @@ send_gen_exit_signal(Process *c_p, Eterm from_tag, hsz += utag_sz; #endif - token_sz = is_immed(token) ? 0 : size_object(token); + token_sz = size_object(token); hsz += token_sz; - from_sz = is_immed(from) ? 0 : size_object(from); + from_sz = size_object(from); hsz += from_sz; - reason_sz = is_immed(reason) ? 0 : size_object(reason); - hsz += reason_sz; + ref_sz = size_object(ref); + hsz += ref_sz; - switch (op) { - case ERTS_SIG_Q_OP_EXIT: - case ERTS_SIG_Q_OP_EXIT_LINKED: { - /* {'EXIT', From, Reason} */ - hsz += 4; /* 3-tuple */ - ref_sz = 0; - break; - } - case ERTS_SIG_Q_OP_MONITOR_DOWN: { - /* {'DOWN', Ref, process, From, Reason} */ - hsz += 6; /* 5-tuple */ - ref_sz = NC_HEAP_SIZE(ref); - hsz += ref_sz; - break; - } - default: - ERTS_INTERNAL_ERROR("Invalid exit signal op"); - break; + if (is_value(reason)) { + reason_sz = size_object(reason); + hsz += reason_sz; + + switch (op) { + case ERTS_SIG_Q_OP_EXIT: + case ERTS_SIG_Q_OP_EXIT_LINKED: { + /* {'EXIT', From, Reason} */ + hsz += 4; /* 3-tuple */ + break; + } + case ERTS_SIG_Q_OP_MONITOR_DOWN: { + /* {'DOWN', Ref, process, From, Reason} */ + hsz += 6; /* 5-tuple */ + break; + } + default: + ERTS_INTERNAL_ERROR("Invalid exit signal op"); + break; + } } /* @@ -1015,35 +1043,33 @@ send_gen_exit_signal(Process *c_p, Eterm from_tag, ohp = &hfrag->off_heap; start_hp = hp; - s_token = (is_immed(token) - ? token - : copy_struct(token, token_sz, &hp, ohp)); - - s_reason = (is_immed(reason) - ? reason - : copy_struct(reason, reason_sz, &hp, ohp)); + s_token = copy_struct(token, token_sz, &hp, ohp); + s_from = copy_struct(from, from_sz, &hp, ohp); + s_ref = copy_struct(ref, ref_sz, &hp, ohp); - s_from = (is_immed(from) - ? from - : copy_struct(from, from_sz, &hp, ohp)); + if (is_value(reason)) { + s_reason = copy_struct(reason, reason_sz, &hp, ohp); - if (!ref_sz) - s_ref = NIL; - else - s_ref = STORE_NC(&hp, ohp, ref); - - switch (op) { - case ERTS_SIG_Q_OP_EXIT: - case ERTS_SIG_Q_OP_EXIT_LINKED: - /* {'EXIT', From, Reason} */ - s_message = TUPLE3(hp, am_EXIT, s_from, s_reason); - hp += 4; - break; - case ERTS_SIG_Q_OP_MONITOR_DOWN: - /* {'DOWN', Ref, process, From, Reason} */ - s_message = TUPLE5(hp, am_DOWN, s_ref, am_process, s_from, s_reason); - hp += 6; - break; + switch (op) { + case ERTS_SIG_Q_OP_EXIT: + case ERTS_SIG_Q_OP_EXIT_LINKED: + /* {'EXIT', From, Reason} */ + s_message = TUPLE3(hp, am_EXIT, s_from, s_reason); + hp += 4; + break; + case ERTS_SIG_Q_OP_MONITOR_DOWN: + /* {'DOWN', Ref, process, From, Reason} */ + s_message = TUPLE5(hp, am_DOWN, s_ref, am_process, s_from, s_reason); + hp += 6; + break; + default: + /* This cannot happen, used to silence gcc warning */ + s_message = THE_NON_VALUE; + break; + } + } else { + s_message = THE_NON_VALUE; + s_reason = THE_NON_VALUE; } #ifdef USE_VM_PROBES @@ -1066,6 +1092,9 @@ send_gen_exit_signal(Process *c_p, Eterm from_tag, xsigd->message = s_message; xsigd->from = s_from; xsigd->reason = s_reason; + xsigd->dist_ext = dist_ext; + xsigd->heap_frag = NULL; + if (is_nil(s_ref)) xsigd->u.normal_kills = normal_kills; else { @@ -1205,7 +1234,18 @@ erts_proc_sig_send_exit(Process *c_p, Eterm from, Eterm to, from_tag = dep->sysname; } send_gen_exit_signal(c_p, from_tag, from, to, ERTS_SIG_Q_OP_EXIT, - reason, NIL, token, normal_kills); + reason, NULL, NIL, token, normal_kills); +} + +void +erts_proc_sig_send_dist_exit(DistEntry *dep, + Eterm from, Eterm to, + ErtsDistExternal *dist_ext, + Eterm reason, Eterm token) +{ + send_gen_exit_signal(NULL, dep->sysname, from, to, ERTS_SIG_Q_OP_EXIT, + reason, dist_ext, NIL, token, 0); + } void @@ -1219,7 +1259,7 @@ erts_proc_sig_send_link_exit(Process *c_p, Eterm from, ErtsLink *lnk, if (is_not_immed(reason) || is_not_nil(token)) { ASSERT(is_internal_pid(from) || is_internal_port(from)); send_gen_exit_signal(c_p, from, from, to, ERTS_SIG_Q_OP_EXIT_LINKED, - reason, NIL, token, 0); + reason, NULL, NIL, token, 0); } else { /* Pass signal using old link structure... */ @@ -1274,10 +1314,12 @@ erts_proc_sig_send_unlink(Process *c_p, ErtsLink *lnk) void erts_proc_sig_send_dist_link_exit(DistEntry *dep, Eterm from, Eterm to, + ErtsDistExternal *dist_ext, Eterm reason, Eterm token) { send_gen_exit_signal(NULL, dep->sysname, from, to, ERTS_SIG_Q_OP_EXIT_LINKED, - reason, NIL, token, 0); + reason, dist_ext, NIL, token, 0); + } void @@ -1299,16 +1341,17 @@ erts_proc_sig_send_dist_unlink(DistEntry *dep, Eterm from, Eterm to) void erts_proc_sig_send_dist_monitor_down(DistEntry *dep, Eterm ref, Eterm from, Eterm to, + ErtsDistExternal *dist_ext, Eterm reason) { Eterm monitored, heap[3]; - if (is_atom(from)) + if (is_atom(from)) monitored = TUPLE2(&heap[0], from, dep->sysname); else monitored = from; send_gen_exit_signal(NULL, dep->sysname, monitored, to, ERTS_SIG_Q_OP_MONITOR_DOWN, - reason, ref, NIL, 0); + reason, dist_ext, ref, NIL, 0); } void @@ -1376,10 +1419,10 @@ erts_proc_sig_send_monitor_down(ErtsMonitor *mon, Eterm reason) || is_internal_pid(from_tag) || is_atom(from_tag)); monitored = TUPLE2(&heap[0], name, node); - } + } send_gen_exit_signal(NULL, from_tag, monitored, to, ERTS_SIG_Q_OP_MONITOR_DOWN, - reason, mdp->ref, NIL, 0); + reason, NULL, mdp->ref, NIL, 0); } erts_monitor_release(mon); } @@ -2037,7 +2080,6 @@ handle_exit_signal(Process *c_p, ErtsSigRecvTracing *tracing, if (type == ERTS_SIG_Q_TYPE_GEN_EXIT) { xsigd = get_exit_signal_data(sig); from = xsigd->from; - reason = xsigd->reason; if (op != ERTS_SIG_Q_OP_EXIT_LINKED) ignore = 0; else { @@ -2062,13 +2104,28 @@ handle_exit_signal(Process *c_p, ErtsSigRecvTracing *tracing, } } + /* This GEN_EXIT was received from another node, decode the exit reason */ + if (ERTS_SIG_IS_EXTERNAL(sig)) + erts_proc_sig_decode_dist(c_p, ERTS_PROC_LOCK_MAIN, sig, 1); + + reason = xsigd->reason; + + if (is_non_value(reason)) { + /* Bad distribution message; remove it from queue... */ + ignore = !0; + destroy = !0; + } + if (!ignore) { if ((op != ERTS_SIG_Q_OP_EXIT || reason != am_kill) && (c_p->flags & F_TRAP_EXIT)) { convert_prepared_sig_to_msg(c_p, sig, xsigd->message, next_nm_sig); + ASSERT(sig->hfrag.next == NULL); + sig->hfrag.next = xsigd->heap_frag; conv_msg = sig; + } else if (reason == am_normal && !xsigd->u.normal_kills) { /* Ignore it... */ @@ -2929,6 +2986,130 @@ handle_sync_suspend(Process *c_p, ErtsMessage *mp) } } +int +erts_proc_sig_decode_dist(Process *proc, ErtsProcLocks proc_locks, + ErtsMessage *msgp, int force_off_heap) +{ + ErtsHeapFactory factory; + Eterm msg; + ErlHeapFragment *bp; + Sint need; + int decode_in_heap_frag; + ErtsDistExternal *dist_ext; + ErtsExitSignalData *xsigd = NULL; + + decode_in_heap_frag = (force_off_heap + || !(proc_locks & ERTS_PROC_LOCK_MAIN) + || (proc->flags & F_OFF_HEAP_MSGQ)); + + if (ERTS_SIG_IS_EXTERNAL_MSG(msgp)) + dist_ext = msgp->data.dist_ext; + else { + xsigd = get_exit_signal_data(msgp); + ASSERT(ERTS_PROC_SIG_TYPE(((ErtsSignal *) msgp)->common.tag) == ERTS_SIG_Q_TYPE_GEN_EXIT); + ASSERT(is_non_value(xsigd->reason)); + dist_ext = xsigd->dist_ext; + } + + if (dist_ext->heap_size >= 0) + need = dist_ext->heap_size; + else { + need = erts_decode_dist_ext_size(dist_ext); + if (need < 0) { + /* bad msg; remove it... */ + if (is_not_immed(ERL_MESSAGE_TOKEN(msgp))) { + bp = erts_dist_ext_trailer(dist_ext); + erts_cleanup_offheap(&bp->off_heap); + } + erts_free_dist_ext_copy(dist_ext); + dist_ext = NULL; + return 0; + } + + dist_ext->heap_size = need; + } + + if (is_not_immed(ERL_MESSAGE_TOKEN(msgp))) { + bp = erts_dist_ext_trailer(dist_ext); + need += bp->used_size; + } + + if (xsigd) { + switch (ERTS_PROC_SIG_OP(ERL_MESSAGE_TERM(msgp))) { + case ERTS_SIG_Q_OP_EXIT: + case ERTS_SIG_Q_OP_EXIT_LINKED: + /* {'EXIT', From, Reason} */ + need += 4; + break; + case ERTS_SIG_Q_OP_MONITOR_DOWN: + /* {'DOWN', Ref, process, From, Reason} */ + need += 6; /* 5-tuple */ + break; + default: + ERTS_INTERNAL_ERROR("Invalid exit signal op"); + break; + } + } + + if (decode_in_heap_frag) + erts_factory_heap_frag_init(&factory, new_message_buffer(need)); + else + erts_factory_proc_prealloc_init(&factory, proc, need); + + ASSERT(dist_ext->heap_size >= 0); + if (is_not_immed(ERL_MESSAGE_TOKEN(msgp))) { + ErlHeapFragment *heap_frag; + heap_frag = erts_dist_ext_trailer(dist_ext); + ERL_MESSAGE_TOKEN(msgp) = copy_struct(ERL_MESSAGE_TOKEN(msgp), + heap_frag->used_size, + &factory.hp, + factory.off_heap); + erts_cleanup_offheap(&heap_frag->off_heap); + } + + msg = erts_decode_dist_ext(&factory, dist_ext); + if (!xsigd) { + ERL_MESSAGE_TERM(msgp) = msg; + msgp->data.attached = NULL; + } else { + switch (ERTS_PROC_SIG_OP(ERL_MESSAGE_TERM(msgp))) { + case ERTS_SIG_Q_OP_EXIT: + case ERTS_SIG_Q_OP_EXIT_LINKED: + /* {'EXIT', From, Reason} */ + erts_reserve_heap(&factory, 4); + xsigd->message = TUPLE3(factory.hp, am_EXIT, xsigd->from, msg); + factory.hp += 4; + break; + case ERTS_SIG_Q_OP_MONITOR_DOWN: + /* {'DOWN', Ref, process, From, Reason} */ + erts_reserve_heap(&factory, 6); + xsigd->message = TUPLE5(factory.hp, am_DOWN, xsigd->u.ref, am_process, xsigd->from, msg); + factory.hp += 6; + break; + } + xsigd->reason = msg; + } + erts_free_dist_ext_copy(dist_ext); + + if (is_non_value(msg)) { + erts_factory_undo(&factory); + return 0; + } + + erts_factory_close(&factory); + + ASSERT(!msgp->data.heap_frag || xsigd); + + if (decode_in_heap_frag) { + if (!xsigd) + msgp->data.heap_frag = factory.heap_frags; + else + xsigd->heap_frag = factory.heap_frags; + } + + return 1; +} + void erts_proc_sig_handle_pending_suspend(Process *c_p) { @@ -3045,7 +3226,7 @@ erts_proc_sig_handle_incoming(Process *c_p, erts_aint32_t *statep, ASSERT(ERTS_SIG_IS_NON_MSG(sig)); tag = ((ErtsSignal *) sig)->common.tag; - + switch (ERTS_PROC_SIG_OP(tag)) { case ERTS_SIG_Q_OP_EXIT: @@ -3091,6 +3272,11 @@ erts_proc_sig_handle_incoming(Process *c_p, erts_aint32_t *statep, break; case ERTS_SIG_Q_TYPE_GEN_EXIT: xsigd = get_exit_signal_data(sig); + + /* This GEN_EXIT was received from another node, decode the exit reason */ + if (ERTS_SIG_IS_EXTERNAL(sig)) + erts_proc_sig_decode_dist(c_p, ERTS_PROC_LOCK_MAIN, sig, 1); + omon = erts_monitor_tree_lookup(ERTS_P_MONITORS(c_p), xsigd->u.ref); if (omon) { @@ -4190,8 +4376,8 @@ handle_msg_tracing(Process *c_p, ErtsSigRecvTracing *tracing, } if (ERTS_SIG_IS_EXTERNAL_MSG(sig)) { cnt++; - if (!erts_decode_dist_message(c_p, ERTS_PROC_LOCK_MAIN, - sig, 0)) { + if (!erts_proc_sig_decode_dist(c_p, ERTS_PROC_LOCK_MAIN, + sig, 0)) { /* Bad dist message; remove it... */ remove_mq_m_sig(c_p, sig, next_sig, next_nm_sig); sig = *next_sig; @@ -4266,7 +4452,7 @@ erts_proc_sig_prep_msgq_for_inspection(Process *c_p, if (ERTS_SIG_IS_EXTERNAL_MSG(mp)) { /* decode it... */ if (mp->data.attached) - erts_decode_dist_message(rp, rp_locks, mp, !0); + erts_proc_sig_decode_dist(rp, rp_locks, mp, !0); msg = ERL_MESSAGE_TERM(mp); diff --git a/erts/emulator/beam/erl_proc_sig_queue.h b/erts/emulator/beam/erl_proc_sig_queue.h index 5a02708bb7..39cee6f230 100644 --- a/erts/emulator/beam/erl_proc_sig_queue.h +++ b/erts/emulator/beam/erl_proc_sig_queue.h @@ -89,6 +89,7 @@ #endif struct erl_mesg; +struct erl_dist_external; typedef struct { struct erl_mesg *next; @@ -212,6 +213,34 @@ erts_proc_sig_send_exit(Process *c_p, Eterm from, Eterm to, /** * + * @brief Send an exit signal to a process. + * + * This function is used instead of erts_proc_sig_send_link_exit() + * when the signal arrives via the distribution and + * therefore no link structure is available. + * + * @param[in] dep Distribution entry of channel + * that the signal arrived on. + * + * @param[in] from Identifier of sender. + * + * @param[in] to Identifier of receiver. + * + * @param[in] dist_ext The exit reason in external term format + * + * @param[in] reason Exit reason. + * + * @param[in] token Seq trace token. + * + */ +void +erts_proc_sig_send_dist_exit(DistEntry *dep, + Eterm from, Eterm to, + ErtsDistExternal *dist_ext, + Eterm reason, Eterm token); + +/** + * * @brief Send an exit signal due to broken link to a process. * * @@ -282,7 +311,7 @@ erts_proc_sig_send_unlink(Process *c_p, ErtsLink *lnk); * * This function is used instead of erts_proc_sig_send_link_exit() * when the signal arrives via the distribution and - * no link structure is available. + * therefore no link structure is available. * * @param[in] dep Distribution entry of channel * that the signal arrived on. @@ -291,6 +320,8 @@ erts_proc_sig_send_unlink(Process *c_p, ErtsLink *lnk); * * @param[in] to Identifier of receiver. * + * @param[in] dist_ext The exit reason in external term format + * * @param[in] reason Exit reason. * * @param[in] token Seq trace token. @@ -299,6 +330,7 @@ erts_proc_sig_send_unlink(Process *c_p, ErtsLink *lnk); void erts_proc_sig_send_dist_link_exit(struct dist_entry_ *dep, Eterm from, Eterm to, + ErtsDistExternal *dist_ext, Eterm reason, Eterm token); /** @@ -307,7 +339,7 @@ erts_proc_sig_send_dist_link_exit(struct dist_entry_ *dep, * * This function is used instead of erts_proc_sig_send_unlink() * when the signal arrives via the distribution and - * no link structure is available. + * therefore no link structure is available. * * @param[in] dep Distribution entry of channel * that the signal arrived on. @@ -380,7 +412,7 @@ erts_proc_sig_send_monitor(ErtsMonitor *mon, Eterm to); * * This function is used instead of erts_proc_sig_send_monitor_down() * when the signal arrives via the distribution and - * no link structure is available. + * therefore no monitor structure is available. * * @param[in] dep Pointer to distribution entry * of channel that the signal @@ -392,12 +424,15 @@ erts_proc_sig_send_monitor(ErtsMonitor *mon, Eterm to); * * @param[in] to Identifier of receiver. * + * @param[in] dist_ext The exit reason in external term format + * * @param[in] reason Exit reason. * */ void erts_proc_sig_send_dist_monitor_down(DistEntry *dep, Eterm ref, Eterm from, Eterm to, + ErtsDistExternal *dist_ext, Eterm reason); /** @@ -962,6 +997,34 @@ void erts_proc_sig_handle_pending_suspend(Process *c_p); /** + * + * @brief Decode the reason term in an external signal + * + * Any distributed signal with a payload only has the control + * message decoded by the dist entry. The final decode of the + * payload is done by the process when it inspects the signal + * by calling this function. + * + * This functions handles both messages and link/monitor exits. + * + * Return true if the decode was successful, false otherwise. + * + * @param[in] c_p Pointer to executing process + * + * @param[in] proc_lock Locks held by process. Should always be MAIN. + * + * @param[in] msgp The signal to decode + * + * @param[in] force_off_heap If the term should be forced to be off-heap + */ +int +erts_proc_sig_decode_dist(Process *proc, ErtsProcLocks proc_locks, + ErtsMessage *msgp, int force_off_heap); + +ErtsDistExternal * +erts_proc_sig_get_external(ErtsMessage *msgp); + +/** * @brief Initialize this functionality */ void erts_proc_sig_queue_init(void); diff --git a/erts/emulator/beam/external.c b/erts/emulator/beam/external.c index 4b8040b356..640ca338d9 100644 --- a/erts/emulator/beam/external.c +++ b/erts/emulator/beam/external.c @@ -636,11 +636,13 @@ byte* erts_encode_ext_ets(Eterm term, byte *ep, struct erl_off_heap_header** off } ErtsDistExternal * -erts_make_dist_ext_copy(ErtsDistExternal *edep, Uint xsize) +erts_make_dist_ext_copy(ErtsDistExternal *edep, Eterm *token) { size_t align_sz; size_t dist_ext_sz; size_t ext_sz = 0; + size_t token_sz = 0; + Eterm token_size; byte *ep; ErtsDistExternal *new_edep; @@ -652,8 +654,12 @@ erts_make_dist_ext_copy(ErtsDistExternal *edep, Uint xsize) align_sz = ERTS_EXTRA_DATA_ALIGN_SZ(dist_ext_sz + ext_sz); + token_size = size_object(*token); + if (token_size) + token_sz = ERTS_HEAP_FRAG_SIZE(token_size); + new_edep = erts_alloc(ERTS_ALC_T_EXT_TERM_DATA, - dist_ext_sz + ext_sz + align_sz + xsize); + dist_ext_sz + ext_sz + align_sz + token_sz); ep = (byte *) new_edep; sys_memcpy((void *) ep, (void *) edep, dist_ext_sz); @@ -669,6 +675,18 @@ erts_make_dist_ext_copy(ErtsDistExternal *edep, Uint xsize) } else { erts_refc_inc(&new_edep->binp->intern.refc, 2); } + + /* Copy the seq_trace token */ + if (is_not_nil(*token)) { + ErlHeapFragment *heap_frag; + ErlOffHeap *ohp; + Eterm *hp; + heap_frag = erts_dist_ext_trailer(new_edep); + ERTS_INIT_HEAP_FRAG(heap_frag, token_size, token_size); + hp = heap_frag->mem; + ohp = &heap_frag->off_heap; + *token = copy_struct(*token, token_size, &hp, ohp); + } return new_edep; } diff --git a/erts/emulator/beam/external.h b/erts/emulator/beam/external.h index 1223f90fcc..91c028c9bd 100644 --- a/erts/emulator/beam/external.h +++ b/erts/emulator/beam/external.h @@ -176,7 +176,7 @@ byte* erts_encode_ext_ets(Eterm, byte *, struct erl_off_heap_header** ext_off_he void erts_free_dist_ext_copy(ErtsDistExternal *); ERTS_GLB_INLINE void *erts_dist_ext_trailer(ErtsDistExternal *); -ErtsDistExternal *erts_make_dist_ext_copy(ErtsDistExternal *, Uint); +ErtsDistExternal *erts_make_dist_ext_copy(ErtsDistExternal *, Eterm *); void *erts_dist_ext_trailer(ErtsDistExternal *); void erts_destroy_dist_ext_copy(ErtsDistExternal *); diff --git a/erts/emulator/beam/msg_instrs.tab b/erts/emulator/beam/msg_instrs.tab index 9bf3aefaca..6f8d1469ef 100644 --- a/erts/emulator/beam/msg_instrs.tab +++ b/erts/emulator/beam/msg_instrs.tab @@ -137,8 +137,8 @@ i_loop_rec(Dest) { if (ERTS_UNLIKELY(ERTS_SIG_IS_EXTERNAL_MSG(msgp))) { FCALLS -= 10; /* FIXME: bump appropriate amount... */ - SWAPOUT; /* erts_decode_dist_message() may write to heap... */ - if (!erts_decode_dist_message(c_p, ERTS_PROC_LOCK_MAIN, msgp, 0)) { + SWAPOUT; /* erts_proc_sig_decode_dist() may write to heap... */ + if (!erts_proc_sig_decode_dist(c_p, ERTS_PROC_LOCK_MAIN, msgp, 0)) { /* * A corrupt distribution message that we weren't able to decode; * remove it... diff --git a/erts/emulator/hipe/hipe_native_bif.c b/erts/emulator/hipe/hipe_native_bif.c index 211ce0492a..80e5d81023 100644 --- a/erts/emulator/hipe/hipe_native_bif.c +++ b/erts/emulator/hipe/hipe_native_bif.c @@ -579,7 +579,7 @@ Eterm hipe_check_get_msg(Process *c_p) if (ERTS_SIG_IS_EXTERNAL_MSG(msgp)) { /* FIXME: bump appropriate amount... */ - if (!erts_decode_dist_message(c_p, ERTS_PROC_LOCK_MAIN, msgp, 0)) { + if (!erts_proc_sig_decode_dist(c_p, ERTS_PROC_LOCK_MAIN, msgp, 0)) { /* * A corrupt distribution message that we weren't able to decode; * remove it... |