aboutsummaryrefslogtreecommitdiffstats
path: root/erts/emulator
diff options
context:
space:
mode:
authorNikolaos S. Papaspyrou <[email protected]>2012-06-09 01:12:50 +0300
committerBjörn-Egil Dahlberg <[email protected]>2015-11-17 14:45:44 +0100
commitf3ae60838a5e8c83cfdd3000e25562dfcbbd438d (patch)
tree9694785b1135194cb22de60e8ce64965fe5ded59 /erts/emulator
parentcfe998988c942bed0fbf026c00a2531fdf5aba7c (diff)
downloadotp-f3ae60838a5e8c83cfdd3000e25562dfcbbd438d.tar.gz
otp-f3ae60838a5e8c83cfdd3000e25562dfcbbd438d.tar.bz2
otp-f3ae60838a5e8c83cfdd3000e25562dfcbbd438d.zip
Enable shcopy for sending messages
Diffstat (limited to 'erts/emulator')
-rw-r--r--erts/emulator/beam/erl_db.c4
-rw-r--r--erts/emulator/beam/erl_message.c114
-rw-r--r--erts/emulator/beam/erl_message.h5
-rw-r--r--erts/emulator/beam/global.h7
4 files changed, 87 insertions, 43 deletions
diff --git a/erts/emulator/beam/erl_db.c b/erts/emulator/beam/erl_db.c
index 9ec14ab5ae..6119a9225f 100644
--- a/erts/emulator/beam/erl_db.c
+++ b/erts/emulator/beam/erl_db.c
@@ -1832,7 +1832,7 @@ BIF_RETTYPE ets_give_away_3(BIF_ALIST_3)
tb->common.id,
from_pid,
BIF_ARG_3),
- 0);
+ ERTS_SND_FLG_SHCOPY_TMP_BUF);
erts_smp_proc_unlock(to_proc, to_locks);
UnUseTmpHeap(5,BIF_P);
BIF_RET(am_true);
@@ -3211,7 +3211,7 @@ retry:
tb->common.id,
p->common.id,
heir_data),
- 0);
+ ERTS_SND_FLG_SHCOPY_TMP_BUF);
erts_smp_proc_unlock(to_proc, to_locks);
return !0;
}
diff --git a/erts/emulator/beam/erl_message.c b/erts/emulator/beam/erl_message.c
index 79739501a8..b1fca1df0c 100644
--- a/erts/emulator/beam/erl_message.c
+++ b/erts/emulator/beam/erl_message.c
@@ -68,6 +68,9 @@ new_message_buffer(Uint size)
bp = (ErlHeapFragment*) ERTS_HEAP_ALLOC(ERTS_ALC_T_HEAP_FRAG,
ERTS_HEAP_FRAG_SIZE(size));
ERTS_INIT_HEAP_FRAG(bp, size, size);
+#ifdef SHCOPY_DEBUG
+ VERBOSE_DEBUG("[pid=%T] new message buffer %p\n", erts_get_current_pid(), bp->mem);
+#endif
return bp;
}
@@ -249,36 +252,6 @@ erts_realloc_shrink_message(ErtsMessage *mp, Uint sz, Eterm *brefs, Uint brefs_s
}
void
-erts_link_mbuf_to_proc(Process *proc, ErlHeapFragment *first_bp)
-{
- if (first_bp) {
- ErlHeapFragment *bp = first_bp;
-
- while (1) {
- /* Move any off_heap's into the process */
- if (bp->off_heap.first != NULL) {
- struct erl_off_heap_header** next_p = &bp->off_heap.first;
- while (*next_p != NULL) {
- next_p = &((*next_p)->next);
- }
- *next_p = MSO(proc).first;
- MSO(proc).first = bp->off_heap.first;
- bp->off_heap.first = NULL;
- OH_OVERHEAD(&(MSO(proc)), bp->off_heap.overhead);
- }
- MBUF_SIZE(proc) += bp->used_size;
- if (!bp->next)
- break;
- bp = bp->next;
- }
-
- /* Link the message buffer */
- bp->next = MBUF(proc);
- MBUF(proc) = first_bp;
- }
-}
-
-void
erts_queue_dist_message(Process *rcvr,
ErtsProcLocks *rcvr_locks,
ErtsDistExternal *dist_ext,
@@ -540,6 +513,36 @@ erts_queue_message(Process* receiver, ErtsProcLocks *receiver_locks,
);
}
+void
+erts_link_mbuf_to_proc(Process *proc, ErlHeapFragment *first_bp)
+{
+ if (first_bp) {
+ ErlHeapFragment *bp = first_bp;
+
+ while (1) {
+ /* Move any off_heap's into the process */
+ if (bp->off_heap.first != NULL) {
+ struct erl_off_heap_header** next_p = &bp->off_heap.first;
+ while (*next_p != NULL) {
+ next_p = &((*next_p)->next);
+ }
+ *next_p = MSO(proc).first;
+ MSO(proc).first = bp->off_heap.first;
+ bp->off_heap.first = NULL;
+ OH_OVERHEAD(&(MSO(proc)), bp->off_heap.overhead);
+ }
+ MBUF_SIZE(proc) += bp->used_size;
+ if (!bp->next)
+ break;
+ bp = bp->next;
+ }
+
+ /* Link the message buffer */
+ bp->next = MBUF(proc);
+ MBUF(proc) = first_bp;
+ }
+}
+
Uint
erts_msg_attached_data_size_aux(ErtsMessage *msg)
{
@@ -663,11 +666,15 @@ erts_send_message(Process* sender,
Eterm utag = NIL;
#endif
erts_aint32_t receiver_state;
+#ifdef SHCOPY_SEND
+ unsigned shflags = (flags & ERTS_SND_FLG_SHCOPY_MASK) >> ERTS_SND_FLG_SHCOPY_SHIFT;
+ erts_shcopy_t info;
+#endif
BM_STOP_TIMER(system);
BM_MESSAGE(message,sender,receiver);
BM_START_TIMER(send);
- #ifdef USE_VM_PROBES
+#ifdef USE_VM_PROBES
*sender_name = *receiver_name = '\0';
if (DTRACE_ENABLED(message_send)) {
erts_snprintf(sender_name, sizeof(DTRACE_CHARBUF_NAME(sender_name)),
@@ -686,15 +693,23 @@ erts_send_message(Process* sender,
#ifdef USE_VM_PROBES
Uint dt_utag_size = 0;
#endif
-
+#ifdef SHCOPY_SEND
+ unsigned shflags = (flags & ERTS_SND_FLG_SHCOPY_MASK) >> ERTS_SND_FLG_SHCOPY_SHIFT;
+ erts_shcopy_t info;
+ INITIALIZE_SHCOPY(info);
+#endif
BM_SWAP_TIMER(send,size);
- msize = size_object(message);
+#ifdef SHCOPY_SEND
+ INITIALIZE_SHCOPY(info);
+ msize = copy_shared_calculate(message, &info, shflags);
+#else
+ msize = size_object(message);
+#endif
BM_SWAP_TIMER(size,send);
#ifdef USE_VM_PROBES
if (stoken != am_have_dt_utag) {
#endif
-
seq_trace_update_send(sender);
seq_trace_output(stoken, message, SEQ_TRACE_SEND,
receiver->common.id, sender);
@@ -720,14 +735,20 @@ erts_send_message(Process* sender,
&ohp);
BM_SWAP_TIMER(send,copy);
+
+#ifdef SHCOPY_SEND
+ if (is_not_immed(message))
+ message = copy_shared_perform(message, msize, &info, &hp, ohp, shflags);
+ DESTROY_SHCOPY(info);
+#else
+ if (is_not_immed(message))
+ message = copy_struct(message, msize, &hp, ohp);
+#endif
if (is_immed(stoken))
token = stoken;
else
token = copy_struct(stoken, seq_trace_size, &hp, ohp);
- if (is_not_immed(message))
- message = copy_struct(message, msize, &hp, ohp);
-
#ifdef USE_VM_PROBES
if (DT_UTAG_FLAGS(sender) & DT_UTAG_SPREADING) {
if (is_immed(DT_UTAG(sender)))
@@ -764,7 +785,12 @@ erts_send_message(Process* sender,
}
else {
BM_SWAP_TIMER(send,size);
- msize = size_object(message);
+#ifdef SHCOPY_SEND
+ INITIALIZE_SHCOPY(info);
+ msize = copy_shared_calculate(message, &info, shflags);
+#else
+ msize = size_object(message);
+#endif
BM_SWAP_TIMER(size,send);
mp = erts_alloc_message_heap_state(receiver,
@@ -774,8 +800,14 @@ erts_send_message(Process* sender,
&hp,
&ohp);
BM_SWAP_TIMER(send,copy);
- if (is_not_immed(message))
- message = copy_struct(message, msize, &hp, ohp);
+#ifdef SHCOPY_SEND
+ if (is_not_immed(message))
+ message = copy_shared_perform(message, msize, &info, &hp, ohp, shflags);
+ DESTROY_SHCOPY(info);
+#else
+ if (is_not_immed(message))
+ message = copy_struct(message, msize, &hp, ohp);
+#endif
BM_MESSAGE_COPIED(msz);
BM_SWAP_TIMER(copy,send);
}
@@ -800,6 +832,7 @@ erts_send_message(Process* sender,
return res;
}
+
/*
* This function delivers an EXIT message to a process
* which is trapping EXITs.
@@ -1725,4 +1758,3 @@ void erts_factory_undo(ErtsHeapFactory* factory)
factory->heap_frags = NULL;
#endif
}
-
diff --git a/erts/emulator/beam/erl_message.h b/erts/emulator/beam/erl_message.h
index 740ae46a0f..0de36c3199 100644
--- a/erts/emulator/beam/erl_message.h
+++ b/erts/emulator/beam/erl_message.h
@@ -238,6 +238,11 @@ do { \
#define ERTS_SND_FLG_NO_SEQ_TRACE (((unsigned) 1) << 0)
+#define ERTS_SND_FLG_SHCOPY_SHIFT 1
+#define ERTS_SND_FLG_SHCOPY_MASK (ERTS_SHCOPY_FLG_MASK << ERTS_SND_FLG_SHCOPY_SHIFT)
+#define ERTS_SND_FLG_SHCOPY_NONE (ERTS_SHCOPY_FLG_NONE << ERTS_SND_FLG_SHCOPY_SHIFT)
+#define ERTS_SND_FLG_SHCOPY_TMP_BUF (ERTS_SHCOPY_FLG_TMP_BUF << ERTS_SND_FLG_SHCOPY_SHIFT)
+
#define ERTS_HEAP_FRAG_SIZE(DATA_WORDS) \
(sizeof(ErlHeapFragment) - sizeof(Eterm) + (DATA_WORDS)*sizeof(Eterm))
diff --git a/erts/emulator/beam/global.h b/erts/emulator/beam/global.h
index c2056dafaa..a5e6ff6f1c 100644
--- a/erts/emulator/beam/global.h
+++ b/erts/emulator/beam/global.h
@@ -1075,6 +1075,13 @@ typedef struct shcopy_info {
ErtsAlcType_t shtable_alloc_type;
} shcopy_info;
+#define INITIALIZE_INFO(info) \
+do { \
+ info.queue_start = info.queue_default; \
+ info.bitstore_start = info.bitstore_default; \
+ info.shtable_start = info.shtable_default; \
+} while(0)
+
#define DESTROY_INFO(info) \
do { \
if (info.queue_start != info.queue_default) { \