diff options
Diffstat (limited to 'erts/emulator/beam/erl_message.c')
-rw-r--r-- | erts/emulator/beam/erl_message.c | 1070 |
1 files changed, 1070 insertions, 0 deletions
diff --git a/erts/emulator/beam/erl_message.c b/erts/emulator/beam/erl_message.c new file mode 100644 index 0000000000..81fbdfbd5a --- /dev/null +++ b/erts/emulator/beam/erl_message.c @@ -0,0 +1,1070 @@ +/* + * %CopyrightBegin% + * + * Copyright Ericsson AB 1997-2009. All Rights Reserved. + * + * The contents of this file are subject to the Erlang Public License, + * Version 1.1, (the "License"); you may not use this file except in + * compliance with the License. You should have received a copy of the + * Erlang Public License along with this software. If not, it can be + * retrieved online at http://www.erlang.org/. + * + * Software distributed under the License is distributed on an "AS IS" + * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See + * the License for the specific language governing rights and limitations + * under the License. + * + * %CopyrightEnd% + */ +/* + * Message passing primitives. + */ + +#ifdef HAVE_CONFIG_H +# include "config.h" +#endif + +#include "sys.h" +#include "erl_vm.h" +#include "global.h" +#include "erl_message.h" +#include "erl_process.h" +#include "erl_nmgc.h" + +ERTS_SCHED_PREF_QUICK_ALLOC_IMPL(message, + ErlMessage, + ERL_MESSAGE_BUF_SZ, + ERTS_ALC_T_MSG_REF) + +#if defined(DEBUG) && 0 +#define HARD_DEBUG +#else +#undef HARD_DEBUG +#endif + +void +init_message(void) +{ + init_message_alloc(); +} + +void +free_message(ErlMessage* mp) +{ + message_free(mp); +} + +/* Allocate message buffer (size in words) */ +ErlHeapFragment* +new_message_buffer(Uint size) +{ + ErlHeapFragment* bp; + bp = (ErlHeapFragment*) ERTS_HEAP_ALLOC(ERTS_ALC_T_HEAP_FRAG, + ERTS_HEAP_FRAG_SIZE(size)); + ERTS_INIT_HEAP_FRAG(bp, size); + return bp; +} + +ErlHeapFragment* +erts_resize_message_buffer(ErlHeapFragment *bp, Uint size, + Eterm *brefs, Uint brefs_size) +{ +#ifdef DEBUG + int i; +#endif +#ifdef HARD_DEBUG + ErlHeapFragment *dbg_bp; + Eterm *dbg_brefs; + Uint dbg_size; + Uint dbg_tot_size; + Eterm *dbg_hp; +#endif + ErlHeapFragment* nbp; + +#ifdef DEBUG + { + Uint off_sz = size < bp->size ? size : bp->size; + for (i = 0; i < brefs_size; i++) { + Eterm *ptr; + if (is_immed(brefs[i])) + continue; + ptr = ptr_val(brefs[i]); + ASSERT(&bp->mem[0] <= ptr && ptr < &bp->mem[0] + off_sz); + + } + } +#endif + + if (size == bp->size) + return bp; + +#ifdef HARD_DEBUG + dbg_brefs = erts_alloc(ERTS_ALC_T_UNDEF, sizeof(Eterm *)*brefs_size); + dbg_bp = new_message_buffer(bp->size); + dbg_hp = dbg_bp->mem; + dbg_tot_size = 0; + for (i = 0; i < brefs_size; i++) { + dbg_size = size_object(brefs[i]); + dbg_tot_size += dbg_size; + dbg_brefs[i] = copy_struct(brefs[i], dbg_size, &dbg_hp, + &dbg_bp->off_heap); + } + ASSERT(dbg_tot_size == (size < bp->size ? size : bp->size)); +#endif + + nbp = (ErlHeapFragment*) ERTS_HEAP_REALLOC(ERTS_ALC_T_HEAP_FRAG, + (void *) bp, + (sizeof(ErlHeapFragment) + - sizeof(Eterm) + + bp->size*sizeof(Eterm)), + (sizeof(ErlHeapFragment) + - sizeof(Eterm) + + size*sizeof(Eterm))); + if (bp != nbp) { + Uint off_sz = size < nbp->size ? size : nbp->size; + Eterm *sp = &bp->mem[0]; + Eterm *ep = sp + off_sz; + Sint offs = &nbp->mem[0] - sp; + erts_offset_off_heap(&nbp->off_heap, offs, sp, ep); + erts_offset_heap(&nbp->mem[0], off_sz, offs, sp, ep); + if (brefs && brefs_size) + erts_offset_heap_ptr(brefs, brefs_size, offs, sp, ep); +#ifdef DEBUG + for (i = 0; i < brefs_size; i++) { + Eterm *ptr; + if (is_immed(brefs[i])) + continue; + ptr = ptr_val(brefs[i]); + ASSERT(&nbp->mem[0] <= ptr && ptr < &nbp->mem[0] + off_sz); + } +#endif + } + nbp->size = size; + + +#ifdef HARD_DEBUG + for (i = 0; i < brefs_size; i++) + ASSERT(eq(dbg_brefs[i], brefs[i])); + free_message_buffer(dbg_bp); + erts_free(ERTS_ALC_T_UNDEF, dbg_brefs); +#endif + + return nbp; +} + + +void +erts_cleanup_offheap(ErlOffHeap *offheap) +{ + if (offheap->mso) { + erts_cleanup_mso(offheap->mso); + } +#ifndef HYBRID /* FIND ME! */ + if (offheap->funs) { + erts_cleanup_funs(offheap->funs); + } +#endif + if (offheap->externals) { + erts_cleanup_externals(offheap->externals); + } +} + +void +free_message_buffer(ErlHeapFragment* bp) +{ + erts_cleanup_offheap(&bp->off_heap); + ERTS_HEAP_FREE(ERTS_ALC_T_HEAP_FRAG, + (void *) bp, + (sizeof(ErlHeapFragment) + - sizeof(Eterm) + + bp->size*sizeof(Eterm))); +} + +static ERTS_INLINE void +link_mbuf_to_proc(Process *proc, ErlHeapFragment *bp) +{ + if (bp) { + /* Link the message buffer */ + bp->next = MBUF(proc); + MBUF(proc) = bp; + MBUF_SIZE(proc) += bp->size; + FLAGS(proc) |= F_FORCE_GC; + + /* Move any binaries into the process */ + if (bp->off_heap.mso != NULL) { + ProcBin** next_p = &bp->off_heap.mso; + while (*next_p != NULL) { + next_p = &((*next_p)->next); + } + *next_p = MSO(proc).mso; + MSO(proc).mso = bp->off_heap.mso; + bp->off_heap.mso = NULL; + MSO(proc).overhead += bp->off_heap.overhead; + } + + /* Move any funs into the process */ +#ifndef HYBRID + if (bp->off_heap.funs != NULL) { + ErlFunThing** next_p = &bp->off_heap.funs; + while (*next_p != NULL) { + next_p = &((*next_p)->next); + } + *next_p = MSO(proc).funs; + MSO(proc).funs = bp->off_heap.funs; + bp->off_heap.funs = NULL; + } +#endif + + /* Move any external things into the process */ + if (bp->off_heap.externals != NULL) { + ExternalThing** next_p = &bp->off_heap.externals; + while (*next_p != NULL) { + next_p = &((*next_p)->next); + } + *next_p = MSO(proc).externals; + MSO(proc).externals = bp->off_heap.externals; + bp->off_heap.externals = NULL; + } + } +} + +Eterm +erts_msg_distext2heap(Process *pp, + ErtsProcLocks *plcksp, + ErlHeapFragment **bpp, + Eterm *tokenp, + ErtsDistExternal *dist_extp) +{ + Eterm msg; + Uint tok_sz = 0; + Eterm *hp = NULL; + Eterm *hp_end = NULL; + ErlOffHeap *ohp; + Sint sz; + + *bpp = NULL; + sz = erts_decode_dist_ext_size(dist_extp, 0); + if (sz < 0) + goto decode_error; + if (is_not_nil(*tokenp)) { + ErlHeapFragment *heap_frag = erts_dist_ext_trailer(dist_extp); + tok_sz = heap_frag->size; + sz += tok_sz; + } + if (pp) + hp = erts_alloc_message_heap(sz, bpp, &ohp, pp, plcksp); + else { + *bpp = new_message_buffer(sz); + hp = (*bpp)->mem; + ohp = &(*bpp)->off_heap; + } + hp_end = hp + sz; + msg = erts_decode_dist_ext(&hp, ohp, dist_extp); + if (is_non_value(msg)) + goto decode_error; + if (is_not_nil(*tokenp)) { + ErlHeapFragment *heap_frag = erts_dist_ext_trailer(dist_extp); + *tokenp = copy_struct(*tokenp, tok_sz, &hp, ohp); + erts_cleanup_offheap(&heap_frag->off_heap); + } + erts_free_dist_ext_copy(dist_extp); + if (hp_end != hp) { + if (!(*bpp)) { + HRelease(pp, hp_end, hp); + } + else { + Uint final_size = hp - &(*bpp)->mem[0]; + Eterm brefs[2] = {msg, *tokenp}; + ASSERT(sz - (hp_end - hp) == final_size); + *bpp = erts_resize_message_buffer(*bpp, final_size, &brefs[0], 2); + msg = brefs[0]; + *tokenp = brefs[1]; + } + } + return msg; + + decode_error: + if (is_not_nil(*tokenp)) { + ErlHeapFragment *heap_frag = erts_dist_ext_trailer(dist_extp); + erts_cleanup_offheap(&heap_frag->off_heap); + } + erts_free_dist_ext_copy(dist_extp); + if (*bpp) + free_message_buffer(*bpp); + else if (hp) { + HRelease(pp, hp_end, hp); + } + *bpp = NULL; + return THE_NON_VALUE; + } + +static ERTS_INLINE void +notify_new_message(Process *receiver) +{ + ERTS_SMP_LC_ASSERT(ERTS_PROC_LOCK_STATUS + & erts_proc_lc_my_proc_locks(receiver)); + + ACTIVATE(receiver); + + switch (receiver->status) { + case P_GARBING: + switch (receiver->gcstatus) { + case P_SUSPENDED: + goto suspended; + case P_WAITING: + goto waiting; + default: + break; + } + break; + case P_SUSPENDED: + suspended: + receiver->rstatus = P_RUNABLE; + break; + case P_WAITING: + waiting: + erts_add_to_runq(receiver); + break; + default: + break; + } +} + +void +erts_queue_dist_message(Process *rcvr, + ErtsProcLocks *rcvr_locks, + ErtsDistExternal *dist_ext, + Eterm token) +{ + ErlMessage* mp; +#ifdef ERTS_SMP + ErtsProcLocks need_locks; +#endif + + ERTS_SMP_LC_ASSERT(*rcvr_locks == erts_proc_lc_my_proc_locks(rcvr)); + + mp = message_alloc(); + +#ifdef ERTS_SMP + need_locks = ~(*rcvr_locks) & (ERTS_PROC_LOCK_MSGQ|ERTS_PROC_LOCK_STATUS); + if (need_locks) { + *rcvr_locks |= need_locks; + if (erts_smp_proc_trylock(rcvr, need_locks) == EBUSY) { + if (need_locks == ERTS_PROC_LOCK_MSGQ) { + erts_smp_proc_unlock(rcvr, ERTS_PROC_LOCK_STATUS); + need_locks = (ERTS_PROC_LOCK_MSGQ + | ERTS_PROC_LOCK_STATUS); + } + erts_smp_proc_lock(rcvr, need_locks); + } + } + + if (rcvr->is_exiting || ERTS_PROC_PENDING_EXIT(rcvr)) { + /* Drop message if receiver is exiting or has a pending exit ... */ + if (is_not_nil(token)) { + ErlHeapFragment *heap_frag; + heap_frag = erts_dist_ext_trailer(mp->data.dist_ext); + erts_cleanup_offheap(&heap_frag->off_heap); + } + erts_free_dist_ext_copy(dist_ext); + message_free(mp); + } + else +#endif + if (IS_TRACED_FL(rcvr, F_TRACE_RECEIVE)) { + /* Ahh... need to decode it in order to trace it... */ + ErlHeapFragment *mbuf; + Eterm msg; + message_free(mp); + msg = erts_msg_distext2heap(rcvr, rcvr_locks, &mbuf, &token, dist_ext); + if (is_value(msg)) + erts_queue_message(rcvr, rcvr_locks, mbuf, msg, token); + } + else { + /* Enqueue message on external format */ + + ERL_MESSAGE_TERM(mp) = THE_NON_VALUE; + ERL_MESSAGE_TOKEN(mp) = token; + mp->next = NULL; + + mp->data.dist_ext = dist_ext; + LINK_MESSAGE(rcvr, mp); + + notify_new_message(rcvr); + } +} + +/* Add a message last in message queue */ +void +erts_queue_message(Process* receiver, + ErtsProcLocks *receiver_locks, + ErlHeapFragment* bp, + Eterm message, + Eterm seq_trace_token) +{ + ErlMessage* mp; +#ifdef ERTS_SMP + ErtsProcLocks need_locks; +#else + ASSERT(bp != NULL || receiver->mbuf == NULL); +#endif + + ERTS_SMP_LC_ASSERT(*receiver_locks == erts_proc_lc_my_proc_locks(receiver)); + + mp = message_alloc(); + +#ifdef ERTS_SMP + need_locks = ~(*receiver_locks) & (ERTS_PROC_LOCK_MSGQ + | ERTS_PROC_LOCK_STATUS); + if (need_locks) { + *receiver_locks |= need_locks; + if (erts_smp_proc_trylock(receiver, need_locks) == EBUSY) { + if (need_locks == ERTS_PROC_LOCK_MSGQ) { + erts_smp_proc_unlock(receiver, ERTS_PROC_LOCK_STATUS); + need_locks = (ERTS_PROC_LOCK_MSGQ + | ERTS_PROC_LOCK_STATUS); + } + erts_smp_proc_lock(receiver, need_locks); + } + } + + if (receiver->is_exiting || ERTS_PROC_PENDING_EXIT(receiver)) { + /* Drop message if receiver is exiting or has a pending + * exit ... + */ + if (bp) + free_message_buffer(bp); + message_free(mp); + return; + } +#endif + + ERL_MESSAGE_TERM(mp) = message; + ERL_MESSAGE_TOKEN(mp) = seq_trace_token; + mp->next = NULL; + +#ifdef ERTS_SMP + if (*receiver_locks & ERTS_PROC_LOCK_MAIN) { + mp->data.heap_frag = bp; + + /* + * We move 'in queue' to 'private queue' and place + * message at the end of 'private queue' in order + * to ensure that the 'in queue' doesn't contain + * references into the heap. By ensuring this, + * we don't need to include the 'in queue' in + * the root set when garbage collecting. + */ + ERTS_SMP_MSGQ_MV_INQ2PRIVQ(receiver); + LINK_MESSAGE_PRIVQ(receiver, mp); + } + else { + mp->data.heap_frag = bp; + LINK_MESSAGE(receiver, mp); + } +#else + mp->data.heap_frag = bp; + LINK_MESSAGE(receiver, mp); +#endif + + notify_new_message(receiver); + + if (IS_TRACED_FL(receiver, F_TRACE_RECEIVE)) { + trace_receive(receiver, message); + } + +#ifndef ERTS_SMP + ERTS_HOLE_CHECK(receiver); +#endif +} + +void +erts_link_mbuf_to_proc(struct process *proc, ErlHeapFragment *bp) +{ + Eterm* htop = HEAP_TOP(proc); + + link_mbuf_to_proc(proc, bp); + if (htop < HEAP_LIMIT(proc)) { + *htop = make_pos_bignum_header(HEAP_LIMIT(proc)-htop-1); + HEAP_TOP(proc) = HEAP_LIMIT(proc); + } +} + +/* + * Moves content of message buffer attached to a message into a heap. + * The message buffer is deallocated. + */ +void +erts_move_msg_mbuf_to_heap(Eterm** hpp, ErlOffHeap* off_heap, ErlMessage *msg) +{ + /* Unions for typecasts avoids warnings about type-punned pointers and aliasing */ + union { + Uint** upp; + ProcBin **pbpp; + ErlFunThing **efpp; + ExternalThing **etpp; + } oh_list_pp, oh_el_next_pp; + union { + Uint *up; + ProcBin *pbp; + ErlFunThing *efp; + ExternalThing *etp; + } oh_el_p; + Eterm term, token, *fhp, *hp; + Sint offs; + Uint sz; + ErlHeapFragment *bp; + +#ifdef HARD_DEBUG + ProcBin *dbg_mso_start = off_heap->mso; + ErlFunThing *dbg_fun_start = off_heap->funs; + ExternalThing *dbg_external_start = off_heap->externals; + Eterm dbg_term, dbg_token; + ErlHeapFragment *dbg_bp; + Uint *dbg_hp, *dbg_thp_start; + Uint dbg_term_sz, dbg_token_sz; +#endif + + bp = msg->data.heap_frag; + term = ERL_MESSAGE_TERM(msg); + token = ERL_MESSAGE_TOKEN(msg); + if (!bp) { + ASSERT(is_immed(term) && is_immed(token)); + return; + } + +#ifdef HARD_DEBUG + dbg_term_sz = size_object(term); + dbg_token_sz = size_object(token); + ASSERT(bp->size == dbg_term_sz + dbg_token_sz); + + dbg_bp = new_message_buffer(bp->size); + dbg_hp = dbg_bp->mem; + dbg_term = copy_struct(term, dbg_term_sz, &dbg_hp, &dbg_bp->off_heap); + dbg_token = copy_struct(token, dbg_token_sz, &dbg_hp, &dbg_bp->off_heap); + dbg_thp_start = *hpp; +#endif + + ASSERT(bp); + msg->data.attached = NULL; + + off_heap->overhead += bp->off_heap.overhead; + sz = bp->size; + +#ifdef DEBUG + if (is_not_immed(term)) { + ASSERT(bp->mem <= ptr_val(term)); + ASSERT(bp->mem + bp->size > ptr_val(term)); + } + + if (is_not_immed(token)) { + ASSERT(bp->mem <= ptr_val(token)); + ASSERT(bp->mem + bp->size > ptr_val(token)); + } +#endif + + fhp = bp->mem; + hp = *hpp; + offs = hp - fhp; + + oh_list_pp.upp = NULL; + oh_el_next_pp.upp = NULL; /* Shut up compiler warning */ + oh_el_p.up = NULL; /* Shut up compiler warning */ + while (sz--) { + Uint cpy_sz; + Eterm val = *fhp++; + + switch (primary_tag(val)) { + case TAG_PRIMARY_IMMED1: + *hp++ = val; + break; + case TAG_PRIMARY_LIST: + case TAG_PRIMARY_BOXED: + ASSERT(bp->mem <= ptr_val(val)); + ASSERT(bp->mem + bp->size > ptr_val(val)); + *hp++ = offset_ptr(val, offs); + break; + case TAG_PRIMARY_HEADER: + *hp++ = val; + switch (val & _HEADER_SUBTAG_MASK) { + case ARITYVAL_SUBTAG: + break; + case REFC_BINARY_SUBTAG: + oh_list_pp.pbpp = &off_heap->mso; + oh_el_p.up = (hp-1); + oh_el_next_pp.pbpp = &(oh_el_p.pbp)->next; + cpy_sz = thing_arityval(val); + goto cpy_words; + case FUN_SUBTAG: +#ifndef HYBRID + oh_list_pp.efpp = &off_heap->funs; + oh_el_p.up = (hp-1); + oh_el_next_pp.efpp = &(oh_el_p.efp)->next; +#endif + cpy_sz = thing_arityval(val); + goto cpy_words; + case EXTERNAL_PID_SUBTAG: + case EXTERNAL_PORT_SUBTAG: + case EXTERNAL_REF_SUBTAG: + oh_list_pp.etpp = &off_heap->externals; + oh_el_p.up = (hp-1); + oh_el_next_pp.etpp = &(oh_el_p.etp)->next; + cpy_sz = thing_arityval(val); + goto cpy_words; + default: + cpy_sz = header_arity(val); + + cpy_words: + sz -= cpy_sz; + while (cpy_sz >= 8) { + cpy_sz -= 8; + *hp++ = *fhp++; + *hp++ = *fhp++; + *hp++ = *fhp++; + *hp++ = *fhp++; + *hp++ = *fhp++; + *hp++ = *fhp++; + *hp++ = *fhp++; + *hp++ = *fhp++; + } + switch (cpy_sz) { + case 7: *hp++ = *fhp++; + case 6: *hp++ = *fhp++; + case 5: *hp++ = *fhp++; + case 4: *hp++ = *fhp++; + case 3: *hp++ = *fhp++; + case 2: *hp++ = *fhp++; + case 1: *hp++ = *fhp++; + default: break; + } + if (oh_list_pp.upp) { +#ifdef HARD_DEBUG + Uint *dbg_old_oh_list_p = *oh_list_pp.upp; +#endif + /* Add to offheap list */ + *oh_el_next_pp.upp = *oh_list_pp.upp; + *oh_list_pp.upp = oh_el_p.up; + ASSERT(*hpp <= oh_el_p.up); + ASSERT(hp > oh_el_p.up); +#ifdef HARD_DEBUG + switch (val & _HEADER_SUBTAG_MASK) { + case REFC_BINARY_SUBTAG: + ASSERT(off_heap->mso == *oh_list_pp.pbpp); + ASSERT(off_heap->mso->next + == (ProcBin *) dbg_old_oh_list_p); + break; +#ifndef HYBRID + case FUN_SUBTAG: + ASSERT(off_heap->funs == *oh_list_pp.efpp); + ASSERT(off_heap->funs->next + == (ErlFunThing *) dbg_old_oh_list_p); + break; +#endif + case EXTERNAL_PID_SUBTAG: + case EXTERNAL_PORT_SUBTAG: + case EXTERNAL_REF_SUBTAG: + ASSERT(off_heap->externals + == *oh_list_pp.etpp); + ASSERT(off_heap->externals->next + == (ExternalThing *) dbg_old_oh_list_p); + break; + default: + ASSERT(0); + } +#endif + oh_list_pp.upp = NULL; + + + } + break; + } + break; + } + } + + ASSERT(bp->size == hp - *hpp); + *hpp = hp; + + if (is_not_immed(token)) { + ASSERT(bp->mem <= ptr_val(token)); + ASSERT(bp->mem + bp->size > ptr_val(token)); + ERL_MESSAGE_TOKEN(msg) = offset_ptr(token, offs); +#ifdef HARD_DEBUG + ASSERT(dbg_thp_start <= ptr_val(ERL_MESSAGE_TOKEN(msg))); + ASSERT(hp > ptr_val(ERL_MESSAGE_TOKEN(msg))); +#endif + } + + if (is_not_immed(term)) { + ASSERT(bp->mem <= ptr_val(term)); + ASSERT(bp->mem + bp->size > ptr_val(term)); + ERL_MESSAGE_TERM(msg) = offset_ptr(term, offs); +#ifdef HARD_DEBUG + ASSERT(dbg_thp_start <= ptr_val(ERL_MESSAGE_TERM(msg))); + ASSERT(hp > ptr_val(ERL_MESSAGE_TERM(msg))); +#endif + } + + +#ifdef HARD_DEBUG + { + int i, j; + { + ProcBin *mso = off_heap->mso; + i = j = 0; + while (mso != dbg_mso_start) { + mso = mso->next; + i++; + } + mso = bp->off_heap.mso; + while (mso) { + mso = mso->next; + j++; + } + ASSERT(i == j); + } + { + ErlFunThing *fun = off_heap->funs; + i = j = 0; + while (fun != dbg_fun_start) { + fun = fun->next; + i++; + } + fun = bp->off_heap.funs; + while (fun) { + fun = fun->next; + j++; + } + ASSERT(i == j); + } + { + ExternalThing *external = off_heap->externals; + i = j = 0; + while (external != dbg_external_start) { + external = external->next; + i++; + } + external = bp->off_heap.externals; + while (external) { + external = external->next; + j++; + } + ASSERT(i == j); + } + } +#endif + + + bp->off_heap.mso = NULL; +#ifndef HYBRID + bp->off_heap.funs = NULL; +#endif + bp->off_heap.externals = NULL; + free_message_buffer(bp); + +#ifdef HARD_DEBUG + ASSERT(eq(ERL_MESSAGE_TERM(msg), dbg_term)); + ASSERT(eq(ERL_MESSAGE_TOKEN(msg), dbg_token)); + free_message_buffer(dbg_bp); +#endif + +} + +Uint +erts_msg_attached_data_size_aux(ErlMessage *msg) +{ + Sint sz; + ASSERT(is_non_value(ERL_MESSAGE_TERM(msg))); + ASSERT(msg->data.dist_ext); + ASSERT(msg->data.dist_ext->heap_size < 0); + + sz = erts_decode_dist_ext_size(msg->data.dist_ext, 0); + if (sz < 0) { + /* Bad external; remove it */ + if (is_not_nil(ERL_MESSAGE_TOKEN(msg))) { + ErlHeapFragment *heap_frag; + heap_frag = erts_dist_ext_trailer(msg->data.dist_ext); + erts_cleanup_offheap(&heap_frag->off_heap); + } + erts_free_dist_ext_copy(msg->data.dist_ext); + msg->data.dist_ext = NULL; + return 0; + } + + msg->data.dist_ext->heap_size = sz; + if (is_not_nil(msg->m[1])) { + ErlHeapFragment *heap_frag; + heap_frag = erts_dist_ext_trailer(msg->data.dist_ext); + sz += heap_frag->size; + } + return sz; +} + +void +erts_move_msg_attached_data_to_heap(Eterm **hpp, ErlOffHeap *ohp, ErlMessage *msg) +{ + if (is_value(ERL_MESSAGE_TERM(msg))) + erts_move_msg_mbuf_to_heap(hpp, ohp, msg); + else if (msg->data.dist_ext) { + ASSERT(msg->data.dist_ext->heap_size >= 0); + if (is_not_nil(ERL_MESSAGE_TOKEN(msg))) { + ErlHeapFragment *heap_frag; + heap_frag = erts_dist_ext_trailer(msg->data.dist_ext); + ERL_MESSAGE_TOKEN(msg) = copy_struct(ERL_MESSAGE_TOKEN(msg), + heap_frag->size, + hpp, + ohp); + erts_cleanup_offheap(&heap_frag->off_heap); + } + ERL_MESSAGE_TERM(msg) = erts_decode_dist_ext(hpp, + ohp, + msg->data.dist_ext); + erts_free_dist_ext_copy(msg->data.dist_ext); + msg->data.dist_ext = NULL; + } + /* else: bad external detected when calculating size */ +} + +/* + * Send a local message when sender & receiver processes are known. + */ + +void +erts_send_message(Process* sender, + Process* receiver, + ErtsProcLocks *receiver_locks, + Eterm message, + unsigned flags) +{ + Uint msize; + ErlHeapFragment* bp = NULL; + Eterm token = NIL; + + BM_STOP_TIMER(system); + BM_MESSAGE(message,sender,receiver); + BM_START_TIMER(send); + + if (SEQ_TRACE_TOKEN(sender) != NIL && !(flags & ERTS_SND_FLG_NO_SEQ_TRACE)) { + Eterm* hp; + + BM_SWAP_TIMER(send,size); + msize = size_object(message); + BM_SWAP_TIMER(size,send); + + seq_trace_update_send(sender); + seq_trace_output(SEQ_TRACE_TOKEN(sender), message, SEQ_TRACE_SEND, + receiver->id, sender); + bp = new_message_buffer(msize + 6 /* TUPLE5 */); + hp = bp->mem; + + BM_SWAP_TIMER(send,copy); + token = copy_struct(SEQ_TRACE_TOKEN(sender), + 6 /* TUPLE5 */, + &hp, + &bp->off_heap); + + message = copy_struct(message, msize, &hp, &bp->off_heap); + BM_MESSAGE_COPIED(msize); + BM_SWAP_TIMER(copy,send); + + erts_queue_message(receiver, + receiver_locks, + bp, + message, + token); + BM_SWAP_TIMER(send,system); +#ifdef HYBRID + } else { + ErlMessage* mp = message_alloc(); + BM_SWAP_TIMER(send,copy); +#ifdef INCREMENTAL + /* TODO: During GC activate processes if the message relies in + * the fromspace and the sender is active. During major + * collections add the message to the gray stack if it relies + * in the old generation and the sender is active and the + * receiver is inactive. + + if (!IS_CONST(message) && (ma_gc_flags & GC_CYCLE) && + (ptr_val(message) >= inc_fromspc && + ptr_val(message) < inc_fromend) && INC_IS_ACTIVE(sender)) + INC_ACTIVATE(receiver); + else if (!IS_CONST(message) && (ma_gc_flags & GC_CYCLE) && + (ptr_val(message) >= global_old_heap && + ptr_val(message) < global_old_hend) && + INC_IS_ACTIVE(sender) && !INC_IS_ACTIVE(receiver)) + Mark message in blackmap and add it to the gray stack + */ + + if (!IS_CONST(message)) + INC_ACTIVATE(receiver); +#endif + LAZY_COPY(sender,message); + BM_SWAP_TIMER(copy,send); + ERL_MESSAGE_TERM(mp) = message; + ERL_MESSAGE_TOKEN(mp) = NIL; + mp->next = NULL; + LINK_MESSAGE(receiver, mp); + ACTIVATE(receiver); + + if (receiver->status == P_WAITING) { + erts_add_to_runq(receiver); + } else if (receiver->status == P_SUSPENDED) { + receiver->rstatus = P_RUNABLE; + } + if (IS_TRACED_FL(receiver, F_TRACE_RECEIVE)) { + trace_receive(receiver, message); + } + + BM_SWAP_TIMER(send,system); + return; +#else + } else if (sender == receiver) { + /* Drop message if receiver has a pending exit ... */ +#ifdef ERTS_SMP + ErtsProcLocks need_locks = (~(*receiver_locks) + & (ERTS_PROC_LOCK_MSGQ + | ERTS_PROC_LOCK_STATUS)); + if (need_locks) { + *receiver_locks |= need_locks; + if (erts_smp_proc_trylock(receiver, need_locks) == EBUSY) { + if (need_locks == ERTS_PROC_LOCK_MSGQ) { + erts_smp_proc_unlock(receiver, ERTS_PROC_LOCK_STATUS); + need_locks = ERTS_PROC_LOCK_MSGQ|ERTS_PROC_LOCK_STATUS; + } + erts_smp_proc_lock(receiver, need_locks); + } + } + if (!ERTS_PROC_PENDING_EXIT(receiver)) +#endif + { + ErlMessage* mp = message_alloc(); + + mp->data.attached = NULL; + ERL_MESSAGE_TERM(mp) = message; + ERL_MESSAGE_TOKEN(mp) = NIL; + mp->next = NULL; + /* + * We move 'in queue' to 'private queue' and place + * message at the end of 'private queue' in order + * to ensure that the 'in queue' doesn't contain + * references into the heap. By ensuring this, + * we don't need to include the 'in queue' in + * the root set when garbage collecting. + */ + + ERTS_SMP_MSGQ_MV_INQ2PRIVQ(receiver); + LINK_MESSAGE_PRIVQ(receiver, mp); + + if (IS_TRACED_FL(receiver, F_TRACE_RECEIVE)) { + trace_receive(receiver, message); + } + } + BM_SWAP_TIMER(send,system); + return; + } else { +#ifdef ERTS_SMP + ErlOffHeap *ohp; + Eterm *hp; + BM_SWAP_TIMER(send,size); + msize = size_object(message); + BM_SWAP_TIMER(size,send); + hp = erts_alloc_message_heap(msize,&bp,&ohp,receiver,receiver_locks); + BM_SWAP_TIMER(send,copy); + message = copy_struct(message, msize, &hp, ohp); + BM_MESSAGE_COPIED(msz); + BM_SWAP_TIMER(copy,send); + erts_queue_message(receiver, receiver_locks, bp, message, token); + BM_SWAP_TIMER(send,system); +#else + ErlMessage* mp = message_alloc(); + Eterm *hp; + BM_SWAP_TIMER(send,size); + msize = size_object(message); + BM_SWAP_TIMER(size,send); + + if (receiver->stop - receiver->htop <= msize) { + BM_SWAP_TIMER(send,system); + erts_garbage_collect(receiver, msize, receiver->arg_reg, receiver->arity); + BM_SWAP_TIMER(system,send); + } + hp = receiver->htop; + receiver->htop = hp + msize; + BM_SWAP_TIMER(send,copy); + message = copy_struct(message, msize, &hp, &receiver->off_heap); + BM_MESSAGE_COPIED(msize); + BM_SWAP_TIMER(copy,send); + ERL_MESSAGE_TERM(mp) = message; + ERL_MESSAGE_TOKEN(mp) = NIL; + mp->next = NULL; + mp->data.attached = NULL; + LINK_MESSAGE(receiver, mp); + + if (receiver->status == P_WAITING) { + erts_add_to_runq(receiver); + } else if (receiver->status == P_SUSPENDED) { + receiver->rstatus = P_RUNABLE; + } + if (IS_TRACED_FL(receiver, F_TRACE_RECEIVE)) { + trace_receive(receiver, message); + } + BM_SWAP_TIMER(send,system); +#endif /* #ifndef ERTS_SMP */ + return; +#endif /* HYBRID */ + } +} + +/* + * This function delivers an EXIT message to a process + * which is trapping EXITs. + */ + +void +erts_deliver_exit_message(Eterm from, Process *to, ErtsProcLocks *to_locksp, + Eterm reason, Eterm token) +{ + Eterm mess; + Eterm save; + Eterm from_copy; + Uint sz_reason; + Uint sz_token; + Uint sz_from; + Eterm* hp; + Eterm temptoken; + ErlHeapFragment* bp = NULL; + + if (token != NIL) { + + ASSERT(is_tuple(token)); + sz_reason = size_object(reason); + sz_token = size_object(token); + sz_from = size_object(from); + bp = new_message_buffer(sz_reason + sz_from + sz_token + 4); + hp = bp->mem; + mess = copy_struct(reason, sz_reason, &hp, &bp->off_heap); + from_copy = copy_struct(from, sz_from, &hp, &bp->off_heap); + save = TUPLE3(hp, am_EXIT, from_copy, mess); + hp += 4; + /* the trace token must in this case be updated by the caller */ + seq_trace_output(token, save, SEQ_TRACE_SEND, to->id, NULL); + temptoken = copy_struct(token, sz_token, &hp, &bp->off_heap); + erts_queue_message(to, to_locksp, bp, save, temptoken); + } else { + ErlOffHeap *ohp; + sz_reason = size_object(reason); + sz_from = IS_CONST(from) ? 0 : size_object(from); + + hp = erts_alloc_message_heap(sz_reason+sz_from+4, + &bp, + &ohp, + to, + to_locksp); + + mess = copy_struct(reason, sz_reason, &hp, ohp); + from_copy = (IS_CONST(from) + ? from + : copy_struct(from, sz_from, &hp, ohp)); + save = TUPLE3(hp, am_EXIT, from_copy, mess); + erts_queue_message(to, to_locksp, bp, save, NIL); + } +} |