aboutsummaryrefslogblamecommitdiffstats
path: root/erts/emulator/beam/msg_instrs.tab
blob: 509143268b47d0814559949c2a837724c0415807 (plain) (tree)





























































































































































































































































































































































































                                                                                   
// -*- c -*-
//
// %CopyrightBegin%
//
// Copyright Ericsson AB 2017. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// %CopyrightEnd%
//

// /*
//  * Skeleton for receive statement:
//  *
//  *             recv_mark L1                     Optional
//  *             call make_ref/monitor            Optional
//  *             ...
//  *             recv_set L1                      Optional
//  *      L1:          <-------------------+
//  *                   <-----------+       |
//  *                               |       |
//  *             loop_rec L2 ------+---+   |
//  *             ...               |   |   |
//  *             remove_message    |   |   |
//  *             jump L3           |   |   |
//  *		...                 |   |   |
//  *		loop_rec_end L1   --+   |   |
//  *      L2:          <---------------+   |
//  *	   	wait L1  -------------------+      or wait_timeout
//  *		timeout
//  *
//  *	 L3:    Code after receive...
//  *
//  */

recv_mark(Dest) {
    /*
     * Save the current position in message buffer and the
     * the label for the loop_rec/2 instruction for the
     * the receive statement.
     */
    c_p->msg.mark = (BeamInstr *) $Dest;
    c_p->msg.saved_last = c_p->msg.last;
}

i_recv_set() {
    /*
     * If the mark is valid (points to the loop_rec/2
     * instruction that follows), we know that the saved
     * position points to the first message that could
     * possibly be matched out.
     *
     * If the mark is invalid, we do nothing, meaning that
     * we will look through all messages in the message queue.
     */
    if (c_p->msg.mark == (BeamInstr *) ($NEXT_INSTRUCTION)) {
        c_p->msg.save = c_p->msg.saved_last;
    }
    /* Fall through to the loop_rec/2 instruction */
}

i_loop_rec(Dest) {
    //| -no_prefetch

    /*
     * Pick up the next message and place it in x(0).
     * If no message, jump to a wait or wait_timeout instruction.
     */

    ErtsMessage* msgp;

    /*
     * We need to disable GC while matching messages
     * in the queue. This since messages with data outside
     * the heap will be corrupted by a GC.
     */
    ASSERT(!(c_p->flags & F_DELAY_GC));
    c_p->flags |= F_DELAY_GC;

loop_rec__:

    PROCESS_MAIN_CHK_LOCKS(c_p);

    msgp = PEEK_MESSAGE(c_p);

    if (!msgp) {
        erts_proc_lock(c_p, ERTS_PROC_LOCKS_MSG_RECEIVE);
        /* Make sure messages wont pass exit signals... */
        if (ERTS_PROC_PENDING_EXIT(c_p)) {
            erts_proc_unlock(c_p, ERTS_PROC_LOCKS_MSG_RECEIVE);
            SWAPOUT;
            c_p->flags &= ~F_DELAY_GC;
            c_p->arity = 0;
            goto do_schedule; /* Will be rescheduled for exit */
        }
        ERTS_MSGQ_MV_INQ2PRIVQ(c_p);
        msgp = PEEK_MESSAGE(c_p);
        if (msgp) {
            erts_proc_unlock(c_p, ERTS_PROC_LOCKS_MSG_RECEIVE);
        } else {
            c_p->flags &= ~F_DELAY_GC;
            SET_I((BeamInstr *) $Dest);
            Goto(*I);		/* Jump to a wait or wait_timeout instruction */
        }
    }
    if (is_non_value(ERL_MESSAGE_TERM(msgp))) {
        SWAPOUT; /* erts_decode_dist_message() may write to heap... */
        if (!erts_decode_dist_message(c_p, ERTS_PROC_LOCK_MAIN, msgp, 0)) {
            /*
             * A corrupt distribution message that we weren't able to decode;
             * remove it...
             */
            /* No swapin should be needed */
            ASSERT(HTOP == c_p->htop && E == c_p->stop);
            /* TODO: Add DTrace probe for this bad message situation? */
            UNLINK_MESSAGE(c_p, msgp);
            msgp->next = NULL;
            erts_cleanup_messages(msgp);
            goto loop_rec__;
        }
        SWAPIN;
    }
    r(0) = ERL_MESSAGE_TERM(msgp);
}

