diff options
author | Rickard Green <[email protected]> | 2018-03-21 11:54:28 +0100 |
---|---|---|
committer | GitHub <[email protected]> | 2018-03-21 11:54:28 +0100 |
commit | cf3cbf0871832cb0808293842e5ae726edfc12e1 (patch) | |
tree | cd67d38d40c3414275ec257a6685b86c949c621a /erts/emulator/beam/erl_message.h | |
parent | 2c5711efcdd48ab8a9b7cd9ae27c97b9c1f8c37e (diff) | |
parent | 4bc282d812cc2c49aa3e2d073e96c720f16aa270 (diff) | |
download | otp-cf3cbf0871832cb0808293842e5ae726edfc12e1.tar.gz otp-cf3cbf0871832cb0808293842e5ae726edfc12e1.tar.bz2 otp-cf3cbf0871832cb0808293842e5ae726edfc12e1.zip |
Merge pull request #1740 from rickard-green/rickard/signals/OTP-14589
Implementation of true asynchronous signaling between processes
Diffstat (limited to 'erts/emulator/beam/erl_message.h')
-rw-r--r-- | erts/emulator/beam/erl_message.h | 259 |
1 files changed, 170 insertions, 89 deletions
diff --git a/erts/emulator/beam/erl_message.h b/erts/emulator/beam/erl_message.h index a14f4f51d8..f56a252aef 100644 --- a/erts/emulator/beam/erl_message.h +++ b/erts/emulator/beam/erl_message.h @@ -1,7 +1,7 @@ /* * %CopyrightBegin% * - * Copyright Ericsson AB 1997-2016. All Rights Reserved. + * Copyright Ericsson AB 1997-2018. All Rights Reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -21,6 +21,11 @@ #ifndef __ERL_MESSAGE_H__ #define __ERL_MESSAGE_H__ +#include "sys.h" +#define ERTS_PROC_SIG_QUEUE_TYPE_ONLY +#include "erl_proc_sig_queue.h" +#undef ERTS_PROC_SIG_QUEUE_TYPE_ONLY + struct proc_bin; struct external_thing_; @@ -118,15 +123,16 @@ struct erl_heap_fragment { }; /* m[0] = message, m[1] = seq trace token */ -#define ERL_MESSAGE_REF_ARRAY_SZ 2 +#define ERL_MESSAGE_REF_ARRAY_SZ 3 #define ERL_MESSAGE_TERM(mp) ((mp)->m[0]) #define ERL_MESSAGE_TOKEN(mp) ((mp)->m[1]) +#define ERL_MESSAGE_FROM(mp) ((mp)->m[2]) #ifdef USE_VM_PROBES /* m[2] = dynamic trace user tag */ #undef ERL_MESSAGE_REF_ARRAY_SZ -#define ERL_MESSAGE_REF_ARRAY_SZ 3 -#define ERL_MESSAGE_DT_UTAG(mp) ((mp)->m[2]) +#define ERL_MESSAGE_REF_ARRAY_SZ 4 +#define ERL_MESSAGE_DT_UTAG(mp) ((mp)->m[3]) #else #endif @@ -157,28 +163,79 @@ struct erl_mesg { ErlHeapFragment hfrag; }; +/* + * The ErtsMessage struct is only one special type + * of signal. All signal structs have a common + * begining and can be differentiated by looking + * at the ErtsSignal 'common.tag' field. + * + * - An ordinary message will have a value + * - A distribution message that has not been + * decoded yet will have the non-value. + * - Other signals will have an external pid + * header tag. In order to differentiate + * between those signals one needs to look + * at the arity part of the header (see + * erts_proc_sig_queue.h). + */ + +#define ERTS_SIG_IS_NON_MSG_TAG(Tag) \ + (is_external_pid_header((Tag))) + +#define ERTS_SIG_IS_NON_MSG(Sig) \ + ERTS_SIG_IS_NON_MSG_TAG(((ErtsSignal *) (Sig))->common.tag) + +#define ERTS_SIG_IS_INTERNAL_MSG_TAG(Tag) \ + (!is_header((Tag))) +#define ERTS_SIG_IS_INTERNAL_MSG(Sig) \ + ERTS_SIG_IS_INTERNAL_MSG_TAG(((ErtsSignal *) (Sig))->common.tag) + +#define ERTS_SIG_IS_EXTERNAL_MSG_TAG(Tag) \ + ((Tag) == THE_NON_VALUE) +#define ERTS_SIG_IS_EXTERNAL_MSG(Sig) \ + ERTS_SIG_IS_EXTERNAL_MSG_TAG(((ErtsSignal *) (Sig))->common.tag) + +#define ERTS_SIG_IS_MSG_TAG(Tag) \ + (!ERTS_SIG_IS_NON_MSG_TAG(Tag)) +#define ERTS_SIG_IS_MSG(Sig) \ + ERTS_SIG_IS_MSG_TAG(((ErtsSignal *) (Sig))->common.tag) + +typedef union { + ErtsSignalCommon common; + ErtsMessageRef msg; +} ErtsSignal; + +typedef struct { + /* pointers to next pointers pointing to... */ + ErtsMessage **next; /* ... next (non-message) signal */ + ErtsMessage **last; /* ... next (non-message) signal */ +} ErtsMsgQNMSigs; + /* Size of default message buffer (erl_message.c) */ #define ERL_MESSAGE_BUF_SZ 500 typedef struct { - ErtsMessage* first; - ErtsMessage** last; /* point to the last next pointer */ - ErtsMessage** save; - Sint len; /* queue length */ + /* inner queue */ + ErtsMessage *first; + ErtsMessage **last; /* point to the last next pointer */ + ErtsMessage **save; - /* - * The following field is used by the recv_mark/1 and - * recv_set/1 instructions. - */ - ErtsMessage** saved_last; /* saved last pointer */ -} ErlMessageQueue; + /* middle queue */ + ErtsMessage *cont; + ErtsMessage **cont_last; + ErtsMsgQNMSigs nmsigs; + /* Common for inner and middle queue */ + ErtsMessage **saved_last; /* saved last pointer */ + Sint len; /* message queue length (inner+middle) */ +} ErtsSignalPrivQueues; typedef struct { ErtsMessage* first; ErtsMessage** last; /* point to the last next pointer */ Sint len; /* queue length */ -} ErlMessageInQueue; + ErtsMsgQNMSigs nmsigs; +} ErtsSignalInQueue; typedef struct erl_trace_message_queue__ { struct erl_trace_message_queue__ *next; /* point to the next receiver */ @@ -188,9 +245,46 @@ typedef struct erl_trace_message_queue__ { Sint len; /* queue length */ } ErlTraceMessageQueue; +#define ERTS_RECV_MARK_SAVE(P) \ + do { \ + erts_proc_lock((P), ERTS_PROC_LOCK_MSGQ); \ + if ((P)->sig_inq.first) \ + erts_proc_sig_fetch((P)); \ + erts_proc_unlock((P), ERTS_PROC_LOCK_MSGQ); \ + if ((P)->sig_qs.cont) { \ + (P)->sig_qs.saved_last = (P)->sig_qs.cont_last; \ + (P)->flags |= F_DEFERRED_SAVED_LAST; \ + } \ + else { \ + (P)->sig_qs.saved_last = (P)->sig_qs.last; \ + (P)->flags &= ~F_DEFERRED_SAVED_LAST; \ + } \ + } while (0) + +#define ERTS_RECV_MARK_SET(P) \ + do { \ + if ((P)->sig_qs.saved_last) { \ + if ((P)->flags & F_DEFERRED_SAVED_LAST) { \ + /* Points to middle queue; use end of inner */ \ + (P)->sig_qs.save = (P)->sig_qs.last; \ + ASSERT(!PEEK_MESSAGE((P))); \ + } \ + else { \ + /* Points to inner queue; safe to use */ \ + (P)->sig_qs.save = (P)->sig_qs.saved_last; \ + } \ + } \ + } while (0) + +#define ERTS_RECV_MARK_CLEAR(P) \ + do { \ + (P)->sig_qs.saved_last = NULL; \ + (P)->flags &= ~F_DEFERRED_SAVED_LAST; \ + } while (0) + /* Get "current" message */ -#define PEEK_MESSAGE(p) (*(p)->msg.save) +#define PEEK_MESSAGE(p) (*(p)->sig_qs.save) #ifdef USE_VM_PROBES #define LINK_MESSAGE_DTAG(mp, dt) ERL_MESSAGE_DT_UTAG(mp) = dt @@ -198,58 +292,69 @@ typedef struct erl_trace_message_queue__ { #define LINK_MESSAGE_DTAG(mp, dt) #endif -#define LINK_MESSAGE_IMPL(p, first_msg, last_msg, num_msgs, where) do { \ - *(p)->where.last = (first_msg); \ - (p)->where.last = (last_msg); \ - (p)->where.len += (num_msgs); \ - } while(0) +#ifdef USE_VM_PROBES +# define ERTS_MSG_RECV_TRACED(P) \ + ((ERTS_TRACE_FLAGS((P)) & F_TRACE_RECEIVE) \ + || DTRACE_ENABLED(message_queued)) +#else +# define ERTS_MSG_RECV_TRACED(P) \ + (ERTS_TRACE_FLAGS((P)) & F_TRACE_RECEIVE) + +#endif /* Add message last in private message queue */ -#define LINK_MESSAGE_PRIVQ(p, first_msg, last_msg, len) \ +#define LINK_MESSAGE_PRIVQ(p, first_msg, last_msg, num_msgs) \ do { \ - LINK_MESSAGE_IMPL(p, first_msg, last_msg, len, msg); \ + ERTS_HDBG_CHECK_SIGNAL_PRIV_QUEUE__((p), "before"); \ + if ((p)->sig_qs.cont || ERTS_MSG_RECV_TRACED((p))) { \ + *(p)->sig_qs.cont_last = (first_msg); \ + (p)->sig_qs.cont_last = (last_msg); \ + } \ + else { \ + *(p)->sig_qs.last = (first_msg); \ + (p)->sig_qs.last = (last_msg); \ + } \ + (p)->sig_qs.len += (num_msgs); \ + ERTS_HDBG_CHECK_SIGNAL_PRIV_QUEUE__((p), "after"); \ } while (0) /* Add message last_msg in message queue */ -#define LINK_MESSAGE(p, first_msg, last_msg, len) \ - LINK_MESSAGE_IMPL(p, first_msg, last_msg, len, msg_inq) - -#define ERTS_MSGQ_MV_INQ2PRIVQ(p) \ - do { \ - if (p->msg_inq.first) { \ - *p->msg.last = p->msg_inq.first; \ - p->msg.last = p->msg_inq.last; \ - p->msg.len += p->msg_inq.len; \ - p->msg_inq.first = NULL; \ - p->msg_inq.last = &p->msg_inq.first; \ - p->msg_inq.len = 0; \ - } \ - } while (0) - +#define LINK_MESSAGE(p, first_msg, last_msg, num_msgs) \ + do { \ + ERTS_HDBG_CHECK_SIGNAL_IN_QUEUE__((p), "before"); \ + *(p)->sig_inq.last = (first_msg); \ + (p)->sig_inq.last = (last_msg); \ + (p)->sig_inq.len += (num_msgs); \ + ERTS_HDBG_CHECK_SIGNAL_IN_QUEUE__((p), "before"); \ + } while(0) /* Unlink current message */ -#define UNLINK_MESSAGE(p,msgp) do { \ - ErtsMessage* __mp = (msgp)->next; \ - *(p)->msg.save = __mp; \ - (p)->msg.len--; \ - if (__mp == NULL) \ - (p)->msg.last = (p)->msg.save; \ -} while(0) +#define UNLINK_MESSAGE(p,msgp) \ + do { \ + ErtsMessage *mp__ = (msgp)->next; \ + ERTS_HDBG_CHECK_SIGNAL_PRIV_QUEUE__((p), "before"); \ + *(p)->sig_qs.save = mp__; \ + (p)->sig_qs.len--; \ + if (mp__ == NULL) \ + (p)->sig_qs.last = (p)->sig_qs.save; \ + ERTS_HDBG_CHECK_SIGNAL_PRIV_QUEUE__((p), "after"); \ + } while(0) /* * Reset message save point (after receive match). * Also invalidate the saved position since it may no * longer be safe to use. */ -#define JOIN_MESSAGE(p) do { \ - (p)->msg.save = &(p)->msg.first; \ - (p)->msg.saved_last = 0; \ -} while(0) +#define JOIN_MESSAGE(p) \ + do { \ + (p)->sig_qs.save = &(p)->sig_qs.first; \ + ERTS_RECV_MARK_CLEAR((p)); \ + } while(0) /* Save current message */ #define SAVE_MESSAGE(p) \ - (p)->msg.save = &(*(p)->msg.save)->next + (p)->sig_qs.save = &(*(p)->sig_qs.save)->next #define ERTS_SND_FLG_NO_SEQ_TRACE (((unsigned) 1) << 0) @@ -276,6 +381,7 @@ typedef struct erl_trace_message_queue__ { (MP)->next = NULL; \ ERL_MESSAGE_TERM(MP) = THE_NON_VALUE; \ ERL_MESSAGE_TOKEN(MP) = NIL; \ + ERL_MESSAGE_FROM(MP) = NIL; \ ERL_MESSAGE_DT_UTAG_INIT(MP); \ MP->data.attached = NULL; \ } while (0) @@ -288,7 +394,7 @@ void free_message_buffer(ErlHeapFragment *); void erts_queue_dist_message(Process*, ErtsProcLocks, ErtsDistExternal *, Eterm, Eterm); Sint erts_queue_message(Process*, ErtsProcLocks,ErtsMessage*, Eterm, Eterm); Sint erts_queue_messages(Process*, ErtsProcLocks, - ErtsMessage*, ErtsMessage**, Uint, Eterm); + ErtsMessage*, ErtsMessage**, Uint); void erts_deliver_exit_message(Eterm, Process*, ErtsProcLocks *, Eterm, Eterm); Sint erts_send_message(Process*, Process*, ErtsProcLocks*, Eterm, unsigned); void erts_link_mbuf_to_proc(Process *proc, ErlHeapFragment *bp); @@ -305,16 +411,6 @@ int erts_decode_dist_message(Process *, ErtsProcLocks, ErtsMessage *, int); void erts_cleanup_messages(ErtsMessage *mp); -typedef struct { - Uint size; - ErtsMessage *msgp; -} ErtsMessageInfo; - -Uint erts_prep_msgq_for_inspection(Process *c_p, - Process *rp, - ErtsProcLocks rp_locks, - ErtsMessageInfo *mip); - void *erts_alloc_message_ref(void); void erts_free_message_ref(void *); @@ -356,12 +452,6 @@ ERTS_GLB_FORCE_INLINE ErtsMessage *erts_shrink_message(ErtsMessage *mp, Uint sz, ERTS_GLB_FORCE_INLINE void erts_free_message(ErtsMessage *mp); ERTS_GLB_INLINE Uint erts_used_frag_sz(const ErlHeapFragment*); ERTS_GLB_INLINE Uint erts_msg_attached_data_size(ErtsMessage *msg); -ERTS_GLB_INLINE void erts_msgq_update_internal_pointers(ErlMessageQueue *msgq, - ErtsMessage **newpp, - ErtsMessage **oldpp); -ERTS_GLB_INLINE void erts_msgq_replace_msg_ref(ErlMessageQueue *msgq, - ErtsMessage *newp, - ErtsMessage **oldpp); #define ERTS_MSG_COMBINED_HFRAG ((void *) 0x1) @@ -465,28 +555,6 @@ ERTS_GLB_INLINE Uint erts_msg_attached_data_size(ErtsMessage *msg) } } -ERTS_GLB_INLINE void -erts_msgq_update_internal_pointers(ErlMessageQueue *msgq, - ErtsMessage **newpp, - ErtsMessage **oldpp) -{ - if (msgq->save == oldpp) - msgq->save = newpp; - if (msgq->last == oldpp) - msgq->last = newpp; - if (msgq->saved_last == oldpp) - msgq->saved_last = newpp; -} - -ERTS_GLB_INLINE void -erts_msgq_replace_msg_ref(ErlMessageQueue *msgq, ErtsMessage *newp, ErtsMessage **oldpp) -{ - ErtsMessage *oldp = *oldpp; - newp->next = oldp->next; - erts_msgq_update_internal_pointers(msgq, &newp->next, &oldp->next); - *oldpp = newp; -} - #endif Uint erts_mbuf_size(Process *p); @@ -500,4 +568,17 @@ Uint erts_mbuf_size(Process *p); # define ERTS_CHK_MBUF_SZ(P) ((void) 1) #endif +#define ERTS_FOREACH_SIG_PRIVQS(PROC, MVAR, CODE) \ + do { \ + int i__; \ + ErtsMessage *msgs__[] = {(PROC)->sig_qs.first, \ + (PROC)->sig_qs.cont}; \ + for (i__ = 0; i__ < sizeof(msgs__)/sizeof(msgs__[0]); i__++) { \ + ErtsMessage *MVAR; \ + for (MVAR = msgs__[i__]; MVAR; MVAR = MVAR->next) { \ + CODE; \ + } \ + } \ + } while (0) + #endif |