diff options
Diffstat (limited to 'erts/emulator/beam/erl_message.c')
-rw-r--r-- | erts/emulator/beam/erl_message.c | 233 |
1 files changed, 108 insertions, 125 deletions
diff --git a/erts/emulator/beam/erl_message.c b/erts/emulator/beam/erl_message.c index a3274d7443..1bebf6efe2 100644 --- a/erts/emulator/beam/erl_message.c +++ b/erts/emulator/beam/erl_message.c @@ -181,7 +181,7 @@ erts_cleanup_offheap(ErlOffHeap *offheap) break; default: ASSERT(is_external_header(u.hdr->thing_word)); - erts_deref_node_entry(u.ext->node); + erts_deref_node_entry(u.ext->node, make_boxed(u.ep)); break; } } @@ -201,34 +201,44 @@ free_message_buffer(ErlHeapFragment* bp) }while (bp != NULL); } +static void +erts_cleanup_message(ErtsMessage *mp) +{ + ErlHeapFragment *bp; + if (ERTS_SIG_IS_EXTERNAL_MSG(mp) || ERTS_SIG_IS_NON_MSG(mp)) { + ErtsDistExternal *edep = erts_proc_sig_get_external(mp); + if (edep) { + erts_free_dist_ext_copy(edep); + if (mp->data.heap_frag == &mp->hfrag) { + ASSERT(ERTS_SIG_IS_EXTERNAL_MSG(mp)); + mp->data.heap_frag = ERTS_MSG_COMBINED_HFRAG; + } + } + } + + if (ERTS_SIG_IS_MSG(mp) && mp->data.attached != ERTS_MSG_COMBINED_HFRAG) { + bp = mp->data.heap_frag; + } else { + /* All non msg signals are combined HFRAG messages, + but we overwrite the mp->data field with the + nm_signal queue ptr so have to fix that here + before freeing it. */ + mp->data.attached = ERTS_MSG_COMBINED_HFRAG; + bp = mp->hfrag.next; + erts_cleanup_offheap(&mp->hfrag.off_heap); + } + + if (bp) + free_message_buffer(bp); +} + void erts_cleanup_messages(ErtsMessage *msgp) { ErtsMessage *mp = msgp; while (mp) { ErtsMessage *fmp; - ErlHeapFragment *bp; - if (ERTS_SIG_IS_EXTERNAL_MSG(mp)) { - if (is_not_immed(ERL_MESSAGE_TOKEN(mp))) { - bp = (ErlHeapFragment *) mp->data.dist_ext->ext_endp; - erts_cleanup_offheap(&bp->off_heap); - } - if (mp->data.dist_ext) - erts_free_dist_ext_copy(mp->data.dist_ext); - } - else { - if (ERTS_SIG_IS_INTERNAL_MSG(mp) - && mp->data.attached != ERTS_MSG_COMBINED_HFRAG) { - bp = mp->data.heap_frag; - } - else { - mp->data.attached = ERTS_MSG_COMBINED_HFRAG; - bp = mp->hfrag.next; - erts_cleanup_offheap(&mp->hfrag.off_heap); - } - if (bp) - free_message_buffer(bp); - } + erts_cleanup_message(mp); fmp = mp; mp = mp->next; erts_free_message(fmp); @@ -260,6 +270,7 @@ void erts_queue_dist_message(Process *rcvr, ErtsProcLocks rcvr_locks, ErtsDistExternal *dist_ext, + ErlHeapFragment *hfrag, Eterm token, Eterm from) { @@ -268,8 +279,26 @@ erts_queue_dist_message(Process *rcvr, ERTS_LC_ASSERT(rcvr_locks == erts_proc_lc_my_proc_locks(rcvr)); - mp = erts_alloc_message(0, NULL); - mp->data.dist_ext = dist_ext; + if (hfrag) { + /* Fragmented message, allocate a message reference */ + mp = erts_alloc_message(0, NULL); + mp->data.heap_frag = hfrag; + } else { + /* Un-fragmented message, allocate space for + token and dist_ext in message. */ + Uint dist_ext_sz = erts_dist_ext_size(dist_ext) / sizeof(Eterm); + Uint token_sz = size_object(token); + Uint sz = token_sz + dist_ext_sz; + Eterm *hp; + + mp = erts_alloc_message(sz, &hp); + mp->data.heap_frag = &mp->hfrag; + mp->hfrag.used_size = token_sz; + + erts_make_dist_ext_copy(dist_ext, erts_get_dist_ext(mp->data.heap_frag)); + + token = copy_struct(token, token_sz, &hp, &mp->data.heap_frag->off_heap); + } ERL_MESSAGE_FROM(mp) = dist_ext->dep->sysname; ERL_MESSAGE_TERM(mp) = THE_NON_VALUE; @@ -493,25 +522,27 @@ Uint erts_msg_attached_data_size_aux(ErtsMessage *msg) { Sint sz; - ASSERT(is_non_value(ERL_MESSAGE_TERM(msg))); - ASSERT(msg->data.dist_ext); - ASSERT(msg->data.dist_ext->heap_size < 0); - - sz = erts_decode_dist_ext_size(msg->data.dist_ext); - if (sz < 0) { - /* Bad external - * We leave the message intact in this case as it's not worth the trouble - * to make all callers remove it from queue. It will be detected again - * and removed from message queue later anyway. - */ - return 0; - } + ErtsDistExternal *edep = erts_get_dist_ext(msg->data.heap_frag); + ASSERT(ERTS_SIG_IS_EXTERNAL_MSG(msg)); - msg->data.dist_ext->heap_size = sz; - if (is_not_nil(msg->m[1])) { - ErlHeapFragment *heap_frag; - heap_frag = erts_dist_ext_trailer(msg->data.dist_ext); - sz += heap_frag->used_size; + if (edep->heap_size < 0) { + + sz = erts_decode_dist_ext_size(edep, 1, 1); + if (sz < 0) { + /* Bad external + * We leave the message intact in this case as it's not worth the trouble + * to make all callers remove it from queue. It will be detected again + * and removed from message queue later anyway. + */ + return 0; + } + + edep->heap_size = sz; + } else { + sz = edep->heap_size; + } + if (is_not_nil(ERL_MESSAGE_TOKEN(msg))) { + sz += msg->data.heap_frag->used_size; } return sz; } @@ -532,9 +563,7 @@ erts_try_alloc_message_on_heap(Process *pp, if ((*psp) & ERTS_PSFLGS_VOLATILE_HEAP) goto in_message_fragment; - else if ( - *plp & ERTS_PROC_LOCK_MAIN - ) { + else if (*plp & ERTS_PROC_LOCK_MAIN) { try_on_heap: if (((*psp) & ERTS_PSFLGS_VOLATILE_HEAP) || (pp->flags & F_DISABLE_GC) @@ -747,6 +776,13 @@ erts_send_message(Process* sender, #endif erts_queue_proc_message(sender, receiver, *receiver_locks, mp, message); + + if (msize > ERTS_MSG_COPY_WORDS_PER_REDUCTION) { + Uint reds = msize / ERTS_MSG_COPY_WORDS_PER_REDUCTION; + if (reds > CONTEXT_REDS) + reds = CONTEXT_REDS; + BUMP_REDS(sender, (int) reds); + } } @@ -1101,84 +1137,28 @@ change_to_off_heap: return res; } -int -erts_decode_dist_message(Process *proc, ErtsProcLocks proc_locks, - ErtsMessage *msgp, int force_off_heap) +void erts_factory_proc_init(ErtsHeapFactory* factory, Process* p) { - ErtsHeapFactory factory; - Eterm msg; - ErlHeapFragment *bp; - Sint need; - int decode_in_heap_frag; - - decode_in_heap_frag = (force_off_heap - || !(proc_locks & ERTS_PROC_LOCK_MAIN) - || (proc->flags & F_OFF_HEAP_MSGQ)); - - if (msgp->data.dist_ext->heap_size >= 0) - need = msgp->data.dist_ext->heap_size; - else { - need = erts_decode_dist_ext_size(msgp->data.dist_ext); - if (need < 0) { - /* bad msg; remove it... */ - if (is_not_immed(ERL_MESSAGE_TOKEN(msgp))) { - bp = erts_dist_ext_trailer(msgp->data.dist_ext); - erts_cleanup_offheap(&bp->off_heap); - } - erts_free_dist_ext_copy(msgp->data.dist_ext); - msgp->data.dist_ext = NULL; - return 0; - } - - msgp->data.dist_ext->heap_size = need; - } - - if (is_not_immed(ERL_MESSAGE_TOKEN(msgp))) { - bp = erts_dist_ext_trailer(msgp->data.dist_ext); - need += bp->used_size; - } - - if (decode_in_heap_frag) - erts_factory_heap_frag_init(&factory, new_message_buffer(need)); - else - erts_factory_proc_prealloc_init(&factory, proc, need); - - ASSERT(msgp->data.dist_ext->heap_size >= 0); - if (is_not_immed(ERL_MESSAGE_TOKEN(msgp))) { - ErlHeapFragment *heap_frag; - heap_frag = erts_dist_ext_trailer(msgp->data.dist_ext); - ERL_MESSAGE_TOKEN(msgp) = copy_struct(ERL_MESSAGE_TOKEN(msgp), - heap_frag->used_size, - &factory.hp, - factory.off_heap); - erts_cleanup_offheap(&heap_frag->off_heap); - } - - msg = erts_decode_dist_ext(&factory, msgp->data.dist_ext); - ERL_MESSAGE_TERM(msgp) = msg; - erts_free_dist_ext_copy(msgp->data.dist_ext); - msgp->data.attached = NULL; - - if (is_non_value(msg)) { - erts_factory_undo(&factory); - return 0; - } - - erts_factory_trim_and_close(&factory, msgp->m, - ERL_MESSAGE_REF_ARRAY_SZ); - - ASSERT(!msgp->data.heap_frag); - - if (decode_in_heap_frag) - msgp->data.heap_frag = factory.heap_frags; - - return 1; -} + /* This function does not use HAlloc to allocate on the heap + as we do not want to use INIT_HEAP_MEM on the allocated + heap as that completely destroys the DEBUG emulators + performance. */ + ErlHeapFragment *bp = p->mbuf; + factory->mode = FACTORY_HALLOC; + factory->p = p; + factory->hp_start = HEAP_TOP(p); + factory->hp = factory->hp_start; + factory->hp_end = HEAP_LIMIT(p); + factory->off_heap = &p->off_heap; + factory->message = NULL; + factory->off_heap_saved.first = p->off_heap.first; + factory->off_heap_saved.overhead = p->off_heap.overhead; + factory->heap_frags_saved = bp; + factory->heap_frags_saved_used = bp ? bp->used_size : 0; + factory->heap_frags = NULL; /* not used */ + factory->alloc_type = 0; /* not used */ -void erts_factory_proc_init(ErtsHeapFactory* factory, - Process* p) -{ - erts_factory_proc_prealloc_init(factory, p, HEAP_LIMIT(p) - HEAP_TOP(p)); + HEAP_TOP(p) = HEAP_LIMIT(p); } void erts_factory_proc_prealloc_init(ErtsHeapFactory* factory, @@ -1235,7 +1215,7 @@ erts_factory_message_create(ErtsHeapFactory* factory, int on_heap; erts_aint32_t state; - state = proc ? erts_atomic32_read_nob(&proc->state) : 0; + state = proc ? erts_atomic32_read_nob(&proc->state) : ERTS_PSFLG_OFF_HEAP_MSGQ; if (state & ERTS_PSFLG_OFF_HEAP_MSGQ) { msgp = erts_alloc_message(sz, &hp); @@ -1468,8 +1448,8 @@ void erts_factory_close(ErtsHeapFactory* factory) else factory->message->data.heap_frag = factory->heap_frags; - /* Fall through */ - case FACTORY_HEAP_FRAGS: + /* Fall through */ + case FACTORY_HEAP_FRAGS: bp = factory->heap_frags; } @@ -1610,6 +1590,9 @@ void erts_factory_undo(ErtsHeapFactory* factory) factory->message->hfrag.next = factory->heap_frags; else factory->message->data.heap_frag = factory->heap_frags; + /* Set the message to NIL in order for this message not to be + treated as a distributed message by the cleanup_messages logic */ + factory->message->m[0] = NIL; erts_cleanup_messages(factory->message); break; case FACTORY_TMP: |