diff options
author | Lukas Larsson <[email protected]> | 2018-09-26 11:55:01 +0200 |
---|---|---|
committer | Lukas Larsson <[email protected]> | 2019-02-22 11:12:53 +0100 |
commit | f2c4f6f83deecba0c2527e520f0f18fba7d84815 (patch) | |
tree | 5009cc4300cdca28dc0f76daaa541051f8d44a01 /erts/emulator/beam/erl_proc_sig_queue.c | |
parent | 6686877360432144bacbf4e95c23b1232eab1b08 (diff) | |
download | otp-f2c4f6f83deecba0c2527e520f0f18fba7d84815.tar.gz otp-f2c4f6f83deecba0c2527e520f0f18fba7d84815.tar.bz2 otp-f2c4f6f83deecba0c2527e520f0f18fba7d84815.zip |
erts: Implement fragmentation of distrubution messages
Diffstat (limited to 'erts/emulator/beam/erl_proc_sig_queue.c')
-rw-r--r-- | erts/emulator/beam/erl_proc_sig_queue.c | 195 |
1 files changed, 100 insertions, 95 deletions
diff --git a/erts/emulator/beam/erl_proc_sig_queue.c b/erts/emulator/beam/erl_proc_sig_queue.c index 1bd219b6d7..8113abebde 100644 --- a/erts/emulator/beam/erl_proc_sig_queue.c +++ b/erts/emulator/beam/erl_proc_sig_queue.c @@ -36,6 +36,7 @@ #include "erl_port_task.h" #include "erl_trace.h" #include "beam_bp.h" +#include "erl_binary.h" #include "big.h" #include "erl_gc.h" #include "bif.h" @@ -80,7 +81,10 @@ #define ERTS_SIG_Q_TYPE_ADJUST_TRACE_INFO \ ERTS_SIG_Q_TYPE_MAX -#define ERTS_SIG_IS_EXTERNAL(sig) is_non_value(get_exit_signal_data(sig)->reason) +#define ERTS_SIG_IS_GEN_EXIT(sig) \ + (ERTS_PROC_SIG_TYPE(((ErtsSignal *) sig)->common.tag) == ERTS_SIG_Q_TYPE_GEN_EXIT) +#define ERTS_SIG_IS_GEN_EXIT_EXTERNAL(sig) \ + (ASSERT(ERTS_SIG_IS_GEN_EXIT(sig)),is_non_value(get_exit_signal_data(sig)->reason)) Process *ERTS_WRITE_UNLIKELY(erts_dirty_process_signal_handler); Process *ERTS_WRITE_UNLIKELY(erts_dirty_process_signal_handler_high); @@ -114,8 +118,6 @@ typedef struct { Eterm message; Eterm from; Eterm reason; - struct erl_dist_external *dist_ext; - ErlHeapFragment *heap_frag; union { Eterm ref; int normal_kills; @@ -945,13 +947,15 @@ erts_proc_sig_get_external(ErtsMessage *msgp) { if (ERTS_SIG_IS_EXTERNAL_MSG(msgp)) { return erts_get_dist_ext(msgp->data.heap_frag); - } else if (ERTS_SIG_IS_NON_MSG(msgp) && ERTS_SIG_IS_EXTERNAL(msgp)) { + } else if (ERTS_SIG_IS_NON_MSG(msgp) && + ERTS_SIG_IS_GEN_EXIT(msgp) && + ERTS_SIG_IS_GEN_EXIT_EXTERNAL(msgp)) { ErtsDistExternal *edep; ErtsExitSignalData *xsigd = get_exit_signal_data(msgp); ASSERT(ERTS_PROC_SIG_TYPE(((ErtsSignal *) msgp)->common.tag) == ERTS_SIG_Q_TYPE_GEN_EXIT); ASSERT(is_non_value(xsigd->reason)); if (msgp->hfrag.next == NULL) - edep = (ErtsDistExternal*)((void*)xsigd) + sizeof(ErtsExitSignalData); + edep = (ErtsDistExternal*)(xsigd + 1); else edep = erts_get_dist_ext(msgp->hfrag.next); return edep; @@ -965,6 +969,7 @@ static void send_gen_exit_signal(Process *c_p, Eterm from_tag, Eterm from, Eterm to, Sint16 op, Eterm reason, ErtsDistExternal *dist_ext, + ErlHeapFragment *dist_ext_hfrag, Eterm ref, Eterm token, int normal_kills) { ErtsExitSignalData *xsigd; @@ -972,7 +977,7 @@ send_gen_exit_signal(Process *c_p, Eterm from_tag, ErtsMessage *mp; ErlHeapFragment *hfrag; ErlOffHeap *ohp; - Uint hsz, from_sz, reason_sz, ref_sz, token_sz; + Uint hsz, from_sz, reason_sz, ref_sz, token_sz, dist_ext_sz; int seq_trace; #ifdef USE_VM_PROBES Eterm s_utag, utag; @@ -984,7 +989,7 @@ send_gen_exit_signal(Process *c_p, Eterm from_tag, ASSERT(is_immed(from_tag)); - hsz = sizeof(ErtsExitSignalData)/sizeof(Uint); + hsz = sizeof(ErtsExitSignalData)/sizeof(Eterm); seq_trace = c_p && have_seqtrace(token); if (seq_trace) @@ -1012,6 +1017,8 @@ send_gen_exit_signal(Process *c_p, Eterm from_tag, ref_sz = size_object(ref); hsz += ref_sz; + /* The reason was part of the control message, + just use copy it into the xsigd */ if (is_value(reason)) { reason_sz = size_object(reason); hsz += reason_sz; @@ -1032,6 +1039,11 @@ send_gen_exit_signal(Process *c_p, Eterm from_tag, ERTS_INTERNAL_ERROR("Invalid exit signal op"); break; } + } else if (dist_ext != NULL && dist_ext_hfrag == NULL) { + /* The message was not fragmented so we need to create space + for a single dist_ext element */ + dist_ext_sz = erts_dist_ext_size(dist_ext) / sizeof(Eterm); + hsz += dist_ext_sz; } /* @@ -1087,13 +1099,12 @@ send_gen_exit_signal(Process *c_p, Eterm from_tag, hfrag->used_size = hp - start_hp; - xsigd = (ErtsExitSignalData *) (char *) hp; + xsigd = (ErtsExitSignalData *) hp; xsigd->message = s_message; xsigd->from = s_from; xsigd->reason = s_reason; - xsigd->dist_ext = dist_ext; - xsigd->heap_frag = NULL; + hfrag->next = dist_ext_hfrag; if (is_nil(s_ref)) xsigd->u.normal_kills = normal_kills; @@ -1102,6 +1113,15 @@ send_gen_exit_signal(Process *c_p, Eterm from_tag, xsigd->u.ref = s_ref; } + hp += sizeof(ErtsExitSignalData)/sizeof(Eterm); + + if (dist_ext != NULL && dist_ext_hfrag == NULL && is_non_value(reason)) { + erts_make_dist_ext_copy(dist_ext, (ErtsDistExternal *) hp); + hp += dist_ext_sz; + } + + ASSERT(hp == mp->hfrag.mem + mp->hfrag.alloc_size); + if (seq_trace) do_seq_trace_output(to, s_token, s_message); @@ -1234,17 +1254,18 @@ erts_proc_sig_send_exit(Process *c_p, Eterm from, Eterm to, from_tag = dep->sysname; } send_gen_exit_signal(c_p, from_tag, from, to, ERTS_SIG_Q_OP_EXIT, - reason, NULL, NIL, token, normal_kills); + reason, NULL, NULL, NIL, token, normal_kills); } void erts_proc_sig_send_dist_exit(DistEntry *dep, Eterm from, Eterm to, ErtsDistExternal *dist_ext, + ErlHeapFragment *hfrag, Eterm reason, Eterm token) { send_gen_exit_signal(NULL, dep->sysname, from, to, ERTS_SIG_Q_OP_EXIT, - reason, dist_ext, NIL, token, 0); + reason, dist_ext, hfrag, NIL, token, 0); } @@ -1259,7 +1280,7 @@ erts_proc_sig_send_link_exit(Process *c_p, Eterm from, ErtsLink *lnk, if (is_not_immed(reason) || is_not_nil(token)) { ASSERT(is_internal_pid(from) || is_internal_port(from)); send_gen_exit_signal(c_p, from, from, to, ERTS_SIG_Q_OP_EXIT_LINKED, - reason, NULL, NIL, token, 0); + reason, NULL, NULL, NIL, token, 0); } else { /* Pass signal using old link structure... */ @@ -1315,10 +1336,11 @@ void erts_proc_sig_send_dist_link_exit(DistEntry *dep, Eterm from, Eterm to, ErtsDistExternal *dist_ext, + ErlHeapFragment *hfrag, Eterm reason, Eterm token) { send_gen_exit_signal(NULL, dep->sysname, from, to, ERTS_SIG_Q_OP_EXIT_LINKED, - reason, dist_ext, NIL, token, 0); + reason, dist_ext, hfrag, NIL, token, 0); } @@ -1342,6 +1364,7 @@ void erts_proc_sig_send_dist_monitor_down(DistEntry *dep, Eterm ref, Eterm from, Eterm to, ErtsDistExternal *dist_ext, + ErlHeapFragment *hfrag, Eterm reason) { Eterm monitored, heap[3]; @@ -1351,7 +1374,7 @@ erts_proc_sig_send_dist_monitor_down(DistEntry *dep, Eterm ref, monitored = from; send_gen_exit_signal(NULL, dep->sysname, monitored, to, ERTS_SIG_Q_OP_MONITOR_DOWN, - reason, dist_ext, ref, NIL, 0); + reason, dist_ext, hfrag, ref, NIL, 0); } void @@ -1422,7 +1445,7 @@ erts_proc_sig_send_monitor_down(ErtsMonitor *mon, Eterm reason) } send_gen_exit_signal(NULL, from_tag, monitored, to, ERTS_SIG_Q_OP_MONITOR_DOWN, - reason, NULL, mdp->ref, NIL, 0); + reason, NULL, NULL, mdp->ref, NIL, 0); } erts_monitor_release(mon); } @@ -2105,9 +2128,9 @@ handle_exit_signal(Process *c_p, ErtsSigRecvTracing *tracing, } /* This GEN_EXIT was received from another node, decode the exit reason */ - if (ERTS_SIG_IS_EXTERNAL(sig)) + if (ERTS_SIG_IS_GEN_EXIT_EXTERNAL(sig)) erts_proc_sig_decode_dist(c_p, ERTS_PROC_LOCK_MAIN, sig, 1); - + reason = xsigd->reason; if (is_non_value(reason)) { @@ -2115,17 +2138,14 @@ handle_exit_signal(Process *c_p, ErtsSigRecvTracing *tracing, ignore = !0; destroy = !0; } - + if (!ignore) { if ((op != ERTS_SIG_Q_OP_EXIT || reason != am_kill) && (c_p->flags & F_TRAP_EXIT)) { convert_prepared_sig_to_msg(c_p, sig, xsigd->message, next_nm_sig); - ASSERT(sig->hfrag.next == NULL); - sig->hfrag.next = xsigd->heap_frag; conv_msg = sig; - } else if (reason == am_normal && !xsigd->u.normal_kills) { /* Ignore it... */ @@ -2991,50 +3011,29 @@ erts_proc_sig_decode_dist(Process *proc, ErtsProcLocks proc_locks, ErtsMessage *msgp, int force_off_heap) { ErtsHeapFactory factory; + ErlHeapFragment *hfrag; Eterm msg; - ErlHeapFragment *bp; Sint need; - int decode_in_heap_frag; - ErtsDistExternal *dist_ext; + ErtsDistExternal *edep; ErtsExitSignalData *xsigd = NULL; - decode_in_heap_frag = (force_off_heap - || !(proc_locks & ERTS_PROC_LOCK_MAIN) - || (proc->flags & F_OFF_HEAP_MSGQ)); - - if (ERTS_SIG_IS_EXTERNAL_MSG(msgp)) - dist_ext = msgp->data.dist_ext; - else { + edep = erts_proc_sig_get_external(msgp); + if (!ERTS_SIG_IS_EXTERNAL_MSG(msgp)) xsigd = get_exit_signal_data(msgp); - ASSERT(ERTS_PROC_SIG_TYPE(((ErtsSignal *) msgp)->common.tag) == ERTS_SIG_Q_TYPE_GEN_EXIT); - ASSERT(is_non_value(xsigd->reason)); - dist_ext = xsigd->dist_ext; - } - if (dist_ext->heap_size >= 0) - need = dist_ext->heap_size; + if (edep->heap_size >= 0) + need = edep->heap_size; else { - need = erts_decode_dist_ext_size(dist_ext); + need = erts_decode_dist_ext_size(edep, 1); if (need < 0) { - /* bad msg; remove it... */ - if (is_not_immed(ERL_MESSAGE_TOKEN(msgp))) { - bp = erts_dist_ext_trailer(dist_ext); - erts_cleanup_offheap(&bp->off_heap); - } - erts_free_dist_ext_copy(dist_ext); - dist_ext = NULL; + /* bad signal; remove it... */ return 0; } - dist_ext->heap_size = need; + edep->heap_size = need; } - if (is_not_immed(ERL_MESSAGE_TOKEN(msgp))) { - bp = erts_dist_ext_trailer(dist_ext); - need += bp->used_size; - } - - if (xsigd) { + if (ERTS_SIG_IS_NON_MSG(msgp)) { switch (ERTS_PROC_SIG_OP(ERL_MESSAGE_TERM(msgp))) { case ERTS_SIG_Q_OP_EXIT: case ERTS_SIG_Q_OP_EXIT_LINKED: @@ -3051,26 +3050,22 @@ erts_proc_sig_decode_dist(Process *proc, ErtsProcLocks proc_locks, } } - if (decode_in_heap_frag) - erts_factory_heap_frag_init(&factory, new_message_buffer(need)); - else - erts_factory_proc_prealloc_init(&factory, proc, need); - - ASSERT(dist_ext->heap_size >= 0); - if (is_not_immed(ERL_MESSAGE_TOKEN(msgp))) { - ErlHeapFragment *heap_frag; - heap_frag = erts_dist_ext_trailer(dist_ext); - ERL_MESSAGE_TOKEN(msgp) = copy_struct(ERL_MESSAGE_TOKEN(msgp), - heap_frag->used_size, - &factory.hp, - factory.off_heap); - erts_cleanup_offheap(&heap_frag->off_heap); + hfrag = new_message_buffer(need); + erts_factory_heap_frag_init(&factory, hfrag); + + ASSERT(edep->heap_size >= 0); + + msg = erts_decode_dist_ext(&factory, edep, 1); + + if (is_non_value(msg)) { + erts_factory_undo(&factory); + return 0; } - msg = erts_decode_dist_ext(&factory, dist_ext); - if (!xsigd) { + if (ERTS_SIG_IS_MSG(msgp)) { ERL_MESSAGE_TERM(msgp) = msg; - msgp->data.attached = NULL; + if (msgp->data.heap_frag == &msgp->hfrag) + msgp->data.heap_frag = ERTS_MSG_COMBINED_HFRAG; } else { switch (ERTS_PROC_SIG_OP(ERL_MESSAGE_TERM(msgp))) { case ERTS_SIG_Q_OP_EXIT: @@ -3089,22 +3084,21 @@ erts_proc_sig_decode_dist(Process *proc, ErtsProcLocks proc_locks, } xsigd->reason = msg; } - erts_free_dist_ext_copy(dist_ext); - if (is_non_value(msg)) { - erts_factory_undo(&factory); - return 0; - } + erts_free_dist_ext_copy(edep); erts_factory_close(&factory); - ASSERT(!msgp->data.heap_frag || xsigd); + hfrag = factory.heap_frags; + while (hfrag->next) + hfrag = hfrag->next; - if (decode_in_heap_frag) { - if (!xsigd) - msgp->data.heap_frag = factory.heap_frags; - else - xsigd->heap_frag = factory.heap_frags; + if (ERTS_SIG_IS_MSG(msgp) && msgp->data.heap_frag != ERTS_MSG_COMBINED_HFRAG) { + hfrag->next = msgp->data.heap_frag; + msgp->data.heap_frag = factory.heap_frags; + } else { + hfrag->next = msgp->hfrag.next; + msgp->hfrag.next = factory.heap_frags; } return 1; @@ -3274,8 +3268,9 @@ erts_proc_sig_handle_incoming(Process *c_p, erts_aint32_t *statep, xsigd = get_exit_signal_data(sig); /* This GEN_EXIT was received from another node, decode the exit reason */ - if (ERTS_SIG_IS_EXTERNAL(sig)) - erts_proc_sig_decode_dist(c_p, ERTS_PROC_LOCK_MAIN, sig, 1); + if (ERTS_SIG_IS_GEN_EXIT_EXTERNAL(sig)) + if (!erts_proc_sig_decode_dist(c_p, ERTS_PROC_LOCK_MAIN, sig, 1)) + break; /* Decode failed, just remove signal */ omon = erts_monitor_tree_lookup(ERTS_P_MONITORS(c_p), xsigd->u.ref); @@ -4375,11 +4370,13 @@ handle_msg_tracing(Process *c_p, ErtsSigRecvTracing *tracing, return -1; /* Yield... */ } if (ERTS_SIG_IS_EXTERNAL_MSG(sig)) { - cnt++; + cnt += 50; /* Decode is expensive... */ if (!erts_proc_sig_decode_dist(c_p, ERTS_PROC_LOCK_MAIN, sig, 0)) { /* Bad dist message; remove it... */ remove_mq_m_sig(c_p, sig, next_sig, next_nm_sig); + sig->next = NULL; + erts_cleanup_messages(sig); sig = *next_sig; continue; } @@ -4451,18 +4448,12 @@ erts_proc_sig_prep_msgq_for_inspection(Process *c_p, if (ERTS_SIG_IS_EXTERNAL_MSG(mp)) { /* decode it... */ - if (mp->data.attached) - erts_proc_sig_decode_dist(rp, rp_locks, mp, !0); - - msg = ERL_MESSAGE_TERM(mp); - - if (is_non_value(msg)) { + if (!erts_proc_sig_decode_dist(rp, rp_locks, mp, !0)) { ErtsMessage *bad_mp = mp; /* * Bad distribution message; remove * it from the queue... */ - ASSERT(!mp->data.attached); ASSERT(*mpp == bad_mp); @@ -4474,6 +4465,8 @@ erts_proc_sig_prep_msgq_for_inspection(Process *c_p, erts_cleanup_messages(bad_mp); continue; } + + msg = ERL_MESSAGE_TERM(mp); } ASSERT(is_value(msg)); @@ -4632,12 +4625,21 @@ debug_foreach_sig_fake_oh(Eterm term, } +static void +debug_foreach_sig_external(ErtsMessage *msgp, + void (*ext_func)(ErtsDistExternal *, void *), + void *arg) +{ + ext_func(erts_proc_sig_get_external(msgp), arg); +} + void erts_proc_sig_debug_foreach_sig(Process *c_p, void (*msg_func)(ErtsMessage *, void *), void (*oh_func)(ErlOffHeap *, void *), ErtsMonitorFunc mon_func, ErtsLinkFunc lnk_func, + void (*ext_func)(ErtsDistExternal *, void *), void *arg) { ErtsMessage *queue[] = {c_p->sig_qs.first, c_p->sig_qs.cont, c_p->sig_inq.first}; @@ -4646,10 +4648,10 @@ erts_proc_sig_debug_foreach_sig(Process *c_p, for (qix = 0; qix < sizeof(queue)/sizeof(queue[0]); qix++) { ErtsMessage *sig; for (sig = queue[qix]; sig; sig = sig->next) { - - if (ERTS_SIG_IS_MSG(sig)) + + if (ERTS_SIG_IS_MSG(sig)) { msg_func(sig, arg); - else { + } else { Eterm tag; Uint16 type; int op; @@ -4668,7 +4670,10 @@ erts_proc_sig_debug_foreach_sig(Process *c_p, case ERTS_SIG_Q_OP_MONITOR_DOWN: switch (type) { case ERTS_SIG_Q_TYPE_GEN_EXIT: - debug_foreach_sig_heap_frags(&sig->hfrag, oh_func, arg); + if (ERTS_SIG_IS_GEN_EXIT_EXTERNAL(sig)) + debug_foreach_sig_external(sig, ext_func, arg); + else + debug_foreach_sig_heap_frags(&sig->hfrag, oh_func, arg); break; case ERTS_LNK_TYPE_PORT: case ERTS_LNK_TYPE_PROC: |