diff options
Diffstat (limited to 'erts/emulator/beam/erl_message.c')
-rw-r--r-- | erts/emulator/beam/erl_message.c | 385 |
1 files changed, 317 insertions, 68 deletions
diff --git a/erts/emulator/beam/erl_message.c b/erts/emulator/beam/erl_message.c index 8870fac7d9..ef52823287 100644 --- a/erts/emulator/beam/erl_message.c +++ b/erts/emulator/beam/erl_message.c @@ -3,16 +3,17 @@ * * Copyright Ericsson AB 1997-2012. All Rights Reserved. * - * The contents of this file are subject to the Erlang Public License, - * Version 1.1, (the "License"); you may not use this file except in - * compliance with the License. You should have received a copy of the - * Erlang Public License along with this software. If not, it can be - * retrieved online at http://www.erlang.org/. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at * - * Software distributed under the License is distributed on an "AS IS" - * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See - * the License for the specific language governing rights and limitations - * under the License. + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. * * %CopyrightEnd% */ @@ -93,9 +94,6 @@ erts_resize_message_buffer(ErlHeapFragment *bp, Uint size, #endif ErlHeapFragment* nbp; - /* ToDo: Make use of 'used_size' to avoid realloc - when shrinking just a few words */ - #ifdef DEBUG { Uint off_sz = size < bp->used_size ? size : bp->used_size; @@ -110,8 +108,10 @@ erts_resize_message_buffer(ErlHeapFragment *bp, Uint size, } #endif - if (size == bp->used_size) + if (size >= (bp->used_size - bp->used_size / 16)) { + bp->used_size = size; return bp; + } #ifdef HARD_DEBUG dbg_brefs = erts_alloc(ERTS_ALC_T_UNDEF, sizeof(Eterm *)*brefs_size); @@ -237,8 +237,7 @@ erts_msg_distext2heap(Process *pp, Eterm msg; Uint tok_sz = 0; Eterm *hp = NULL; - Eterm *hp_end = NULL; - ErlOffHeap *ohp; + ErtsHeapFactory factory; Sint sz; *bpp = NULL; @@ -250,36 +249,26 @@ erts_msg_distext2heap(Process *pp, tok_sz = heap_frag->used_size; sz += tok_sz; } - if (pp) + if (pp) { + ErlOffHeap *ohp; hp = erts_alloc_message_heap(sz, bpp, &ohp, pp, plcksp); + } else { *bpp = new_message_buffer(sz); hp = (*bpp)->mem; - ohp = &(*bpp)->off_heap; } - hp_end = hp + sz; - msg = erts_decode_dist_ext(&hp, ohp, dist_extp); + erts_factory_message_init(&factory, pp, hp, *bpp); + msg = erts_decode_dist_ext(&factory, dist_extp); if (is_non_value(msg)) goto decode_error; if (is_not_nil(*tokenp)) { ErlHeapFragment *heap_frag = erts_dist_ext_trailer(dist_extp); - *tokenp = copy_struct(*tokenp, tok_sz, &hp, ohp); + hp = erts_produce_heap(&factory, tok_sz, 0); + *tokenp = copy_struct(*tokenp, tok_sz, &hp, factory.off_heap); erts_cleanup_offheap(&heap_frag->off_heap); } erts_free_dist_ext_copy(dist_extp); - if (hp_end != hp) { - if (!(*bpp)) { - HRelease(pp, hp_end, hp); - } - else { - Uint final_size = hp - &(*bpp)->mem[0]; - Eterm brefs[2] = {msg, *tokenp}; - ASSERT(sz - (hp_end - hp) == final_size); - *bpp = erts_resize_message_buffer(*bpp, final_size, &brefs[0], 2); - msg = brefs[0]; - *tokenp = brefs[1]; - } - } + erts_factory_close(&factory); return msg; decode_error: @@ -288,13 +277,7 @@ erts_msg_distext2heap(Process *pp, erts_cleanup_offheap(&heap_frag->off_heap); } erts_free_dist_ext_copy(dist_extp); - if (*bpp) { - free_message_buffer(*bpp); - *bpp = NULL; - } - else if (hp) { - HRelease(pp, hp_end, hp); - } + *bpp = NULL; return THE_NON_VALUE; } @@ -369,11 +352,7 @@ erts_queue_dist_message(Process *rcvr, tok_label, tok_lastcnt, tok_serial); } #endif - erts_queue_message(rcvr, rcvr_locks, mbuf, msg, token -#ifdef USE_VM_PROBES - , NIL -#endif - ); + erts_queue_message(rcvr, rcvr_locks, mbuf, msg, token); } else { /* Enqueue message on external format */ @@ -563,15 +542,15 @@ queue_message(Process *c_p, } void -erts_queue_message(Process* receiver, - ErtsProcLocks *receiver_locks, - ErlHeapFragment* bp, - Eterm message, - Eterm seq_trace_token #ifdef USE_VM_PROBES - , Eterm dt_utag +erts_queue_message_probe(Process* receiver, ErtsProcLocks *receiver_locks, + ErlHeapFragment* bp, + Eterm message, Eterm seq_trace_token, Eterm dt_utag) +#else +erts_queue_message(Process* receiver, ErtsProcLocks *receiver_locks, + ErlHeapFragment* bp, + Eterm message, Eterm seq_trace_token) #endif - ) { queue_message(NULL, receiver, @@ -855,10 +834,11 @@ erts_msg_attached_data_size_aux(ErlMessage *msg) } void -erts_move_msg_attached_data_to_heap(Eterm **hpp, ErlOffHeap *ohp, ErlMessage *msg) +erts_move_msg_attached_data_to_heap(ErtsHeapFactory* factory, + ErlMessage *msg) { if (is_value(ERL_MESSAGE_TERM(msg))) - erts_move_msg_mbuf_to_heap(hpp, ohp, msg); + erts_move_msg_mbuf_to_heap(&factory->hp, factory->off_heap, msg); else if (msg->data.dist_ext) { ASSERT(msg->data.dist_ext->heap_size >= 0); if (is_not_nil(ERL_MESSAGE_TOKEN(msg))) { @@ -866,12 +846,11 @@ erts_move_msg_attached_data_to_heap(Eterm **hpp, ErlOffHeap *ohp, ErlMessage *ms heap_frag = erts_dist_ext_trailer(msg->data.dist_ext); ERL_MESSAGE_TOKEN(msg) = copy_struct(ERL_MESSAGE_TOKEN(msg), heap_frag->used_size, - hpp, - ohp); + &factory->hp, + factory->off_heap); erts_cleanup_offheap(&heap_frag->off_heap); } - ERL_MESSAGE_TERM(msg) = erts_decode_dist_ext(hpp, - ohp, + ERL_MESSAGE_TERM(msg) = erts_decode_dist_ext(factory, msg->data.dist_ext); erts_free_dist_ext_copy(msg->data.dist_ext); msg->data.dist_ext = NULL; @@ -1117,11 +1096,7 @@ erts_deliver_exit_message(Eterm from, Process *to, ErtsProcLocks *to_locksp, /* the trace token must in this case be updated by the caller */ seq_trace_output(token, save, SEQ_TRACE_SEND, to->common.id, NULL); temptoken = copy_struct(token, sz_token, &hp, &bp->off_heap); - erts_queue_message(to, to_locksp, bp, save, temptoken -#ifdef USE_VM_PROBES - , NIL -#endif - ); + erts_queue_message(to, to_locksp, bp, save, temptoken); } else { ErlOffHeap *ohp; sz_reason = size_object(reason); @@ -1138,11 +1113,285 @@ erts_deliver_exit_message(Eterm from, Process *to, ErtsProcLocks *to_locksp, ? from : copy_struct(from, sz_from, &hp, ohp)); save = TUPLE3(hp, am_EXIT, from_copy, mess); - erts_queue_message(to, to_locksp, bp, save, NIL -#ifdef USE_VM_PROBES - , NIL -#endif - ); + erts_queue_message(to, to_locksp, bp, save, NIL); + } +} + +void erts_factory_proc_init(ErtsHeapFactory* factory, + Process* p) +{ + erts_factory_proc_prealloc_init(factory, p, HEAP_LIMIT(p) - HEAP_TOP(p)); +} + +void erts_factory_proc_prealloc_init(ErtsHeapFactory* factory, + Process* p, + Sint size) +{ + factory->mode = FACTORY_HALLOC; + factory->p = p; + factory->hp_start = HAlloc(p, size); + factory->hp = factory->hp_start; + factory->hp_end = factory->hp_start + size; + factory->off_heap = &p->off_heap; + factory->off_heap_saved.first = p->off_heap.first; + factory->off_heap_saved.overhead = p->off_heap.overhead; + factory->heap_frags_saved = p->mbuf; + factory->heap_frags = NULL; /* not used */ + factory->alloc_type = 0; /* not used */ +} + +void erts_factory_message_init(ErtsHeapFactory* factory, + Process* rp, + Eterm* hp, + ErlHeapFragment* bp) +{ + if (bp) { + factory->mode = FACTORY_HEAP_FRAGS; + factory->p = NULL; + factory->hp_start = bp->mem; + factory->hp = hp ? hp : bp->mem; + factory->hp_end = bp->mem + bp->alloc_size; + factory->off_heap = &bp->off_heap; + factory->heap_frags = bp; + factory->heap_frags_saved = bp; + factory->alloc_type = ERTS_ALC_T_HEAP_FRAG; + ASSERT(!bp->next); + } + else { + factory->mode = FACTORY_HALLOC; + factory->p = rp; + factory->hp_start = hp; + factory->hp = hp; + factory->hp_end = HEAP_TOP(rp); + factory->off_heap = &rp->off_heap; + factory->heap_frags_saved = rp->mbuf; + factory->heap_frags = NULL; /* not used */ + factory->alloc_type = 0; /* not used */ + } + factory->off_heap_saved.first = factory->off_heap->first; + factory->off_heap_saved.overhead = factory->off_heap->overhead; + + ASSERT(factory->hp >= factory->hp_start && factory->hp <= factory->hp_end); +} + +void erts_factory_static_init(ErtsHeapFactory* factory, + Eterm* hp, + Uint size, + ErlOffHeap* off_heap) +{ + factory->mode = FACTORY_STATIC; + factory->hp_start = hp; + factory->hp = hp; + factory->hp_end = hp + size; + factory->off_heap = off_heap; + factory->off_heap_saved.first = factory->off_heap->first; + factory->off_heap_saved.overhead = factory->off_heap->overhead; +} + +/* When we know the term is an immediate and need no heap. +*/ +void erts_factory_dummy_init(ErtsHeapFactory* factory) +{ + factory->mode = FACTORY_CLOSED; +} + +static void reserve_heap(ErtsHeapFactory*, Uint need, Uint xtra); + +Eterm* erts_produce_heap(ErtsHeapFactory* factory, Uint need, Uint xtra) +{ + Eterm* res; + + ASSERT((unsigned int)factory->mode > (unsigned int)FACTORY_CLOSED); + if (factory->hp + need > factory->hp_end) { + reserve_heap(factory, need, xtra); + } + res = factory->hp; + factory->hp += need; + return res; +} + +Eterm* erts_reserve_heap(ErtsHeapFactory* factory, Uint need) +{ + ASSERT((unsigned int)factory->mode > (unsigned int)FACTORY_CLOSED); + if (factory->hp + need > factory->hp_end) { + reserve_heap(factory, need, 200); + } + return factory->hp; +} + +static void reserve_heap(ErtsHeapFactory* factory, Uint need, Uint xtra) +{ + ErlHeapFragment* bp; + + switch (factory->mode) { + case FACTORY_HALLOC: + HRelease(factory->p, factory->hp_end, factory->hp); + factory->hp = HAllocX(factory->p, need, xtra); + factory->hp_end = factory->hp + need; + return; + + case FACTORY_HEAP_FRAGS: + bp = factory->heap_frags; + + if (bp) { + ASSERT(factory->hp > bp->mem); + ASSERT(factory->hp <= factory->hp_end); + ASSERT(factory->hp_end == bp->mem + bp->alloc_size); + + bp->used_size = factory->hp - bp->mem; + } + bp = (ErlHeapFragment*) ERTS_HEAP_ALLOC(factory->alloc_type, + ERTS_HEAP_FRAG_SIZE(need+xtra)); + bp->next = factory->heap_frags; + factory->heap_frags = bp; + bp->alloc_size = need + xtra; + bp->used_size = need; + bp->off_heap.first = NULL; + bp->off_heap.overhead = 0; + + factory->hp = bp->mem; + factory->hp_end = bp->mem + bp->alloc_size; + return; + + case FACTORY_STATIC: + case FACTORY_CLOSED: + default: + ASSERT(!"Invalid factory mode"); + } +} + +void erts_factory_close(ErtsHeapFactory* factory) +{ + ErlHeapFragment* bp; + + switch (factory->mode) { + case FACTORY_HALLOC: + HRelease(factory->p, factory->hp_end, factory->hp); + break; + + case FACTORY_HEAP_FRAGS: + bp = factory->heap_frags; + + if (bp) { + ASSERT(factory->hp >= bp->mem); + ASSERT(factory->hp <= factory->hp_end); + ASSERT(factory->hp_end == bp->mem + bp->alloc_size); + + bp->used_size = factory->hp - bp->mem; + } + break; + case FACTORY_STATIC: break; + case FACTORY_CLOSED: break; + default: + ASSERT(!"Invalid factory mode"); } + factory->mode = FACTORY_CLOSED; +} + +void erts_factory_trim_and_close(ErtsHeapFactory* factory, + Eterm *brefs, Uint brefs_size) +{ + if (factory->mode == FACTORY_HEAP_FRAGS) { + ErlHeapFragment* bp = factory->heap_frags; + if (bp->next == NULL) { + Uint used_sz = factory->hp - bp->mem; + ASSERT(used_sz <= bp->alloc_size); + factory->heap_frags = erts_resize_message_buffer(bp, used_sz, + brefs, brefs_size); + factory->mode = FACTORY_CLOSED; + return; + } + /*else we don't trim multi fragmented messages for now */ + } + erts_factory_close(factory); +} + +void erts_factory_undo(ErtsHeapFactory* factory) +{ + ErlHeapFragment* bp; + struct erl_off_heap_header *hdr, **hdr_nextp; + + switch (factory->mode) { + case FACTORY_HALLOC: + case FACTORY_STATIC: + /* Cleanup off-heap + */ + hdr_nextp = NULL; + for (hdr = factory->off_heap->first; + hdr != factory->off_heap_saved.first; + hdr = hdr->next) { + + hdr_nextp = &hdr->next; + } + + if (hdr_nextp != NULL) { + *hdr_nextp = NULL; + erts_cleanup_offheap(factory->off_heap); + factory->off_heap->first = factory->off_heap_saved.first; + factory->off_heap->overhead = factory->off_heap_saved.overhead; + } + + if (factory->mode == FACTORY_HALLOC) { + /* Free heap frags + */ + bp = factory->p->mbuf; + if (bp != factory->heap_frags_saved) { + do { + ErlHeapFragment *next_bp = bp->next; + ASSERT(bp->off_heap.first == NULL); + ERTS_HEAP_FREE(ERTS_ALC_T_HEAP_FRAG, (void *) bp, + ERTS_HEAP_FRAG_SIZE(bp->alloc_size)); + bp = next_bp; + } while (bp != factory->heap_frags_saved); + + factory->p->mbuf = bp; + } + + /* Rollback heap top + */ + if (factory->heap_frags_saved == NULL) { /* No heap frags when we started */ + ASSERT(factory->hp_start >= HEAP_START(factory->p)); + ASSERT(factory->hp_start <= HEAP_LIMIT(factory->p)); + + HEAP_TOP(factory->p) = factory->hp_start; + } + else { + ASSERT(factory->heap_frags_saved == factory->p->mbuf); + if (factory->hp_start == factory->heap_frags_saved->mem) { + factory->p->mbuf = factory->p->mbuf->next; + ERTS_HEAP_FREE(ERTS_ALC_T_HEAP_FRAG, factory->heap_frags_saved, + ERTS_HEAP_FRAG_SIZE(factory->heap_frags_saved->alloc_size)); + } + else if (factory->hp_start != factory->hp_end) { + unsigned remains = factory->hp_start - factory->heap_frags_saved->mem; + ASSERT(remains > 0 && remains < factory->heap_frags_saved->used_size); + factory->heap_frags_saved->used_size = remains; + } + } + } + break; + + case FACTORY_HEAP_FRAGS: + bp = factory->heap_frags; + do { + ErlHeapFragment* next_bp = bp->next; + + erts_cleanup_offheap(&bp->off_heap); + ERTS_HEAP_FREE(factory->alloc_type, (void *) bp, + ERTS_HEAP_FRAG_SIZE(bp->size)); + bp = next_bp; + }while (bp != NULL); + break; + + case FACTORY_CLOSED: break; + default: + ASSERT(!"Invalid factory mode"); + } + factory->mode = FACTORY_CLOSED; +#ifdef DEBUG + factory->p = NULL; + factory->hp = NULL; + factory->heap_frags = NULL; +#endif } |