/*
* %CopyrightBegin%
*
* Copyright Ericsson AB 1997-2012. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* %CopyrightEnd%
*/
/*
* Message passing primitives.
*/
#ifdef HAVE_CONFIG_H
# include "config.h"
#endif
#include "sys.h"
#include "erl_vm.h"
#include "global.h"
#include "erl_message.h"
#include "erl_process.h"
#include "erl_binary.h"
#include "dtrace-wrapper.h"
ERTS_SCHED_PREF_QUICK_ALLOC_IMPL(message,
ErlMessage,
ERL_MESSAGE_BUF_SZ,
ERTS_ALC_T_MSG_REF)
#if defined(DEBUG) && 0
#define HARD_DEBUG
#else
#undef HARD_DEBUG
#endif
#ifdef DEBUG
static ERTS_INLINE int in_heapfrag(const Eterm* ptr, const ErlHeapFragment *bp)
{
return ((unsigned)(ptr - bp->mem) < bp->used_size);
}
#endif
void
init_message(void)
{
init_message_alloc();
}
void
free_message(ErlMessage* mp)
{
message_free(mp);
}
/* Allocate message buffer (size in words) */
ErlHeapFragment*
new_message_buffer(Uint size)
{
ErlHeapFragment* bp;
bp = (ErlHeapFragment*) ERTS_HEAP_ALLOC(ERTS_ALC_T_HEAP_FRAG,
ERTS_HEAP_FRAG_SIZE(size));
ERTS_INIT_HEAP_FRAG(bp, size);
return bp;
}
ErlHeapFragment*
erts_resize_message_buffer(ErlHeapFragment *bp, Uint size,
Eterm *brefs, Uint brefs_size)
{
#ifdef DEBUG
int i;
#endif
#ifdef HARD_DEBUG
ErlHeapFragment *dbg_bp;
Eterm *dbg_brefs;
Uint dbg_size;
Uint dbg_tot_size;
Eterm *dbg_hp;
#endif
ErlHeapFragment* nbp;
#ifdef DEBUG
{
Uint off_sz = size < bp->used_size ? size : bp->used_size;
for (i = 0; i < brefs_size; i++) {
Eterm *ptr;
if (is_immed(brefs[i]))
continue;
ptr = ptr_val(brefs[i]);
ASSERT(&bp->mem[0] <= ptr && ptr < &bp->mem[0] + off_sz);
}
}
#endif
if (size >= (bp->used_size - bp->used_size / 16)) {
bp->used_size = size;
return bp;
}
#ifdef HARD_DEBUG
dbg_brefs = erts_alloc(ERTS_ALC_T_UNDEF, sizeof(Eterm *)*brefs_size);
dbg_bp = new_message_buffer(bp->used_size);
dbg_hp = dbg_bp->mem;
dbg_tot_size = 0;
for (i = 0; i < brefs_size; i++) {
dbg_size = size_object(brefs[i]);
dbg_tot_size += dbg_size;
dbg_brefs[i] = copy_struct(brefs[i], dbg_size, &dbg_hp,
&dbg_bp->off_heap);
}
ASSERT(dbg_tot_size == (size < bp->used_size ? size : bp->used_size));
#endif
nbp = (ErlHeapFragment*) ERTS_HEAP_REALLOC(ERTS_ALC_T_HEAP_FRAG,
(void *) bp,
ERTS_HEAP_FRAG_SIZE(bp->alloc_size),
ERTS_HEAP_FRAG_SIZE(size));
if (bp != nbp) {
Uint off_sz = size < nbp->used_size ? size : nbp->used_size;
Eterm *sp = &bp->mem[0];
Eterm *ep = sp + off_sz;
Sint offs = &nbp->mem[0] - sp;
erts_offset_off_heap(&nbp->off_heap, offs, sp, ep);
erts_offset_heap(&nbp->mem[0], off_sz, offs, sp, ep);
if (brefs && brefs_size)
erts_offset_heap_ptr(brefs, brefs_size, offs, sp, ep);
#ifdef DEBUG
for (i = 0; i < brefs_size; i++) {
Eterm *ptr;
if (is_immed(brefs[i]))
continue;
ptr = ptr_val(brefs[i]);
ASSERT(&nbp->mem[0] <= ptr && ptr < &nbp->mem[0] + off_sz);
}
#endif
}
nbp->alloc_size = size;
nbp->used_size = size;
#ifdef HARD_DEBUG
for (i = 0; i < brefs_size; i++)
ASSERT(eq(dbg_brefs[i], brefs[i]));
free_message_buffer(dbg_bp);
erts_free(ERTS_ALC_T_UNDEF, dbg_brefs);
#endif
return nbp;
}
void
erts_cleanup_offheap(ErlOffHeap *offheap)
{
union erl_off_heap_ptr u;
for (u.hdr = offheap->first; u.hdr; u.hdr = u.hdr->next) {
switch (thing_subtag(u.hdr->thing_word)) {
case REFC_BINARY_SUBTAG:
if (erts_refc_dectest(&u.pb->val->refc, 0) == 0) {
erts_bin_free(u.pb->val);
}
break;
case FUN_SUBTAG:
if (erts_refc_dectest(&u.fun->fe->refc, 0) == 0) {
erts_erase_fun_entry(u.fun->fe);
}
break;
default:
ASSERT(is_external_header(u.hdr->thing_word));
erts_deref_node_entry(u.ext->node);
break;
}
}
}
void
free_message_buffer(ErlHeapFragment* bp)
{
ASSERT(bp != NULL);
do {
ErlHeapFragment* next_bp = bp->next;
erts_cleanup_offheap(&bp->off_heap);
ERTS_HEAP_FREE(ERTS_ALC_T_HEAP_FRAG, (void *) bp,
ERTS_HEAP_FRAG_SIZE(bp->size));
bp = next_bp;
}while (bp != NULL);
}
static ERTS_INLINE void
link_mbuf_to_proc(Process *proc, ErlHeapFragment *bp)
{
if (bp) {
/* Link the message buffer */
bp->next = MBUF(proc);
MBUF(proc) = bp;
MBUF_SIZE(proc) += bp->used_size;
FLAGS(proc) |= F_FORCE_GC;
/* Move any off_heap's into the process */
if (bp->off_heap.first != NULL) {
struct erl_off_heap_header** next_p = &bp->off_heap.first;
while (*next_p != NULL) {
next_p = &((*next_p)->next);
}
*next_p = MSO(proc).first;
MSO(proc).first = bp->off_heap.first;
bp->off_heap.first = NULL;
OH_OVERHEAD(&(MSO(proc)), bp->off_heap.overhead);
}
}
}
Eterm
erts_msg_distext2heap(Process *pp,
ErtsProcLocks *plcksp,
ErlHeapFragment **bpp,
Eterm *tokenp,
ErtsDistExternal *dist_extp)
{
Eterm msg;
Uint tok_sz = 0;
Eterm *hp = NULL;
ErtsHeapFactory factory;
Sint sz;
*bpp = NULL;
sz = erts_decode_dist_ext_size(dist_extp);
if (sz < 0)
goto decode_error;
if (is_not_nil(*tokenp)) {
ErlHeapFragment *heap_frag = erts_dist_ext_trailer(dist_extp);
tok_sz = heap_frag->used_size;
sz += tok_sz;
}
if (pp) {
ErlOffHeap *ohp;
hp = erts_alloc_message_heap(sz, bpp, &ohp, pp, plcksp);
}
else {
*bpp = new_message_buffer(sz);
hp = (*bpp)->mem;
}
erts_factory_message_init(&factory, pp, hp, *bpp);
msg = erts_decode_dist_ext(&factory, dist_extp);
if (is_non_value(msg))
goto decode_error;
if (is_not_nil(*tokenp)) {
ErlHeapFragment *heap_frag = erts_dist_ext_trailer(dist_extp);
hp = erts_produce_heap(&factory, tok_sz, 0);
*tokenp = copy_struct(*tokenp, tok_sz, &hp, factory.off_heap);
erts_cleanup_offheap(&heap_frag->off_heap);
}
erts_free_dist_ext_copy(dist_extp);
erts_factory_close(&factory);
return msg;
decode_error:
if (is_not_nil(*tokenp)) {
ErlHeapFragment *heap_frag = erts_dist_ext_trailer(dist_extp);
erts_cleanup_offheap(&heap_frag->off_heap);
}
erts_free_dist_ext_copy(dist_extp);
*bpp = NULL;
return THE_NON_VALUE;
}
void
erts_queue_dist_message(Process *rcvr,
ErtsProcLocks *rcvr_locks,
ErtsDistExternal *dist_ext,
Eterm token)
{
ErlMessage* mp;
#ifdef USE_VM_PROBES
Sint tok_label = 0;
Sint tok_lastcnt = 0;
Sint tok_serial = 0;
#endif
#ifdef ERTS_SMP
erts_aint_t state;
#endif
ERTS_SMP_LC_ASSERT(*rcvr_locks == erts_proc_lc_my_proc_locks(rcvr));
mp = message_alloc();
#ifdef ERTS_SMP
if (!(*rcvr_locks & ERTS_PROC_LOCK_MSGQ)) {
if (erts_smp_proc_trylock(rcvr, ERTS_PROC_LOCK_MSGQ) == EBUSY) {
ErtsProcLocks need_locks = ERTS_PROC_LOCK_MSGQ;
if (*rcvr_locks & ERTS_PROC_LOCK_STATUS) {
erts_smp_proc_unlock(rcvr, ERTS_PROC_LOCK_STATUS);
need_locks |= ERTS_PROC_LOCK_STATUS;
}
erts_smp_proc_lock(rcvr, need_locks);
}
}
state = erts_smp_atomic32_read_acqb(&rcvr->state);
if (state & (ERTS_PSFLG_PENDING_EXIT|ERTS_PSFLG_EXITING)) {
if (!(*rcvr_locks & ERTS_PROC_LOCK_MSGQ))
erts_smp_proc_unlock(rcvr, ERTS_PROC_LOCK_MSGQ);
/* Drop message if receiver is exiting or has a pending exit ... */
if (is_not_nil(token)) {
ErlHeapFragment *heap_frag;
heap_frag = erts_dist_ext_trailer(mp->data.dist_ext);
erts_cleanup_offheap(&heap_frag->off_heap);
}
erts_free_dist_ext_copy(dist_ext);
message_free(mp);
}
else
#endif
if (IS_TRACED_FL(rcvr, F_TRACE_RECEIVE)) {
/* Ahh... need to decode it in order to trace it... */
ErlHeapFragment *mbuf;
Eterm msg;
if (!(*rcvr_locks & ERTS_PROC_LOCK_MSGQ))
erts_smp_proc_unlock(rcvr, ERTS_PROC_LOCK_MSGQ);
message_free(mp);
msg = erts_msg_distext2heap(rcvr, rcvr_locks, &mbuf, &token, dist_ext);
if (is_value(msg))
#ifdef USE_VM_PROBES
if (DTRACE_ENABLED(message_queued)) {
DTRACE_CHARBUF(receiver_name, DTRACE_TERM_BUF_SIZE);
dtrace_proc_str(rcvr, receiver_name);
if (token != NIL && token != am_have_dt_utag) {
tok_label = signed_val(SEQ_TRACE_T_LABEL(token));
tok_lastcnt = signed_val(SEQ_TRACE_T_LASTCNT(token));
tok_serial = signed_val(SEQ_TRACE_T_SERIAL(token));
}
DTRACE6(message_queued,
receiver_name, size_object(msg), rcvr->msg.len,
tok_label, tok_lastcnt, tok_serial);
}
#endif
erts_queue_message(rcvr, rcvr_locks, mbuf, msg, token);
}
else {
/* Enqueue message on external format */
ERL_MESSAGE_TERM(mp) = THE_NON_VALUE;
#ifdef USE_VM_PROBES
ERL_MESSAGE_DT_UTAG(mp) = NIL;
if (token == am_have_dt_utag) {
ERL_MESSAGE_TOKEN(mp) = NIL;
} else {
#endif
ERL_MESSAGE_TOKEN(mp) = token;
#ifdef USE_VM_PROBES
}
#endif
mp->next = NULL;
#ifdef USE_VM_PROBES
if (DTRACE_ENABLED(message_queued)) {
DTRACE_CHARBUF(receiver_name, DTRACE_TERM_BUF_SIZE);
dtrace_proc_str(rcvr, receiver_name);
if (token != NIL && token != am_have_dt_utag) {
tok_label = signed_val(SEQ_TRACE_T_LABEL(token));
tok_lastcnt = signed_val(SEQ_TRACE_T_LASTCNT(token));
tok_serial = signed_val(SEQ_TRACE_T_SERIAL(token));
}
/*
* TODO: We don't know the real size of the external message here.
* -1 will appear to a D script as 4294967295.
*/
DTRACE6(message_queued, receiver_name, -1, rcvr->msg.len + 1,
tok_label, tok_lastcnt, tok_serial);
}
#endif
mp->data.dist_ext = dist_ext;
LINK_MESSAGE(rcvr, mp);
if (!(*rcvr_locks & ERTS_PROC_LOCK_MSGQ))
erts_smp_proc_unlock(rcvr, ERTS_PROC_LOCK_MSGQ);
erts_proc_notify_new_message(rcvr,
#ifdef ERTS_SMP
*rcvr_locks
#else
0
#endif
);
}
}
/* Add a message last in message queue */
static Sint
queue_message(Process *c_p,
Process* receiver,
ErtsProcLocks *receiver_locks,
erts_aint32_t *receiver_state,
ErlHeapFragment* bp,
Eterm message,
Eterm seq_trace_token
#ifdef USE_VM_PROBES
, Eterm dt_utag
#endif
)
{
Sint res;
ErlMessage* mp;
int locked_msgq = 0;
erts_aint_t state;
#ifndef ERTS_SMP
ASSERT(bp != NULL || receiver->mbuf == NULL);
#endif
ERTS_SMP_LC_ASSERT(*receiver_locks == erts_proc_lc_my_proc_locks(receiver));
mp = message_alloc();
if (receiver_state)
state = *receiver_state;
else
state = erts_smp_atomic32_read_acqb(&receiver->state);
#ifdef ERTS_SMP
if (state & (ERTS_PSFLG_EXITING|ERTS_PSFLG_PENDING_EXIT))
goto exiting;
if (!(*receiver_locks & ERTS_PROC_LOCK_MSGQ)) {
if (erts_smp_proc_trylock(receiver, ERTS_PROC_LOCK_MSGQ) == EBUSY) {
ErtsProcLocks need_locks = ERTS_PROC_LOCK_MSGQ;
if (*receiver_locks & ERTS_PROC_LOCK_STATUS) {
erts_smp_proc_unlock(receiver, ERTS_PROC_LOCK_STATUS);
need_locks |= ERTS_PROC_LOCK_STATUS;
}
erts_smp_proc_lock(receiver, need_locks);
}
locked_msgq = 1;
state = erts_smp_atomic32_read_nob(&receiver->state);
if (receiver_state)
*receiver_state = state;
}
#endif
if (state & (ERTS_PSFLG_PENDING_EXIT|ERTS_PSFLG_EXITING)) {
#ifdef ERTS_SMP
exiting:
#endif
/* Drop message if receiver is exiting or has a pending exit... */
if (locked_msgq)
erts_smp_proc_unlock(receiver, ERTS_PROC_LOCK_MSGQ);
if (bp)
free_message_buffer(bp);
message_free(mp);
return 0;
}
ERL_MESSAGE_TERM(mp) = message;
ERL_MESSAGE_TOKEN(mp) = seq_trace_token;
#ifdef USE_VM_PROBES
ERL_MESSAGE_DT_UTAG(mp) = dt_utag;
#endif
mp->next = NULL;
mp->data.heap_frag = bp;
#ifndef ERTS_SMP
res = receiver->msg.len;
#else
res = receiver->msg_inq.len;
if (*receiver_locks & ERTS_PROC_LOCK_MAIN) {
/*
* We move 'in queue' to 'private queue' and place
* message at the end of 'private queue' in order
* to ensure that the 'in queue' doesn't contain
* references into the heap. By ensuring this,
* we don't need to include the 'in queue' in
* the root set when garbage collecting.
*/
res += receiver->msg.len;
ERTS_SMP_MSGQ_MV_INQ2PRIVQ(receiver);
LINK_MESSAGE_PRIVQ(receiver, mp);
}
else
#endif
{
LINK_MESSAGE(receiver, mp);
}
#ifdef USE_VM_PROBES
if (DTRACE_ENABLED(message_queued)) {
DTRACE_CHARBUF(receiver_name, DTRACE_TERM_BUF_SIZE);
Sint tok_label = 0;
Sint tok_lastcnt = 0;
Sint tok_serial = 0;
dtrace_proc_str(receiver, receiver_name);
if (seq_trace_token != NIL && is_tuple(seq_trace_token)) {
tok_label = signed_val(SEQ_TRACE_T_LABEL(seq_trace_token));
tok_lastcnt = signed_val(SEQ_TRACE_T_LASTCNT(seq_trace_token));
tok_serial = signed_val(SEQ_TRACE_T_SERIAL(seq_trace_token));
}
DTRACE6(message_queued,
receiver_name, size_object(message), receiver->msg.len,
tok_label, tok_lastcnt, tok_serial);
}
#endif
if (IS_TRACED_FL(receiver, F_TRACE_RECEIVE))
trace_receive(receiver, message);
if (locked_msgq)
erts_smp_proc_unlock(receiver, ERTS_PROC_LOCK_MSGQ);
erts_proc_notify_new_message(receiver,
#ifdef ERTS_SMP
*receiver_locks
#else
0
#endif
);
#ifndef ERTS_SMP
ERTS_HOLE_CHECK(receiver);
#endif
return res;
}
void
#ifdef USE_VM_PROBES
erts_queue_message_probe(Process* receiver, ErtsProcLocks *receiver_locks,
ErlHeapFragment* bp,
Eterm message, Eterm seq_trace_token, Eterm dt_utag)
#else
erts_queue_message(Process* receiver, ErtsProcLocks *receiver_locks,
ErlHeapFragment* bp,
Eterm message, Eterm seq_trace_token)
#endif
{
queue_message(NULL,
receiver,
receiver_locks,
NULL,
bp,
message,
seq_trace_token
#ifdef USE_VM_PROBES
, dt_utag
#endif
);
}
void
erts_link_mbuf_to_proc(struct process *proc, ErlHeapFragment *bp)
{
Eterm* htop = HEAP_TOP(proc);
link_mbuf_to_proc(proc, bp);
if (htop < HEAP_LIMIT(proc)) {
*htop = make_pos_bignum_header(HEAP_LIMIT(proc)-htop-1);
HEAP_TOP(proc) = HEAP_LIMIT(proc);
}
}
/*
* Moves content of message buffer attached to a message into a heap.
* The message buffer is deallocated.
*/
void
erts_move_msg_mbuf_to_heap(Eterm** hpp, ErlOffHeap* off_heap, ErlMessage *msg)
{
struct erl_off_heap_header* oh;
Eterm term, token, *fhp, *hp;
Sint offs;
Uint sz;
ErlHeapFragment *bp;
#ifdef USE_VM_PROBES
Eterm utag;
#endif
#ifdef HARD_DEBUG
struct erl_off_heap_header* dbg_oh_start = off_heap->first;
Eterm dbg_term, dbg_token;
ErlHeapFragment *dbg_bp;
Uint *dbg_hp, *dbg_thp_start;
Uint dbg_term_sz, dbg_token_sz;
#ifdef USE_VM_PROBES
Eterm dbg_utag;
Uint dbg_utag_sz;
#endif
#endif
bp = msg->data.heap_frag;
term = ERL_MESSAGE_TERM(msg);
token = ERL_MESSAGE_TOKEN(msg);
#ifdef USE_VM_PROBES
utag = ERL_MESSAGE_DT_UTAG(msg);
#endif
if (!bp) {
#ifdef USE_VM_PROBES
ASSERT(is_immed(term) && is_immed(token) && is_immed(utag));
#else
ASSERT(is_immed(term) && is_immed(token));
#endif
return;
}
#ifdef HARD_DEBUG
dbg_term_sz = size_object(term);
dbg_token_sz = size_object(token);
dbg_bp = new_message_buffer(dbg_term_sz + dbg_token_sz);
#ifdef USE_VM_PROBES
dbg_utag_sz = size_object(utag);
dbg_bp = new_message_buffer(dbg_term_sz + dbg_token_sz + dbg_utag_sz );
#endif
/*ASSERT(dbg_term_sz + dbg_token_sz == erts_msg_used_frag_sz(msg));
Copied size may be smaller due to removed SubBins's or garbage.
Copied size may be larger due to duplicated shared terms.
*/
dbg_hp = dbg_bp->mem;
dbg_term = copy_struct(term, dbg_term_sz, &dbg_hp, &dbg_bp->off_heap);
dbg_token = copy_struct(token, dbg_token_sz, &dbg_hp, &dbg_bp->off_heap);
#ifdef USE_VM_PROBES
dbg_utag = copy_struct(utag, dbg_utag_sz, &dbg_hp, &dbg_bp->off_heap);
#endif
dbg_thp_start = *hpp;
#endif
if (bp->next != NULL) {
move_multi_frags(hpp, off_heap, bp, msg->m,
#ifdef USE_VM_PROBES
3
#else
2
#endif
);
goto copy_done;
}
OH_OVERHEAD(off_heap, bp->off_heap.overhead);
sz = bp->used_size;
ASSERT(is_immed(term) || in_heapfrag(ptr_val(term),bp));
ASSERT(is_immed(token) || in_heapfrag(ptr_val(token),bp));
fhp = bp->mem;
hp = *hpp;
offs = hp - fhp;
oh = NULL;
while (sz--) {
Uint cpy_sz;
Eterm val = *fhp++;
switch (primary_tag(val)) {
case TAG_PRIMARY_IMMED1:
*hp++ = val;
break;
case TAG_PRIMARY_LIST:
case TAG_PRIMARY_BOXED:
ASSERT(in_heapfrag(ptr_val(val), bp));
*hp++ = offset_ptr(val, offs);
break;
case TAG_PRIMARY_HEADER:
*hp++ = val;
switch (val & _HEADER_SUBTAG_MASK) {
case ARITYVAL_SUBTAG:
break;
case REFC_BINARY_SUBTAG:
case FUN_SUBTAG:
case EXTERNAL_PID_SUBTAG:
case EXTERNAL_PORT_SUBTAG:
case EXTERNAL_REF_SUBTAG:
oh = (struct erl_off_heap_header*) (hp-1);
cpy_sz = thing_arityval(val);
goto cpy_words;
default:
cpy_sz = header_arity(val);
cpy_words:
ASSERT(sz >= cpy_sz);
sz -= cpy_sz;
while (cpy_sz >= 8) {
cpy_sz -= 8;
*hp++ = *fhp++;
*hp++ = *fhp++;
*hp++ = *fhp++;
*hp++ = *fhp++;
*hp++ = *fhp++;
*hp++ = *fhp++;
*hp++ = *fhp++;
*hp++ = *fhp++;
}
switch (cpy_sz) {
case 7: *hp++ = *fhp++;
case 6: *hp++ = *fhp++;
case 5: *hp++ = *fhp++;
case 4: *hp++ = *fhp++;
case 3: *hp++ = *fhp++;
case 2: *hp++ = *fhp++;
case 1: *hp++ = *fhp++;
default: break;
}
if (oh) {
/* Add to offheap list */
oh->next = off_heap->first;
off_heap->first = oh;
ASSERT(*hpp <= (Eterm*)oh);
ASSERT(hp > (Eterm*)oh);
oh = NULL;
}
break;
}
break;
}
}
ASSERT(bp->used_size == hp - *hpp);
*hpp = hp;
if (is_not_immed(token)) {
ASSERT(in_heapfrag(ptr_val(token), bp));
ERL_MESSAGE_TOKEN(msg) = offset_ptr(token, offs);
#ifdef HARD_DEBUG
ASSERT(dbg_thp_start <= ptr_val(ERL_MESSAGE_TOKEN(msg)));
ASSERT(hp > ptr_val(ERL_MESSAGE_TOKEN(msg)));
#endif
}
if (is_not_immed(term)) {
ASSERT(in_heapfrag(ptr_val(term),bp));
ERL_MESSAGE_TERM(msg) = offset_ptr(term, offs);
#ifdef HARD_DEBUG
ASSERT(dbg_thp_start <= ptr_val(ERL_MESSAGE_TERM(msg)));
ASSERT(hp > ptr_val(ERL_MESSAGE_TERM(msg)));
#endif
}
#ifdef USE_VM_PROBES
if (is_not_immed(utag)) {
ASSERT(in_heapfrag(ptr_val(utag), bp));
ERL_MESSAGE_DT_UTAG(msg) = offset_ptr(utag, offs);
#ifdef HARD_DEBUG
ASSERT(dbg_thp_start <= ptr_val(ERL_MESSAGE_DT_UTAG(msg)));
ASSERT(hp > ptr_val(ERL_MESSAGE_DT_UTAG(msg)));
#endif
}
#endif
copy_done:
#ifdef HARD_DEBUG
{
int i, j;
ErlHeapFragment* frag;
{
struct erl_off_heap_header* dbg_oh = off_heap->first;
i = j = 0;
while (dbg_oh != dbg_oh_start) {
dbg_oh = dbg_oh->next;
i++;
}
for (frag=bp; frag; frag=frag->next) {
dbg_oh = frag->off_heap.first;
while (dbg_oh) {
dbg_oh = dbg_oh->next;
j++;
}
}
ASSERT(i == j);
}
}
#endif
bp->off_heap.first = NULL;
free_message_buffer(bp);
msg->data.heap_frag = NULL;
#ifdef HARD_DEBUG
ASSERT(eq(ERL_MESSAGE_TERM(msg), dbg_term));
ASSERT(eq(ERL_MESSAGE_TOKEN(msg), dbg_token));
#ifdef USE_VM_PROBES
ASSERT(eq(ERL_MESSAGE_DT_UTAG(msg), dbg_utag));
#endif
free_message_buffer(dbg_bp);
#endif
}
Uint
erts_msg_attached_data_size_aux(ErlMessage *msg)
{
Sint sz;
ASSERT(is_non_value(ERL_MESSAGE_TERM(msg)));
ASSERT(msg->data.dist_ext);
ASSERT(msg->data.dist_ext->heap_size < 0);
sz = erts_decode_dist_ext_size(msg->data.dist_ext);
if (sz < 0) {
/* Bad external; remove it */
if (is_not_nil(ERL_MESSAGE_TOKEN(msg))) {
ErlHeapFragment *heap_frag;
heap_frag = erts_dist_ext_trailer(msg->data.dist_ext);
erts_cleanup_offheap(&heap_frag->off_heap);
}
erts_free_dist_ext_copy(msg->data.dist_ext);
msg->data.dist_ext = NULL;
return 0;
}
msg->data.dist_ext->heap_size = sz;
if (is_not_nil(msg->m[1])) {
ErlHeapFragment *heap_frag;
heap_frag = erts_dist_ext_trailer(msg->data.dist_ext);
sz += heap_frag->used_size;
}
return sz;
}
void
erts_move_msg_attached_data_to_heap(ErtsHeapFactory* factory,
ErlMessage *msg)
{
if (is_value(ERL_MESSAGE_TERM(msg)))
erts_move_msg_mbuf_to_heap(&factory->hp, factory->off_heap, msg);
else if (msg->data.dist_ext) {
ASSERT(msg->data.dist_ext->heap_size >= 0);
if (is_not_nil(ERL_MESSAGE_TOKEN(msg))) {
ErlHeapFragment *heap_frag;
heap_frag = erts_dist_ext_trailer(msg->data.dist_ext);
ERL_MESSAGE_TOKEN(msg) = copy_struct(ERL_MESSAGE_TOKEN(msg),
heap_frag->used_size,
&factory->hp,
factory->off_heap);
erts_cleanup_offheap(&heap_frag->off_heap);
}
ERL_MESSAGE_TERM(msg) = erts_decode_dist_ext(factory,
msg->data.dist_ext);
erts_free_dist_ext_copy(msg->data.dist_ext);
msg->data.dist_ext = NULL;
}
/* else: bad external detected when calculating size */
}
/*
* Send a local message when sender & receiver processes are known.
*/
Sint
erts_send_message(Process* sender,
Process* receiver,
ErtsProcLocks *receiver_locks,
Eterm message,
unsigned flags)
{
Uint msize;
ErlHeapFragment* bp = NULL;
Eterm token = NIL;
Sint res = 0;
#ifdef USE_VM_PROBES
DTRACE_CHARBUF(sender_name, 64);
DTRACE_CHARBUF(receiver_name, 64);
Sint tok_label = 0;
Sint tok_lastcnt = 0;
Sint tok_serial = 0;
#endif
BM_STOP_TIMER(system);
BM_MESSAGE(message,sender,receiver);
BM_START_TIMER(send);
#ifdef USE_VM_PROBES
*sender_name = *receiver_name = '\0';
if (DTRACE_ENABLED(message_send)) {
erts_snprintf(sender_name, sizeof(DTRACE_CHARBUF_NAME(sender_name)),
"%T", sender->common.id);
erts_snprintf(receiver_name, sizeof(DTRACE_CHARBUF_NAME(receiver_name)),
"%T", receiver->common.id);
}
#endif
if (SEQ_TRACE_TOKEN(sender) != NIL && !(flags & ERTS_SND_FLG_NO_SEQ_TRACE)) {
Eterm* hp;
Eterm stoken = SEQ_TRACE_TOKEN(sender);
Uint seq_trace_size = 0;
#ifdef USE_VM_PROBES
Uint dt_utag_size = 0;
Eterm utag = NIL;
#endif
BM_SWAP_TIMER(send,size);
msize = size_object(message);
BM_SWAP_TIMER(size,send);
#ifdef USE_VM_PROBES
if (stoken != am_have_dt_utag) {
#endif
seq_trace_update_send(sender);
seq_trace_output(stoken, message, SEQ_TRACE_SEND,
receiver->common.id, sender);
seq_trace_size = 6; /* TUPLE5 */
#ifdef USE_VM_PROBES
}
if (DT_UTAG_FLAGS(sender) & DT_UTAG_SPREADING) {
dt_utag_size = size_object(DT_UTAG(sender));
} else if (stoken == am_have_dt_utag ) {
stoken = NIL;
}
#endif
bp = new_message_buffer(msize + seq_trace_size
#ifdef USE_VM_PROBES
+ dt_utag_size
#endif
);
hp = bp->mem;
BM_SWAP_TIMER(send,copy);
token = copy_struct(stoken,
seq_trace_size,
&hp,
&bp->off_heap);
message = copy_struct(message, msize, &hp, &bp->off_heap);
#ifdef USE_VM_PROBES
if (DT_UTAG_FLAGS(sender) & DT_UTAG_SPREADING) {
utag = copy_struct(DT_UTAG(sender), dt_utag_size, &hp, &bp->off_heap);
#ifdef DTRACE_TAG_HARDDEBUG
erts_fprintf(stderr,
"Dtrace -> (%T) Spreading tag (%T) with "
"message %T!\r\n",sender->common.id, utag, message);
#endif
}
#endif
BM_MESSAGE_COPIED(msize);
BM_SWAP_TIMER(copy,send);
#ifdef USE_VM_PROBES
if (DTRACE_ENABLED(message_send)) {
if (stoken != NIL && stoken != am_have_dt_utag) {
tok_label = signed_val(SEQ_TRACE_T_LABEL(stoken));
tok_lastcnt = signed_val(SEQ_TRACE_T_LASTCNT(stoken));
tok_serial = signed_val(SEQ_TRACE_T_SERIAL(stoken));
}
DTRACE6(message_send, sender_name, receiver_name,
msize, tok_label, tok_lastcnt, tok_serial);
}
#endif
res = queue_message(NULL,
receiver,
receiver_locks,
NULL,
bp,
message,
token
#ifdef USE_VM_PROBES
, utag
#endif
);
BM_SWAP_TIMER(send,system);
} else if (sender == receiver) {
/* Drop message if receiver has a pending exit ... */
#ifdef ERTS_SMP
ErtsProcLocks need_locks = (~(*receiver_locks)
& (ERTS_PROC_LOCK_MSGQ
| ERTS_PROC_LOCK_STATUS));
if (need_locks) {
*receiver_locks |= need_locks;
if (erts_smp_proc_trylock(receiver, need_locks) == EBUSY) {
if (need_locks == ERTS_PROC_LOCK_MSGQ) {
erts_smp_proc_unlock(receiver, ERTS_PROC_LOCK_STATUS);
need_locks = ERTS_PROC_LOCK_MSGQ|ERTS_PROC_LOCK_STATUS;
}
erts_smp_proc_lock(receiver, need_locks);
}
}
if (!ERTS_PROC_PENDING_EXIT(receiver))
#endif
{
ErlMessage* mp = message_alloc();
DTRACE6(message_send, sender_name, receiver_name,
size_object(message), tok_label, tok_lastcnt, tok_serial);
mp->data.attached = NULL;
ERL_MESSAGE_TERM(mp) = message;
ERL_MESSAGE_TOKEN(mp) = NIL;
#ifdef USE_VM_PROBES
ERL_MESSAGE_DT_UTAG(mp) = NIL;
#endif
mp->next = NULL;
/*
* We move 'in queue' to 'private queue' and place
* message at the end of 'private queue' in order
* to ensure that the 'in queue' doesn't contain
* references into the heap. By ensuring this,
* we don't need to include the 'in queue' in
* the root set when garbage collecting.
*/
ERTS_SMP_MSGQ_MV_INQ2PRIVQ(receiver);
LINK_MESSAGE_PRIVQ(receiver, mp);
res = receiver->msg.len;
if (IS_TRACED_FL(receiver, F_TRACE_RECEIVE)) {
trace_receive(receiver, message);
}
}
BM_SWAP_TIMER(send,system);
} else {
ErlOffHeap *ohp;
Eterm *hp;
erts_aint32_t state;
BM_SWAP_TIMER(send,size);
msize = size_object(message);
BM_SWAP_TIMER(size,send);
hp = erts_alloc_message_heap_state(msize,
&bp,
&ohp,
receiver,
receiver_locks,
&state);
BM_SWAP_TIMER(send,copy);
message = copy_struct(message, msize, &hp, ohp);
BM_MESSAGE_COPIED(msz);
BM_SWAP_TIMER(copy,send);
DTRACE6(message_send, sender_name, receiver_name,
msize, tok_label, tok_lastcnt, tok_serial);
res = queue_message(sender,
receiver,
receiver_locks,
&state,
bp,
message,
token
#ifdef USE_VM_PROBES
, NIL
#endif
);
BM_SWAP_TIMER(send,system);
}
return res;
}
/*
* This function delivers an EXIT message to a process
* which is trapping EXITs.
*/
void
erts_deliver_exit_message(Eterm from, Process *to, ErtsProcLocks *to_locksp,
Eterm reason, Eterm token)
{
Eterm mess;
Eterm save;
Eterm from_copy;
Uint sz_reason;
Uint sz_token;
Uint sz_from;
Eterm* hp;
Eterm temptoken;
ErlHeapFragment* bp = NULL;
if (token != NIL
#ifdef USE_VM_PROBES
&& token != am_have_dt_utag
#endif
) {
ASSERT(is_tuple(token));
sz_reason = size_object(reason);
sz_token = size_object(token);
sz_from = size_object(from);
bp = new_message_buffer(sz_reason + sz_from + sz_token + 4);
hp = bp->mem;
mess = copy_struct(reason, sz_reason, &hp, &bp->off_heap);
from_copy = copy_struct(from, sz_from, &hp, &bp->off_heap);
save = TUPLE3(hp, am_EXIT, from_copy, mess);
hp += 4;
/* the trace token must in this case be updated by the caller */
seq_trace_output(token, save, SEQ_TRACE_SEND, to->common.id, NULL);
temptoken = copy_struct(token, sz_token, &hp, &bp->off_heap);
erts_queue_message(to, to_locksp, bp, save, temptoken);
} else {
ErlOffHeap *ohp;
sz_reason = size_object(reason);
sz_from = IS_CONST(from) ? 0 : size_object(from);
hp = erts_alloc_message_heap(sz_reason+sz_from+4,
&bp,
&ohp,
to,
to_locksp);
mess = copy_struct(reason, sz_reason, &hp, ohp);
from_copy = (IS_CONST(from)
? from
: copy_struct(from, sz_from, &hp, ohp));
save = TUPLE3(hp, am_EXIT, from_copy, mess);
erts_queue_message(to, to_locksp, bp, save, NIL);
}
}
void erts_factory_proc_init(ErtsHeapFactory* factory,
Process* p)
{
erts_factory_proc_prealloc_init(factory, p, HEAP_LIMIT(p) - HEAP_TOP(p));
}
void erts_factory_proc_prealloc_init(ErtsHeapFactory* factory,
Process* p,
Sint size)
{
factory->mode = FACTORY_HALLOC;
factory->p = p;
factory->hp_start = HAlloc(p, size);
factory->hp = factory->hp_start;
factory->hp_end = factory->hp_start + size;
factory->off_heap = &p->off_heap;
factory->off_heap_saved.first = p->off_heap.first;
factory->off_heap_saved.overhead = p->off_heap.overhead;
factory->heap_frags_saved = p->mbuf;
factory->heap_frags = NULL; /* not used */
factory->alloc_type = 0; /* not used */
}
void erts_factory_message_init(ErtsHeapFactory* factory,
Process* rp,
Eterm* hp,
ErlHeapFragment* bp)
{
if (bp) {
factory->mode = FACTORY_HEAP_FRAGS;
factory->p = NULL;
factory->hp_start = bp->mem;
factory->hp = hp ? hp : bp->mem;
factory->hp_end = bp->mem + bp->alloc_size;
factory->off_heap = &bp->off_heap;
factory->heap_frags = bp;
factory->heap_frags_saved = bp;
factory->alloc_type = ERTS_ALC_T_HEAP_FRAG;
ASSERT(!bp->next);
}
else {
factory->mode = FACTORY_HALLOC;
factory->p = rp;
factory->hp_start = hp;
factory->hp = hp;
factory->hp_end = HEAP_TOP(rp);
factory->off_heap = &rp->off_heap;
factory->heap_frags_saved = rp->mbuf;
factory->heap_frags = NULL; /* not used */
factory->alloc_type = 0; /* not used */
}
factory->off_heap_saved.first = factory->off_heap->first;
factory->off_heap_saved.overhead = factory->off_heap->overhead;
ASSERT(factory->hp >= factory->hp_start && factory->hp <= factory->hp_end);
}
/* One static sized heap that must suffice.
No extra heap fragments will be allocated.
*/
void erts_factory_static_init(ErtsHeapFactory* factory,
Eterm* hp,
Uint size,
ErlOffHeap* off_heap)
{
factory->mode = FACTORY_STATIC;
factory->hp_start = hp;
factory->hp = hp;
factory->hp_end = hp + size;
factory->off_heap = off_heap;
factory->off_heap_saved.first = factory->off_heap->first;
factory->off_heap_saved.overhead = factory->off_heap->overhead;
}
/* A temporary heap with default buffer allocated/freed by client.
* factory_close is same as factory_undo
*/
void erts_factory_tmp_init(ErtsHeapFactory* factory, Eterm* hp, Uint size,
Uint32 atype)
{
factory->mode = FACTORY_TMP;
factory->hp_start = hp;
factory->hp = hp;
factory->hp_end = hp + size;
factory->heap_frags = NULL;
factory->off_heap_saved.first = NULL;
factory->off_heap_saved.overhead = 0;
factory->off_heap = &factory->off_heap_saved;
factory->alloc_type = atype;
}
/* When we know the term is an immediate and need no heap.
*/
void erts_factory_dummy_init(ErtsHeapFactory* factory)
{
factory->mode = FACTORY_CLOSED;
}
static void reserve_heap(ErtsHeapFactory*, Uint need, Uint xtra);
Eterm* erts_produce_heap(ErtsHeapFactory* factory, Uint need, Uint xtra)
{
Eterm* res;
ASSERT((unsigned int)factory->mode > (unsigned int)FACTORY_CLOSED);
if (factory->hp + need > factory->hp_end) {
reserve_heap(factory, need, xtra);
}
res = factory->hp;
factory->hp += need;
return res;
}
Eterm* erts_reserve_heap(ErtsHeapFactory* factory, Uint need)
{
ASSERT((unsigned int)factory->mode > (unsigned int)FACTORY_CLOSED);
if (factory->hp + need > factory->hp_end) {
reserve_heap(factory, need, 200);
}
return factory->hp;
}
static void reserve_heap(ErtsHeapFactory* factory, Uint need, Uint xtra)
{
ErlHeapFragment* bp;
switch (factory->mode) {
case FACTORY_HALLOC:
HRelease(factory->p, factory->hp_end, factory->hp);
factory->hp = HAllocX(factory->p, need, xtra);
factory->hp_end = factory->hp + need;
return;
case FACTORY_HEAP_FRAGS:
case FACTORY_TMP:
bp = factory->heap_frags;
if (bp) {
ASSERT(factory->hp > bp->mem);
ASSERT(factory->hp <= factory->hp_end);
ASSERT(factory->hp_end == bp->mem + bp->alloc_size);
bp->used_size = factory->hp - bp->mem;
}
bp = (ErlHeapFragment*) ERTS_HEAP_ALLOC(factory->alloc_type,
ERTS_HEAP_FRAG_SIZE(need+xtra));
bp->next = factory->heap_frags;
factory->heap_frags = bp;
bp->alloc_size = need + xtra;
bp->used_size = need;
bp->off_heap.first = NULL;
bp->off_heap.overhead = 0;
factory->hp = bp->mem;
factory->hp_end = bp->mem + bp->alloc_size;
return;
case FACTORY_STATIC:
case FACTORY_CLOSED:
default:
ASSERT(!"Invalid factory mode");
}
}
void erts_factory_close(ErtsHeapFactory* factory)
{
ErlHeapFragment* bp;
switch (factory->mode) {
case FACTORY_HALLOC:
HRelease(factory->p, factory->hp_end, factory->hp);
break;
case FACTORY_HEAP_FRAGS:
bp = factory->heap_frags;
if (bp) {
ASSERT(factory->hp >= bp->mem);
ASSERT(factory->hp <= factory->hp_end);
ASSERT(factory->hp_end == bp->mem + bp->alloc_size);
bp->used_size = factory->hp - bp->mem;
}
break;
case FACTORY_TMP:
erts_factory_undo(factory);
break;
case FACTORY_STATIC: break;
case FACTORY_CLOSED: break;
default:
ASSERT(!"Invalid factory mode");
}
factory->mode = FACTORY_CLOSED;
}
void erts_factory_trim_and_close(ErtsHeapFactory* factory,
Eterm *brefs, Uint brefs_size)
{
if (factory->mode == FACTORY_HEAP_FRAGS) {
ErlHeapFragment* bp = factory->heap_frags;
if (bp->next == NULL) {
Uint used_sz = factory->hp - bp->mem;
ASSERT(used_sz <= bp->alloc_size);
factory->heap_frags = erts_resize_message_buffer(bp, used_sz,
brefs, brefs_size);
factory->mode = FACTORY_CLOSED;
return;
}
/*else we don't trim multi fragmented messages for now */
}
erts_factory_close(factory);
}
void erts_factory_undo(ErtsHeapFactory* factory)
{
ErlHeapFragment* bp;
struct erl_off_heap_header *hdr, **hdr_nextp;
switch (factory->mode) {
case FACTORY_HALLOC:
case FACTORY_STATIC:
/* Cleanup off-heap
*/
hdr_nextp = NULL;
for (hdr = factory->off_heap->first;
hdr != factory->off_heap_saved.first;
hdr = hdr->next) {
hdr_nextp = &hdr->next;
}
if (hdr_nextp != NULL) {
*hdr_nextp = NULL;
erts_cleanup_offheap(factory->off_heap);
factory->off_heap->first = factory->off_heap_saved.first;
factory->off_heap->overhead = factory->off_heap_saved.overhead;
}
if (factory->mode == FACTORY_HALLOC) {
/* Free heap frags
*/
bp = factory->p->mbuf;
if (bp != factory->heap_frags_saved) {
do {
ErlHeapFragment *next_bp = bp->next;
ASSERT(bp->off_heap.first == NULL);
ERTS_HEAP_FREE(ERTS_ALC_T_HEAP_FRAG, (void *) bp,
ERTS_HEAP_FRAG_SIZE(bp->alloc_size));
bp = next_bp;
} while (bp != factory->heap_frags_saved);
factory->p->mbuf = bp;
}
/* Rollback heap top
*/
if (factory->heap_frags_saved == NULL) { /* No heap frags when we started */
ASSERT(factory->hp_start >= HEAP_START(factory->p));
ASSERT(factory->hp_start <= HEAP_LIMIT(factory->p));
HEAP_TOP(factory->p) = factory->hp_start;
}
else {
ASSERT(factory->heap_frags_saved == factory->p->mbuf);
if (factory->hp_start == factory->heap_frags_saved->mem) {
factory->p->mbuf = factory->p->mbuf->next;
ERTS_HEAP_FREE(ERTS_ALC_T_HEAP_FRAG, factory->heap_frags_saved,
ERTS_HEAP_FRAG_SIZE(factory->heap_frags_saved->alloc_size));
}
else if (factory->hp_start != factory->hp_end) {
unsigned remains = factory->hp_start - factory->heap_frags_saved->mem;
ASSERT(remains > 0 && remains < factory->heap_frags_saved->used_size);
factory->heap_frags_saved->used_size = remains;
}
}
}
break;
case FACTORY_TMP:
case FACTORY_HEAP_FRAGS:
erts_cleanup_offheap(factory->off_heap);
factory->off_heap->first = NULL;
bp = factory->heap_frags;
while (bp != NULL) {
ErlHeapFragment* next_bp = bp->next;
ASSERT(bp->off_heap.first == NULL);
ERTS_HEAP_FREE(factory->alloc_type, (void *) bp,
ERTS_HEAP_FRAG_SIZE(bp->alloc_size));
bp = next_bp;
}
break;
case FACTORY_CLOSED: break;
default:
ASSERT(!"Invalid factory mode");
}
factory->mode = FACTORY_CLOSED;
#ifdef DEBUG
factory->p = NULL;
factory->hp = NULL;
factory->heap_frags = NULL;
#endif
}