From f3ae60838a5e8c83cfdd3000e25562dfcbbd438d Mon Sep 17 00:00:00 2001 From: "Nikolaos S. Papaspyrou" Date: Sat, 9 Jun 2012 01:12:50 +0300 Subject: Enable shcopy for sending messages --- erts/emulator/beam/erl_db.c | 4 +- erts/emulator/beam/erl_message.c | 114 +++++++++++++++++++++++++-------------- erts/emulator/beam/erl_message.h | 5 ++ erts/emulator/beam/global.h | 7 +++ 4 files changed, 87 insertions(+), 43 deletions(-) (limited to 'erts/emulator') diff --git a/erts/emulator/beam/erl_db.c b/erts/emulator/beam/erl_db.c index 9ec14ab5ae..6119a9225f 100644 --- a/erts/emulator/beam/erl_db.c +++ b/erts/emulator/beam/erl_db.c @@ -1832,7 +1832,7 @@ BIF_RETTYPE ets_give_away_3(BIF_ALIST_3) tb->common.id, from_pid, BIF_ARG_3), - 0); + ERTS_SND_FLG_SHCOPY_TMP_BUF); erts_smp_proc_unlock(to_proc, to_locks); UnUseTmpHeap(5,BIF_P); BIF_RET(am_true); @@ -3211,7 +3211,7 @@ retry: tb->common.id, p->common.id, heir_data), - 0); + ERTS_SND_FLG_SHCOPY_TMP_BUF); erts_smp_proc_unlock(to_proc, to_locks); return !0; } diff --git a/erts/emulator/beam/erl_message.c b/erts/emulator/beam/erl_message.c index 79739501a8..b1fca1df0c 100644 --- a/erts/emulator/beam/erl_message.c +++ b/erts/emulator/beam/erl_message.c @@ -68,6 +68,9 @@ new_message_buffer(Uint size) bp = (ErlHeapFragment*) ERTS_HEAP_ALLOC(ERTS_ALC_T_HEAP_FRAG, ERTS_HEAP_FRAG_SIZE(size)); ERTS_INIT_HEAP_FRAG(bp, size, size); +#ifdef SHCOPY_DEBUG + VERBOSE_DEBUG("[pid=%T] new message buffer %p\n", erts_get_current_pid(), bp->mem); +#endif return bp; } @@ -248,36 +251,6 @@ erts_realloc_shrink_message(ErtsMessage *mp, Uint sz, Eterm *brefs, Uint brefs_s return nmp; } -void -erts_link_mbuf_to_proc(Process *proc, ErlHeapFragment *first_bp) -{ - if (first_bp) { - ErlHeapFragment *bp = first_bp; - - while (1) { - /* Move any off_heap's into the process */ - if (bp->off_heap.first != NULL) { - struct erl_off_heap_header** next_p = &bp->off_heap.first; - while (*next_p != NULL) { - next_p = &((*next_p)->next); - } - *next_p = MSO(proc).first; - MSO(proc).first = bp->off_heap.first; - bp->off_heap.first = NULL; - OH_OVERHEAD(&(MSO(proc)), bp->off_heap.overhead); - } - MBUF_SIZE(proc) += bp->used_size; - if (!bp->next) - break; - bp = bp->next; - } - - /* Link the message buffer */ - bp->next = MBUF(proc); - MBUF(proc) = first_bp; - } -} - void erts_queue_dist_message(Process *rcvr, ErtsProcLocks *rcvr_locks, @@ -540,6 +513,36 @@ erts_queue_message(Process* receiver, ErtsProcLocks *receiver_locks, ); } +void +erts_link_mbuf_to_proc(Process *proc, ErlHeapFragment *first_bp) +{ + if (first_bp) { + ErlHeapFragment *bp = first_bp; + + while (1) { + /* Move any off_heap's into the process */ + if (bp->off_heap.first != NULL) { + struct erl_off_heap_header** next_p = &bp->off_heap.first; + while (*next_p != NULL) { + next_p = &((*next_p)->next); + } + *next_p = MSO(proc).first; + MSO(proc).first = bp->off_heap.first; + bp->off_heap.first = NULL; + OH_OVERHEAD(&(MSO(proc)), bp->off_heap.overhead); + } + MBUF_SIZE(proc) += bp->used_size; + if (!bp->next) + break; + bp = bp->next; + } + + /* Link the message buffer */ + bp->next = MBUF(proc); + MBUF(proc) = first_bp; + } +} + Uint erts_msg_attached_data_size_aux(ErtsMessage *msg) { @@ -663,11 +666,15 @@ erts_send_message(Process* sender, Eterm utag = NIL; #endif erts_aint32_t receiver_state; +#ifdef SHCOPY_SEND + unsigned shflags = (flags & ERTS_SND_FLG_SHCOPY_MASK) >> ERTS_SND_FLG_SHCOPY_SHIFT; + erts_shcopy_t info; +#endif BM_STOP_TIMER(system); BM_MESSAGE(message,sender,receiver); BM_START_TIMER(send); - #ifdef USE_VM_PROBES +#ifdef USE_VM_PROBES *sender_name = *receiver_name = '\0'; if (DTRACE_ENABLED(message_send)) { erts_snprintf(sender_name, sizeof(DTRACE_CHARBUF_NAME(sender_name)), @@ -686,15 +693,23 @@ erts_send_message(Process* sender, #ifdef USE_VM_PROBES Uint dt_utag_size = 0; #endif - +#ifdef SHCOPY_SEND + unsigned shflags = (flags & ERTS_SND_FLG_SHCOPY_MASK) >> ERTS_SND_FLG_SHCOPY_SHIFT; + erts_shcopy_t info; + INITIALIZE_SHCOPY(info); +#endif BM_SWAP_TIMER(send,size); - msize = size_object(message); +#ifdef SHCOPY_SEND + INITIALIZE_SHCOPY(info); + msize = copy_shared_calculate(message, &info, shflags); +#else + msize = size_object(message); +#endif BM_SWAP_TIMER(size,send); #ifdef USE_VM_PROBES if (stoken != am_have_dt_utag) { #endif - seq_trace_update_send(sender); seq_trace_output(stoken, message, SEQ_TRACE_SEND, receiver->common.id, sender); @@ -720,14 +735,20 @@ erts_send_message(Process* sender, &ohp); BM_SWAP_TIMER(send,copy); + +#ifdef SHCOPY_SEND + if (is_not_immed(message)) + message = copy_shared_perform(message, msize, &info, &hp, ohp, shflags); + DESTROY_SHCOPY(info); +#else + if (is_not_immed(message)) + message = copy_struct(message, msize, &hp, ohp); +#endif if (is_immed(stoken)) token = stoken; else token = copy_struct(stoken, seq_trace_size, &hp, ohp); - if (is_not_immed(message)) - message = copy_struct(message, msize, &hp, ohp); - #ifdef USE_VM_PROBES if (DT_UTAG_FLAGS(sender) & DT_UTAG_SPREADING) { if (is_immed(DT_UTAG(sender))) @@ -764,7 +785,12 @@ erts_send_message(Process* sender, } else { BM_SWAP_TIMER(send,size); - msize = size_object(message); +#ifdef SHCOPY_SEND + INITIALIZE_SHCOPY(info); + msize = copy_shared_calculate(message, &info, shflags); +#else + msize = size_object(message); +#endif BM_SWAP_TIMER(size,send); mp = erts_alloc_message_heap_state(receiver, @@ -774,8 +800,14 @@ erts_send_message(Process* sender, &hp, &ohp); BM_SWAP_TIMER(send,copy); - if (is_not_immed(message)) - message = copy_struct(message, msize, &hp, ohp); +#ifdef SHCOPY_SEND + if (is_not_immed(message)) + message = copy_shared_perform(message, msize, &info, &hp, ohp, shflags); + DESTROY_SHCOPY(info); +#else + if (is_not_immed(message)) + message = copy_struct(message, msize, &hp, ohp); +#endif BM_MESSAGE_COPIED(msz); BM_SWAP_TIMER(copy,send); } @@ -800,6 +832,7 @@ erts_send_message(Process* sender, return res; } + /* * This function delivers an EXIT message to a process * which is trapping EXITs. @@ -1725,4 +1758,3 @@ void erts_factory_undo(ErtsHeapFactory* factory) factory->heap_frags = NULL; #endif } - diff --git a/erts/emulator/beam/erl_message.h b/erts/emulator/beam/erl_message.h index 740ae46a0f..0de36c3199 100644 --- a/erts/emulator/beam/erl_message.h +++ b/erts/emulator/beam/erl_message.h @@ -238,6 +238,11 @@ do { \ #define ERTS_SND_FLG_NO_SEQ_TRACE (((unsigned) 1) << 0) +#define ERTS_SND_FLG_SHCOPY_SHIFT 1 +#define ERTS_SND_FLG_SHCOPY_MASK (ERTS_SHCOPY_FLG_MASK << ERTS_SND_FLG_SHCOPY_SHIFT) +#define ERTS_SND_FLG_SHCOPY_NONE (ERTS_SHCOPY_FLG_NONE << ERTS_SND_FLG_SHCOPY_SHIFT) +#define ERTS_SND_FLG_SHCOPY_TMP_BUF (ERTS_SHCOPY_FLG_TMP_BUF << ERTS_SND_FLG_SHCOPY_SHIFT) + #define ERTS_HEAP_FRAG_SIZE(DATA_WORDS) \ (sizeof(ErlHeapFragment) - sizeof(Eterm) + (DATA_WORDS)*sizeof(Eterm)) diff --git a/erts/emulator/beam/global.h b/erts/emulator/beam/global.h index c2056dafaa..a5e6ff6f1c 100644 --- a/erts/emulator/beam/global.h +++ b/erts/emulator/beam/global.h @@ -1075,6 +1075,13 @@ typedef struct shcopy_info { ErtsAlcType_t shtable_alloc_type; } shcopy_info; +#define INITIALIZE_INFO(info) \ +do { \ + info.queue_start = info.queue_default; \ + info.bitstore_start = info.bitstore_default; \ + info.shtable_start = info.shtable_default; \ +} while(0) + #define DESTROY_INFO(info) \ do { \ if (info.queue_start != info.queue_default) { \ -- cgit v1.2.3