// -*- c -*- // // %CopyrightBegin% // // Copyright Ericsson AB 2017-2018. All Rights Reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. // // %CopyrightEnd% // // /* // * Skeleton for receive statement: // * // * recv_mark L1 Optional // * call make_ref/monitor Optional // * ... // * recv_set L1 Optional // * L1: <-------------------+ // * <-----------+ | // * | | // * loop_rec L2 ------+---+ | // * ... | | | // * remove_message | | | // * jump L3 | | | // * ... | | | // * loop_rec_end L1 --+ | | // * L2: <---------------+ | // * wait L1 -------------------+ or wait_timeout // * timeout // * // * L3: Code after receive... // * // */ i_recv_mark() { /* * Save the current end of message queue */ ERTS_RECV_MARK_SAVE(c_p); } i_recv_set() { /* * If previously saved recv mark, set peek position to it */ ERTS_RECV_MARK_SET(c_p); SET_I($NEXT_INSTRUCTION); goto loop_rec_top__; //| -no_next } i_loop_rec(Dest) { //| -no_prefetch /* * Pick up the next message and place it in x(0). * If no message, jump to a wait or wait_timeout instruction. */ ErtsMessage* msgp; /* Entry point from recv_set */ loop_rec_top__: /* * We need to disable GC while matching messages * in the queue. This since messages with data outside * the heap will be corrupted by a GC. */ ASSERT(!(c_p->flags & F_DELAY_GC)); c_p->flags |= F_DELAY_GC; /* Entry point from loop_rec_end (and locally) */ loop_rec__: if (FCALLS <= 0 && FCALLS <= neg_o_reds) { $SET_CP_I_ABS(I); c_p->flags &= ~F_DELAY_GC; SWAPOUT; c_p->arity = 0; c_p->current = NULL; goto do_schedule; } ASSERT(!ERTS_PROC_IS_EXITING(c_p)); PROCESS_MAIN_CHK_LOCKS(c_p); msgp = PEEK_MESSAGE(c_p); if (ERTS_UNLIKELY(msgp == NULL)) { int get_out; SWAPOUT; $SET_CP_I_ABS(I); c_p->arity = 0; c_p->current = NULL; c_p->fcalls = FCALLS; FCALLS -= erts_proc_sig_receive_helper(c_p, FCALLS, neg_o_reds, &msgp, &get_out); SWAPIN; if (ERTS_UNLIKELY(msgp == NULL)) { if (get_out) { if (get_out < 0) { ASSERT(FCALLS <= 0 && FCALLS <= neg_o_reds); goto loop_rec__; /* yield */ } else { ASSERT(ERTS_PROC_IS_EXITING(c_p)); goto do_schedule; /* exit */ } } /* * If there are no more messages in queue * (and we are not yielding or exiting) * erts_proc_sig_receive_helper() * returns with message queue lock locked... */ c_p->flags &= ~F_DELAY_GC; $SET_I_REL($Dest); Goto(*I); /* Jump to a wait or wait_timeout instruction */ } } ASSERT(msgp == PEEK_MESSAGE(c_p)); ASSERT(msgp && ERTS_SIG_IS_MSG(msgp)); if (ERTS_UNLIKELY(ERTS_SIG_IS_EXTERNAL_MSG(msgp))) { FCALLS -= 10; /* FIXME: bump appropriate amount... */ SWAPOUT; /* erts_proc_sig_decode_dist() may write to heap... */ if (!erts_proc_sig_decode_dist(c_p, ERTS_PROC_LOCK_MAIN, msgp, 0)) { /* * A corrupt distribution message that we weren't able to decode; * remove it... */ /* No swapin should be needed */ ASSERT(HTOP == c_p->htop && E == c_p->stop); /* TODO: Add DTrace probe for this bad message situation? */ UNLINK_MESSAGE(c_p, msgp); msgp->next = NULL; erts_cleanup_messages(msgp); goto loop_rec__; } SWAPIN; } ASSERT(msgp == PEEK_MESSAGE(c_p)); ASSERT(ERTS_SIG_IS_INTERNAL_MSG(msgp)); r(0) = ERL_MESSAGE_TERM(msgp); } remove_message() { //| -no_prefetch /* * Remove a (matched) message from the message queue. */ ErtsMessage* msgp; PROCESS_MAIN_CHK_LOCKS(c_p); ERTS_CHK_MBUF_SZ(c_p); msgp = PEEK_MESSAGE(c_p); if (ERTS_PROC_GET_SAVED_CALLS_BUF(c_p)) { save_calls(c_p, &exp_receive); } if (ERL_MESSAGE_TOKEN(msgp) == NIL) { #ifdef USE_VM_PROBES if (DT_UTAG(c_p) != NIL) { if (DT_UTAG_FLAGS(c_p) & DT_UTAG_PERMANENT) { SEQ_TRACE_TOKEN(c_p) = am_have_dt_utag; } else { DT_UTAG(c_p) = NIL; SEQ_TRACE_TOKEN(c_p) = NIL; } } else { #endif SEQ_TRACE_TOKEN(c_p) = NIL; #ifdef USE_VM_PROBES } DT_UTAG_FLAGS(c_p) &= ~DT_UTAG_SPREADING; #endif } else if (ERL_MESSAGE_TOKEN(msgp) != am_undefined) { Eterm msg; SEQ_TRACE_TOKEN(c_p) = ERL_MESSAGE_TOKEN(msgp); #ifdef USE_VM_PROBES if (ERL_MESSAGE_TOKEN(msgp) == am_have_dt_utag) { if (DT_UTAG(c_p) == NIL) { DT_UTAG(c_p) = ERL_MESSAGE_DT_UTAG(msgp); } DT_UTAG_FLAGS(c_p) |= DT_UTAG_SPREADING; } else { #endif ASSERT(is_tuple(SEQ_TRACE_TOKEN(c_p))); ASSERT(SEQ_TRACE_TOKEN_ARITY(c_p) == 5); ASSERT(is_small(SEQ_TRACE_TOKEN_SERIAL(c_p))); ASSERT(is_small(SEQ_TRACE_TOKEN_LASTCNT(c_p))); ASSERT(is_small(SEQ_TRACE_TOKEN_FLAGS(c_p))); ASSERT(is_pid(SEQ_TRACE_TOKEN_SENDER(c_p))); c_p->seq_trace_lastcnt = unsigned_val(SEQ_TRACE_TOKEN_SERIAL(c_p)); if (c_p->seq_trace_clock < unsigned_val(SEQ_TRACE_TOKEN_SERIAL(c_p))) { c_p->seq_trace_clock = unsigned_val(SEQ_TRACE_TOKEN_SERIAL(c_p)); } msg = ERL_MESSAGE_TERM(msgp); seq_trace_output(SEQ_TRACE_TOKEN(c_p), msg, SEQ_TRACE_RECEIVE, c_p->common.id, c_p); #ifdef USE_VM_PROBES } #endif } #ifdef USE_VM_PROBES if (DTRACE_ENABLED(message_receive)) { Eterm token2 = NIL; DTRACE_CHARBUF(receiver_name, DTRACE_TERM_BUF_SIZE); Sint tok_label = 0; Sint tok_lastcnt = 0; Sint tok_serial = 0; Sint len = erts_proc_sig_privqs_len(c_p); dtrace_proc_str(c_p, receiver_name); token2 = SEQ_TRACE_TOKEN(c_p); if (have_seqtrace(token2)) { tok_label = SEQ_TRACE_T_DTRACE_LABEL(token2); tok_lastcnt = signed_val(SEQ_TRACE_T_LASTCNT(token2)); tok_serial = signed_val(SEQ_TRACE_T_SERIAL(token2)); } DTRACE6(message_receive, receiver_name, size_object(ERL_MESSAGE_TERM(msgp)), len, /* This is NOT message queue len, but its something... */ tok_label, tok_lastcnt, tok_serial); } #endif UNLINK_MESSAGE(c_p, msgp); JOIN_MESSAGE(c_p); CANCEL_TIMER(c_p); erts_save_message_in_proc(c_p, msgp); c_p->flags &= ~F_DELAY_GC; if (ERTS_IS_GC_DESIRED_INTERNAL(c_p, HTOP, E)) { /* * We want to GC soon but we leave a few * reductions giving the message some time * to turn into garbage. */ ERTS_VBUMP_LEAVE_REDS_INTERNAL(c_p, 5, FCALLS); } ERTS_DBG_CHK_REDS(c_p, FCALLS); ERTS_CHK_MBUF_SZ(c_p); ERTS_VERIFY_UNUSED_TEMP_ALLOC(c_p); PROCESS_MAIN_CHK_LOCKS(c_p); } loop_rec_end(Dest) { //| -no_next /* * Advance the save pointer to the next message (the current * message didn't match), then jump to the loop_rec instruction. */ ASSERT(c_p->flags & F_DELAY_GC); $SET_I_REL($Dest); SAVE_MESSAGE(c_p); FCALLS--; goto loop_rec__; } timeout_locked() { /* * A timeout has occurred. Reset the save pointer so that the next * receive statement will examine the first message first. */ erts_proc_unlock(c_p, ERTS_PROC_LOCKS_MSG_RECEIVE); $timeout(); } timeout() { if (IS_TRACED_FL(c_p, F_TRACE_RECEIVE)) { trace_receive(c_p, am_clock_service, am_timeout, NULL); } if (ERTS_PROC_GET_SAVED_CALLS_BUF(c_p)) { save_calls(c_p, &exp_timeout); } c_p->flags &= ~F_TIMO; JOIN_MESSAGE(c_p); } TIMEOUT_VALUE() { c_p->freason = EXC_TIMEOUT_VALUE; goto find_func_info; //| -no_next } i_wait_error_locked() { erts_proc_unlock(c_p, ERTS_PROC_LOCKS_MSG_RECEIVE); $TIMEOUT_VALUE(); } i_wait_error() { $TIMEOUT_VALUE(); } wait_timeout_unlocked_int := wait.lock.int.execute; wait_timeout_locked_int := wait.int.execute; wait_timeout_unlocked := wait.lock.src.execute; wait_timeout_locked := wait.src.execute; wait_unlocked := wait.lock.execute; wait_locked := wait.unlocked.execute; wait.lock() { erts_proc_lock(c_p, ERTS_PROC_LOCKS_MSG_RECEIVE); } wait.unlocked() { } wait.int(Int) { /* * If we have already set the timer, we must NOT set it again. Therefore, * we must test the F_INSLPQUEUE flag as well as the F_TIMO flag. */ if ((c_p->flags & (F_INSLPQUEUE | F_TIMO)) == 0) { BeamInstr** pi = (BeamInstr **) c_p->def_arg_reg; *pi = $NEXT_INSTRUCTION; erts_set_proc_timer_uword(c_p, $Int); } } wait.src(Src) { /* * If we have already set the timer, we must NOT set it again. Therefore, * we must test the F_INSLPQUEUE flag as well as the F_TIMO flag. */ if ((c_p->flags & (F_INSLPQUEUE | F_TIMO)) == 0) { Eterm timeout_value = $Src; if (timeout_value == make_small(0)) { erts_proc_unlock(c_p, ERTS_PROC_LOCKS_MSG_RECEIVE); $NEXT0(); } else if (timeout_value == am_infinity) { c_p->flags |= F_TIMO; } else { int tres = erts_set_proc_timer_term(c_p, timeout_value); if (tres == 0) { /* * The timer routiner will set c_p->i to the value in * c_p->def_arg_reg[0]. Note that it is safe to use this * location because there are no living x registers in * a receive statement. */ BeamInstr** pi = (BeamInstr**) c_p->def_arg_reg; *pi = $NEXT_INSTRUCTION; } else { /* Wrong time */ erts_proc_unlock(c_p, ERTS_PROC_LOCKS_MSG_RECEIVE); c_p->freason = EXC_TIMEOUT_VALUE; goto find_func_info; } } } } // // Prepare to wait indefinitely for a new message to arrive // (or the time set above if falling through from above). // When a new message arrives, control will be transferred // the loop_rec instruction (at label L1). In case of // of timeout, control will be transferred to the timeout // instruction following the wait_timeout instruction. // wait.execute(JumpTarget) { $SET_REL_I(c_p->i, $JumpTarget); /* L1 */ SWAPOUT; c_p->arity = 0; if (!ERTS_PTMR_IS_TIMED_OUT(c_p)) { erts_atomic32_read_band_relb(&c_p->state, ~ERTS_PSFLG_ACTIVE); } ASSERT(!ERTS_PROC_IS_EXITING(c_p)); erts_proc_unlock(c_p, ERTS_PROC_LOCKS_MSG_RECEIVE); c_p->current = NULL; goto do_schedule; //| -no_next }