diff options
Diffstat (limited to 'erts/emulator/beam/erl_message.c')
-rw-r--r-- | erts/emulator/beam/erl_message.c | 327 |
1 files changed, 70 insertions, 257 deletions
diff --git a/erts/emulator/beam/erl_message.c b/erts/emulator/beam/erl_message.c index 6f7c71ef98..ed7a9d37c2 100644 --- a/erts/emulator/beam/erl_message.c +++ b/erts/emulator/beam/erl_message.c @@ -1,7 +1,7 @@ /* * %CopyrightBegin% * - * Copyright Ericsson AB 1997-2017. All Rights Reserved. + * Copyright Ericsson AB 1997-2018. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -33,6 +33,7 @@ #include "erl_binary.h" #include "dtrace-wrapper.h" #include "beam_bp.h" +#include "erl_proc_sig_queue.h" ERTS_SCHED_PREF_QUICK_ALLOC_IMPL(message_ref, ErtsMessageRef, @@ -207,7 +208,7 @@ erts_cleanup_messages(ErtsMessage *msgp) while (mp) { ErtsMessage *fmp; ErlHeapFragment *bp; - if (is_non_value(ERL_MESSAGE_TERM(mp))) { + if (ERTS_SIG_IS_EXTERNAL_MSG(mp)) { if (is_not_immed(ERL_MESSAGE_TOKEN(mp))) { bp = (ErlHeapFragment *) mp->data.dist_ext->ext_endp; erts_cleanup_offheap(&bp->off_heap); @@ -215,10 +216,13 @@ erts_cleanup_messages(ErtsMessage *msgp) if (mp->data.dist_ext) erts_free_dist_ext_copy(mp->data.dist_ext); } - else { - if (mp->data.attached != ERTS_MSG_COMBINED_HFRAG) + else { + if (ERTS_SIG_IS_INTERNAL_MSG(mp) + && mp->data.attached != ERTS_MSG_COMBINED_HFRAG) { bp = mp->data.heap_frag; + } else { + mp->data.attached = ERTS_MSG_COMBINED_HFRAG; bp = mp->hfrag.next; erts_cleanup_offheap(&mp->hfrag.off_heap); } @@ -272,6 +276,7 @@ erts_queue_dist_message(Process *rcvr, mp = erts_alloc_message(0, NULL); mp->data.dist_ext = dist_ext; + ERL_MESSAGE_FROM(mp) = dist_ext->dep->sysname; ERL_MESSAGE_TERM(mp) = THE_NON_VALUE; #ifdef USE_VM_PROBES ERL_MESSAGE_DT_UTAG(mp) = NIL; @@ -294,46 +299,15 @@ erts_queue_dist_message(Process *rcvr, } } + state = erts_atomic32_read_acqb(&rcvr->state); - if (state & (ERTS_PSFLG_PENDING_EXIT|ERTS_PSFLG_EXITING)) { + if (state & ERTS_PSFLG_EXITING) { if (!(rcvr_locks & ERTS_PROC_LOCK_MSGQ)) erts_proc_unlock(rcvr, ERTS_PROC_LOCK_MSGQ); /* Drop message if receiver is exiting or has a pending exit ... */ erts_cleanup_messages(mp); } - else - if (IS_TRACED_FL(rcvr, F_TRACE_RECEIVE)) { - if (from == am_Empty) - from = dist_ext->dep->sysname; - - /* Ahh... need to decode it in order to trace it... */ - if (!(rcvr_locks & ERTS_PROC_LOCK_MSGQ)) - erts_proc_unlock(rcvr, ERTS_PROC_LOCK_MSGQ); - if (!erts_decode_dist_message(rcvr, rcvr_locks, mp, 0)) - erts_free_message(mp); - else { - Eterm msg = ERL_MESSAGE_TERM(mp); - token = ERL_MESSAGE_TOKEN(mp); -#ifdef USE_VM_PROBES - if (DTRACE_ENABLED(message_queued)) { - DTRACE_CHARBUF(receiver_name, DTRACE_TERM_BUF_SIZE); - - dtrace_proc_str(rcvr, receiver_name); - if (have_seqtrace(token)) { - tok_label = signed_val(SEQ_TRACE_T_LABEL(token)); - tok_lastcnt = signed_val(SEQ_TRACE_T_LASTCNT(token)); - tok_serial = signed_val(SEQ_TRACE_T_SERIAL(token)); - } - DTRACE6(message_queued, - receiver_name, size_object(msg), rcvr->msg.len, - tok_label, tok_lastcnt, tok_serial); - } -#endif - erts_queue_message(rcvr, rcvr_locks, mp, msg, from); - } - } else { - /* Enqueue message on external format */ #ifdef USE_VM_PROBES if (DTRACE_ENABLED(message_queued)) { @@ -359,9 +333,7 @@ erts_queue_dist_message(Process *rcvr, if (!(rcvr_locks & ERTS_PROC_LOCK_MSGQ)) erts_proc_unlock(rcvr, ERTS_PROC_LOCK_MSGQ); - erts_proc_notify_new_message(rcvr, - rcvr_locks - ); + erts_proc_notify_new_message(rcvr, rcvr_locks); } } @@ -372,15 +344,14 @@ queue_messages(Process* receiver, ErtsProcLocks receiver_locks, ErtsMessage* first, ErtsMessage** last, - Uint len, - Eterm from) + Uint len) { - ErtsTracingEvent* te; Sint res; int locked_msgq = 0; erts_aint32_t state; ASSERT(is_value(ERL_MESSAGE_TERM(first))); + ASSERT(is_value(ERL_MESSAGE_FROM(first))); ASSERT(ERL_MESSAGE_TOKEN(first) == am_undefined || ERL_MESSAGE_TOKEN(first) == NIL || is_tuple(ERL_MESSAGE_TOKEN(first))); @@ -398,7 +369,7 @@ queue_messages(Process* receiver, state = *receiver_state; else state = erts_atomic32_read_nob(&receiver->state); - if (state & (ERTS_PSFLG_EXITING|ERTS_PSFLG_PENDING_EXIT)) + if (state & ERTS_PSFLG_EXITING) goto exiting; need_locks = receiver_locks & ERTS_PROC_LOCKS_HIGHER_THAN(ERTS_PROC_LOCK_MSGQ); @@ -414,7 +385,7 @@ queue_messages(Process* receiver, state = erts_atomic32_read_nob(&receiver->state); - if (state & (ERTS_PSFLG_PENDING_EXIT|ERTS_PSFLG_EXITING)) { + if (state & ERTS_PSFLG_EXITING) { exiting: /* Drop message if receiver is exiting or has a pending exit... */ if (locked_msgq) @@ -423,7 +394,7 @@ queue_messages(Process* receiver, return 0; } - res = receiver->msg.len; + res = receiver->sig_qs.len; if (receiver_locks & ERTS_PROC_LOCK_MAIN) { /* * We move 'in queue' to 'private queue' and place @@ -433,8 +404,8 @@ queue_messages(Process* receiver, * we don't need to include the 'in queue' in * the root set when garbage collecting. */ - res += receiver->msg_inq.len; - ERTS_MSGQ_MV_INQ2PRIVQ(receiver); + res += receiver->sig_inq.len; + erts_proc_sig_fetch(receiver); LINK_MESSAGE_PRIVQ(receiver, first, last, len); } else @@ -442,38 +413,6 @@ queue_messages(Process* receiver, LINK_MESSAGE(receiver, first, last, len); } - if (IS_TRACED_FL(receiver, F_TRACE_RECEIVE) - && (te = &erts_receive_tracing[erts_active_bp_ix()], - te->on)) { - - ErtsMessage *msg = first; - -#ifdef USE_VM_PROBES - if (DTRACE_ENABLED(message_queued)) { - DTRACE_CHARBUF(receiver_name, DTRACE_TERM_BUF_SIZE); - Sint tok_label = 0; - Sint tok_lastcnt = 0; - Sint tok_serial = 0; - Eterm seq_trace_token = ERL_MESSAGE_TOKEN(msg); - - dtrace_proc_str(receiver, receiver_name); - if (seq_trace_token != NIL && is_tuple(seq_trace_token)) { - tok_label = signed_val(SEQ_TRACE_T_LABEL(seq_trace_token)); - tok_lastcnt = signed_val(SEQ_TRACE_T_LASTCNT(seq_trace_token)); - tok_serial = signed_val(SEQ_TRACE_T_SERIAL(seq_trace_token)); - } - DTRACE6(message_queued, - receiver_name, size_object(ERL_MESSAGE_TERM(msg)), - receiver->msg.len, - tok_label, tok_lastcnt, tok_serial); - } -#endif - while (msg) { - trace_receive(receiver, from, ERL_MESSAGE_TERM(msg), te); - msg = msg->next; - } - - } if (locked_msgq) { erts_proc_unlock(receiver, ERTS_PROC_LOCK_MSGQ); } @@ -489,8 +428,9 @@ queue_message(Process* receiver, ErtsMessage* mp, Eterm msg, Eterm from) { ERL_MESSAGE_TERM(mp) = msg; + ERL_MESSAGE_FROM(mp) = from; return queue_messages(receiver, receiver_state, receiver_locks, - mp, &mp->next, 1, from); + mp, &mp->next, 1); } Sint @@ -503,11 +443,10 @@ erts_queue_message(Process* receiver, ErtsProcLocks receiver_locks, Sint erts_queue_messages(Process* receiver, ErtsProcLocks receiver_locks, - ErtsMessage* first, ErtsMessage** last, Uint len, - Eterm from) + ErtsMessage* first, ErtsMessage** last, Uint len) { return queue_messages(receiver, NULL, receiver_locks, - first, last, len, from); + first, last, len); } void @@ -922,70 +861,77 @@ Sint erts_move_messages_off_heap(Process *c_p) { int reds = 1; + int i; + ErtsMessage *msgq[] = {c_p->sig_qs.first, c_p->sig_qs.cont}; /* * Move all messages off heap. This *only* occurs when the * process had off heap message disabled and just enabled * it... */ - ErtsMessage *mp; - reds += c_p->msg.len / 10; + reds += c_p->sig_qs.len / 10; ASSERT(erts_atomic32_read_nob(&c_p->state) & ERTS_PSFLG_OFF_HEAP_MSGQ); ASSERT(c_p->flags & F_OFF_HEAP_MSGQ_CHNG); - for (mp = c_p->msg.first; mp; mp = mp->next) { - Uint msg_sz, token_sz; + for (i = 0; i < sizeof(msgq)/sizeof(msgq[0]); i++) { + ErtsMessage *mp; + for (mp = msgq[i]; mp; mp = mp->next) { + Uint msg_sz, token_sz; #ifdef USE_VM_PROBES - Uint utag_sz; + Uint utag_sz; #endif - Eterm *hp; - ErlHeapFragment *hfrag; + Eterm *hp; + ErlHeapFragment *hfrag; + + if (!ERTS_SIG_IS_INTERNAL_MSG(mp)) + continue; - if (mp->data.attached) - continue; + if (mp->data.attached) + continue; - if (is_immed(ERL_MESSAGE_TERM(mp)) + if (is_immed(ERL_MESSAGE_TERM(mp)) #ifdef USE_VM_PROBES - && is_immed(ERL_MESSAGE_DT_UTAG(mp)) + && is_immed(ERL_MESSAGE_DT_UTAG(mp)) #endif - && is_not_immed(ERL_MESSAGE_TOKEN(mp))) - continue; + && is_not_immed(ERL_MESSAGE_TOKEN(mp))) + continue; - /* - * The message refers into the heap. Copy the message - * from the heap into a heap fragment and attach - * it to the message... - */ - msg_sz = size_object(ERL_MESSAGE_TERM(mp)); + /* + * The message refers into the heap. Copy the message + * from the heap into a heap fragment and attach + * it to the message... + */ + msg_sz = size_object(ERL_MESSAGE_TERM(mp)); #ifdef USE_VM_PROBES - utag_sz = size_object(ERL_MESSAGE_DT_UTAG(mp)); + utag_sz = size_object(ERL_MESSAGE_DT_UTAG(mp)); #endif - token_sz = size_object(ERL_MESSAGE_TOKEN(mp)); + token_sz = size_object(ERL_MESSAGE_TOKEN(mp)); - hfrag = new_message_buffer(msg_sz + hfrag = new_message_buffer(msg_sz #ifdef USE_VM_PROBES - + utag_sz + + utag_sz #endif - + token_sz); - hp = hfrag->mem; - if (is_not_immed(ERL_MESSAGE_TERM(mp))) - ERL_MESSAGE_TERM(mp) = copy_struct(ERL_MESSAGE_TERM(mp), - msg_sz, &hp, - &hfrag->off_heap); - if (is_not_immed(ERL_MESSAGE_TOKEN(mp))) - ERL_MESSAGE_TOKEN(mp) = copy_struct(ERL_MESSAGE_TOKEN(mp), - token_sz, &hp, - &hfrag->off_heap); + + token_sz); + hp = hfrag->mem; + if (is_not_immed(ERL_MESSAGE_TERM(mp))) + ERL_MESSAGE_TERM(mp) = copy_struct(ERL_MESSAGE_TERM(mp), + msg_sz, &hp, + &hfrag->off_heap); + if (is_not_immed(ERL_MESSAGE_TOKEN(mp))) + ERL_MESSAGE_TOKEN(mp) = copy_struct(ERL_MESSAGE_TOKEN(mp), + token_sz, &hp, + &hfrag->off_heap); #ifdef USE_VM_PROBES - if (is_not_immed(ERL_MESSAGE_DT_UTAG(mp))) - ERL_MESSAGE_DT_UTAG(mp) = copy_struct(ERL_MESSAGE_DT_UTAG(mp), - utag_sz, &hp, - &hfrag->off_heap); + if (is_not_immed(ERL_MESSAGE_DT_UTAG(mp))) + ERL_MESSAGE_DT_UTAG(mp) = copy_struct(ERL_MESSAGE_DT_UTAG(mp), + utag_sz, &hp, + &hfrag->off_heap); #endif - mp->data.heap_frag = hfrag; - reds += 1; + mp->data.heap_frag = hfrag; + reds += 1; + } } return reds; @@ -1015,7 +961,7 @@ erts_complete_off_heap_message_queue_change(Process *c_p) else { reds += 2; erts_proc_lock(c_p, ERTS_PROC_LOCK_MSGQ); - ERTS_MSGQ_MV_INQ2PRIVQ(c_p); + erts_proc_sig_fetch(c_p); erts_proc_unlock(c_p, ERTS_PROC_LOCK_MSGQ); reds += erts_move_messages_off_heap(c_p); } @@ -1225,139 +1171,6 @@ erts_decode_dist_message(Process *proc, ErtsProcLocks proc_locks, return 1; } -/* - * ERTS_INSPECT_MSGQ_KEEP_OH_MSGS == 0 will move off heap messages - * into the heap of the inspected process if off_heap_message_queue - * is false when process_info(_, messages) is called. That is, the - * following GC will have more data in the rootset compared to the - * scenario when process_info(_, messages) had not been called. - * - * ERTS_INSPECT_MSGQ_KEEP_OH_MSGS != 0 will keep off heap messages - * off heap when process_info(_, messages) is called regardless of - * the off_heap_message_queue setting of the process. That is, it - * will change the following execution of the process as little as - * possible. - */ -#define ERTS_INSPECT_MSGQ_KEEP_OH_MSGS 1 - -Uint -erts_prep_msgq_for_inspection(Process *c_p, Process *rp, - ErtsProcLocks rp_locks, ErtsMessageInfo *mip) -{ - Uint tot_heap_size; - ErtsMessage* mp; - Sint i; - int self_on_heap; - - /* - * Prepare the message queue for inspection - * by process_info(). - * - * - * - Decode all messages on external format - * - Remove all corrupt dist messages from queue - * - Save pointer to, and heap size need of each - * message in the mip array. - * - Return total heap size need for all messages - * that needs to be copied. - * - * If ERTS_INSPECT_MSGQ_KEEP_OH_MSGS == 0: - * - In case off heap messages is disabled and - * we are inspecting our own queue, move all - * off heap data into the heap. - */ - - self_on_heap = c_p == rp && !(c_p->flags & F_OFF_HEAP_MSGQ); - - tot_heap_size = 0; - i = 0; - mp = rp->msg.first; - while (mp) { - Eterm msg = ERL_MESSAGE_TERM(mp); - - mip[i].size = 0; - - if (is_non_value(msg)) { - /* Dist message on external format; decode it... */ - if (mp->data.attached) - erts_decode_dist_message(rp, rp_locks, mp, - ERTS_INSPECT_MSGQ_KEEP_OH_MSGS); - - msg = ERL_MESSAGE_TERM(mp); - - if (is_non_value(msg)) { - ErtsMessage **mpp; - ErtsMessage *bad_mp = mp; - /* - * Bad distribution message; remove - * it from the queue... - */ - ASSERT(!mp->data.attached); - - mpp = i == 0 ? &rp->msg.first : &mip[i-1].msgp->next; - - ASSERT(*mpp == bad_mp); - - erts_msgq_update_internal_pointers(&rp->msg, mpp, &bad_mp->next); - - mp = mp->next; - *mpp = mp; - rp->msg.len--; - bad_mp->next = NULL; - erts_cleanup_messages(bad_mp); - continue; - } - } - - ASSERT(is_value(msg)); - -#if ERTS_INSPECT_MSGQ_KEEP_OH_MSGS - if (is_not_immed(msg) && (!self_on_heap || mp->data.attached)) { - Uint sz = size_object(msg); - mip[i].size = sz; - tot_heap_size += sz; - } -#else - if (self_on_heap) { - if (mp->data.attached) { - ErtsMessage *tmp = NULL; - if (mp->data.attached != ERTS_MSG_COMBINED_HFRAG) { - erts_link_mbuf_to_proc(rp, mp->data.heap_frag); - mp->data.attached = NULL; - } - else { - /* - * Need to replace the message reference since - * we will get references to the message data - * from the heap... - */ - ErtsMessage **mpp; - tmp = erts_alloc_message(0, NULL); - sys_memcpy((void *) tmp->m, (void *) mp->m, - sizeof(Eterm)*ERL_MESSAGE_REF_ARRAY_SZ); - mpp = i == 0 ? &rp->msg.first : &mip[i-1].msgp->next; - erts_msgq_replace_msg_ref(&rp->msg, tmp, mpp); - erts_save_message_in_proc(rp, mp); - mp = tmp; - } - } - } - else if (is_not_immed(msg)) { - Uint sz = size_object(msg); - mip[i].size = sz; - tot_heap_size += sz; - } - -#endif - - mip[i].msgp = mp; - i++; - mp = mp->next; - } - - return tot_heap_size; -} - void erts_factory_proc_init(ErtsHeapFactory* factory, Process* p) { |