aboutsummaryrefslogtreecommitdiffstats
path: root/erts/emulator/beam/erl_proc_sig_queue.c
diff options
context:
space:
mode:
authorRickard Green <[email protected]>2018-04-13 14:33:38 +0200
committerRickard Green <[email protected]>2018-05-16 13:28:46 +0200
commit78f639da759cd3f8b5f28bc86ec404f3c3db60f4 (patch)
tree1e449df42cf4bd3e840a10de105a4646d19136e9 /erts/emulator/beam/erl_proc_sig_queue.c
parent08873ec673ef223a4e0c9826b0370d80bb6aad24 (diff)
downloadotp-78f639da759cd3f8b5f28bc86ec404f3c3db60f4.tar.gz
otp-78f639da759cd3f8b5f28bc86ec404f3c3db60f4.tar.bz2
otp-78f639da759cd3f8b5f28bc86ec404f3c3db60f4.zip
New process suspend implementation based on async signaling
Diffstat (limited to 'erts/emulator/beam/erl_proc_sig_queue.c')
-rw-r--r--erts/emulator/beam/erl_proc_sig_queue.c469
1 files changed, 447 insertions, 22 deletions
diff --git a/erts/emulator/beam/erl_proc_sig_queue.c b/erts/emulator/beam/erl_proc_sig_queue.c
index 5165cd22a5..b4b087b082 100644
--- a/erts/emulator/beam/erl_proc_sig_queue.c
+++ b/erts/emulator/beam/erl_proc_sig_queue.c
@@ -39,6 +39,7 @@
#include "big.h"
#include "erl_gc.h"
#include "bif.h"
+#include "erl_bif_unique.h"
#include "erl_proc_sig_queue.h"
#include "dtrace-wrapper.h"
@@ -49,7 +50,7 @@
* Note that not all signal are handled using this functionality!
*/
-#define ERTS_SIG_Q_OP_MAX 11
+#define ERTS_SIG_Q_OP_MAX 13
#define ERTS_SIG_Q_OP_EXIT 0
#define ERTS_SIG_Q_OP_EXIT_LINKED 1
@@ -62,7 +63,9 @@
#define ERTS_SIG_Q_OP_TRACE_CHANGE_STATE 8
#define ERTS_SIG_Q_OP_PERSISTENT_MON_MSG 9
#define ERTS_SIG_Q_OP_IS_ALIVE 10
-#define ERTS_SIG_Q_OP_PROCESS_INFO ERTS_SIG_Q_OP_MAX
+#define ERTS_SIG_Q_OP_PROCESS_INFO 11
+#define ERTS_SIG_Q_OP_SYNC_SUSPEND 12
+#define ERTS_SIG_Q_OP_RPC ERTS_SIG_Q_OP_MAX
#define ERTS_SIG_Q_TYPE_MAX (ERTS_MON_LNK_TYPE_MAX + 5)
@@ -154,6 +157,17 @@ typedef struct {
} ErtsIsAliveRequest;
typedef struct {
+ Eterm message;
+ Eterm requester;
+ int async;
+} ErtsSyncSuspendRequest;
+
+typedef struct {
+ ErtsMonitorSuspend *mon;
+ ErtsMessage *sync;
+} ErtsProcSigPendingSuspend;
+
+typedef struct {
ErtsSignalCommon common;
Sint refc;
Sint delayed_len;
@@ -176,6 +190,15 @@ typedef struct {
#define ERTS_PROC_SIG_PI_MSGQ_LEN_IGNORE ((Sint) -1)
#define ERTS_PROC_SIG_PI_MSGQ_LEN_SYNC ((Sint) -2)
+typedef struct {
+ ErtsSignalCommon common;
+ Eterm requester;
+ Eterm (*func)(Process *, void *, int *, ErlHeapFragment **);
+ void *arg;
+ Eterm ref;
+ ErtsORefThing oref_thing;
+} ErtsProcSigRPC;
+
static int handle_msg_tracing(Process *c_p,
ErtsSigRecvTracing *tracing,
ErtsMessage ***next_nm_sig);
@@ -1311,6 +1334,8 @@ erts_proc_sig_send_monitor_down(ErtsMonitor *mon, Eterm reason)
/* Pass signal using old monitor structure... */
ErtsSignal *sig;
+ send_using_monitor_struct:
+
mon->other.item = reason; /* Pass immed reason via other.item... */
sig = (ErtsSignal *) mon;
sig->common.tag = ERTS_PROC_SIG_MAKE_TAG(ERTS_SIG_Q_OP_MONITOR_DOWN,
@@ -1322,6 +1347,18 @@ erts_proc_sig_send_monitor_down(ErtsMonitor *mon, Eterm reason)
ErtsMonitorData *mdp = erts_monitor_to_data(mon);
Eterm from_tag, monitored, heap[3];
+ if (mon->type == ERTS_MON_TYPE_SUSPEND) {
+ /*
+ * Set reason to 'undefined', since exit
+ * reason is not used for suspend monitors,
+ * and send using monitor structure. This
+ * since we don't want to trigger
+ * unnecessary memory allocation etc...
+ */
+ reason = am_undefined;
+ goto send_using_monitor_struct;
+ }
+
if (!(mon->flags & ERTS_ML_FLG_NAME)) {
from_tag = monitored = mdp->origin.other.item;
if (is_external_pid(from_tag)) {
@@ -1599,7 +1636,173 @@ erts_proc_sig_send_process_info_request(Process *c_p,
else
erts_free(ERTS_ALC_T_SIG_DATA, pis);
return res;
-}
+}
+
+void
+erts_proc_sig_send_sync_suspend(Process *c_p, Eterm to, Eterm tag, Eterm reply)
+{
+ ErlHeapFragment *hfrag;
+ Uint hsz, tag_sz;
+ Eterm *hp, *start_hp, tag_cpy, msg, default_reply;
+ ErlOffHeap *ohp;
+ ErtsMessage *mp;
+ ErtsSyncSuspendRequest *ssusp;
+ int async_suspend;
+
+ tag_sz = size_object(tag);
+
+ hsz = 3 + tag_sz + sizeof(ErtsSyncSuspendRequest)/sizeof(Eterm);
+
+ mp = erts_alloc_message(hsz, &hp);
+ hfrag = &mp->hfrag;
+ mp->next = NULL;
+ ohp = &hfrag->off_heap;
+ start_hp = hp;
+
+ tag_cpy = copy_struct(tag, tag_sz, &hp, ohp);
+
+ async_suspend = is_non_value(reply);
+ default_reply = async_suspend ? am_suspended : reply;
+
+ msg = TUPLE2(hp, tag_cpy, default_reply);
+ hp += 3;
+
+ hfrag->used_size = hp - start_hp;
+
+ ssusp = (ErtsSyncSuspendRequest *) (char *) hp;
+ ssusp->message = msg;
+ ssusp->requester = c_p->common.id;
+ ssusp->async = async_suspend;
+
+ ERL_MESSAGE_TERM(mp) = ERTS_PROC_SIG_MAKE_TAG(ERTS_SIG_Q_OP_SYNC_SUSPEND,
+ ERTS_SIG_Q_TYPE_UNDEFINED,
+ 0);
+ ERL_MESSAGE_TOKEN(mp) = NIL;
+ ERL_MESSAGE_FROM(mp) = am_system;
+#ifdef USE_VM_PROBES
+ ERL_MESSAGE_DT_UTAG(mp) = NIL;
+#endif
+
+ if (proc_queue_signal(c_p, to, (ErtsSignal *) mp, ERTS_SIG_Q_OP_SYNC_SUSPEND))
+ (void) maybe_elevate_sig_handling_prio(c_p, to);
+ else {
+ Eterm *tp;
+ /* It wasn't alive; reply to ourselves... */
+ mp->next = NULL;
+ mp->data.attached = ERTS_MSG_COMBINED_HFRAG;
+ tp = tuple_val(msg);
+ tp[2] = async_suspend ? am_badarg : am_exited;
+ erts_queue_message(c_p, ERTS_PROC_LOCK_MAIN,
+ mp, msg, am_system);
+ }
+}
+
+Eterm
+erts_proc_sig_send_rpc_request(Process *c_p,
+ Eterm to,
+ int reply,
+ Eterm (*func)(Process *, void *, int *, ErlHeapFragment **),
+ void *arg)
+{
+ Eterm res;
+ ErtsProcSigRPC *sig = erts_alloc(ERTS_ALC_T_SIG_DATA,
+ sizeof(ErtsProcSigRPC));
+ sig->common.tag = ERTS_PROC_SIG_MAKE_TAG(ERTS_SIG_Q_OP_RPC,
+ ERTS_SIG_Q_TYPE_UNDEFINED,
+ 0);
+ sig->requester = reply ? c_p->common.id : NIL;
+ sig->func = func;
+ sig->arg = arg;
+
+ if (!reply) {
+ res = am_ok;
+ sig->ref = am_ok;
+ }
+ else {
+ res = erts_make_ref(c_p);
+
+ sys_memcpy((void *) &sig->oref_thing,
+ (void *) internal_ref_val(res),
+ sizeof(ErtsORefThing));
+
+ sig->ref = make_internal_ref(&sig->oref_thing);
+
+ ERTS_RECV_MARK_SAVE(c_p);
+ ERTS_RECV_MARK_SET(c_p);
+ }
+
+ if (proc_queue_signal(c_p, to, (ErtsSignal *) sig, ERTS_SIG_Q_OP_RPC))
+ (void) maybe_elevate_sig_handling_prio(c_p, to);
+ else {
+ erts_free(ERTS_ALC_T_SIG_DATA, sig);
+ res = THE_NON_VALUE;
+ if (reply)
+ JOIN_MESSAGE(c_p);
+ }
+
+ return res;
+}
+
+static int
+handle_rpc(Process *c_p, ErtsProcSigRPC *rpc, int cnt, int limit, int *yieldp)
+{
+ Process *rp;
+ ErlHeapFragment *bp = NULL;
+ Eterm res;
+ Uint hsz;
+ int reds, out_cnt;
+
+ /*
+ * reds in:
+ * Reductions left.
+ *
+ * reds out:
+ * Absolute value of reds out equals consumed
+ * amount of reds. If a negative value, force
+ * a yield.
+ */
+
+ reds = (limit - cnt) / ERTS_SIG_REDS_CNT_FACTOR;
+ if (reds <= 0)
+ reds = 1;
+
+ res = (*rpc->func)(c_p, rpc->arg, &reds, &bp);
+
+ if (reds < 0) {
+ /* Force yield... */
+ *yieldp = !0;
+ reds *= -1;
+ }
+
+ out_cnt = reds*ERTS_SIG_REDS_CNT_FACTOR;
+
+ hsz = 3 + sizeof(ErtsORefThing)/sizeof(Eterm);
+
+ rp = erts_proc_lookup(rpc->requester);
+ if (!rp) {
+ if (bp)
+ free_message_buffer(bp);
+ }
+ else {
+ Eterm *hp, msg, ref;
+ ErtsMessage *mp = erts_alloc_message(hsz, &hp);
+
+ sys_memcpy((void *) hp, (void *) &rpc->oref_thing,
+ sizeof(rpc->oref_thing));
+
+ ref = make_internal_ref(hp);
+ hp += sizeof(rpc->oref_thing)/sizeof(Eterm);
+ msg = TUPLE2(hp, ref, res);
+
+ mp->hfrag.next = bp;
+
+ erts_queue_proc_message(c_p, rp, 0, mp, msg);
+ }
+
+ erts_free(ERTS_ALC_T_SIG_DATA, rpc);
+
+ return out_cnt;
+}
static void
is_alive_response(Process *c_p, ErtsMessage *mp, int is_alive)
@@ -2643,6 +2846,155 @@ handle_process_info(Process *c_p, ErtsSigRecvTracing *tracing,
return ((int) reds)*4 + 8;
}
+static void
+handle_suspend(Process *c_p, ErtsMonitor *mon, int *yieldp)
+{
+ erts_aint32_t state = erts_atomic32_read_nob(&c_p->state);
+
+ ASSERT(mon->type == ERTS_MON_TYPE_SUSPEND);
+
+ if (!(state & ERTS_PSFLG_DIRTY_RUNNING)) {
+ ErtsMonitorSuspend *msp;
+ erts_aint_t mstate;
+
+ msp = (ErtsMonitorSuspend *) erts_monitor_to_data(mon);
+ mstate = erts_atomic_read_bor_acqb(&msp->state,
+ ERTS_MSUSPEND_STATE_FLG_ACTIVE);
+ ASSERT(!(mstate & ERTS_MSUSPEND_STATE_FLG_ACTIVE)); (void) mstate;
+ erts_suspend(c_p, ERTS_PROC_LOCK_MAIN, NULL);
+ *yieldp = !0;
+ }
+ else {
+ /* Executing dirty; delay suspend... */
+ ErtsProcSigPendingSuspend *psusp;
+ ErtsMonitorSuspend *msp;
+
+ psusp = ERTS_PROC_GET_PENDING_SUSPEND(c_p);
+ if (!psusp) {
+ psusp = erts_alloc(ERTS_ALC_T_SIG_DATA,
+ sizeof(ErtsProcSigPendingSuspend));
+ psusp->mon = NULL;
+ psusp->sync = NULL;
+ ERTS_PROC_SET_PENDING_SUSPEND(c_p, (void *) psusp);
+ }
+
+ msp = (ErtsMonitorSuspend *) erts_monitor_to_data(mon);
+
+ msp->next = psusp->mon;
+ psusp->mon = msp;
+
+ erts_atomic32_inc_nob(&msp->md.refc);
+ }
+}
+
+static void
+sync_suspend_reply(Process *c_p, ErtsMessage *mp, erts_aint32_t state)
+{
+ /*
+ * Sender prepared the message for us. Just patch
+ * the result if necessary. The default prepared
+ * result is 'false'.
+ */
+ Process *rp;
+ ErtsSyncSuspendRequest *ssusp;
+
+ ssusp = (ErtsSyncSuspendRequest *) (char *) (&mp->hfrag.mem[0]
+ + mp->hfrag.used_size);
+
+ ASSERT(ERTS_SIG_IS_NON_MSG(mp));
+ ASSERT(ERTS_PROC_SIG_OP(((ErtsSignal *) mp)->common.tag)
+ == ERTS_SIG_Q_OP_SYNC_SUSPEND);
+ ASSERT(mp->hfrag.alloc_size > mp->hfrag.used_size);
+ ASSERT((mp->hfrag.alloc_size - mp->hfrag.used_size)*sizeof(UWord)
+ >= sizeof(ErtsSyncSuspendRequest));
+ ASSERT(is_internal_pid(ssusp->requester));
+ ASSERT(ssusp->requester != c_p->common.id);
+ ASSERT(is_tuple_arity(ssusp->message, 2));
+ ASSERT(is_immed(tuple_val(ssusp->message)[2]));
+
+ ERL_MESSAGE_TERM(mp) = ssusp->message;
+ mp->data.attached = ERTS_MSG_COMBINED_HFRAG;
+ mp->next = NULL;
+
+ rp = erts_proc_lookup(ssusp->requester);
+ if (!rp)
+ erts_cleanup_messages(mp);
+ else {
+ if ((state & (ERTS_PSFLG_EXITING
+ | ERTS_PSFLG_SUSPENDED)) != ERTS_PSFLG_SUSPENDED) {
+ /* Not suspended -> patch result... */
+ if (state & ERTS_PSFLG_EXITING) {
+ Eterm *tp = tuple_val(ssusp->message);
+ tp[2] = ssusp->async ? am_exited : am_badarg;
+ }
+ else {
+ Eterm *tp = tuple_val(ssusp->message);
+ ASSERT(!(state & ERTS_PSFLG_SUSPENDED));
+ tp[2] = ssusp->async ? am_not_suspended : am_internal_error;
+ }
+ }
+ erts_queue_proc_message(c_p, rp, 0, mp, ssusp->message);
+ }
+}
+
+static void
+handle_sync_suspend(Process *c_p, ErtsMessage *mp)
+{
+ ErtsProcSigPendingSuspend *psusp;
+
+ psusp = (ErtsProcSigPendingSuspend *) ERTS_PROC_GET_PENDING_SUSPEND(c_p);
+ if (!psusp)
+ sync_suspend_reply(c_p, mp, erts_atomic32_read_nob(&c_p->state));
+ else {
+ mp->next = psusp->sync;
+ psusp->sync = mp;
+ }
+}
+
+void
+erts_proc_sig_handle_pending_suspend(Process *c_p)
+{
+ ErtsMonitorSuspend *msp;
+ ErtsMessage *sync;
+ ErtsProcSigPendingSuspend *psusp;
+ erts_aint32_t state = erts_atomic32_read_nob(&c_p->state);
+
+ psusp = (ErtsProcSigPendingSuspend *) ERTS_PROC_GET_PENDING_SUSPEND(c_p);
+
+ msp = psusp->mon;
+
+ while (msp) {
+ ErtsMonitorSuspend *next_msp = msp->next;
+ msp->next = NULL;
+ if (!(state & ERTS_PSFLG_EXITING)
+ && erts_monitor_is_in_table(&msp->md.target)) {
+ erts_aint_t mstate;
+
+ mstate = erts_atomic_read_bor_acqb(&msp->state,
+ ERTS_MSUSPEND_STATE_FLG_ACTIVE);
+ ASSERT(!(mstate & ERTS_MSUSPEND_STATE_FLG_ACTIVE)); (void) mstate;
+ erts_suspend(c_p, ERTS_PROC_LOCK_MAIN, NULL);
+ }
+
+ erts_monitor_release(&msp->md.target);
+
+ msp = next_msp;
+ }
+
+ sync = psusp->sync;
+
+ while (sync) {
+ ErtsMessage *next_sync = sync->next;
+ sync->next = NULL;
+ sync_suspend_reply(c_p, sync, state);
+ sync = next_sync;
+ }
+
+ erts_free(ERTS_ALC_T_SIG_DATA, psusp);
+
+ ERTS_PROC_SET_PENDING_SUSPEND(c_p, NULL);
+}
+
/*
* Called in order to handle incoming signals.
*/
@@ -2653,7 +3005,7 @@ erts_proc_sig_handle_incoming(Process *c_p, erts_aint32_t *statep,
{
Eterm tag;
erts_aint32_t state;
- int cnt, limit, abs_lim, msg_tracing;
+ int yield, cnt, limit, abs_lim, msg_tracing;
ErtsMessage *sig, ***next_nm_sig;
ErtsSigRecvTracing tracing;
@@ -2673,6 +3025,7 @@ erts_proc_sig_handle_incoming(Process *c_p, erts_aint32_t *statep,
limit = *redsp;
*redsp = 0;
+ yield = 0;
if (!c_p->sig_qs.cont) {
if (state == -1)
@@ -2786,6 +3139,18 @@ erts_proc_sig_handle_incoming(Process *c_p, erts_aint32_t *statep,
cnt += handle_nodedown(c_p, sig, mdp, next_nm_sig);
}
break;
+ case ERTS_MON_TYPE_SUSPEND:
+ tmon = (ErtsMonitor *) sig;
+ ASSERT(erts_monitor_is_target(tmon));
+ ASSERT(!erts_monitor_is_in_table(tmon));
+ mdp = erts_monitor_to_data(tmon);
+ if (erts_monitor_is_in_table(&mdp->origin)) {
+ erts_monitor_tree_delete(&ERTS_P_MONITORS(c_p),
+ &mdp->origin);
+ omon = &mdp->origin;
+ }
+ remove_nm_sig(c_p, sig, next_nm_sig);
+ break;
default:
ERTS_INTERNAL_ERROR("invalid monitor type");
break;
@@ -2849,9 +3214,13 @@ erts_proc_sig_handle_incoming(Process *c_p, erts_aint32_t *statep,
if (mon->type == ERTS_MON_TYPE_DIST_PROC)
erts_monitor_tree_insert(&ERTS_P_MONITORS(c_p), mon);
- else
+ else {
erts_monitor_list_insert(&ERTS_P_LT_MONITORS(c_p), mon);
+ if (mon->type == ERTS_MON_TYPE_SUSPEND)
+ handle_suspend(c_p, mon, &yield);
+ }
ERTS_PROC_SIG_HDBG_PRIV_CHKQ(c_p, &tracing, next_nm_sig);
+ cnt += 2;
break;
}
@@ -2895,9 +3264,16 @@ erts_proc_sig_handle_incoming(Process *c_p, erts_aint32_t *statep,
erts_monitor_tree_delete(&ERTS_P_MONITORS(c_p), tmon);
else {
erts_monitor_list_delete(&ERTS_P_LT_MONITORS(c_p), tmon);
- if (type == ERTS_MON_TYPE_RESOURCE) {
+ switch (type) {
+ case ERTS_MON_TYPE_RESOURCE:
erts_nif_demonitored((ErtsResource *) tmon->other.ptr);
cnt++;
+ break;
+ case ERTS_MON_TYPE_SUSPEND:
+ erts_resume(c_p, ERTS_PROC_LOCK_MAIN);
+ break;
+ default:
+ break;
}
}
erts_monitor_release_both(mdp);
@@ -3012,6 +3388,21 @@ erts_proc_sig_handle_incoming(Process *c_p, erts_aint32_t *statep,
ERTS_PROC_SIG_HDBG_PRIV_CHKQ(c_p, &tracing, next_nm_sig);
break;
+ case ERTS_SIG_Q_OP_SYNC_SUSPEND:
+ ERTS_PROC_SIG_HDBG_PRIV_CHKQ(c_p, &tracing, next_nm_sig);
+ remove_nm_sig(c_p, sig, next_nm_sig);
+ handle_sync_suspend(c_p, sig);
+ ERTS_PROC_SIG_HDBG_PRIV_CHKQ(c_p, &tracing, next_nm_sig);
+ break;
+
+ case ERTS_SIG_Q_OP_RPC:
+ ERTS_PROC_SIG_HDBG_PRIV_CHKQ(c_p, &tracing, next_nm_sig);
+ remove_nm_sig(c_p, sig, next_nm_sig);
+ cnt += handle_rpc(c_p, (ErtsProcSigRPC *) sig, cnt,
+ limit, &yield);
+ ERTS_PROC_SIG_HDBG_PRIV_CHKQ(c_p, &tracing, next_nm_sig);
+ break;
+
case ERTS_SIG_Q_OP_TRACE_CHANGE_STATE: {
Uint16 type = ERTS_PROC_SIG_TYPE(tag);
@@ -3169,6 +3560,15 @@ stop: {
*redsp = cnt/4 + 1;
+ if (yield) {
+ int vreds = max_reds - *redsp;
+ if (vreds > 0) {
+ ErtsSchedulerData *esdp = erts_get_scheduler_data();
+ esdp->virtual_reds += vreds;
+ }
+ *redsp = max_reds;
+ }
+
return res;
}
}
@@ -3277,6 +3677,8 @@ erts_proc_sig_handle_exit(Process *c_p, int *redsp)
case ERTS_MON_TYPE_PROC:
case ERTS_MON_TYPE_DIST_PROC:
case ERTS_MON_TYPE_NODE:
+ case ERTS_MON_TYPE_NODES:
+ case ERTS_MON_TYPE_SUSPEND:
erts_monitor_release((ErtsMonitor *) sig);
break;
default:
@@ -3332,6 +3734,17 @@ erts_proc_sig_handle_exit(Process *c_p, int *redsp)
handle_process_info(c_p, NULL, sig, next_nm_sig, 0);
break;
+ case ERTS_SIG_Q_OP_SYNC_SUSPEND:
+ handle_sync_suspend(c_p, sig);
+ break;
+
+ case ERTS_SIG_Q_OP_RPC: {
+ int yield = 0;
+ handle_rpc(c_p, (ErtsProcSigRPC *) sig,
+ cnt, limit, &yield);
+ break;
+ }
+
case ERTS_SIG_Q_OP_TRACE_CHANGE_STATE:
destroy_trace_info((ErtsSigTraceInfo *) sig);
break;
@@ -3467,6 +3880,7 @@ erts_proc_sig_signal_size(ErtsSignal *sig)
}
break;
+ case ERTS_SIG_Q_OP_SYNC_SUSPEND:
case ERTS_SIG_Q_OP_PERSISTENT_MON_MSG:
case ERTS_SIG_Q_OP_IS_ALIVE:
size = ((ErtsMessage *) sig)->hfrag.alloc_size;
@@ -3522,6 +3936,10 @@ erts_proc_sig_signal_size(ErtsSignal *sig)
break;
}
+ case ERTS_SIG_Q_OP_RPC:
+ size = sizeof(ErtsProcSigRPC);
+ break;
+
default:
ERTS_INTERNAL_ERROR("Unknown signal");
break;
@@ -3598,17 +4016,13 @@ erts_proc_sig_receive_helper(Process *c_p,
*/
*get_outp = 0;
*msgpp = NULL;
+
return consumed_reds;
}
erts_proc_unlock(c_p, ERTS_PROC_LOCK_MSGQ);
- if (left_reds <= 0) {
- *get_outp = -1; /* yield */
- *msgpp = NULL;
-
- ASSERT(consumed_reds >= (fcalls - neg_o_reds));
- return consumed_reds;
- }
+ if (left_reds <= 0)
+ break; /* Yield */
/* handle newly arrived signals... */
}
@@ -3629,19 +4043,27 @@ erts_proc_sig_receive_helper(Process *c_p,
max_reds, !0);
consumed_reds += reds;
left_reds -= reds;
- /* we may have exited by an incoming signal... */
- if (state & ERTS_PSFLG_EXITING) {
+
+ /* we may have exited or suspended by an incoming signal... */
+
+ if (state & (ERTS_PSFLG_EXITING|ERTS_PSFLG_SUSPENDED)) {
+ if (state & ERTS_PSFLG_SUSPENDED)
+ break; /* Yield */
+
/*
* Process need to schedule out in order
* to terminate. Prepare this a bit...
*/
+ ASSERT(state & ERTS_PSFLG_EXITING);
ASSERT(c_p->flags & F_DELAY_GC);
c_p->flags &= ~F_DELAY_GC;
c_p->arity = 0;
c_p->current = NULL;
+
*get_outp = 1;
*msgpp = NULL;
+
return consumed_reds;
}
@@ -3652,17 +4074,20 @@ erts_proc_sig_receive_helper(Process *c_p,
return consumed_reds;
}
- if (left_reds <= 0) {
- *get_outp = -1; /* yield */
- *msgpp = NULL;
-
- ASSERT(consumed_reds >= (fcalls - neg_o_reds));
- return consumed_reds;
- }
+ if (left_reds <= 0)
+ break; /* yield */
ASSERT(!c_p->sig_qs.cont);
/* Go fetch again... */
}
+
+ /* Yield... */
+
+ *get_outp = -1;
+ *msgpp = NULL;
+
+ ASSERT(consumed_reds >= (fcalls - neg_o_reds));
+ return consumed_reds;
}
static int