diff options
Diffstat (limited to 'erts/emulator/beam/erl_message.h')
-rw-r--r-- | erts/emulator/beam/erl_message.h | 277 |
1 files changed, 192 insertions, 85 deletions
diff --git a/erts/emulator/beam/erl_message.h b/erts/emulator/beam/erl_message.h index f37b430d27..740ae46a0f 100644 --- a/erts/emulator/beam/erl_message.h +++ b/erts/emulator/beam/erl_message.h @@ -24,6 +24,8 @@ struct proc_bin; struct external_thing_; +typedef struct erl_mesg ErtsMessage; + /* * This struct represents data that must be updated by structure copy, * but is stored outside of any heap. @@ -54,6 +56,7 @@ typedef struct { enum { FACTORY_CLOSED = 0, FACTORY_HALLOC, + FACTORY_MESSAGE, FACTORY_HEAP_FRAGS, FACTORY_STATIC } mode; @@ -61,8 +64,10 @@ typedef struct { Eterm* hp_start; Eterm* hp; Eterm* hp_end; + ErtsMessage *message; struct erl_heap_fragment* heap_frags; struct erl_heap_fragment* heap_frags_saved; + Uint heap_frags_saved_used; ErlOffHeap* off_heap; ErlOffHeap off_heap_saved; Uint32 alloc_type; @@ -70,7 +75,10 @@ typedef struct { void erts_factory_proc_init(ErtsHeapFactory*, Process*); void erts_factory_proc_prealloc_init(ErtsHeapFactory*, Process*, Sint size); -void erts_factory_message_init(ErtsHeapFactory*, Process*, Eterm* hp, struct erl_heap_fragment*); +void erts_factory_heap_frag_init(ErtsHeapFactory*, struct erl_heap_fragment*); +ErtsMessage *erts_factory_message_create(ErtsHeapFactory *, Process *, + ErtsProcLocks *, Uint sz); +void erts_factory_selfcontained_message_init(ErtsHeapFactory*, ErtsMessage *, Eterm *); void erts_factory_static_init(ErtsHeapFactory*, Eterm* hp, Uint size, ErlOffHeap*); void erts_factory_dummy_init(ErtsHeapFactory*); @@ -91,6 +99,8 @@ void erts_factory_undo(ErtsHeapFactory*); #include "external.h" #include "erl_process.h" +#define ERTS_INVALID_HFRAG_PTR ((ErlHeapFragment *) ~((UWord) 7)) + /* * This struct represents a heap fragment, which is used when there * isn't sufficient room in the process heap and we can't do a GC. @@ -105,33 +115,46 @@ struct erl_heap_fragment { Eterm mem[1]; /* Data */ }; -typedef struct erl_mesg { - struct erl_mesg* next; /* Next message */ - union { - ErtsDistExternal *dist_ext; - ErlHeapFragment *heap_frag; - void *attached; - } data; -#ifdef USE_VM_PROBES - Eterm m[3]; /* m[0] = message, m[1] = seq trace token, m[3] = dynamic trace user tag */ -#else - Eterm m[2]; /* m[0] = message, m[1] = seq trace token */ -#endif -} ErlMessage; - +/* m[0] = message, m[1] = seq trace token */ +#define ERL_MESSAGE_REF_ARRAY_SZ 2 #define ERL_MESSAGE_TERM(mp) ((mp)->m[0]) #define ERL_MESSAGE_TOKEN(mp) ((mp)->m[1]) + #ifdef USE_VM_PROBES +/* m[2] = dynamic trace user tag */ +#undef ERL_MESSAGE_REF_ARRAY_SZ +#define ERL_MESSAGE_REF_ARRAY_SZ 3 #define ERL_MESSAGE_DT_UTAG(mp) ((mp)->m[2]) +#else #endif +#define ERL_MESSAGE_REF_FIELDS__ \ + ErtsMessage *next; /* Next message */ \ + union { \ + ErtsDistExternal *dist_ext; \ + ErlHeapFragment *heap_frag; \ + void *attached; \ + } data; \ + Eterm m[ERL_MESSAGE_REF_ARRAY_SZ] + + +typedef struct erl_msg_ref__ { + ERL_MESSAGE_REF_FIELDS__; +} ErtsMessageRef; + +struct erl_mesg { + ERL_MESSAGE_REF_FIELDS__; + + ErlHeapFragment hfrag; +}; + /* Size of default message buffer (erl_message.c) */ #define ERL_MESSAGE_BUF_SZ 500 typedef struct { - ErlMessage* first; - ErlMessage** last; /* point to the last next pointer */ - ErlMessage** save; + ErtsMessage* first; + ErtsMessage** last; /* point to the last next pointer */ + ErtsMessage** save; Sint len; /* queue length */ /* @@ -139,14 +162,14 @@ typedef struct { * recv_set/1 instructions. */ BeamInstr* mark; /* address to rec_loop/2 instruction */ - ErlMessage** saved_last; /* saved last pointer */ + ErtsMessage** saved_last; /* saved last pointer */ } ErlMessageQueue; #ifdef ERTS_SMP typedef struct { - ErlMessage* first; - ErlMessage** last; /* point to the last next pointer */ + ErtsMessage* first; + ErtsMessage** last; /* point to the last next pointer */ Sint len; /* queue length */ } ErlMessageInQueue; @@ -197,7 +220,7 @@ do { \ /* Unlink current message */ #define UNLINK_MESSAGE(p,msgp) do { \ - ErlMessage* __mp = (msgp)->next; \ + ErtsMessage* __mp = (msgp)->next; \ *(p)->msg.save = __mp; \ (p)->msg.len--; \ if (__mp == NULL) \ @@ -213,76 +236,33 @@ do { \ #define SAVE_MESSAGE(p) \ (p)->msg.save = &(*(p)->msg.save)->next -/* - * ErtsMoveMsgAttachmentIntoProc() moves data attached to a message - * onto the heap of a process. The attached data is the content of - * the the message either on the internal format or on the external - * format, and also possibly a seq trace token on the internal format. - * If the message content is on the external format, the decode might - * fail. If the decoding fails, ERL_MESSAGE_TERM(M) will contain - * THE_NON_VALUE. That is, ERL_MESSAGE_TERM(M) *has* to be checked - * afterwards and taken care of appropriately. - * - * ErtsMoveMsgAttachmentIntoProc() will shallow copy to heap if - * possible; otherwise, move to heap via garbage collection. - * - * ErtsMoveMsgAttachmentIntoProc() is used when receiveing messages - * in process_main() and in hipe_check_get_msg(). - */ - -#define ErtsMoveMsgAttachmentIntoProc(M, P, ST, HT, FC, SWPO, SWPI) \ -do { \ - if ((M)->data.attached) { \ - Uint need__ = erts_msg_attached_data_size((M)); \ - { SWPO ; } \ - if ((ST) - (HT) >= need__) { \ - ErtsHeapFactory factory__; \ - erts_factory_proc_prealloc_init(&factory__, (P), need__); \ - erts_move_msg_attached_data_to_heap(&factory__, (M)); \ - erts_factory_close(&factory__); \ - if ((P)->mbuf != NULL) { \ - /* Heap was exhausted by messages. This is a rare case */ \ - /* that can currently (OTP 18) only happen if hamts are */ \ - /* far exceeding the estimated heap size. Do GC. */ \ - (FC) -= erts_garbage_collect((P), 0, NULL, 0); \ - } \ - } \ - else { \ - (FC) -= erts_garbage_collect((P), 0, NULL, 0); \ - } \ - { SWPI ; } \ - ASSERT(!(M)->data.attached); \ - } \ -} while (0) - #define ERTS_SND_FLG_NO_SEQ_TRACE (((unsigned) 1) << 0) #define ERTS_HEAP_FRAG_SIZE(DATA_WORDS) \ (sizeof(ErlHeapFragment) - sizeof(Eterm) + (DATA_WORDS)*sizeof(Eterm)) -#define ERTS_INIT_HEAP_FRAG(HEAP_FRAG_P, DATA_WORDS) \ -do { \ - (HEAP_FRAG_P)->next = NULL; \ - (HEAP_FRAG_P)->alloc_size = (DATA_WORDS); \ - (HEAP_FRAG_P)->used_size = (DATA_WORDS); \ - (HEAP_FRAG_P)->off_heap.first = NULL; \ - (HEAP_FRAG_P)->off_heap.overhead = 0; \ -} while (0) +#define ERTS_INIT_HEAP_FRAG(HEAP_FRAG_P, USED_WORDS, DATA_WORDS) \ + do { \ + (HEAP_FRAG_P)->next = NULL; \ + (HEAP_FRAG_P)->alloc_size = (DATA_WORDS); \ + (HEAP_FRAG_P)->used_size = (USED_WORDS); \ + (HEAP_FRAG_P)->off_heap.first = NULL; \ + (HEAP_FRAG_P)->off_heap.overhead = 0; \ + } while (0) void init_message(void); -void free_message(ErlMessage *); ErlHeapFragment* new_message_buffer(Uint); ErlHeapFragment* erts_resize_message_buffer(ErlHeapFragment *, Uint, Eterm *, Uint); void free_message_buffer(ErlHeapFragment *); void erts_queue_dist_message(Process*, ErtsProcLocks*, ErtsDistExternal *, Eterm); #ifdef USE_VM_PROBES -void erts_queue_message_probe(Process*, ErtsProcLocks*, ErlHeapFragment*, +void erts_queue_message_probe(Process*, ErtsProcLocks*, ErtsMessage*, Eterm message, Eterm seq_trace_token, Eterm dt_utag); #define erts_queue_message(RP,RL,BP,Msg,SEQ) \ erts_queue_message_probe((RP),(RL),(BP),(Msg),(SEQ),NIL) #else -void erts_queue_message(Process*, ErtsProcLocks*, ErlHeapFragment*, +void erts_queue_message(Process*, ErtsProcLocks*, ErtsMessage*, Eterm message, Eterm seq_trace_token); #define erts_queue_message_probe(RP,RL,BP,Msg,SEQ,TAG) \ erts_queue_message((RP),(RL),(BP),(Msg),(SEQ)) @@ -291,20 +271,141 @@ void erts_deliver_exit_message(Eterm, Process*, ErtsProcLocks *, Eterm, Eterm); Sint erts_send_message(Process*, Process*, ErtsProcLocks*, Eterm, unsigned); void erts_link_mbuf_to_proc(Process *proc, ErlHeapFragment *bp); -void erts_move_msg_mbuf_to_heap(Eterm**, ErlOffHeap*, ErlMessage *); - -Uint erts_msg_attached_data_size_aux(ErlMessage *msg); -void erts_move_msg_attached_data_to_heap(ErtsHeapFactory*, ErlMessage *); -Eterm erts_msg_distext2heap(Process *, ErtsProcLocks *, ErlHeapFragment **, - Eterm *, ErtsDistExternal *); +Uint erts_msg_attached_data_size_aux(ErtsMessage *msg); void erts_cleanup_offheap(ErlOffHeap *offheap); +void erts_save_message_in_proc(Process *p, ErtsMessage *msg); +Sint erts_move_messages_off_heap(Process *c_p); +Sint erts_complete_off_heap_message_queue_change(Process *c_p); +Eterm erts_change_off_heap_message_queue_state(Process *c_p, int enable); + +int erts_decode_dist_message(Process *, ErtsProcLocks, ErtsMessage *, int); + +void erts_cleanup_messages(ErtsMessage *mp); + +typedef struct { + Uint size; + ErtsMessage *msgp; +} ErtsMessageInfo; + +Uint erts_prep_msgq_for_inspection(Process *c_p, + Process *rp, + ErtsProcLocks rp_locks, + ErtsMessageInfo *mip); +void *erts_alloc_message_ref(void); +void erts_free_message_ref(void *); +#define ERTS_SMALL_FIX_MSG_SZ 10 +#define ERTS_MEDIUM_FIX_MSG_SZ 20 +#define ERTS_LARGE_FIX_MSG_SZ 30 + +void *erts_alloc_small_message(void); +void erts_free_small_message(void *mp); + +typedef struct { + ErtsMessage m; + Eterm data[ERTS_SMALL_FIX_MSG_SZ-1]; +} ErtsSmallFixSzMessage; + +typedef struct { + ErtsMessage m; + Eterm data[ERTS_MEDIUM_FIX_MSG_SZ-1]; +} ErtsMediumFixSzMessage; + +typedef struct { + ErtsMessage m; + Eterm data[ERTS_LARGE_FIX_MSG_SZ-1]; +} ErtsLargeFixSzMessage; + +ErtsMessage *erts_try_alloc_message_on_heap(Process *pp, + erts_aint32_t *psp, + ErtsProcLocks *plp, + Uint sz, + Eterm **hpp, + ErlOffHeap **ohpp, + int *on_heap_p); +ErtsMessage *erts_realloc_shrink_message(ErtsMessage *mp, Uint sz, + Eterm *brefs, Uint brefs_size); + +ERTS_GLB_FORCE_INLINE ErtsMessage *erts_alloc_message(Uint sz, Eterm **hpp); +ERTS_GLB_FORCE_INLINE ErtsMessage *erts_shrink_message(ErtsMessage *mp, Uint sz, + Eterm *brefs, Uint brefs_size); +ERTS_GLB_FORCE_INLINE void erts_free_message(ErtsMessage *mp); ERTS_GLB_INLINE Uint erts_used_frag_sz(const ErlHeapFragment*); -ERTS_GLB_INLINE Uint erts_msg_attached_data_size(ErlMessage *msg); +ERTS_GLB_INLINE Uint erts_msg_attached_data_size(ErtsMessage *msg); + +#define ERTS_MSG_COMBINED_HFRAG ((void *) 0x1) #if ERTS_GLB_INLINE_INCL_FUNC_DEF + +ERTS_GLB_FORCE_INLINE ErtsMessage *erts_alloc_message(Uint sz, Eterm **hpp) +{ + ErtsMessage *mp; + + if (sz == 0) { + mp = erts_alloc_message_ref(); + mp->next = NULL; + ERL_MESSAGE_TERM(mp) = NIL; + mp->data.attached = NULL; + if (hpp) + *hpp = NULL; + return mp; + } + + mp = erts_alloc(ERTS_ALC_T_MSG, + sizeof(ErtsMessage) + (sz - 1)*sizeof(Eterm)); + + mp->next = NULL; + ERL_MESSAGE_TERM(mp) = NIL; + mp->data.attached = ERTS_MSG_COMBINED_HFRAG; + ERTS_INIT_HEAP_FRAG(&mp->hfrag, sz, sz); + + if (hpp) + *hpp = &mp->hfrag.mem[0]; + + return mp; +} + +ERTS_GLB_FORCE_INLINE ErtsMessage * +erts_shrink_message(ErtsMessage *mp, Uint sz, Eterm *brefs, Uint brefs_size) +{ + if (sz == 0) { + ErtsMessage *nmp; + if (!mp->data.attached) + return mp; + ASSERT(mp->data.attached == ERTS_MSG_COMBINED_HFRAG); + nmp = erts_alloc_message_ref(); +#ifdef DEBUG + if (brefs && brefs_size) { + int i; + for (i = 0; i < brefs_size; i++) + ASSERT(is_non_value(brefs[i]) || is_immed(brefs[i])); + } +#endif + erts_free(ERTS_ALC_T_MSG, mp); + return nmp; + } + + ASSERT(mp->data.attached == ERTS_MSG_COMBINED_HFRAG); + ASSERT(mp->hfrag.used_size >= sz); + + if (sz >= (mp->hfrag.alloc_size - mp->hfrag.alloc_size / 16)) { + mp->hfrag.used_size = sz; + return mp; + } + + return erts_realloc_shrink_message(mp, sz, brefs, brefs_size); +} + +ERTS_GLB_FORCE_INLINE void erts_free_message(ErtsMessage *mp) +{ + if (mp->data.attached != ERTS_MSG_COMBINED_HFRAG) + erts_free_message_ref(mp); + else + erts_free(ERTS_ALC_T_MSG, mp); +} + ERTS_GLB_INLINE Uint erts_used_frag_sz(const ErlHeapFragment* bp) { Uint sz = 0; @@ -314,11 +415,17 @@ ERTS_GLB_INLINE Uint erts_used_frag_sz(const ErlHeapFragment* bp) return sz; } -ERTS_GLB_INLINE Uint erts_msg_attached_data_size(ErlMessage *msg) +ERTS_GLB_INLINE Uint erts_msg_attached_data_size(ErtsMessage *msg) { ASSERT(msg->data.attached); - if (is_value(ERL_MESSAGE_TERM(msg))) - return erts_used_frag_sz(msg->data.heap_frag); + if (is_value(ERL_MESSAGE_TERM(msg))) { + ErlHeapFragment *bp; + if (msg->data.attached == ERTS_MSG_COMBINED_HFRAG) + bp = &msg->hfrag; + else + bp = msg->data.heap_frag; + return erts_used_frag_sz(bp); + } else if (msg->data.dist_ext->heap_size < 0) return erts_msg_attached_data_size_aux(msg); else { |