diff options
-rw-r--r-- | erts/emulator/beam/bif.c | 15 | ||||
-rw-r--r-- | erts/emulator/beam/erl_message.c | 52 | ||||
-rw-r--r-- | erts/emulator/beam/erl_message.h | 6 | ||||
-rw-r--r-- | erts/emulator/beam/erl_process.c | 19 |
4 files changed, 42 insertions, 50 deletions
diff --git a/erts/emulator/beam/bif.c b/erts/emulator/beam/bif.c index 6943c8852c..66f4259d20 100644 --- a/erts/emulator/beam/bif.c +++ b/erts/emulator/beam/bif.c @@ -2051,18 +2051,11 @@ do_send(Process *p, Eterm to, Eterm msg, int suspend) { rp_locks |= ERTS_PROC_LOCK_MAIN; #endif /* send to local process */ - erts_send_message(p, rp, &rp_locks, msg, 0); - if (!erts_use_sender_punish) + res = erts_send_message(p, rp, &rp_locks, msg, 0); + if (erts_use_sender_punish) + res *= 4; + else res = 0; - else { -#ifdef ERTS_SMP - res = rp->msg_inq.len*4; - if (ERTS_PROC_LOCK_MAIN & rp_locks) - res += rp->msg.len*4; -#else - res = rp->msg.len*4; -#endif - } erts_smp_proc_unlock(rp, p == rp ? (rp_locks & ~ERTS_PROC_LOCK_MAIN) diff --git a/erts/emulator/beam/erl_message.c b/erts/emulator/beam/erl_message.c index c98ca09f3f..531144bc4f 100644 --- a/erts/emulator/beam/erl_message.c +++ b/erts/emulator/beam/erl_message.c @@ -418,7 +418,7 @@ erts_queue_dist_message(Process *rcvr, } /* Add a message last in message queue */ -static int +static Sint queue_message(Process *c_p, Process* receiver, ErtsProcLocks *receiver_locks, @@ -431,6 +431,7 @@ queue_message(Process *c_p, #endif ) { + Sint res; ErlMessage* mp; int locked_msgq = 0; erts_aint_t state; @@ -491,7 +492,10 @@ queue_message(Process *c_p, mp->next = NULL; mp->data.heap_frag = bp; -#ifdef ERTS_SMP +#ifndef ERTS_SMP + res = receiver->msg.len; +#else + res = receiver->msg_inq.len; if (*receiver_locks & ERTS_PROC_LOCK_MAIN) { /* * We move 'in queue' to 'private queue' and place @@ -501,6 +505,7 @@ queue_message(Process *c_p, * we don't need to include the 'in queue' in * the root set when garbage collecting. */ + res += receiver->msg.len; ERTS_SMP_MSGQ_MV_INQ2PRIVQ(receiver); LINK_MESSAGE_PRIVQ(receiver, mp); } @@ -540,7 +545,7 @@ queue_message(Process *c_p, #ifndef ERTS_SMP ERTS_HOLE_CHECK(receiver); #endif - return 0; + return res; } void @@ -864,7 +869,7 @@ erts_move_msg_attached_data_to_heap(Eterm **hpp, ErlOffHeap *ohp, ErlMessage *ms * Send a local message when sender & receiver processes are known. */ -void +Sint erts_send_message(Process* sender, Process* receiver, ErtsProcLocks *receiver_locks, @@ -874,6 +879,7 @@ erts_send_message(Process* sender, Uint msize; ErlHeapFragment* bp = NULL; Eterm token = NIL; + Sint res = 0; #ifdef USE_VM_PROBES DTRACE_CHARBUF(sender_name, 64); DTRACE_CHARBUF(receiver_name, 64); @@ -960,15 +966,17 @@ erts_send_message(Process* sender, msize, tok_label, tok_lastcnt, tok_serial); } #endif - erts_queue_message(receiver, - receiver_locks, - bp, - message, - token + res = queue_message(NULL, + receiver, + receiver_locks, + NULL, + bp, + message, + token #ifdef USE_VM_PROBES - , utag + , utag #endif - ); + ); BM_SWAP_TIMER(send,system); } else if (sender == receiver) { /* Drop message if receiver has a pending exit ... */ @@ -1012,12 +1020,13 @@ erts_send_message(Process* sender, ERTS_SMP_MSGQ_MV_INQ2PRIVQ(receiver); LINK_MESSAGE_PRIVQ(receiver, mp); + res = receiver->msg.len; + if (IS_TRACED_FL(receiver, F_TRACE_RECEIVE)) { trace_receive(receiver, message); } } BM_SWAP_TIMER(send,system); - return; } else { #ifdef ERTS_SMP ErlOffHeap *ohp; @@ -1039,15 +1048,15 @@ erts_send_message(Process* sender, BM_SWAP_TIMER(copy,send); DTRACE6(message_send, sender_name, receiver_name, msize, tok_label, tok_lastcnt, tok_serial); - queue_message(sender, - receiver, - receiver_locks, - &state, - bp, - message, - token + res = queue_message(sender, + receiver, + receiver_locks, + &state, + bp, + message, + token #ifdef USE_VM_PROBES - , NIL + , NIL #endif ); BM_SWAP_TIMER(send,system); @@ -1079,7 +1088,7 @@ erts_send_message(Process* sender, mp->next = NULL; mp->data.attached = NULL; LINK_MESSAGE(receiver, mp); - + res = receiver->msg.len; erts_proc_notify_new_message(receiver); if (IS_TRACED_FL(receiver, F_TRACE_RECEIVE)) { @@ -1089,6 +1098,7 @@ erts_send_message(Process* sender, #endif /* #ifndef ERTS_SMP */ return; } + return res; } /* diff --git a/erts/emulator/beam/erl_message.h b/erts/emulator/beam/erl_message.h index 3e9a24ee81..bb4dbf0ef3 100644 --- a/erts/emulator/beam/erl_message.h +++ b/erts/emulator/beam/erl_message.h @@ -90,7 +90,7 @@ typedef struct { ErlMessage* first; ErlMessage** last; /* point to the last next pointer */ ErlMessage** save; - int len; /* queue length */ + Sint len; /* queue length */ /* * The following two fields are used by the recv_mark/1 and @@ -105,7 +105,7 @@ typedef struct { typedef struct { ErlMessage* first; ErlMessage** last; /* point to the last next pointer */ - int len; /* queue length */ + Sint len; /* queue length */ } ErlMessageInQueue; #endif @@ -234,7 +234,7 @@ void erts_queue_message(Process*, ErtsProcLocks*, ErlHeapFragment*, Eterm, Eterm #endif ); void erts_deliver_exit_message(Eterm, Process*, ErtsProcLocks *, Eterm, Eterm); -void erts_send_message(Process*, Process*, ErtsProcLocks*, Eterm, unsigned); +Sint erts_send_message(Process*, Process*, ErtsProcLocks*, Eterm, unsigned); void erts_link_mbuf_to_proc(Process *proc, ErlHeapFragment *bp); void erts_move_msg_mbuf_to_heap(Eterm**, ErlOffHeap*, ErlMessage *); diff --git a/erts/emulator/beam/erl_process.c b/erts/emulator/beam/erl_process.c index afd2ddc69d..8153cdce1c 100644 --- a/erts/emulator/beam/erl_process.c +++ b/erts/emulator/beam/erl_process.c @@ -5001,7 +5001,9 @@ schedule_process(Process *p, erts_aint32_t state, int active_enq) while (1) { erts_aint32_t e; n = e = a; - ASSERT(!(a & ERTS_PSFLG_FREE)); + + if (a & ERTS_PSFLG_FREE) + return; /* We don't want to schedule free processes... */ n |= ERTS_PSFLG_ACTIVE; if (!(a & (ERTS_PSFLG_SUSPENDED|ERTS_PSFLG_RUNNING))) n |= ERTS_PSFLG_IN_RUNQ; @@ -5012,23 +5014,13 @@ schedule_process(Process *p, erts_aint32_t state, int active_enq) return; /* Someone else activated process ... */ } -#ifdef RRR_DEBUG - if (active_enq) - erts_fprintf(stderr, "! state=0x%x\n", n); -#endif - if (erts_system_profile_flags.runnable_procs && !(a & (ERTS_PSFLG_ACTIVE|ERTS_PSFLG_SUSPENDED))) { profile_runnable_proc(p, am_active); } - if ((n & ERTS_PSFLG_IN_RUNQ) && !(a & ERTS_PSFLG_IN_RUNQ)) { -#ifdef RRR_DEBUG - if (active_enq) - erts_fprintf(stderr, "-->\n"); -#endif + if ((n & ERTS_PSFLG_IN_RUNQ) && !(a & ERTS_PSFLG_IN_RUNQ)) add2runq(p, n); - } } void @@ -5098,9 +5090,6 @@ resume_process(Process *p) state = erts_smp_atomic32_read_band_mb(&p->state, ~ERTS_PSFLG_SUSPENDED); state &= ~ERTS_PSFLG_SUSPENDED; -#ifdef RRR_DEBUG - erts_fprintf(stderr, "%T - state=0x%x\n", p->id, state); -#endif if ((state & (ERTS_PSFLG_EXITING | ERTS_PSFLG_ACTIVE | ERTS_PSFLG_IN_RUNQ |