aboutsummaryrefslogtreecommitdiffstats
path: root/erts/emulator/beam/erl_message.c
diff options
context:
space:
mode:
authorSverker Eriksson <[email protected]>2015-06-02 19:23:49 +0200
committerSverker Eriksson <[email protected]>2015-06-15 14:48:22 +0200
commit02d0ce034598297565f2b35ecc3d1af121787f33 (patch)
treea415e4b7d21c240352ae8737806e516caa2f2b43 /erts/emulator/beam/erl_message.c
parent512c321bdaf25b685124405e287a3839be467149 (diff)
downloadotp-02d0ce034598297565f2b35ecc3d1af121787f33.tar.gz
otp-02d0ce034598297565f2b35ecc3d1af121787f33.tar.bz2
otp-02d0ce034598297565f2b35ecc3d1af121787f33.zip
erts: Remove hashmap probabilistic heap overestimation
by adding a dynamic heap factory. "binary_to_term" is now a hybrid solution with both a call to decoded_size() to calculate needed heap space AND possible dynamic allocation of more heap space if needed for big maps. The heap size returned from decoded_size() is guaranteed to be sufficient for all term heap data except for hashmap nodes. All hashmap nodes are created at the end of dec_term() by invoking the heap factory interface that may allocate more heap space on process heap or in fragments. With this commit it is no longer guaranteed that a message is confined to only one heap fragment.
Diffstat (limited to 'erts/emulator/beam/erl_message.c')
-rw-r--r--erts/emulator/beam/erl_message.c307
1 files changed, 269 insertions, 38 deletions
diff --git a/erts/emulator/beam/erl_message.c b/erts/emulator/beam/erl_message.c
index ccfc2e6458..13e084ce10 100644
--- a/erts/emulator/beam/erl_message.c
+++ b/erts/emulator/beam/erl_message.c
@@ -237,8 +237,7 @@ erts_msg_distext2heap(Process *pp,
Eterm msg;
Uint tok_sz = 0;
Eterm *hp = NULL;
- Eterm *hp_end = NULL;
- ErlOffHeap *ohp;
+ ErtsHeapFactory factory;
Sint sz;
*bpp = NULL;
@@ -250,36 +249,26 @@ erts_msg_distext2heap(Process *pp,
tok_sz = heap_frag->used_size;
sz += tok_sz;
}
- if (pp)
+ if (pp) {
+ ErlOffHeap *ohp;
hp = erts_alloc_message_heap(sz, bpp, &ohp, pp, plcksp);
+ }
else {
*bpp = new_message_buffer(sz);
hp = (*bpp)->mem;
- ohp = &(*bpp)->off_heap;
}
- hp_end = hp + sz;
- msg = erts_decode_dist_ext(&hp, ohp, dist_extp);
+ erts_factory_message_init(&factory, pp, hp, *bpp);
+ msg = erts_decode_dist_ext(&factory, dist_extp);
if (is_non_value(msg))
goto decode_error;
if (is_not_nil(*tokenp)) {
ErlHeapFragment *heap_frag = erts_dist_ext_trailer(dist_extp);
- *tokenp = copy_struct(*tokenp, tok_sz, &hp, ohp);
+ hp = erts_produce_heap(&factory, tok_sz, 0);
+ *tokenp = copy_struct(*tokenp, tok_sz, &hp, factory.off_heap);
erts_cleanup_offheap(&heap_frag->off_heap);
}
erts_free_dist_ext_copy(dist_extp);
- if (hp_end != hp) {
- if (!(*bpp)) {
- HRelease(pp, hp_end, hp);
- }
- else {
- Uint final_size = hp - &(*bpp)->mem[0];
- Eterm brefs[2] = {msg, *tokenp};
- ASSERT(sz - (hp_end - hp) == final_size);
- *bpp = erts_resize_message_buffer(*bpp, final_size, &brefs[0], 2);
- msg = brefs[0];
- *tokenp = brefs[1];
- }
- }
+ erts_factory_close(&factory);
return msg;
decode_error:
@@ -288,13 +277,7 @@ erts_msg_distext2heap(Process *pp,
erts_cleanup_offheap(&heap_frag->off_heap);
}
erts_free_dist_ext_copy(dist_extp);
- if (*bpp) {
- free_message_buffer(*bpp);
- *bpp = NULL;
- }
- else if (hp) {
- HRelease(pp, hp_end, hp);
- }
+ *bpp = NULL;
return THE_NON_VALUE;
}
@@ -851,10 +834,11 @@ erts_msg_attached_data_size_aux(ErlMessage *msg)
}
void
-erts_move_msg_attached_data_to_heap(Eterm **hpp, ErlOffHeap *ohp, ErlMessage *msg)
+erts_move_msg_attached_data_to_heap(ErtsHeapFactory* factory,
+ ErlMessage *msg)
{
if (is_value(ERL_MESSAGE_TERM(msg)))
- erts_move_msg_mbuf_to_heap(hpp, ohp, msg);
+ erts_move_msg_mbuf_to_heap(&factory->hp, factory->off_heap, msg);
else if (msg->data.dist_ext) {
ASSERT(msg->data.dist_ext->heap_size >= 0);
if (is_not_nil(ERL_MESSAGE_TOKEN(msg))) {
@@ -862,12 +846,11 @@ erts_move_msg_attached_data_to_heap(Eterm **hpp, ErlOffHeap *ohp, ErlMessage *ms
heap_frag = erts_dist_ext_trailer(msg->data.dist_ext);
ERL_MESSAGE_TOKEN(msg) = copy_struct(ERL_MESSAGE_TOKEN(msg),
heap_frag->used_size,
- hpp,
- ohp);
+ &factory->hp,
+ factory->off_heap);
erts_cleanup_offheap(&heap_frag->off_heap);
}
- ERL_MESSAGE_TERM(msg) = erts_decode_dist_ext(hpp,
- ohp,
+ ERL_MESSAGE_TERM(msg) = erts_decode_dist_ext(factory,
msg->data.dist_ext);
erts_free_dist_ext_copy(msg->data.dist_ext);
msg->data.dist_ext = NULL;
@@ -1134,15 +1117,263 @@ erts_deliver_exit_message(Eterm from, Process *to, ErtsProcLocks *to_locksp,
}
}
+void erts_factory_proc_init(ErtsHeapFactory* factory,
+ Process* p)
+{
+ erts_factory_proc_prealloc_init(factory, p, HEAP_LIMIT(p) - HEAP_TOP(p));
+}
+
+void erts_factory_proc_prealloc_init(ErtsHeapFactory* factory,
+ Process* p,
+ Sint size)
+{
+ factory->mode = FACTORY_HALLOC;
+ factory->p = p;
+ factory->hp_start = HAlloc(p, size);
+ factory->hp = factory->hp_start;
+ factory->hp_end = factory->hp_start + size;
+ factory->off_heap = &p->off_heap;
+ factory->off_heap_saved.first = p->off_heap.first;
+ factory->off_heap_saved.overhead = p->off_heap.overhead;
+ factory->heap_frags_saved = p->mbuf;
+ factory->heap_frags = NULL; /* not used */
+ factory->alloc_type = 0; /* not used */
+}
+
+void erts_factory_message_init(ErtsHeapFactory* factory,
+ Process* rp,
+ Eterm* hp,
+ ErlHeapFragment* bp)
+{
+ if (bp) {
+ factory->mode = FACTORY_HEAP_FRAGS;
+ factory->p = NULL;
+ factory->hp_start = bp->mem;
+ factory->hp = hp ? hp : bp->mem;
+ factory->hp_end = bp->mem + bp->alloc_size;
+ factory->off_heap = &bp->off_heap;
+ factory->heap_frags = bp;
+ factory->heap_frags_saved = bp;
+ factory->alloc_type = ERTS_ALC_T_HEAP_FRAG;
+ ASSERT(!bp->next);
+ }
+ else {
+ factory->mode = FACTORY_HALLOC;
+ factory->p = rp;
+ factory->hp_start = hp;
+ factory->hp = hp;
+ factory->hp_end = HEAP_TOP(rp);
+ factory->off_heap = &rp->off_heap;
+ factory->heap_frags_saved = rp->mbuf;
+ factory->heap_frags = NULL; /* not used */
+ factory->alloc_type = 0; /* not used */
+ }
+ factory->off_heap_saved.first = factory->off_heap->first;
+ factory->off_heap_saved.overhead = factory->off_heap->overhead;
+
+ ASSERT(factory->hp >= factory->hp_start && factory->hp <= factory->hp_end);
+}
+
+void erts_factory_static_init(ErtsHeapFactory* factory,
+ Eterm* hp,
+ Uint size,
+ ErlOffHeap* off_heap)
+{
+ factory->mode = FACTORY_STATIC;
+ factory->hp_start = hp;
+ factory->hp = hp;
+ factory->hp_end = hp + size;
+ factory->off_heap = off_heap;
+ factory->off_heap_saved.first = factory->off_heap->first;
+ factory->off_heap_saved.overhead = factory->off_heap->overhead;
+}
+
+/* When we know the term is an immediate and need no heap.
+*/
+void erts_factory_dummy_init(ErtsHeapFactory* factory)
+{
+ factory->mode = FACTORY_CLOSED;
+}
+
+static void reserve_heap(ErtsHeapFactory*, Uint need, Uint xtra);
+
Eterm* erts_produce_heap(ErtsHeapFactory* factory, Uint need, Uint xtra)
{
Eterm* res;
- if (factory->p) {
- res = HAllocX(factory->p, need, xtra);
- } else {
- res = factory->hp;
- factory->hp += need;
+
+ ASSERT((unsigned int)factory->mode > (unsigned int)FACTORY_CLOSED);
+ if (factory->hp + need > factory->hp_end) {
+ reserve_heap(factory, need, xtra);
}
+ res = factory->hp;
+ factory->hp += need;
return res;
}
+Eterm* erts_reserve_heap(ErtsHeapFactory* factory, Uint need)
+{
+ ASSERT((unsigned int)factory->mode > (unsigned int)FACTORY_CLOSED);
+ if (factory->hp + need > factory->hp_end) {
+ reserve_heap(factory, need, 200);
+ }
+ return factory->hp;
+}
+
+static void reserve_heap(ErtsHeapFactory* factory, Uint need, Uint xtra)
+{
+ ErlHeapFragment* bp;
+
+ switch (factory->mode) {
+ case FACTORY_HALLOC:
+ HRelease(factory->p, factory->hp_end, factory->hp);
+ factory->hp = HAllocX(factory->p, need, xtra);
+ factory->hp_end = factory->hp + need;
+ return;
+
+ case FACTORY_HEAP_FRAGS:
+ bp = factory->heap_frags;
+
+ if (bp) {
+ ASSERT(factory->hp > bp->mem);
+ ASSERT(factory->hp <= factory->hp_end);
+ ASSERT(factory->hp_end == bp->mem + bp->alloc_size);
+
+ bp->used_size = factory->hp - bp->mem;
+ }
+ bp = (ErlHeapFragment*) ERTS_HEAP_ALLOC(factory->alloc_type,
+ ERTS_HEAP_FRAG_SIZE(need+xtra));
+ bp->next = factory->heap_frags;
+ factory->heap_frags = bp;
+ bp->alloc_size = need + xtra;
+ bp->used_size = need;
+ bp->off_heap.first = NULL;
+ bp->off_heap.overhead = 0;
+
+ factory->hp = bp->mem;
+ factory->hp_end = bp->mem + bp->alloc_size;
+ return;
+
+ case FACTORY_STATIC:
+ case FACTORY_CLOSED:
+ default:
+ ASSERT(!"Invalid factory mode");
+ }
+}
+
+void erts_factory_close(ErtsHeapFactory* factory)
+{
+ ErlHeapFragment* bp;
+
+ switch (factory->mode) {
+ case FACTORY_HALLOC:
+ HRelease(factory->p, factory->hp_end, factory->hp);
+ break;
+
+ case FACTORY_HEAP_FRAGS:
+ bp = factory->heap_frags;
+
+ if (bp) {
+ ASSERT(factory->hp >= bp->mem);
+ ASSERT(factory->hp <= factory->hp_end);
+ ASSERT(factory->hp_end == bp->mem + bp->alloc_size);
+
+ bp->used_size = factory->hp - bp->mem;
+ }
+ break;
+ case FACTORY_STATIC: break;
+ case FACTORY_CLOSED: break;
+ default:
+ ASSERT(!"Invalid factory mode");
+ }
+ factory->mode = FACTORY_CLOSED;
+}
+
+void erts_factory_undo(ErtsHeapFactory* factory)
+{
+ ErlHeapFragment* bp;
+ struct erl_off_heap_header *hdr, **hdr_nextp;
+
+ switch (factory->mode) {
+ case FACTORY_HALLOC:
+ case FACTORY_STATIC:
+ /* Cleanup off-heap
+ */
+ hdr_nextp = NULL;
+ for (hdr = factory->off_heap->first;
+ hdr != factory->off_heap_saved.first;
+ hdr = hdr->next) {
+
+ hdr_nextp = &hdr->next;
+ }
+
+ if (hdr_nextp != NULL) {
+ *hdr_nextp = NULL;
+ erts_cleanup_offheap(factory->off_heap);
+ factory->off_heap->first = factory->off_heap_saved.first;
+ factory->off_heap->overhead = factory->off_heap_saved.overhead;
+ }
+
+ if (factory->mode == FACTORY_HALLOC) {
+ /* Free heap frags
+ */
+ bp = factory->p->mbuf;
+ if (bp != factory->heap_frags_saved) {
+ do {
+ ErlHeapFragment *next_bp = bp->next;
+ ASSERT(bp->off_heap.first == NULL);
+ ERTS_HEAP_FREE(ERTS_ALC_T_HEAP_FRAG, (void *) bp,
+ ERTS_HEAP_FRAG_SIZE(bp->alloc_size));
+ bp = next_bp;
+ } while (bp != factory->heap_frags_saved);
+
+ factory->p->mbuf = bp;
+ }
+
+ /* Rollback heap top
+ */
+ if (factory->heap_frags_saved == NULL) { /* No heap frags when we started */
+ ASSERT(factory->hp_start >= HEAP_START(factory->p));
+ ASSERT(factory->hp_start <= HEAP_LIMIT(factory->p));
+
+ HEAP_TOP(factory->p) = factory->hp_start;
+ }
+ else {
+ ASSERT(factory->heap_frags_saved == factory->p->mbuf);
+ if (factory->hp_start == factory->heap_frags_saved->mem) {
+ factory->p->mbuf = factory->p->mbuf->next;
+ ERTS_HEAP_FREE(ERTS_ALC_T_HEAP_FRAG, factory->heap_frags_saved,
+ ERTS_HEAP_FRAG_SIZE(factory->heap_frags_saved->alloc_size));
+ }
+ else if (factory->hp_start != factory->hp_end) {
+ unsigned remains = factory->hp_start - factory->heap_frags_saved->mem;
+ ASSERT(remains > 0 && remains < factory->heap_frags_saved->used_size);
+ factory->heap_frags_saved->used_size = remains;
+ }
+ }
+ }
+ break;
+
+ case FACTORY_HEAP_FRAGS:
+ bp = factory->heap_frags;
+ do {
+ ErlHeapFragment* next_bp = bp->next;
+
+ erts_cleanup_offheap(&bp->off_heap);
+ ERTS_HEAP_FREE(factory->alloc_type, (void *) bp,
+ ERTS_HEAP_FRAG_SIZE(bp->size));
+ bp = next_bp;
+ }while (bp != NULL);
+ break;
+
+ case FACTORY_CLOSED: break;
+ default:
+ ASSERT(!"Invalid factory mode");
+ }
+ factory->mode = FACTORY_CLOSED;
+#ifdef DEBUG
+ factory->p = NULL;
+ factory->hp = NULL;
+ factory->heap_frags = NULL;
+#endif
+}
+