aboutsummaryrefslogtreecommitdiffstats
path: root/erts/emulator/beam/erl_message.c
diff options
context:
space:
mode:
Diffstat (limited to 'erts/emulator/beam/erl_message.c')
-rw-r--r--erts/emulator/beam/erl_message.c198
1 files changed, 155 insertions, 43 deletions
diff --git a/erts/emulator/beam/erl_message.c b/erts/emulator/beam/erl_message.c
index d593108f8e..b3e74e3e6a 100644
--- a/erts/emulator/beam/erl_message.c
+++ b/erts/emulator/beam/erl_message.c
@@ -630,9 +630,24 @@ erts_try_alloc_message_on_heap(Process *pp,
#endif
else {
in_message_fragment:
-
- mp = erts_alloc_message(sz, hpp);
- *ohpp = sz == 0 ? NULL : &mp->hfrag.off_heap;
+ if (!((*psp) & ERTS_PSFLG_ON_HEAP_MSGQ)) {
+ mp = erts_alloc_message(sz, hpp);
+ *ohpp = sz == 0 ? NULL : &mp->hfrag.off_heap;
+ }
+ else {
+ mp = erts_alloc_message(0, NULL);
+ if (!sz) {
+ *hpp = NULL;
+ *ohpp = NULL;
+ }
+ else {
+ ErlHeapFragment *bp;
+ bp = new_message_buffer(sz);
+ *hpp = &bp->mem[0];
+ mp->data.heap_frag = bp;
+ *ohpp = &bp->off_heap;
+ }
+ }
*on_heap_p = 0;
}
@@ -1022,12 +1037,12 @@ erts_complete_off_heap_message_queue_change(Process *c_p)
ASSERT(erts_smp_atomic32_read_nob(&c_p->state) & ERTS_PSFLG_OFF_HEAP_MSGQ);
/*
- * This job was first initiated when the process changed
- * "off heap message queue" state from false to true. Since
- * then ERTS_PSFLG_OFF_HEAP_MSGQ has been set. However, the
- * state change might have been changed again (multiple times)
- * since then. Check users last requested state (the flag
- * F_OFF_HEAP_MSGQ), and make the state consistent with that.
+ * This job was first initiated when the process changed to off heap
+ * message queue management. Since then ERTS_PSFLG_OFF_HEAP_MSGQ
+ * has been set. However, the management state might have been changed
+ * again (multiple times) since then. Check users last requested state
+ * (the flags F_OFF_HEAP_MSGQ, and F_ON_HEAP_MSGQ), and make the state
+ * consistent with that.
*/
if (!(c_p->flags & F_OFF_HEAP_MSGQ))
@@ -1068,8 +1083,9 @@ change_off_heap_msgq(void *vcohmq)
}
Eterm
-erts_change_off_heap_message_queue_state(Process *c_p, int enable)
+erts_change_message_queue_management(Process *c_p, Eterm new_state)
{
+ Eterm res;
#ifdef DEBUG
if (c_p->flags & F_OFF_HEAP_MSGQ) {
@@ -1088,57 +1104,117 @@ erts_change_off_heap_message_queue_state(Process *c_p, int enable)
}
#endif
- if (c_p->flags & F_OFF_HEAP_MSGQ) {
- /* Off heap message queue is enabled */
+ switch (c_p->flags & (F_OFF_HEAP_MSGQ|F_ON_HEAP_MSGQ)) {
+
+ case F_OFF_HEAP_MSGQ:
+ res = am_off_heap;
- if (!enable) {
+ switch (new_state) {
+ case am_off_heap:
+ break;
+ case am_on_heap:
+ c_p->flags |= F_ON_HEAP_MSGQ;
+ erts_smp_atomic32_read_bor_nob(&c_p->state,
+ ERTS_PSFLG_ON_HEAP_MSGQ);
+ /* fall through */
+ case am_mixed:
c_p->flags &= ~F_OFF_HEAP_MSGQ;
/*
* We are not allowed to clear ERTS_PSFLG_OFF_HEAP_MSGQ
- * if a change is ongoing. It will be adjusted when the
- * change completes...
+ * if a off heap change is ongoing. It will be adjusted
+ * when the change completes...
*/
if (!(c_p->flags & F_OFF_HEAP_MSGQ_CHNG)) {
/* Safe to clear ERTS_PSFLG_OFF_HEAP_MSGQ... */
erts_smp_atomic32_read_band_nob(&c_p->state,
~ERTS_PSFLG_OFF_HEAP_MSGQ);
}
+ break;
+ default:
+ res = THE_NON_VALUE; /* badarg */
+ break;
+ }
+ break;
+
+ case F_ON_HEAP_MSGQ:
+ res = am_on_heap;
+
+ switch (new_state) {
+ case am_on_heap:
+ break;
+ case am_mixed:
+ c_p->flags &= ~F_ON_HEAP_MSGQ;
+ erts_smp_atomic32_read_band_nob(&c_p->state,
+ ~ERTS_PSFLG_ON_HEAP_MSGQ);
+ break;
+ case am_off_heap:
+ c_p->flags &= ~F_ON_HEAP_MSGQ;
+ erts_smp_atomic32_read_band_nob(&c_p->state,
+ ~ERTS_PSFLG_ON_HEAP_MSGQ);
+ goto change_to_off_heap;
+ default:
+ res = THE_NON_VALUE; /* badarg */
+ break;
+ }
+ break;
+
+ case 0:
+ res = am_mixed;
+
+ switch (new_state) {
+ case am_mixed:
+ break;
+ case am_on_heap:
+ c_p->flags |= F_ON_HEAP_MSGQ;
+ erts_smp_atomic32_read_bor_nob(&c_p->state,
+ ERTS_PSFLG_ON_HEAP_MSGQ);
+ break;
+ case am_off_heap:
+ goto change_to_off_heap;
+ default:
+ res = THE_NON_VALUE; /* badarg */
+ break;
}
+ break;
- return am_true; /* Old state */
+ default:
+ res = am_error;
+ ERTS_INTERNAL_ERROR("Inconsistent message queue management state");
+ break;
}
- /* Off heap message queue is disabled */
+ return res;
+
+change_to_off_heap:
- if (enable) {
- c_p->flags |= F_OFF_HEAP_MSGQ;
+ c_p->flags |= F_OFF_HEAP_MSGQ;
+
+ /*
+ * We do not have to schedule a change if
+ * we have an ongoing off heap change...
+ */
+ if (!(c_p->flags & F_OFF_HEAP_MSGQ_CHNG)) {
+ ErtsChangeOffHeapMessageQueue *cohmq;
/*
- * We do not have to schedule a change if
- * we have an ongoing change...
+ * Need to set ERTS_PSFLG_OFF_HEAP_MSGQ and wait
+ * thread progress before completing the change in
+ * order to ensure that all senders observe that
+ * messages should be passed off heap. When the
+ * change has completed, GC does not need to inspect
+ * the message queue at all.
*/
- if (!(c_p->flags & F_OFF_HEAP_MSGQ_CHNG)) {
- ErtsChangeOffHeapMessageQueue *cohmq;
- /*
- * Need to set ERTS_PSFLG_OFF_HEAP_MSGQ and wait
- * thread progress before completing the change in
- * order to ensure that all senders observe that
- * messages should be passed off heap. When the
- * change has completed, GC does not need to inspect
- * the message queue at all.
- */
- erts_smp_atomic32_read_bor_nob(&c_p->state,
- ERTS_PSFLG_OFF_HEAP_MSGQ);
- c_p->flags |= F_OFF_HEAP_MSGQ_CHNG;
- cohmq = erts_alloc(ERTS_ALC_T_MSGQ_CHNG,
- sizeof(ErtsChangeOffHeapMessageQueue));
- cohmq->pid = c_p->common.id;
- erts_schedule_thr_prgr_later_op(change_off_heap_msgq,
- (void *) cohmq,
- &cohmq->lop);
- }
+ erts_smp_atomic32_read_bor_nob(&c_p->state,
+ ERTS_PSFLG_OFF_HEAP_MSGQ);
+ c_p->flags |= F_OFF_HEAP_MSGQ_CHNG;
+ cohmq = erts_alloc(ERTS_ALC_T_MSGQ_CHNG,
+ sizeof(ErtsChangeOffHeapMessageQueue));
+ cohmq->pid = c_p->common.id;
+ erts_schedule_thr_prgr_later_op(change_off_heap_msgq,
+ (void *) cohmq,
+ &cohmq->lop);
}
- return am_false; /* Old state */
+ return res;
}
int
@@ -1501,6 +1577,9 @@ void erts_factory_selfcontained_message_init(ErtsHeapFactory* factory,
ASSERT(factory->hp >= factory->hp_start && factory->hp <= factory->hp_end);
}
+/* One static sized heap that must suffice.
+ No extra heap fragments will be allocated.
+*/
void erts_factory_static_init(ErtsHeapFactory* factory,
Eterm* hp,
Uint size,
@@ -1515,6 +1594,23 @@ void erts_factory_static_init(ErtsHeapFactory* factory,
factory->off_heap_saved.overhead = factory->off_heap->overhead;
}
+/* A temporary heap with default buffer allocated/freed by client.
+ * factory_close is same as factory_undo
+ */
+void erts_factory_tmp_init(ErtsHeapFactory* factory, Eterm* hp, Uint size,
+ Uint32 atype)
+{
+ factory->mode = FACTORY_TMP;
+ factory->hp_start = hp;
+ factory->hp = hp;
+ factory->hp_end = hp + size;
+ factory->heap_frags = NULL;
+ factory->off_heap_saved.first = NULL;
+ factory->off_heap_saved.overhead = 0;
+ factory->off_heap = &factory->off_heap_saved;
+ factory->alloc_type = atype;
+}
+
/* When we know the term is an immediate and need no heap.
*/
void erts_factory_dummy_init(ErtsHeapFactory* factory)
@@ -1565,6 +1661,7 @@ static void reserve_heap(ErtsHeapFactory* factory, Uint need, Uint xtra)
else {
/* Fall through */
case FACTORY_HEAP_FRAGS:
+ case FACTORY_TMP:
bp = factory->heap_frags;
}
@@ -1630,6 +1727,9 @@ void erts_factory_close(ErtsHeapFactory* factory)
bp->used_size = factory->hp - bp->mem;
}
break;
+ case FACTORY_TMP:
+ erts_factory_undo(factory);
+ break;
case FACTORY_STATIC: break;
case FACTORY_CLOSED: break;
default:
@@ -1756,8 +1856,20 @@ void erts_factory_undo(ErtsHeapFactory* factory)
factory->message->data.heap_frag = factory->heap_frags;
erts_cleanup_messages(factory->message);
break;
+ case FACTORY_TMP:
case FACTORY_HEAP_FRAGS:
- free_message_buffer(factory->heap_frags);
+ erts_cleanup_offheap(factory->off_heap);
+ factory->off_heap->first = NULL;
+
+ bp = factory->heap_frags;
+ while (bp != NULL) {
+ ErlHeapFragment* next_bp = bp->next;
+
+ ASSERT(bp->off_heap.first == NULL);
+ ERTS_HEAP_FREE(factory->alloc_type, (void *) bp,
+ ERTS_HEAP_FRAG_SIZE(bp->alloc_size));
+ bp = next_bp;
+ }
break;
case FACTORY_CLOSED: break;