remove_message() {
    //| -no_prefetch

    /*
     * Remove a (matched) message from the message queue.
     */

    ErtsMessage* msgp;
    PROCESS_MAIN_CHK_LOCKS(c_p);

    ERTS_CHK_MBUF_SZ(c_p);

    msgp = PEEK_MESSAGE(c_p);

    if (ERTS_PROC_GET_SAVED_CALLS_BUF(c_p)) {
        save_calls(c_p, &exp_receive);
    }
    if (ERL_MESSAGE_TOKEN(msgp) == NIL) {
#ifdef USE_VM_PROBES
        if (DT_UTAG(c_p) != NIL) {
            if (DT_UTAG_FLAGS(c_p) & DT_UTAG_PERMANENT) {
                SEQ_TRACE_TOKEN(c_p) = am_have_dt_utag;
            } else {
                DT_UTAG(c_p) = NIL;
                SEQ_TRACE_TOKEN(c_p) = NIL;
            }
        } else {
#endif
            SEQ_TRACE_TOKEN(c_p) = NIL;
#ifdef USE_VM_PROBES
        }
        DT_UTAG_FLAGS(c_p) &= ~DT_UTAG_SPREADING;
#endif
    } else if (ERL_MESSAGE_TOKEN(msgp) != am_undefined) {
        Eterm msg;
        SEQ_TRACE_TOKEN(c_p) = ERL_MESSAGE_TOKEN(msgp);
#ifdef USE_VM_PROBES
        if (ERL_MESSAGE_TOKEN(msgp) == am_have_dt_utag) {
            if (DT_UTAG(c_p) == NIL) {
                DT_UTAG(c_p) = ERL_MESSAGE_DT_UTAG(msgp);
            }
            DT_UTAG_FLAGS(c_p) |= DT_UTAG_SPREADING;
        } else {
#endif
            ASSERT(is_tuple(SEQ_TRACE_TOKEN(c_p)));
            ASSERT(SEQ_TRACE_TOKEN_ARITY(c_p) == 5);
            ASSERT(is_small(SEQ_TRACE_TOKEN_SERIAL(c_p)));
            ASSERT(is_small(SEQ_TRACE_TOKEN_LASTCNT(c_p)));
            ASSERT(is_small(SEQ_TRACE_TOKEN_FLAGS(c_p)));
            ASSERT(is_pid(SEQ_TRACE_TOKEN_SENDER(c_p)));
            c_p->seq_trace_lastcnt = unsigned_val(SEQ_TRACE_TOKEN_SERIAL(c_p));
            if (c_p->seq_trace_clock < unsigned_val(SEQ_TRACE_TOKEN_SERIAL(c_p))) {
                c_p->seq_trace_clock = unsigned_val(SEQ_TRACE_TOKEN_SERIAL(c_p));
            }
            msg = ERL_MESSAGE_TERM(msgp);
            seq_trace_output(SEQ_TRACE_TOKEN(c_p), msg, SEQ_TRACE_RECEIVE, 
                             c_p->common.id, c_p);
#ifdef USE_VM_PROBES
        }
#endif
    }
#ifdef USE_VM_PROBES
    if (DTRACE_ENABLED(message_receive)) {
        Eterm token2 = NIL;
        DTRACE_CHARBUF(receiver_name, DTRACE_TERM_BUF_SIZE);
        Sint tok_label = 0;
        Sint tok_lastcnt = 0;
        Sint tok_serial = 0;

        dtrace_proc_str(c_p, receiver_name);
        token2 = SEQ_TRACE_TOKEN(c_p);
        if (have_seqtrace(token2)) {
            tok_label = signed_val(SEQ_TRACE_T_LABEL(token2));
            tok_lastcnt = signed_val(SEQ_TRACE_T_LASTCNT(token2));
            tok_serial = signed_val(SEQ_TRACE_T_SERIAL(token2));
        }
        DTRACE6(message_receive,
                receiver_name, size_object(ERL_MESSAGE_TERM(msgp)),
                c_p->msg.len - 1, tok_label, tok_lastcnt, tok_serial);
    }
#endif
    UNLINK_MESSAGE(c_p, msgp);
    JOIN_MESSAGE(c_p);
    CANCEL_TIMER(c_p);

    erts_save_message_in_proc(c_p, msgp);
    c_p->flags &= ~F_DELAY_GC;

    if (ERTS_IS_GC_DESIRED_INTERNAL(c_p, HTOP, E)) {
        /*
         * We want to GC soon but we leave a few
         * reductions giving the message some time
         * to turn into garbage.
         */
        ERTS_VBUMP_LEAVE_REDS_INTERNAL(c_p, 5, FCALLS);
    }

    ERTS_DBG_CHK_REDS(c_p, FCALLS);
    ERTS_CHK_MBUF_SZ(c_p);

    ERTS_VERIFY_UNUSED_TEMP_ALLOC(c_p);
    PROCESS_MAIN_CHK_LOCKS(c_p);
}

