aboutsummaryrefslogtreecommitdiffstats
path: root/erts/emulator/beam/erl_message.c
diff options
context:
space:
mode:
Diffstat (limited to 'erts/emulator/beam/erl_message.c')
-rw-r--r--erts/emulator/beam/erl_message.c669
1 files changed, 231 insertions, 438 deletions
diff --git a/erts/emulator/beam/erl_message.c b/erts/emulator/beam/erl_message.c
index b30c4a49d7..a3274d7443 100644
--- a/erts/emulator/beam/erl_message.c
+++ b/erts/emulator/beam/erl_message.c
@@ -1,7 +1,7 @@
/*
* %CopyrightBegin%
*
- * Copyright Ericsson AB 1997-2017. All Rights Reserved.
+ * Copyright Ericsson AB 1997-2018. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -33,6 +33,7 @@
#include "erl_binary.h"
#include "dtrace-wrapper.h"
#include "beam_bp.h"
+#include "erl_proc_sig_queue.h"
ERTS_SCHED_PREF_QUICK_ALLOC_IMPL(message_ref,
ErtsMessageRef,
@@ -170,7 +171,7 @@ erts_cleanup_offheap(ErlOffHeap *offheap)
erts_bin_release(u.pb->val);
break;
case FUN_SUBTAG:
- if (erts_smp_refc_dectest(&u.fun->fe->refc, 0) == 0) {
+ if (erts_refc_dectest(&u.fun->fe->refc, 0) == 0) {
erts_erase_fun_entry(u.fun->fe);
}
break;
@@ -207,7 +208,7 @@ erts_cleanup_messages(ErtsMessage *msgp)
while (mp) {
ErtsMessage *fmp;
ErlHeapFragment *bp;
- if (is_non_value(ERL_MESSAGE_TERM(mp))) {
+ if (ERTS_SIG_IS_EXTERNAL_MSG(mp)) {
if (is_not_immed(ERL_MESSAGE_TOKEN(mp))) {
bp = (ErlHeapFragment *) mp->data.dist_ext->ext_endp;
erts_cleanup_offheap(&bp->off_heap);
@@ -215,10 +216,13 @@ erts_cleanup_messages(ErtsMessage *msgp)
if (mp->data.dist_ext)
erts_free_dist_ext_copy(mp->data.dist_ext);
}
- else {
- if (mp->data.attached != ERTS_MSG_COMBINED_HFRAG)
+ else {
+ if (ERTS_SIG_IS_INTERNAL_MSG(mp)
+ && mp->data.attached != ERTS_MSG_COMBINED_HFRAG) {
bp = mp->data.heap_frag;
+ }
else {
+ mp->data.attached = ERTS_MSG_COMBINED_HFRAG;
bp = mp->hfrag.next;
erts_cleanup_offheap(&mp->hfrag.off_heap);
}
@@ -260,20 +264,14 @@ erts_queue_dist_message(Process *rcvr,
Eterm from)
{
ErtsMessage* mp;
-#ifdef USE_VM_PROBES
- Sint tok_label = 0;
- Sint tok_lastcnt = 0;
- Sint tok_serial = 0;
-#endif
-#ifdef ERTS_SMP
erts_aint_t state;
-#endif
- ERTS_SMP_LC_ASSERT(rcvr_locks == erts_proc_lc_my_proc_locks(rcvr));
+ ERTS_LC_ASSERT(rcvr_locks == erts_proc_lc_my_proc_locks(rcvr));
mp = erts_alloc_message(0, NULL);
mp->data.dist_ext = dist_ext;
+ ERL_MESSAGE_FROM(mp) = dist_ext->dep->sysname;
ERL_MESSAGE_TERM(mp) = THE_NON_VALUE;
#ifdef USE_VM_PROBES
ERL_MESSAGE_DT_UTAG(mp) = NIL;
@@ -283,249 +281,182 @@ erts_queue_dist_message(Process *rcvr,
#endif
ERL_MESSAGE_TOKEN(mp) = token;
-#ifdef ERTS_SMP
if (!(rcvr_locks & ERTS_PROC_LOCK_MSGQ)) {
- if (erts_smp_proc_trylock(rcvr, ERTS_PROC_LOCK_MSGQ) == EBUSY) {
+ if (erts_proc_trylock(rcvr, ERTS_PROC_LOCK_MSGQ) == EBUSY) {
ErtsProcLocks need_locks = ERTS_PROC_LOCK_MSGQ;
ErtsProcLocks unlocks =
rcvr_locks & ERTS_PROC_LOCKS_HIGHER_THAN(ERTS_PROC_LOCK_MSGQ);
if (unlocks) {
- erts_smp_proc_unlock(rcvr, unlocks);
+ erts_proc_unlock(rcvr, unlocks);
need_locks |= unlocks;
}
- erts_smp_proc_lock(rcvr, need_locks);
+ erts_proc_lock(rcvr, need_locks);
}
}
- state = erts_smp_atomic32_read_acqb(&rcvr->state);
- if (state & (ERTS_PSFLG_PENDING_EXIT|ERTS_PSFLG_EXITING)) {
+
+ state = erts_atomic32_read_acqb(&rcvr->state);
+ if (state & ERTS_PSFLG_EXITING) {
if (!(rcvr_locks & ERTS_PROC_LOCK_MSGQ))
- erts_smp_proc_unlock(rcvr, ERTS_PROC_LOCK_MSGQ);
+ erts_proc_unlock(rcvr, ERTS_PROC_LOCK_MSGQ);
/* Drop message if receiver is exiting or has a pending exit ... */
erts_cleanup_messages(mp);
}
- else
-#endif
- if (IS_TRACED_FL(rcvr, F_TRACE_RECEIVE)) {
- if (from == am_Empty)
- from = dist_ext->dep->sysname;
-
- /* Ahh... need to decode it in order to trace it... */
- if (!(rcvr_locks & ERTS_PROC_LOCK_MSGQ))
- erts_smp_proc_unlock(rcvr, ERTS_PROC_LOCK_MSGQ);
- if (!erts_decode_dist_message(rcvr, rcvr_locks, mp, 0))
- erts_free_message(mp);
- else {
- Eterm msg = ERL_MESSAGE_TERM(mp);
- token = ERL_MESSAGE_TOKEN(mp);
-#ifdef USE_VM_PROBES
- if (DTRACE_ENABLED(message_queued)) {
- DTRACE_CHARBUF(receiver_name, DTRACE_TERM_BUF_SIZE);
-
- dtrace_proc_str(rcvr, receiver_name);
- if (have_seqtrace(token)) {
- tok_label = signed_val(SEQ_TRACE_T_LABEL(token));
- tok_lastcnt = signed_val(SEQ_TRACE_T_LASTCNT(token));
- tok_serial = signed_val(SEQ_TRACE_T_SERIAL(token));
- }
- DTRACE6(message_queued,
- receiver_name, size_object(msg), rcvr->msg.len,
- tok_label, tok_lastcnt, tok_serial);
- }
-#endif
- erts_queue_message(rcvr, rcvr_locks, mp, msg, from);
- }
- }
else {
- /* Enqueue message on external format */
+ LINK_MESSAGE(rcvr, mp);
-#ifdef USE_VM_PROBES
- if (DTRACE_ENABLED(message_queued)) {
- DTRACE_CHARBUF(receiver_name, DTRACE_TERM_BUF_SIZE);
-
- dtrace_proc_str(rcvr, receiver_name);
- if (have_seqtrace(token)) {
- tok_label = signed_val(SEQ_TRACE_T_LABEL(token));
- tok_lastcnt = signed_val(SEQ_TRACE_T_LASTCNT(token));
- tok_serial = signed_val(SEQ_TRACE_T_SERIAL(token));
- }
- /*
- * TODO: We don't know the real size of the external message here.
- * -1 will appear to a D script as 4294967295.
- */
- DTRACE6(message_queued, receiver_name, -1, rcvr->msg.len + 1,
- tok_label, tok_lastcnt, tok_serial);
- }
-#endif
-
- LINK_MESSAGE(rcvr, mp, &mp->next, 1);
+ if (rcvr_locks & ERTS_PROC_LOCK_MAIN)
+ erts_proc_sig_fetch(rcvr);
if (!(rcvr_locks & ERTS_PROC_LOCK_MSGQ))
- erts_smp_proc_unlock(rcvr, ERTS_PROC_LOCK_MSGQ);
+ erts_proc_unlock(rcvr, ERTS_PROC_LOCK_MSGQ);
- erts_proc_notify_new_message(rcvr,
-#ifdef ERTS_SMP
- rcvr_locks
-#else
- 0
-#endif
- );
+ erts_proc_notify_new_message(rcvr, rcvr_locks);
}
}
/* Add messages last in message queue */
-static Sint
+static void
queue_messages(Process* receiver,
- erts_aint32_t *receiver_state,
ErtsProcLocks receiver_locks,
ErtsMessage* first,
ErtsMessage** last,
- Uint len,
- Eterm from)
+ Uint len)
{
- ErtsTracingEvent* te;
- Sint res;
int locked_msgq = 0;
erts_aint32_t state;
- ASSERT(is_value(ERL_MESSAGE_TERM(first)));
- ASSERT(ERL_MESSAGE_TOKEN(first) == am_undefined ||
- ERL_MESSAGE_TOKEN(first) == NIL ||
- is_tuple(ERL_MESSAGE_TOKEN(first)));
-
-#ifdef ERTS_SMP
-#ifdef ERTS_ENABLE_LOCK_CHECK
- ERTS_SMP_LC_ASSERT(erts_proc_lc_my_proc_locks(receiver) < ERTS_PROC_LOCK_MSGQ ||
- receiver_locks == erts_proc_lc_my_proc_locks(receiver));
+#ifdef DEBUG
+ {
+ ErtsMessage* fmsg = ERTS_SIG_IS_MSG(first) ? first : first->next;
+ ASSERT(fmsg);
+ ASSERT(is_value(ERL_MESSAGE_TERM(fmsg)));
+ ASSERT(is_value(ERL_MESSAGE_FROM(fmsg)));
+ ASSERT(ERL_MESSAGE_TOKEN(fmsg) == am_undefined ||
+ ERL_MESSAGE_TOKEN(fmsg) == NIL ||
+ is_tuple(ERL_MESSAGE_TOKEN(fmsg)));
+ }
#endif
- if (!(receiver_locks & ERTS_PROC_LOCK_MSGQ)) {
- if (erts_smp_proc_trylock(receiver, ERTS_PROC_LOCK_MSGQ) == EBUSY) {
- ErtsProcLocks need_locks;
-
- if (receiver_state)
- state = *receiver_state;
- else
- state = erts_smp_atomic32_read_nob(&receiver->state);
- if (state & (ERTS_PSFLG_EXITING|ERTS_PSFLG_PENDING_EXIT))
- goto exiting;
+ ERTS_LC_ASSERT((erts_proc_lc_my_proc_locks(receiver) & ERTS_PROC_LOCK_MSGQ)
+ == (receiver_locks & ERTS_PROC_LOCK_MSGQ));
- need_locks = receiver_locks & ERTS_PROC_LOCKS_HIGHER_THAN(ERTS_PROC_LOCK_MSGQ);
- if (need_locks) {
- erts_smp_proc_unlock(receiver, need_locks);
- }
- need_locks |= ERTS_PROC_LOCK_MSGQ;
- erts_smp_proc_lock(receiver, need_locks);
- }
+ if (!(receiver_locks & ERTS_PROC_LOCK_MSGQ)) {
+ erts_proc_lock(receiver, ERTS_PROC_LOCK_MSGQ);
locked_msgq = 1;
}
-#endif
-
- state = erts_smp_atomic32_read_nob(&receiver->state);
+ state = erts_atomic32_read_nob(&receiver->state);
- if (state & (ERTS_PSFLG_PENDING_EXIT|ERTS_PSFLG_EXITING)) {
-#ifdef ERTS_SMP
- exiting:
-#endif
+ if (state & ERTS_PSFLG_EXITING) {
/* Drop message if receiver is exiting or has a pending exit... */
if (locked_msgq)
- erts_smp_proc_unlock(receiver, ERTS_PROC_LOCK_MSGQ);
+ erts_proc_unlock(receiver, ERTS_PROC_LOCK_MSGQ);
+ if (ERTS_SIG_IS_NON_MSG(first)) {
+ ErtsSchedulerData* esdp = erts_get_scheduler_data();
+ ASSERT(esdp);
+ ASSERT(!esdp->pending_signal.sig);
+ esdp->pending_signal.sig = (ErtsSignal*) first;
+ esdp->pending_signal.to = receiver->common.id;
+ first = first->next;
+ }
erts_cleanup_messages(first);
- return 0;
+ return;
}
- res = receiver->msg.len;
-#ifdef ERTS_SMP
- if (receiver_locks & ERTS_PROC_LOCK_MAIN) {
- /*
- * We move 'in queue' to 'private queue' and place
- * message at the end of 'private queue' in order
- * to ensure that the 'in queue' doesn't contain
- * references into the heap. By ensuring this,
- * we don't need to include the 'in queue' in
- * the root set when garbage collecting.
- */
- res += receiver->msg_inq.len;
- ERTS_SMP_MSGQ_MV_INQ2PRIVQ(receiver);
- LINK_MESSAGE_PRIVQ(receiver, first, last, len);
+ if (last == &first->next) {
+ ASSERT(len == 1);
+ LINK_MESSAGE(receiver, first);
}
- else
-#endif
- {
- LINK_MESSAGE(receiver, first, last, len);
+ else {
+ erts_enqueue_signals(receiver, first, last, NULL, len, state);
}
- if (IS_TRACED_FL(receiver, F_TRACE_RECEIVE)
- && (te = &erts_receive_tracing[erts_active_bp_ix()],
- te->on)) {
-
- ErtsMessage *msg = first;
-
-#ifdef USE_VM_PROBES
- if (DTRACE_ENABLED(message_queued)) {
- DTRACE_CHARBUF(receiver_name, DTRACE_TERM_BUF_SIZE);
- Sint tok_label = 0;
- Sint tok_lastcnt = 0;
- Sint tok_serial = 0;
- Eterm seq_trace_token = ERL_MESSAGE_TOKEN(msg);
-
- dtrace_proc_str(receiver, receiver_name);
- if (seq_trace_token != NIL && is_tuple(seq_trace_token)) {
- tok_label = signed_val(SEQ_TRACE_T_LABEL(seq_trace_token));
- tok_lastcnt = signed_val(SEQ_TRACE_T_LASTCNT(seq_trace_token));
- tok_serial = signed_val(SEQ_TRACE_T_SERIAL(seq_trace_token));
- }
- DTRACE6(message_queued,
- receiver_name, size_object(ERL_MESSAGE_TERM(msg)),
- receiver->msg.len,
- tok_label, tok_lastcnt, tok_serial);
- }
-#endif
- while (msg) {
- trace_receive(receiver, from, ERL_MESSAGE_TERM(msg), te);
- msg = msg->next;
- }
+ if (receiver_locks & ERTS_PROC_LOCK_MAIN)
+ erts_proc_sig_fetch(receiver);
- }
if (locked_msgq) {
- erts_smp_proc_unlock(receiver, ERTS_PROC_LOCK_MSGQ);
+ erts_proc_unlock(receiver, ERTS_PROC_LOCK_MSGQ);
}
-#ifdef ERTS_SMP
- erts_proc_notify_new_message(receiver, receiver_locks);
-#else
- erts_proc_notify_new_message(receiver, 0);
-#endif
- return res;
+ if (last == &first->next)
+ erts_proc_notify_new_message(receiver, receiver_locks);
+ else
+ erts_proc_notify_new_sig(receiver, state, ERTS_PSFLG_ACTIVE);
}
-static Sint
-queue_message(Process* receiver,
- erts_aint32_t *receiver_state,
- ErtsProcLocks receiver_locks,
- ErtsMessage* mp, Eterm msg, Eterm from)
+static ERTS_INLINE
+ErtsMessage* prepend_pending_sig_maybe(Process* sender, Process* receiver,
+ ErtsMessage* mp)
{
- ERL_MESSAGE_TERM(mp) = msg;
- return queue_messages(receiver, receiver_state, receiver_locks,
- mp, &mp->next, 1, from);
+ ErtsSchedulerData* esdp = sender->scheduler_data;
+ ErtsSignal* pend_sig;
+
+ if (!esdp || esdp->pending_signal.to != receiver->common.id)
+ return mp;
+
+ pend_sig = esdp->pending_signal.sig;
+
+ ASSERT(esdp->pending_signal.dbg_from == sender);
+ esdp->pending_signal.sig = NULL;
+ esdp->pending_signal.to = THE_NON_VALUE;
+ pend_sig->common.next = mp;
+ pend_sig->common.specific.next = NULL;
+ return (ErtsMessage*) pend_sig;
}
-Sint
+/**
+ *
+ * @brief Send one message from *NOT* a local process.
+ *
+ * seq_trace does not work with this type of messages
+ * to it is set to am_undefined which means that the
+ * receiving process will not remove the seq_trace token
+ * when it gets this message.
+ *
+ */
+void
erts_queue_message(Process* receiver, ErtsProcLocks receiver_locks,
ErtsMessage* mp, Eterm msg, Eterm from)
{
- return queue_message(receiver, NULL, receiver_locks, mp, msg, from);
+ ASSERT(is_not_internal_pid(from));
+ ERL_MESSAGE_TERM(mp) = msg;
+ ERL_MESSAGE_FROM(mp) = from;
+ ERL_MESSAGE_TOKEN(mp) = am_undefined;
+ queue_messages(receiver, receiver_locks, mp, &mp->next, 1);
}
+/**
+ * @brief Send one message from a local process.
+ *
+ * It is up to the caller of this function to set the
+ * correct seq_trace. The general rule of thumb is that
+ * it should be set to am_undefined if the message
+ * cannot be traced using seq_trace, if it can be
+ * traced it should be set to the trace token. It should
+ * very rarely be explicitly set to NIL!
+ */
+void
+erts_queue_proc_message(Process* sender,
+ Process* receiver, ErtsProcLocks receiver_locks,
+ ErtsMessage* mp, Eterm msg)
+{
+ ERL_MESSAGE_TERM(mp) = msg;
+ ERL_MESSAGE_FROM(mp) = sender->common.id;
+ queue_messages(receiver, receiver_locks,
+ prepend_pending_sig_maybe(sender, receiver, mp),
+ &mp->next, 1);
+}
-Sint
-erts_queue_messages(Process* receiver, ErtsProcLocks receiver_locks,
- ErtsMessage* first, ErtsMessage** last, Uint len,
- Eterm from)
+
+void
+erts_queue_proc_messages(Process* sender,
+ Process* receiver, ErtsProcLocks receiver_locks,
+ ErtsMessage* first, ErtsMessage** last, Uint len)
{
- return queue_messages(receiver, NULL, receiver_locks,
- first, last, len, from);
+ queue_messages(receiver, receiver_locks,
+ prepend_pending_sig_maybe(sender, receiver, first),
+ last, len);
}
void
@@ -594,9 +525,7 @@ erts_try_alloc_message_on_heap(Process *pp,
ErlOffHeap **ohpp,
int *on_heap_p)
{
-#ifdef ERTS_SMP
int locked_main = 0;
-#endif
ErtsMessage *mp;
ASSERT(!(*psp & ERTS_PSFLG_OFF_HEAP_MSGQ));
@@ -604,15 +533,9 @@ erts_try_alloc_message_on_heap(Process *pp,
if ((*psp) & ERTS_PSFLGS_VOLATILE_HEAP)
goto in_message_fragment;
else if (
-#if defined(ERTS_SMP)
*plp & ERTS_PROC_LOCK_MAIN
-#else
- pp
-#endif
) {
-#ifdef ERTS_SMP
try_on_heap:
-#endif
if (((*psp) & ERTS_PSFLGS_VOLATILE_HEAP)
|| (pp->flags & F_DISABLE_GC)
|| HEAP_LIMIT(pp) - HEAP_TOP(pp) <= sz) {
@@ -620,12 +543,10 @@ erts_try_alloc_message_on_heap(Process *pp,
* The heap is either potentially in an inconsistent
* state, or not large enough.
*/
-#ifdef ERTS_SMP
if (locked_main) {
*plp &= ~ERTS_PROC_LOCK_MAIN;
- erts_smp_proc_unlock(pp, ERTS_PROC_LOCK_MAIN);
+ erts_proc_unlock(pp, ERTS_PROC_LOCK_MAIN);
}
-#endif
goto in_message_fragment;
}
@@ -636,17 +557,15 @@ erts_try_alloc_message_on_heap(Process *pp,
mp->data.attached = NULL;
*on_heap_p = !0;
}
-#ifdef ERTS_SMP
- else if (pp && erts_smp_proc_trylock(pp, ERTS_PROC_LOCK_MAIN) == 0) {
+ else if (pp && erts_proc_trylock(pp, ERTS_PROC_LOCK_MAIN) == 0) {
locked_main = 1;
- *psp = erts_smp_atomic32_read_nob(&pp->state);
+ *psp = erts_atomic32_read_nob(&pp->state);
*plp |= ERTS_PROC_LOCK_MAIN;
goto try_on_heap;
}
-#endif
else {
in_message_fragment:
- if (!((*psp) & ERTS_PSFLG_ON_HEAP_MSGQ)) {
+ if ((*psp) & ERTS_PSFLG_OFF_HEAP_MSGQ) {
mp = erts_alloc_message(sz, hpp);
*ohpp = sz == 0 ? NULL : &mp->hfrag.off_heap;
}
@@ -674,18 +593,16 @@ erts_try_alloc_message_on_heap(Process *pp,
* Send a local message when sender & receiver processes are known.
*/
-Sint
+void
erts_send_message(Process* sender,
Process* receiver,
ErtsProcLocks *receiver_locks,
- Eterm message,
- unsigned flags)
+ Eterm message)
{
Uint msize;
ErtsMessage* mp;
ErlOffHeap *ohp;
Eterm token = NIL;
- Sint res = 0;
#ifdef USE_VM_PROBES
DTRACE_CHARBUF(sender_name, 64);
DTRACE_CHARBUF(receiver_name, 64);
@@ -712,9 +629,9 @@ erts_send_message(Process* sender,
}
#endif
- receiver_state = erts_smp_atomic32_read_nob(&receiver->state);
+ receiver_state = erts_atomic32_read_nob(&receiver->state);
- if (SEQ_TRACE_TOKEN(sender) != NIL && !(flags & ERTS_SND_FLG_NO_SEQ_TRACE)) {
+ if (SEQ_TRACE_TOKEN(sender) != NIL) {
Eterm* hp;
Eterm stoken = SEQ_TRACE_TOKEN(sender);
Uint seq_trace_size = 0;
@@ -731,7 +648,8 @@ erts_send_message(Process* sender,
seq_trace_update_send(sender);
seq_trace_output(stoken, message, SEQ_TRACE_SEND,
receiver->common.id, sender);
- seq_trace_size = 6; /* TUPLE5 */
+
+ seq_trace_size = size_object(stoken);
}
#ifdef USE_VM_PROBES
if (DT_UTAG_FLAGS(sender) & DT_UTAG_SPREADING) {
@@ -780,7 +698,7 @@ erts_send_message(Process* sender,
}
if (DTRACE_ENABLED(message_send)) {
if (have_seqtrace(stoken)) {
- tok_label = signed_val(SEQ_TRACE_T_LABEL(stoken));
+ tok_label = SEQ_TRACE_T_DTRACE_LABEL(stoken);
tok_lastcnt = signed_val(SEQ_TRACE_T_LASTCNT(stoken));
tok_serial = signed_val(SEQ_TRACE_T_SERIAL(stoken));
}
@@ -827,13 +745,8 @@ erts_send_message(Process* sender,
#ifdef USE_VM_PROBES
ERL_MESSAGE_DT_UTAG(mp) = utag;
#endif
- res = queue_message(receiver,
- &receiver_state,
- *receiver_locks,
- mp, message,
- sender->common.id);
- return res;
+ erts_queue_proc_message(sender, receiver, *receiver_locks, mp, message);
}
@@ -952,70 +865,77 @@ Sint
erts_move_messages_off_heap(Process *c_p)
{
int reds = 1;
+ int i;
+ ErtsMessage *msgq[] = {c_p->sig_qs.first, c_p->sig_qs.cont};
/*
* Move all messages off heap. This *only* occurs when the
* process had off heap message disabled and just enabled
* it...
*/
- ErtsMessage *mp;
- reds += c_p->msg.len / 10;
+ reds += erts_proc_sig_privqs_len(c_p) / 10;
- ASSERT(erts_smp_atomic32_read_nob(&c_p->state)
+ ASSERT(erts_atomic32_read_nob(&c_p->state)
& ERTS_PSFLG_OFF_HEAP_MSGQ);
ASSERT(c_p->flags & F_OFF_HEAP_MSGQ_CHNG);
- for (mp = c_p->msg.first; mp; mp = mp->next) {
- Uint msg_sz, token_sz;
+ for (i = 0; i < sizeof(msgq)/sizeof(msgq[0]); i++) {
+ ErtsMessage *mp;
+ for (mp = msgq[i]; mp; mp = mp->next) {
+ Uint msg_sz, token_sz;
#ifdef USE_VM_PROBES
- Uint utag_sz;
+ Uint utag_sz;
#endif
- Eterm *hp;
- ErlHeapFragment *hfrag;
+ Eterm *hp;
+ ErlHeapFragment *hfrag;
- if (mp->data.attached)
- continue;
+ if (!ERTS_SIG_IS_INTERNAL_MSG(mp))
+ continue;
- if (is_immed(ERL_MESSAGE_TERM(mp))
+ if (mp->data.attached)
+ continue;
+
+ if (is_immed(ERL_MESSAGE_TERM(mp))
#ifdef USE_VM_PROBES
- && is_immed(ERL_MESSAGE_DT_UTAG(mp))
+ && is_immed(ERL_MESSAGE_DT_UTAG(mp))
#endif
- && is_not_immed(ERL_MESSAGE_TOKEN(mp)))
- continue;
+ && is_not_immed(ERL_MESSAGE_TOKEN(mp)))
+ continue;
- /*
- * The message refers into the heap. Copy the message
- * from the heap into a heap fragment and attach
- * it to the message...
- */
- msg_sz = size_object(ERL_MESSAGE_TERM(mp));
+ /*
+ * The message refers into the heap. Copy the message
+ * from the heap into a heap fragment and attach
+ * it to the message...
+ */
+ msg_sz = size_object(ERL_MESSAGE_TERM(mp));
#ifdef USE_VM_PROBES
- utag_sz = size_object(ERL_MESSAGE_DT_UTAG(mp));
+ utag_sz = size_object(ERL_MESSAGE_DT_UTAG(mp));
#endif
- token_sz = size_object(ERL_MESSAGE_TOKEN(mp));
+ token_sz = size_object(ERL_MESSAGE_TOKEN(mp));
- hfrag = new_message_buffer(msg_sz
+ hfrag = new_message_buffer(msg_sz
#ifdef USE_VM_PROBES
- + utag_sz
+ + utag_sz
#endif
- + token_sz);
- hp = hfrag->mem;
- if (is_not_immed(ERL_MESSAGE_TERM(mp)))
- ERL_MESSAGE_TERM(mp) = copy_struct(ERL_MESSAGE_TERM(mp),
- msg_sz, &hp,
- &hfrag->off_heap);
- if (is_not_immed(ERL_MESSAGE_TOKEN(mp)))
- ERL_MESSAGE_TOKEN(mp) = copy_struct(ERL_MESSAGE_TOKEN(mp),
- token_sz, &hp,
- &hfrag->off_heap);
+ + token_sz);
+ hp = hfrag->mem;
+ if (is_not_immed(ERL_MESSAGE_TERM(mp)))
+ ERL_MESSAGE_TERM(mp) = copy_struct(ERL_MESSAGE_TERM(mp),
+ msg_sz, &hp,
+ &hfrag->off_heap);
+ if (is_not_immed(ERL_MESSAGE_TOKEN(mp)))
+ ERL_MESSAGE_TOKEN(mp) = copy_struct(ERL_MESSAGE_TOKEN(mp),
+ token_sz, &hp,
+ &hfrag->off_heap);
#ifdef USE_VM_PROBES
- if (is_not_immed(ERL_MESSAGE_DT_UTAG(mp)))
- ERL_MESSAGE_DT_UTAG(mp) = copy_struct(ERL_MESSAGE_DT_UTAG(mp),
- utag_sz, &hp,
- &hfrag->off_heap);
+ if (is_not_immed(ERL_MESSAGE_DT_UTAG(mp)))
+ ERL_MESSAGE_DT_UTAG(mp) = copy_struct(ERL_MESSAGE_DT_UTAG(mp),
+ utag_sz, &hp,
+ &hfrag->off_heap);
#endif
- mp->data.heap_frag = hfrag;
- reds += 1;
+ mp->data.heap_frag = hfrag;
+ reds += 1;
+ }
}
return reds;
@@ -1026,9 +946,9 @@ erts_complete_off_heap_message_queue_change(Process *c_p)
{
int reds = 1;
- ERTS_SMP_LC_ASSERT(ERTS_PROC_LOCK_MAIN == erts_proc_lc_my_proc_locks(c_p));
+ ERTS_LC_ASSERT(ERTS_PROC_LOCK_MAIN == erts_proc_lc_my_proc_locks(c_p));
ASSERT(c_p->flags & F_OFF_HEAP_MSGQ_CHNG);
- ASSERT(erts_smp_atomic32_read_nob(&c_p->state) & ERTS_PSFLG_OFF_HEAP_MSGQ);
+ ASSERT(erts_atomic32_read_nob(&c_p->state) & ERTS_PSFLG_OFF_HEAP_MSGQ);
/*
* This job was first initiated when the process changed to off heap
@@ -1040,13 +960,13 @@ erts_complete_off_heap_message_queue_change(Process *c_p)
*/
if (!(c_p->flags & F_OFF_HEAP_MSGQ))
- erts_smp_atomic32_read_band_nob(&c_p->state,
+ erts_atomic32_read_band_nob(&c_p->state,
~ERTS_PSFLG_OFF_HEAP_MSGQ);
else {
reds += 2;
- erts_smp_proc_lock(c_p, ERTS_PROC_LOCK_MSGQ);
- ERTS_SMP_MSGQ_MV_INQ2PRIVQ(c_p);
- erts_smp_proc_unlock(c_p, ERTS_PROC_LOCK_MSGQ);
+ erts_proc_lock(c_p, ERTS_PROC_LOCK_MSGQ);
+ erts_proc_sig_fetch(c_p);
+ erts_proc_unlock(c_p, ERTS_PROC_LOCK_MSGQ);
reds += erts_move_messages_off_heap(c_p);
}
c_p->flags &= ~F_OFF_HEAP_MSGQ_CHNG;
@@ -1083,16 +1003,16 @@ erts_change_message_queue_management(Process *c_p, Eterm new_state)
#ifdef DEBUG
if (c_p->flags & F_OFF_HEAP_MSGQ) {
- ASSERT(erts_smp_atomic32_read_nob(&c_p->state)
+ ASSERT(erts_atomic32_read_nob(&c_p->state)
& ERTS_PSFLG_OFF_HEAP_MSGQ);
}
else {
if (c_p->flags & F_OFF_HEAP_MSGQ_CHNG) {
- ASSERT(erts_smp_atomic32_read_nob(&c_p->state)
+ ASSERT(erts_atomic32_read_nob(&c_p->state)
& ERTS_PSFLG_OFF_HEAP_MSGQ);
}
else {
- ASSERT(!(erts_smp_atomic32_read_nob(&c_p->state)
+ ASSERT(!(erts_atomic32_read_nob(&c_p->state)
& ERTS_PSFLG_OFF_HEAP_MSGQ));
}
}
@@ -1109,8 +1029,6 @@ erts_change_message_queue_management(Process *c_p, Eterm new_state)
case am_on_heap:
c_p->flags |= F_ON_HEAP_MSGQ;
c_p->flags &= ~F_OFF_HEAP_MSGQ;
- erts_smp_atomic32_read_bor_nob(&c_p->state,
- ERTS_PSFLG_ON_HEAP_MSGQ);
/*
* We are not allowed to clear ERTS_PSFLG_OFF_HEAP_MSGQ
* if a off heap change is ongoing. It will be adjusted
@@ -1118,7 +1036,7 @@ erts_change_message_queue_management(Process *c_p, Eterm new_state)
*/
if (!(c_p->flags & F_OFF_HEAP_MSGQ_CHNG)) {
/* Safe to clear ERTS_PSFLG_OFF_HEAP_MSGQ... */
- erts_smp_atomic32_read_band_nob(&c_p->state,
+ erts_atomic32_read_band_nob(&c_p->state,
~ERTS_PSFLG_OFF_HEAP_MSGQ);
}
break;
@@ -1136,8 +1054,6 @@ erts_change_message_queue_management(Process *c_p, Eterm new_state)
break;
case am_off_heap:
c_p->flags &= ~F_ON_HEAP_MSGQ;
- erts_smp_atomic32_read_band_nob(&c_p->state,
- ~ERTS_PSFLG_ON_HEAP_MSGQ);
goto change_to_off_heap;
default:
res = THE_NON_VALUE; /* badarg */
@@ -1171,7 +1087,7 @@ change_to_off_heap:
* change has completed, GC does not need to inspect
* the message queue at all.
*/
- erts_smp_atomic32_read_bor_nob(&c_p->state,
+ erts_atomic32_read_bor_nob(&c_p->state,
ERTS_PSFLG_OFF_HEAP_MSGQ);
c_p->flags |= F_OFF_HEAP_MSGQ_CHNG;
cohmq = erts_alloc(ERTS_ALC_T_MSGQ_CHNG,
@@ -1259,139 +1175,6 @@ erts_decode_dist_message(Process *proc, ErtsProcLocks proc_locks,
return 1;
}
-/*
- * ERTS_INSPECT_MSGQ_KEEP_OH_MSGS == 0 will move off heap messages
- * into the heap of the inspected process if off_heap_message_queue
- * is false when process_info(_, messages) is called. That is, the
- * following GC will have more data in the rootset compared to the
- * scenario when process_info(_, messages) had not been called.
- *
- * ERTS_INSPECT_MSGQ_KEEP_OH_MSGS != 0 will keep off heap messages
- * off heap when process_info(_, messages) is called regardless of
- * the off_heap_message_queue setting of the process. That is, it
- * will change the following execution of the process as little as
- * possible.
- */
-#define ERTS_INSPECT_MSGQ_KEEP_OH_MSGS 1
-
-Uint
-erts_prep_msgq_for_inspection(Process *c_p, Process *rp,
- ErtsProcLocks rp_locks, ErtsMessageInfo *mip)
-{
- Uint tot_heap_size;
- ErtsMessage* mp;
- Sint i;
- int self_on_heap;
-
- /*
- * Prepare the message queue for inspection
- * by process_info().
- *
- *
- * - Decode all messages on external format
- * - Remove all corrupt dist messages from queue
- * - Save pointer to, and heap size need of each
- * message in the mip array.
- * - Return total heap size need for all messages
- * that needs to be copied.
- *
- * If ERTS_INSPECT_MSGQ_KEEP_OH_MSGS == 0:
- * - In case off heap messages is disabled and
- * we are inspecting our own queue, move all
- * off heap data into the heap.
- */
-
- self_on_heap = c_p == rp && !(c_p->flags & F_OFF_HEAP_MSGQ);
-
- tot_heap_size = 0;
- i = 0;
- mp = rp->msg.first;
- while (mp) {
- Eterm msg = ERL_MESSAGE_TERM(mp);
-
- mip[i].size = 0;
-
- if (is_non_value(msg)) {
- /* Dist message on external format; decode it... */
- if (mp->data.attached)
- erts_decode_dist_message(rp, rp_locks, mp,
- ERTS_INSPECT_MSGQ_KEEP_OH_MSGS);
-
- msg = ERL_MESSAGE_TERM(mp);
-
- if (is_non_value(msg)) {
- ErtsMessage **mpp;
- ErtsMessage *bad_mp = mp;
- /*
- * Bad distribution message; remove
- * it from the queue...
- */
- ASSERT(!mp->data.attached);
-
- mpp = i == 0 ? &rp->msg.first : &mip[i-1].msgp->next;
-
- ASSERT(*mpp == bad_mp);
-
- erts_msgq_update_internal_pointers(&rp->msg, mpp, &bad_mp->next);
-
- mp = mp->next;
- *mpp = mp;
- rp->msg.len--;
- bad_mp->next = NULL;
- erts_cleanup_messages(bad_mp);
- continue;
- }
- }
-
- ASSERT(is_value(msg));
-
-#if ERTS_INSPECT_MSGQ_KEEP_OH_MSGS
- if (is_not_immed(msg) && (!self_on_heap || mp->data.attached)) {
- Uint sz = size_object(msg);
- mip[i].size = sz;
- tot_heap_size += sz;
- }
-#else
- if (self_on_heap) {
- if (mp->data.attached) {
- ErtsMessage *tmp = NULL;
- if (mp->data.attached != ERTS_MSG_COMBINED_HFRAG) {
- erts_link_mbuf_to_proc(rp, mp->data.heap_frag);
- mp->data.attached = NULL;
- }
- else {
- /*
- * Need to replace the message reference since
- * we will get references to the message data
- * from the heap...
- */
- ErtsMessage **mpp;
- tmp = erts_alloc_message(0, NULL);
- sys_memcpy((void *) tmp->m, (void *) mp->m,
- sizeof(Eterm)*ERL_MESSAGE_REF_ARRAY_SZ);
- mpp = i == 0 ? &rp->msg.first : &mip[i-1].msgp->next;
- erts_msgq_replace_msg_ref(&rp->msg, tmp, mpp);
- erts_save_message_in_proc(rp, mp);
- mp = tmp;
- }
- }
- }
- else if (is_not_immed(msg)) {
- Uint sz = size_object(msg);
- mip[i].size = sz;
- tot_heap_size += sz;
- }
-
-#endif
-
- mip[i].msgp = mp;
- i++;
- mp = mp->next;
- }
-
- return tot_heap_size;
-}
-
void erts_factory_proc_init(ErtsHeapFactory* factory,
Process* p)
{
@@ -1452,7 +1235,7 @@ erts_factory_message_create(ErtsHeapFactory* factory,
int on_heap;
erts_aint32_t state;
- state = proc ? erts_smp_atomic32_read_nob(&proc->state) : 0;
+ state = proc ? erts_atomic32_read_nob(&proc->state) : 0;
if (state & ERTS_PSFLG_OFF_HEAP_MSGQ) {
msgp = erts_alloc_message(sz, &hp);
@@ -1467,7 +1250,7 @@ erts_factory_message_create(ErtsHeapFactory* factory,
}
if (on_heap) {
- ERTS_SMP_ASSERT(*proc_locksp & ERTS_PROC_LOCK_MAIN);
+ ERTS_ASSERT(*proc_locksp & ERTS_PROC_LOCK_MAIN);
ASSERT(ohp == &proc->off_heap);
factory->mode = FACTORY_HALLOC;
factory->p = proc;
@@ -1581,32 +1364,18 @@ void erts_factory_dummy_init(ErtsHeapFactory* factory)
factory->mode = FACTORY_CLOSED;
}
-static void reserve_heap(ErtsHeapFactory*, Uint need, Uint xtra);
-
-Eterm* erts_produce_heap(ErtsHeapFactory* factory, Uint need, Uint xtra)
-{
- Eterm* res;
-
- ASSERT((unsigned int)factory->mode > (unsigned int)FACTORY_CLOSED);
- if (factory->hp + need > factory->hp_end) {
- reserve_heap(factory, need, xtra);
- }
- res = factory->hp;
- factory->hp += need;
- return res;
-}
-
Eterm* erts_reserve_heap(ErtsHeapFactory* factory, Uint need)
{
ASSERT((unsigned int)factory->mode > (unsigned int)FACTORY_CLOSED);
if (factory->hp + need > factory->hp_end) {
- reserve_heap(factory, need, 200);
+ erts_reserve_heap__(factory, need, 200);
}
return factory->hp;
}
-static void reserve_heap(ErtsHeapFactory* factory, Uint need, Uint xtra)
+void erts_reserve_heap__(ErtsHeapFactory* factory, Uint need, Uint xtra)
{
+ /* internal... */
ErlHeapFragment* bp;
switch (factory->mode) {
@@ -1616,7 +1385,9 @@ static void reserve_heap(ErtsHeapFactory* factory, Uint need, Uint xtra)
factory->hp_end = factory->hp + need;
return;
- case FACTORY_MESSAGE:
+ case FACTORY_MESSAGE: {
+ int replace_oh;
+ int replace_msg_hfrag;
if (!factory->heap_frags) {
ASSERT(factory->message->data.attached == ERTS_MSG_COMBINED_HFRAG);
bp = &factory->message->hfrag;
@@ -1628,25 +1399,45 @@ static void reserve_heap(ErtsHeapFactory* factory, Uint need, Uint xtra)
bp = factory->heap_frags;
}
+ replace_oh = 0;
+ replace_msg_hfrag = 0;
+
if (bp) {
- ASSERT(factory->hp > bp->mem);
+ ASSERT(factory->hp >= bp->mem);
ASSERT(factory->hp <= factory->hp_end);
ASSERT(factory->hp_end == bp->mem + bp->alloc_size);
bp->used_size = factory->hp - bp->mem;
+ if (!bp->used_size && factory->heap_frags) {
+ factory->heap_frags = bp->next;
+ bp->next = NULL;
+ ASSERT(!bp->off_heap.first);
+ if (factory->off_heap == &bp->off_heap)
+ replace_oh = !0;
+ if (factory->message && factory->message->data.heap_frag == bp)
+ replace_msg_hfrag = !0;
+ free_message_buffer(bp);
+ }
}
bp = (ErlHeapFragment*) ERTS_HEAP_ALLOC(factory->alloc_type,
ERTS_HEAP_FRAG_SIZE(need+xtra));
bp->next = factory->heap_frags;
factory->heap_frags = bp;
bp->alloc_size = need + xtra;
- bp->used_size = need;
+ bp->used_size = need + xtra;
bp->off_heap.first = NULL;
bp->off_heap.overhead = 0;
-
+ if (replace_oh) {
+ factory->off_heap = &bp->off_heap;
+ factory->off_heap_saved.first = factory->off_heap->first;
+ factory->off_heap_saved.overhead = factory->off_heap->overhead;
+ }
+ if (replace_msg_hfrag)
+ factory->message->data.heap_frag = bp;
factory->hp = bp->mem;
factory->hp_end = bp->mem + bp->alloc_size;
return;
+ }
case FACTORY_STATIC:
case FACTORY_CLOSED:
@@ -1729,9 +1520,11 @@ void erts_factory_trim_and_close(ErtsHeapFactory* factory,
if (bp->next == NULL) {
Uint used_sz = factory->hp - bp->mem;
ASSERT(used_sz <= bp->alloc_size);
- if (used_sz > 0)
- bp = erts_resize_message_buffer(bp, used_sz,
- brefs, brefs_size);
+ if (used_sz > 0) {
+ if (used_sz != bp->alloc_size)
+ bp = erts_resize_message_buffer(bp, used_sz,
+ brefs, brefs_size);
+ }
else {
free_message_buffer(bp);
bp = NULL;