aboutsummaryrefslogtreecommitdiffstats
path: root/erts/emulator/beam/erl_proc_sig_queue.c
diff options
context:
space:
mode:
authorRickard Green <[email protected]>2018-03-21 16:33:35 +0100
committerRickard Green <[email protected]>2018-03-22 16:18:31 +0100
commitf78ed9b50effd3007aafef2979ae5921ffedf75b (patch)
treef2a7ac598f4217be8789f0890a3c41fc96ae6385 /erts/emulator/beam/erl_proc_sig_queue.c
parent2dc14df8ce5d56df3974aa57a61b3c1a0bfd3149 (diff)
downloadotp-f78ed9b50effd3007aafef2979ae5921ffedf75b.tar.gz
otp-f78ed9b50effd3007aafef2979ae5921ffedf75b.tar.bz2
otp-f78ed9b50effd3007aafef2979ae5921ffedf75b.zip
Fix signal order for is_process_alive
Diffstat (limited to 'erts/emulator/beam/erl_proc_sig_queue.c')
-rw-r--r--erts/emulator/beam/erl_proc_sig_queue.c180
1 files changed, 154 insertions, 26 deletions
diff --git a/erts/emulator/beam/erl_proc_sig_queue.c b/erts/emulator/beam/erl_proc_sig_queue.c
index 1b5cbb1919..489df08991 100644
--- a/erts/emulator/beam/erl_proc_sig_queue.c
+++ b/erts/emulator/beam/erl_proc_sig_queue.c
@@ -48,7 +48,7 @@
* Note that not all signal are handled using this functionality!
*/
-#define ERTS_SIG_Q_OP_MAX 9
+#define ERTS_SIG_Q_OP_MAX 10
#define ERTS_SIG_Q_OP_EXIT 0
#define ERTS_SIG_Q_OP_EXIT_LINKED 1
@@ -59,7 +59,8 @@
#define ERTS_SIG_Q_OP_UNLINK 6
#define ERTS_SIG_Q_OP_GROUP_LEADER 7
#define ERTS_SIG_Q_OP_TRACE_CHANGE_STATE 8
-#define ERTS_SIG_Q_OP_PERSISTENT_MON_MSG ERTS_SIG_Q_OP_MAX
+#define ERTS_SIG_Q_OP_PERSISTENT_MON_MSG 9
+#define ERTS_SIG_Q_OP_IS_ALIVE ERTS_SIG_Q_OP_MAX
#define ERTS_SIG_Q_TYPE_MAX (ERTS_MON_LNK_TYPE_MAX + 5)
@@ -184,6 +185,11 @@ typedef struct {
Eterm heap[1];
} ErtsSigGroupLeader;
+typedef struct {
+ Eterm message;
+ Eterm requester;
+} ErtsIsAliveRequest;
+
static int handle_msg_tracing(Process *c_p,
ErtsSigRecvTracing *tracing,
ErtsMessage ***next_nm_sig);
@@ -548,6 +554,43 @@ proc_queue_signal(Process *c_p, Eterm pid, ErtsSignal *sig, int op)
return res;
}
+static int
+maybe_elevate_sig_handling_prio(Process *c_p, Eterm other)
+{
+ /*
+ * returns:
+ * > 0 -> elevated prio; process alive or exiting
+ * < 0 -> no elevation needed; process alive or exiting
+ * 0 -> process terminated (free)
+ */
+ int res;
+ Process *rp;
+ erts_aint32_t state, my_prio, other_prio;
+
+ rp = erts_proc_lookup_raw(other);
+ if (!rp)
+ res = 0;
+ else {
+ res = -1;
+ state = erts_atomic32_read_nob(&c_p->state);
+ my_prio = ERTS_PSFLGS_GET_USR_PRIO(state);
+
+ state = erts_atomic32_read_nob(&rp->state);
+ other_prio = ERTS_PSFLGS_GET_USR_PRIO(state);
+
+ if (other_prio > my_prio) {
+ /* Others prio is lower than mine; elevate it... */
+ res = !!erts_sig_prio(other, my_prio);
+ if (res) {
+ /* ensure handled if dirty executing... */
+ state = erts_atomic32_read_nob(&rp->state);
+ ensure_dirty_proc_handled(other, state, my_prio);
+ }
+ }
+ }
+ return res;
+}
+
void
erts_proc_sig_fetch(Process *proc)
{
@@ -1215,33 +1258,10 @@ erts_proc_sig_send_group_leader(Process *c_p, Eterm to, Eterm gl, Eterm ref)
if (!res)
destroy_sig_group_leader(sgl);
else if (c_p) {
- int prio_res = !0;
erts_aint_t flags, rm_flags = ERTS_SIG_GL_FLG_SENDER;
- Process *rp;
- erts_aint32_t state, my_prio, other_prio;
-
- state = erts_atomic32_read_nob(&c_p->state);
- my_prio = ERTS_PSFLGS_GET_USR_PRIO(state);
-
- rp = erts_proc_lookup_raw(to);
- if (!rp)
- prio_res = 0;
- else {
- state = erts_atomic32_read_nob(&rp->state);
- other_prio = ERTS_PSFLGS_GET_USR_PRIO(state);
-
- if (other_prio > my_prio) {
- /* Others prio is lower than mine; elevate it... */
- prio_res = erts_sig_prio(to, my_prio);
- if (prio_res) {
- state = erts_atomic32_read_nob(&rp->state);
- ensure_dirty_proc_handled(to, state, my_prio);
- }
- }
- }
+ int prio_res = maybe_elevate_sig_handling_prio(c_p, to);
if (!prio_res)
rm_flags |= ERTS_SIG_GL_FLG_ACTIVE;
-
flags = erts_atomic_read_band_nob(&sgl->flags, ~rm_flags);
if (!prio_res && (flags & ERTS_SIG_GL_FLG_ACTIVE))
res = 0; /* We deactivated signal... */
@@ -1253,6 +1273,99 @@ erts_proc_sig_send_group_leader(Process *c_p, Eterm to, Eterm gl, Eterm ref)
group_leader_reply(c_p, c_p->common.id, ref, 0);
}
+void
+erts_proc_sig_send_is_alive_request(Process *c_p, Eterm to, Eterm ref)
+{
+ ErlHeapFragment *hfrag;
+ Uint hsz;
+ Eterm *hp, *start_hp, ref_cpy, msg;
+ ErlOffHeap *ohp;
+ ErtsMessage *mp;
+ ErtsIsAliveRequest *alive_req;
+
+ ASSERT(is_internal_ordinary_ref(ref));
+
+ hsz = ERTS_REF_THING_SIZE + 3 + sizeof(ErtsIsAliveRequest)/sizeof(Eterm);
+
+ mp = erts_alloc_message(hsz, &hp);
+ hfrag = &mp->hfrag;
+ mp->next = NULL;
+ ohp = &hfrag->off_heap;
+ start_hp = hp;
+
+ ref_cpy = STORE_NC(&hp, ohp, ref);
+ msg = TUPLE2(hp, ref_cpy, am_false); /* default res 'false' */
+ hp += 3;
+
+ hfrag->used_size = hp - start_hp;
+
+ alive_req = (ErtsIsAliveRequest *) (char *) hp;
+ alive_req->message = msg;
+ alive_req->requester = c_p->common.id;
+
+ ERL_MESSAGE_TERM(mp) = ERTS_PROC_SIG_MAKE_TAG(ERTS_SIG_Q_OP_IS_ALIVE,
+ ERTS_SIG_Q_TYPE_UNDEFINED,
+ 0);
+ ERL_MESSAGE_TOKEN(mp) = NIL;
+ ERL_MESSAGE_FROM(mp) = am_system;
+#ifdef USE_VM_PROBES
+ ERL_MESSAGE_DT_UTAG(mp) = NIL;
+#endif
+
+ if (proc_queue_signal(c_p, to, (ErtsSignal *) mp, ERTS_SIG_Q_OP_IS_ALIVE))
+ (void) maybe_elevate_sig_handling_prio(c_p, to);
+ else {
+ /* It wasn't alive; reply to ourselves... */
+ mp->next = NULL;
+ mp->data.attached = ERTS_MSG_COMBINED_HFRAG;
+ erts_queue_message(c_p, ERTS_PROC_LOCK_MAIN,
+ mp, msg, am_system);
+ }
+}
+
+static void
+is_alive_response(Process *c_p, ErtsMessage *mp, int is_alive)
+{
+ /*
+ * Sender prepared the message for us. Just patch
+ * the result if necessary. The default prepared
+ * result is 'false'.
+ */
+ Process *rp;
+ ErtsIsAliveRequest *alive_req;
+
+ alive_req = (ErtsIsAliveRequest *) (char *) (&mp->hfrag.mem[0]
+ + mp->hfrag.used_size);
+
+
+ ASSERT(ERTS_SIG_IS_NON_MSG(mp));
+ ASSERT(ERTS_PROC_SIG_OP(((ErtsSignal *) mp)->common.tag)
+ == ERTS_SIG_Q_OP_IS_ALIVE);
+ ASSERT(mp->hfrag.alloc_size > mp->hfrag.used_size);
+ ASSERT((mp->hfrag.alloc_size - mp->hfrag.used_size)*sizeof(UWord)
+ >= sizeof(ErtsIsAliveRequest));
+ ASSERT(is_internal_pid(alive_req->requester));
+ ASSERT(alive_req->requester != c_p->common.id);
+ ASSERT(is_tuple_arity(alive_req->message, 2));
+ ASSERT(is_internal_ordinary_ref(tuple_val(alive_req->message)[1]));
+ ASSERT(tuple_val(alive_req->message)[2] == am_false);
+
+ ERL_MESSAGE_TERM(mp) = alive_req->message;
+ mp->data.attached = ERTS_MSG_COMBINED_HFRAG;
+ mp->next = NULL;
+
+ rp = erts_proc_lookup(alive_req->requester);
+ if (!rp)
+ erts_cleanup_messages(mp);
+ else {
+ if (is_alive) { /* patch result... */
+ Eterm *tp = tuple_val(alive_req->message);
+ tp[2] = am_true;
+ }
+ erts_queue_message(rp, 0, mp, alive_req->message, am_system);
+ }
+}
+
static ERTS_INLINE void
adjust_tracing_state(Process *c_p, ErtsSigRecvTracing *tracing, int setup)
{
@@ -2355,6 +2468,13 @@ erts_proc_sig_handle_incoming(Process *c_p, erts_aint32_t *statep,
break;
}
+ case ERTS_SIG_Q_OP_IS_ALIVE:
+ ERTS_PROC_SIG_HDBG_PRIV_CHKQ(c_p, &tracing, next_nm_sig);
+ remove_nm_sig(c_p, sig, next_nm_sig);
+ is_alive_response(c_p, sig, !0);
+ ERTS_PROC_SIG_HDBG_PRIV_CHKQ(c_p, &tracing, next_nm_sig);
+ break;
+
case ERTS_SIG_Q_OP_TRACE_CHANGE_STATE: {
Uint16 type = ERTS_PROC_SIG_TYPE(tag);
@@ -2667,6 +2787,12 @@ erts_proc_sig_handle_exit(Process *c_p, int *redsp)
break;
}
+ case ERTS_SIG_Q_OP_IS_ALIVE:
+ ERTS_PROC_SIG_HDBG_PRIV_CHKQ(c_p, &tracing, next_nm_sig);
+ is_alive_response(c_p, sig, 0);
+ ERTS_PROC_SIG_HDBG_PRIV_CHKQ(c_p, &tracing, next_nm_sig);
+ break;
+
case ERTS_SIG_Q_OP_TRACE_CHANGE_STATE:
destroy_trace_info((ErtsSigTraceInfo *) sig);
break;
@@ -2803,6 +2929,7 @@ erts_proc_sig_signal_size(ErtsSignal *sig)
break;
case ERTS_SIG_Q_OP_PERSISTENT_MON_MSG:
+ case ERTS_SIG_Q_OP_IS_ALIVE:
size = ((ErtsMessage *) sig)->hfrag.alloc_size;
size *= sizeof(Eterm);
size += sizeof(ErtsMessage) - sizeof(Eterm);
@@ -3514,6 +3641,7 @@ erts_proc_sig_debug_foreach_sig(Process *c_p,
break;
}
+ case ERTS_SIG_Q_OP_IS_ALIVE:
case ERTS_SIG_Q_OP_TRACE_CHANGE_STATE:
break;