// -*- c -*-
//
// %CopyrightBegin%
//
// Copyright Ericsson AB 2017. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// %CopyrightEnd%
//
// /*
// * Skeleton for receive statement:
// *
// * recv_mark L1 Optional
// * call make_ref/monitor Optional
// * ...
// * recv_set L1 Optional
// * L1: <-------------------+
// * <-----------+ |
// * | |
// * loop_rec L2 ------+---+ |
// * ... | | |
// * remove_message | | |
// * jump L3 | | |
// * ... | | |
// * loop_rec_end L1 --+ | |
// * L2: <---------------+ |
// * wait L1 -------------------+ or wait_timeout
// * timeout
// *
// * L3: Code after receive...
// *
// */
recv_mark(Dest) {
/*
* Save the current position in message buffer and the
* the label for the loop_rec/2 instruction for the
* the receive statement.
*/
c_p->msg.mark = (BeamInstr *) $Dest;
c_p->msg.saved_last = c_p->msg.last;
}
i_recv_set() {
/*
* If the mark is valid (points to the loop_rec/2
* instruction that follows), we know that the saved
* position points to the first message that could
* possibly be matched out.
*
* If the mark is invalid, we do nothing, meaning that
* we will look through all messages in the message queue.
*/
if (c_p->msg.mark == (BeamInstr *) ($NEXT_INSTRUCTION)) {
c_p->msg.save = c_p->msg.saved_last;
}
/* Fall through to the loop_rec/2 instruction */
}
i_loop_rec(Dest) {
//| -no_prefetch
/*
* Pick up the next message and place it in x(0).
* If no message, jump to a wait or wait_timeout instruction.
*/
ErtsMessage* msgp;
/*
* We need to disable GC while matching messages
* in the queue. This since messages with data outside
* the heap will be corrupted by a GC.
*/
ASSERT(!(c_p->flags & F_DELAY_GC));
c_p->flags |= F_DELAY_GC;
loop_rec__:
PROCESS_MAIN_CHK_LOCKS(c_p);
msgp = PEEK_MESSAGE(c_p);
if (!msgp) {
erts_proc_lock(c_p, ERTS_PROC_LOCKS_MSG_RECEIVE);
/* Make sure messages wont pass exit signals... */
if (ERTS_PROC_PENDING_EXIT(c_p)) {
erts_proc_unlock(c_p, ERTS_PROC_LOCKS_MSG_RECEIVE);
SWAPOUT;
c_p->flags &= ~F_DELAY_GC;
c_p->arity = 0;
goto do_schedule; /* Will be rescheduled for exit */
}
ERTS_MSGQ_MV_INQ2PRIVQ(c_p);
msgp = PEEK_MESSAGE(c_p);
if (msgp) {
erts_proc_unlock(c_p, ERTS_PROC_LOCKS_MSG_RECEIVE);
} else {
c_p->flags &= ~F_DELAY_GC;
SET_I((BeamInstr *) $Dest);
Goto(*I); /* Jump to a wait or wait_timeout instruction */
}
}
if (is_non_value(ERL_MESSAGE_TERM(msgp))) {
SWAPOUT; /* erts_decode_dist_message() may write to heap... */
if (!erts_decode_dist_message(c_p, ERTS_PROC_LOCK_MAIN, msgp, 0)) {
/*
* A corrupt distribution message that we weren't able to decode;
* remove it...
*/
/* No swapin should be needed */
ASSERT(HTOP == c_p->htop && E == c_p->stop);
/* TODO: Add DTrace probe for this bad message situation? */
UNLINK_MESSAGE(c_p, msgp);
msgp->next = NULL;
erts_cleanup_messages(msgp);
goto loop_rec__;
}
SWAPIN;
}
r(0) = ERL_MESSAGE_TERM(msgp);
}
remove_message() {
//| -no_prefetch
/*
* Remove a (matched) message from the message queue.
*/
ErtsMessage* msgp;
PROCESS_MAIN_CHK_LOCKS(c_p);
ERTS_CHK_MBUF_SZ(c_p);
msgp = PEEK_MESSAGE(c_p);
if (ERTS_PROC_GET_SAVED_CALLS_BUF(c_p)) {
save_calls(c_p, &exp_receive);
}
if (ERL_MESSAGE_TOKEN(msgp) == NIL) {
#ifdef USE_VM_PROBES
if (DT_UTAG(c_p) != NIL) {
if (DT_UTAG_FLAGS(c_p) & DT_UTAG_PERMANENT) {
SEQ_TRACE_TOKEN(c_p) = am_have_dt_utag;
} else {
DT_UTAG(c_p) = NIL;
SEQ_TRACE_TOKEN(c_p) = NIL;
}
} else {
#endif
SEQ_TRACE_TOKEN(c_p) = NIL;
#ifdef USE_VM_PROBES
}
DT_UTAG_FLAGS(c_p) &= ~DT_UTAG_SPREADING;
#endif
} else if (ERL_MESSAGE_TOKEN(msgp) != am_undefined) {
Eterm msg;
SEQ_TRACE_TOKEN(c_p) = ERL_MESSAGE_TOKEN(msgp);
#ifdef USE_VM_PROBES
if (ERL_MESSAGE_TOKEN(msgp) == am_have_dt_utag) {
if (DT_UTAG(c_p) == NIL) {
DT_UTAG(c_p) = ERL_MESSAGE_DT_UTAG(msgp);
}
DT_UTAG_FLAGS(c_p) |= DT_UTAG_SPREADING;
} else {
#endif
ASSERT(is_tuple(SEQ_TRACE_TOKEN(c_p)));
ASSERT(SEQ_TRACE_TOKEN_ARITY(c_p) == 5);
ASSERT(is_small(SEQ_TRACE_TOKEN_SERIAL(c_p)));
ASSERT(is_small(SEQ_TRACE_TOKEN_LASTCNT(c_p)));
ASSERT(is_small(SEQ_TRACE_TOKEN_FLAGS(c_p)));
ASSERT(is_pid(SEQ_TRACE_TOKEN_SENDER(c_p)));
c_p->seq_trace_lastcnt = unsigned_val(SEQ_TRACE_TOKEN_SERIAL(c_p));
if (c_p->seq_trace_clock < unsigned_val(SEQ_TRACE_TOKEN_SERIAL(c_p))) {
c_p->seq_trace_clock = unsigned_val(SEQ_TRACE_TOKEN_SERIAL(c_p));
}
msg = ERL_MESSAGE_TERM(msgp);
seq_trace_output(SEQ_TRACE_TOKEN(c_p), msg, SEQ_TRACE_RECEIVE,
c_p->common.id, c_p);
#ifdef USE_VM_PROBES
}
#endif
}
#ifdef USE_VM_PROBES
if (DTRACE_ENABLED(message_receive)) {
Eterm token2 = NIL;
DTRACE_CHARBUF(receiver_name, DTRACE_TERM_BUF_SIZE);
Sint tok_label = 0;
Sint tok_lastcnt = 0;
Sint tok_serial = 0;
dtrace_proc_str(c_p, receiver_name);
token2 = SEQ_TRACE_TOKEN(c_p);
if (have_seqtrace(token2)) {
tok_label = signed_val(SEQ_TRACE_T_LABEL(token2));
tok_lastcnt = signed_val(SEQ_TRACE_T_LASTCNT(token2));
tok_serial = signed_val(SEQ_TRACE_T_SERIAL(token2));
}
DTRACE6(message_receive,
receiver_name, size_object(ERL_MESSAGE_TERM(msgp)),
c_p->msg.len - 1, tok_label, tok_lastcnt, tok_serial);
}
#endif
UNLINK_MESSAGE(c_p, msgp);
JOIN_MESSAGE(c_p);
CANCEL_TIMER(c_p);
erts_save_message_in_proc(c_p, msgp);
c_p->flags &= ~F_DELAY_GC;
if (ERTS_IS_GC_DESIRED_INTERNAL(c_p, HTOP, E)) {
/*
* We want to GC soon but we leave a few
* reductions giving the message some time
* to turn into garbage.
*/
ERTS_VBUMP_LEAVE_REDS_INTERNAL(c_p, 5, FCALLS);
}
ERTS_DBG_CHK_REDS(c_p, FCALLS);
ERTS_CHK_MBUF_SZ(c_p);
ERTS_VERIFY_UNUSED_TEMP_ALLOC(c_p);
PROCESS_MAIN_CHK_LOCKS(c_p);
}
loop_rec_end(Dest) {
//| -no_next
/*
* Advance the save pointer to the next message (the current
* message didn't match), then jump to the loop_rec instruction.
*/
ASSERT(c_p->flags & F_DELAY_GC);
SET_I((BeamInstr *) $Dest);
SAVE_MESSAGE(c_p);
if (FCALLS > 0 || FCALLS > neg_o_reds) {
FCALLS--;
goto loop_rec__;
}
c_p->flags &= ~F_DELAY_GC;
c_p->i = I;
SWAPOUT;
c_p->arity = 0;
c_p->current = NULL;
goto do_schedule;
}
timeout_locked() {
/*
* A timeout has occurred. Reset the save pointer so that the next
* receive statement will examine the first message first.
*/
erts_proc_unlock(c_p, ERTS_PROC_LOCKS_MSG_RECEIVE);
$timeout();
}
timeout() {
if (IS_TRACED_FL(c_p, F_TRACE_RECEIVE)) {
trace_receive(c_p, am_clock_service, am_timeout, NULL);
}
if (ERTS_PROC_GET_SAVED_CALLS_BUF(c_p)) {
save_calls(c_p, &exp_timeout);
}
c_p->flags &= ~F_TIMO;
JOIN_MESSAGE(c_p);
}
TIMEOUT_VALUE() {
c_p->freason = EXC_TIMEOUT_VALUE;
goto find_func_info;
}
i_wait_error_locked() {
erts_proc_unlock(c_p, ERTS_PROC_LOCKS_MSG_RECEIVE);
$TIMEOUT_VALUE();
}
i_wait_error() {
$TIMEOUT_VALUE();
}
wait_timeout_unlocked_int := wait.lock.int.execute;
wait_timeout_locked_int := wait.int.execute;
wait_timeout_unlocked := wait.lock.src.execute;
wait_timeout_locked := wait.src.execute;
wait_unlocked := wait.lock.execute;
wait_locked := wait.unlocked.execute;
wait.lock() {
erts_proc_lock(c_p, ERTS_PROC_LOCKS_MSG_RECEIVE);
}
wait.unlocked() {
}
wait.int(Int) {
/*
* If we have already set the timer, we must NOT set it again. Therefore,
* we must test the F_INSLPQUEUE flag as well as the F_TIMO flag.
*/
if ((c_p->flags & (F_INSLPQUEUE | F_TIMO)) == 0) {
BeamInstr** pi = (BeamInstr **) c_p->def_arg_reg;
*pi = $NEXT_INSTRUCTION;
erts_set_proc_timer_uword(c_p, $Int);
}
}
wait.src(Src) {
/*
* If we have already set the timer, we must NOT set it again. Therefore,
* we must test the F_INSLPQUEUE flag as well as the F_TIMO flag.
*/
if ((c_p->flags & (F_INSLPQUEUE | F_TIMO)) == 0) {
Eterm timeout_value = $Src;
if (timeout_value == make_small(0)) {
erts_proc_unlock(c_p, ERTS_PROC_LOCKS_MSG_RECEIVE);
$NEXT0();
} else if (timeout_value == am_infinity) {
c_p->flags |= F_TIMO;
} else {
int tres = erts_set_proc_timer_term(c_p, timeout_value);
if (tres == 0) {
/*
* The timer routiner will set c_p->i to the value in
* c_p->def_arg_reg[0]. Note that it is safe to use this
* location because there are no living x registers in
* a receive statement.
*/
BeamInstr** pi = (BeamInstr**) c_p->def_arg_reg;
*pi = $NEXT_INSTRUCTION;
} else { /* Wrong time */
erts_proc_unlock(c_p, ERTS_PROC_LOCKS_MSG_RECEIVE);
c_p->freason = EXC_TIMEOUT_VALUE;
goto find_func_info;
}
}
}
}
//
// Prepare to wait indefinitely for a new message to arrive
// (or the time set above if falling through from above).
// When a new message arrives, control will be transferred
// the loop_rec instruction (at label L1). In case of
// of timeout, control will be transferred to the timeout
// instruction following the wait_timeout instruction.
//
wait.execute(JumpTarget) {
c_p->i = (BeamInstr *) $JumpTarget; /* L1 */
SWAPOUT;
c_p->arity = 0;
if (!ERTS_PTMR_IS_TIMED_OUT(c_p)) {
erts_atomic32_read_band_relb(&c_p->state,
~ERTS_PSFLG_ACTIVE);
}
ASSERT(!ERTS_PROC_IS_EXITING(c_p));
erts_proc_unlock(c_p, ERTS_PROC_LOCKS_MSG_RECEIVE);
c_p->current = NULL;
goto do_schedule;
//| -no_next
}