diff options
Diffstat (limited to 'erts/emulator/beam/erl_message.c')
-rw-r--r-- | erts/emulator/beam/erl_message.c | 198 |
1 files changed, 155 insertions, 43 deletions
diff --git a/erts/emulator/beam/erl_message.c b/erts/emulator/beam/erl_message.c index d593108f8e..b3e74e3e6a 100644 --- a/erts/emulator/beam/erl_message.c +++ b/erts/emulator/beam/erl_message.c @@ -630,9 +630,24 @@ erts_try_alloc_message_on_heap(Process *pp, #endif else { in_message_fragment: - - mp = erts_alloc_message(sz, hpp); - *ohpp = sz == 0 ? NULL : &mp->hfrag.off_heap; + if (!((*psp) & ERTS_PSFLG_ON_HEAP_MSGQ)) { + mp = erts_alloc_message(sz, hpp); + *ohpp = sz == 0 ? NULL : &mp->hfrag.off_heap; + } + else { + mp = erts_alloc_message(0, NULL); + if (!sz) { + *hpp = NULL; + *ohpp = NULL; + } + else { + ErlHeapFragment *bp; + bp = new_message_buffer(sz); + *hpp = &bp->mem[0]; + mp->data.heap_frag = bp; + *ohpp = &bp->off_heap; + } + } *on_heap_p = 0; } @@ -1022,12 +1037,12 @@ erts_complete_off_heap_message_queue_change(Process *c_p) ASSERT(erts_smp_atomic32_read_nob(&c_p->state) & ERTS_PSFLG_OFF_HEAP_MSGQ); /* - * This job was first initiated when the process changed - * "off heap message queue" state from false to true. Since - * then ERTS_PSFLG_OFF_HEAP_MSGQ has been set. However, the - * state change might have been changed again (multiple times) - * since then. Check users last requested state (the flag - * F_OFF_HEAP_MSGQ), and make the state consistent with that. + * This job was first initiated when the process changed to off heap + * message queue management. Since then ERTS_PSFLG_OFF_HEAP_MSGQ + * has been set. However, the management state might have been changed + * again (multiple times) since then. Check users last requested state + * (the flags F_OFF_HEAP_MSGQ, and F_ON_HEAP_MSGQ), and make the state + * consistent with that. */ if (!(c_p->flags & F_OFF_HEAP_MSGQ)) @@ -1068,8 +1083,9 @@ change_off_heap_msgq(void *vcohmq) } Eterm -erts_change_off_heap_message_queue_state(Process *c_p, int enable) +erts_change_message_queue_management(Process *c_p, Eterm new_state) { + Eterm res; #ifdef DEBUG if (c_p->flags & F_OFF_HEAP_MSGQ) { @@ -1088,57 +1104,117 @@ erts_change_off_heap_message_queue_state(Process *c_p, int enable) } #endif - if (c_p->flags & F_OFF_HEAP_MSGQ) { - /* Off heap message queue is enabled */ + switch (c_p->flags & (F_OFF_HEAP_MSGQ|F_ON_HEAP_MSGQ)) { + + case F_OFF_HEAP_MSGQ: + res = am_off_heap; - if (!enable) { + switch (new_state) { + case am_off_heap: + break; + case am_on_heap: + c_p->flags |= F_ON_HEAP_MSGQ; + erts_smp_atomic32_read_bor_nob(&c_p->state, + ERTS_PSFLG_ON_HEAP_MSGQ); + /* fall through */ + case am_mixed: c_p->flags &= ~F_OFF_HEAP_MSGQ; /* * We are not allowed to clear ERTS_PSFLG_OFF_HEAP_MSGQ - * if a change is ongoing. It will be adjusted when the - * change completes... + * if a off heap change is ongoing. It will be adjusted + * when the change completes... */ if (!(c_p->flags & F_OFF_HEAP_MSGQ_CHNG)) { /* Safe to clear ERTS_PSFLG_OFF_HEAP_MSGQ... */ erts_smp_atomic32_read_band_nob(&c_p->state, ~ERTS_PSFLG_OFF_HEAP_MSGQ); } + break; + default: + res = THE_NON_VALUE; /* badarg */ + break; + } + break; + + case F_ON_HEAP_MSGQ: + res = am_on_heap; + + switch (new_state) { + case am_on_heap: + break; + case am_mixed: + c_p->flags &= ~F_ON_HEAP_MSGQ; + erts_smp_atomic32_read_band_nob(&c_p->state, + ~ERTS_PSFLG_ON_HEAP_MSGQ); + break; + case am_off_heap: + c_p->flags &= ~F_ON_HEAP_MSGQ; + erts_smp_atomic32_read_band_nob(&c_p->state, + ~ERTS_PSFLG_ON_HEAP_MSGQ); + goto change_to_off_heap; + default: + res = THE_NON_VALUE; /* badarg */ + break; + } + break; + + case 0: + res = am_mixed; + + switch (new_state) { + case am_mixed: + break; + case am_on_heap: + c_p->flags |= F_ON_HEAP_MSGQ; + erts_smp_atomic32_read_bor_nob(&c_p->state, + ERTS_PSFLG_ON_HEAP_MSGQ); + break; + case am_off_heap: + goto change_to_off_heap; + default: + res = THE_NON_VALUE; /* badarg */ + break; } + break; - return am_true; /* Old state */ + default: + res = am_error; + ERTS_INTERNAL_ERROR("Inconsistent message queue management state"); + break; } - /* Off heap message queue is disabled */ + return res; + +change_to_off_heap: - if (enable) { - c_p->flags |= F_OFF_HEAP_MSGQ; + c_p->flags |= F_OFF_HEAP_MSGQ; + + /* + * We do not have to schedule a change if + * we have an ongoing off heap change... + */ + if (!(c_p->flags & F_OFF_HEAP_MSGQ_CHNG)) { + ErtsChangeOffHeapMessageQueue *cohmq; /* - * We do not have to schedule a change if - * we have an ongoing change... + * Need to set ERTS_PSFLG_OFF_HEAP_MSGQ and wait + * thread progress before completing the change in + * order to ensure that all senders observe that + * messages should be passed off heap. When the + * change has completed, GC does not need to inspect + * the message queue at all. */ - if (!(c_p->flags & F_OFF_HEAP_MSGQ_CHNG)) { - ErtsChangeOffHeapMessageQueue *cohmq; - /* - * Need to set ERTS_PSFLG_OFF_HEAP_MSGQ and wait - * thread progress before completing the change in - * order to ensure that all senders observe that - * messages should be passed off heap. When the - * change has completed, GC does not need to inspect - * the message queue at all. - */ - erts_smp_atomic32_read_bor_nob(&c_p->state, - ERTS_PSFLG_OFF_HEAP_MSGQ); - c_p->flags |= F_OFF_HEAP_MSGQ_CHNG; - cohmq = erts_alloc(ERTS_ALC_T_MSGQ_CHNG, - sizeof(ErtsChangeOffHeapMessageQueue)); - cohmq->pid = c_p->common.id; - erts_schedule_thr_prgr_later_op(change_off_heap_msgq, - (void *) cohmq, - &cohmq->lop); - } + erts_smp_atomic32_read_bor_nob(&c_p->state, + ERTS_PSFLG_OFF_HEAP_MSGQ); + c_p->flags |= F_OFF_HEAP_MSGQ_CHNG; + cohmq = erts_alloc(ERTS_ALC_T_MSGQ_CHNG, + sizeof(ErtsChangeOffHeapMessageQueue)); + cohmq->pid = c_p->common.id; + erts_schedule_thr_prgr_later_op(change_off_heap_msgq, + (void *) cohmq, + &cohmq->lop); } - return am_false; /* Old state */ + return res; } int @@ -1501,6 +1577,9 @@ void erts_factory_selfcontained_message_init(ErtsHeapFactory* factory, ASSERT(factory->hp >= factory->hp_start && factory->hp <= factory->hp_end); } +/* One static sized heap that must suffice. + No extra heap fragments will be allocated. +*/ void erts_factory_static_init(ErtsHeapFactory* factory, Eterm* hp, Uint size, @@ -1515,6 +1594,23 @@ void erts_factory_static_init(ErtsHeapFactory* factory, factory->off_heap_saved.overhead = factory->off_heap->overhead; } +/* A temporary heap with default buffer allocated/freed by client. + * factory_close is same as factory_undo + */ +void erts_factory_tmp_init(ErtsHeapFactory* factory, Eterm* hp, Uint size, + Uint32 atype) +{ + factory->mode = FACTORY_TMP; + factory->hp_start = hp; + factory->hp = hp; + factory->hp_end = hp + size; + factory->heap_frags = NULL; + factory->off_heap_saved.first = NULL; + factory->off_heap_saved.overhead = 0; + factory->off_heap = &factory->off_heap_saved; + factory->alloc_type = atype; +} + /* When we know the term is an immediate and need no heap. */ void erts_factory_dummy_init(ErtsHeapFactory* factory) @@ -1565,6 +1661,7 @@ static void reserve_heap(ErtsHeapFactory* factory, Uint need, Uint xtra) else { /* Fall through */ case FACTORY_HEAP_FRAGS: + case FACTORY_TMP: bp = factory->heap_frags; } @@ -1630,6 +1727,9 @@ void erts_factory_close(ErtsHeapFactory* factory) bp->used_size = factory->hp - bp->mem; } break; + case FACTORY_TMP: + erts_factory_undo(factory); + break; case FACTORY_STATIC: break; case FACTORY_CLOSED: break; default: @@ -1756,8 +1856,20 @@ void erts_factory_undo(ErtsHeapFactory* factory) factory->message->data.heap_frag = factory->heap_frags; erts_cleanup_messages(factory->message); break; + case FACTORY_TMP: case FACTORY_HEAP_FRAGS: - free_message_buffer(factory->heap_frags); + erts_cleanup_offheap(factory->off_heap); + factory->off_heap->first = NULL; + + bp = factory->heap_frags; + while (bp != NULL) { + ErlHeapFragment* next_bp = bp->next; + + ASSERT(bp->off_heap.first == NULL); + ERTS_HEAP_FREE(factory->alloc_type, (void *) bp, + ERTS_HEAP_FRAG_SIZE(bp->alloc_size)); + bp = next_bp; + } break; case FACTORY_CLOSED: break; |