loop_rec_end(Dest) {
    //| -no_next
    /*
     * Advance the save pointer to the next message (the current
     * message didn't match), then jump to the loop_rec instruction.
     */

    ASSERT(c_p->flags & F_DELAY_GC);

    SET_I((BeamInstr *) $Dest);
    SAVE_MESSAGE(c_p);
    if (FCALLS > 0 || FCALLS > neg_o_reds) {
        FCALLS--;
        goto loop_rec__;
    }

    c_p->flags &= ~F_DELAY_GC;
    c_p->i = I;
    SWAPOUT;
    c_p->arity = 0;
    c_p->current = NULL;
    goto do_schedule;
}

timeout_locked() {
    /*
     * A timeout has occurred.  Reset the save pointer so that the next
     * receive statement will examine the first message first.
     */

    erts_proc_unlock(c_p, ERTS_PROC_LOCKS_MSG_RECEIVE);
    $timeout();
}

timeout() {
    if (IS_TRACED_FL(c_p, F_TRACE_RECEIVE)) {
        trace_receive(c_p, am_clock_service, am_timeout, NULL);
    }
    if (ERTS_PROC_GET_SAVED_CALLS_BUF(c_p)) {
        save_calls(c_p, &exp_timeout);
    }
    c_p->flags &= ~F_TIMO;
    JOIN_MESSAGE(c_p);
}

TIMEOUT_VALUE() {
    c_p->freason = EXC_TIMEOUT_VALUE;
    goto find_func_info;
}

i_wait_error_locked() {
    erts_proc_unlock(c_p, ERTS_PROC_LOCKS_MSG_RECEIVE);
    $TIMEOUT_VALUE();
}

i_wait_error() {
    $TIMEOUT_VALUE();
}

wait_timeout_unlocked_int := wait.lock.int.execute;
wait_timeout_locked_int := wait.int.execute;

wait_timeout_unlocked := wait.lock.src.execute;
wait_timeout_locked := wait.src.execute;

wait_unlocked := wait.lock.execute;
wait_locked := wait.unlocked.execute;

wait.lock() {
    erts_proc_lock(c_p, ERTS_PROC_LOCKS_MSG_RECEIVE);
}

wait.unlocked() {
}

wait.int(Int) {
    /*
     * If we have already set the timer, we must NOT set it again.  Therefore,
     * we must test the F_INSLPQUEUE flag as well as the F_TIMO flag.
     */
    if ((c_p->flags & (F_INSLPQUEUE | F_TIMO)) == 0) {
        BeamInstr** pi = (BeamInstr **) c_p->def_arg_reg;
        *pi = $NEXT_INSTRUCTION;
        erts_set_proc_timer_uword(c_p, $Int);
    }
}

wait.src(Src) {
    /*
     * If we have already set the timer, we must NOT set it again.  Therefore,
     * we must test the F_INSLPQUEUE flag as well as the F_TIMO flag.
     */
    if ((c_p->flags & (F_INSLPQUEUE | F_TIMO)) == 0) {
        Eterm timeout_value = $Src;
        if (timeout_value == make_small(0)) {
            erts_proc_unlock(c_p, ERTS_PROC_LOCKS_MSG_RECEIVE);
            $NEXT0();
        } else if (timeout_value == am_infinity) {
            c_p->flags |= F_TIMO;
        } else {
            int tres = erts_set_proc_timer_term(c_p, timeout_value);
            if (tres == 0) {
                /*
                 * The timer routiner will set c_p->i to the value in
                 * c_p->def_arg_reg[0].  Note that it is safe to use this
                 * location because there are no living x registers in
                 * a receive statement.
                 */
                BeamInstr** pi = (BeamInstr**) c_p->def_arg_reg;
                *pi = $NEXT_INSTRUCTION;
            } else { /* Wrong time */
                erts_proc_unlock(c_p, ERTS_PROC_LOCKS_MSG_RECEIVE);
                c_p->freason = EXC_TIMEOUT_VALUE;
                goto find_func_info;
            }
        }
    }
}

//
// Prepare to wait indefinitely for a new message to arrive
// (or the time set above if falling through from above).
// When a new message arrives, control will be transferred
// the loop_rec instruction (at label L1).  In case of
// of timeout, control will be transferred to the timeout
// instruction following the wait_timeout instruction.
//

wait.execute(JumpTarget) {
    c_p->i = (BeamInstr *) $JumpTarget; /* L1 */
    SWAPOUT;
    c_p->arity = 0;

    if (!ERTS_PTMR_IS_TIMED_OUT(c_p)) {
        erts_atomic32_read_band_relb(&c_p->state,
                                         ~ERTS_PSFLG_ACTIVE);
    }
    ASSERT(!ERTS_PROC_IS_EXITING(c_p));
    erts_proc_unlock(c_p, ERTS_PROC_LOCKS_MSG_RECEIVE);
    c_p->current = NULL;
    goto do_schedule;
    //| -no_next
}