From 3ac08f9b668613a4292436979eacc61863c2ab94 Mon Sep 17 00:00:00 2001 From: Rickard Green Date: Wed, 16 Sep 2015 15:02:50 +0200 Subject: Fragmented young heap generation and off_heap_message_queue option * The youngest generation of the heap can now consist of multiple blocks. Heap fragments and message fragments are added to the youngest generation when needed without triggering a GC. After a GC the youngest generation is contained in one single block. * The off_heap_message_queue process flag has been added. When enabled all message data in the queue is kept off heap. When a message is selected from the queue, the message fragment (or heap fragment) containing the actual message is attached to the youngest generation. Messages stored off heap is not part of GC. --- erts/emulator/beam/erl_message.h | 277 +++++++++++++++++++++++++++------------ 1 file changed, 192 insertions(+), 85 deletions(-) (limited to 'erts/emulator/beam/erl_message.h') diff --git a/erts/emulator/beam/erl_message.h b/erts/emulator/beam/erl_message.h index f37b430d27..740ae46a0f 100644 --- a/erts/emulator/beam/erl_message.h +++ b/erts/emulator/beam/erl_message.h @@ -24,6 +24,8 @@ struct proc_bin; struct external_thing_; +typedef struct erl_mesg ErtsMessage; + /* * This struct represents data that must be updated by structure copy, * but is stored outside of any heap. @@ -54,6 +56,7 @@ typedef struct { enum { FACTORY_CLOSED = 0, FACTORY_HALLOC, + FACTORY_MESSAGE, FACTORY_HEAP_FRAGS, FACTORY_STATIC } mode; @@ -61,8 +64,10 @@ typedef struct { Eterm* hp_start; Eterm* hp; Eterm* hp_end; + ErtsMessage *message; struct erl_heap_fragment* heap_frags; struct erl_heap_fragment* heap_frags_saved; + Uint heap_frags_saved_used; ErlOffHeap* off_heap; ErlOffHeap off_heap_saved; Uint32 alloc_type; @@ -70,7 +75,10 @@ typedef struct { void erts_factory_proc_init(ErtsHeapFactory*, Process*); void erts_factory_proc_prealloc_init(ErtsHeapFactory*, Process*, Sint size); -void erts_factory_message_init(ErtsHeapFactory*, Process*, Eterm* hp, struct erl_heap_fragment*); +void erts_factory_heap_frag_init(ErtsHeapFactory*, struct erl_heap_fragment*); +ErtsMessage *erts_factory_message_create(ErtsHeapFactory *, Process *, + ErtsProcLocks *, Uint sz); +void erts_factory_selfcontained_message_init(ErtsHeapFactory*, ErtsMessage *, Eterm *); void erts_factory_static_init(ErtsHeapFactory*, Eterm* hp, Uint size, ErlOffHeap*); void erts_factory_dummy_init(ErtsHeapFactory*); @@ -91,6 +99,8 @@ void erts_factory_undo(ErtsHeapFactory*); #include "external.h" #include "erl_process.h" +#define ERTS_INVALID_HFRAG_PTR ((ErlHeapFragment *) ~((UWord) 7)) + /* * This struct represents a heap fragment, which is used when there * isn't sufficient room in the process heap and we can't do a GC. @@ -105,33 +115,46 @@ struct erl_heap_fragment { Eterm mem[1]; /* Data */ }; -typedef struct erl_mesg { - struct erl_mesg* next; /* Next message */ - union { - ErtsDistExternal *dist_ext; - ErlHeapFragment *heap_frag; - void *attached; - } data; -#ifdef USE_VM_PROBES - Eterm m[3]; /* m[0] = message, m[1] = seq trace token, m[3] = dynamic trace user tag */ -#else - Eterm m[2]; /* m[0] = message, m[1] = seq trace token */ -#endif -} ErlMessage; - +/* m[0] = message, m[1] = seq trace token */ +#define ERL_MESSAGE_REF_ARRAY_SZ 2 #define ERL_MESSAGE_TERM(mp) ((mp)->m[0]) #define ERL_MESSAGE_TOKEN(mp) ((mp)->m[1]) + #ifdef USE_VM_PROBES +/* m[2] = dynamic trace user tag */ +#undef ERL_MESSAGE_REF_ARRAY_SZ +#define ERL_MESSAGE_REF_ARRAY_SZ 3 #define ERL_MESSAGE_DT_UTAG(mp) ((mp)->m[2]) +#else #endif +#define ERL_MESSAGE_REF_FIELDS__ \ + ErtsMessage *next; /* Next message */ \ + union { \ + ErtsDistExternal *dist_ext; \ + ErlHeapFragment *heap_frag; \ + void *attached; \ + } data; \ + Eterm m[ERL_MESSAGE_REF_ARRAY_SZ] + + +typedef struct erl_msg_ref__ { + ERL_MESSAGE_REF_FIELDS__; +} ErtsMessageRef; + +struct erl_mesg { + ERL_MESSAGE_REF_FIELDS__; + + ErlHeapFragment hfrag; +}; + /* Size of default message buffer (erl_message.c) */ #define ERL_MESSAGE_BUF_SZ 500 typedef struct { - ErlMessage* first; - ErlMessage** last; /* point to the last next pointer */ - ErlMessage** save; + ErtsMessage* first; + ErtsMessage** last; /* point to the last next pointer */ + ErtsMessage** save; Sint len; /* queue length */ /* @@ -139,14 +162,14 @@ typedef struct { * recv_set/1 instructions. */ BeamInstr* mark; /* address to rec_loop/2 instruction */ - ErlMessage** saved_last; /* saved last pointer */ + ErtsMessage** saved_last; /* saved last pointer */ } ErlMessageQueue; #ifdef ERTS_SMP typedef struct { - ErlMessage* first; - ErlMessage** last; /* point to the last next pointer */ + ErtsMessage* first; + ErtsMessage** last; /* point to the last next pointer */ Sint len; /* queue length */ } ErlMessageInQueue; @@ -197,7 +220,7 @@ do { \ /* Unlink current message */ #define UNLINK_MESSAGE(p,msgp) do { \ - ErlMessage* __mp = (msgp)->next; \ + ErtsMessage* __mp = (msgp)->next; \ *(p)->msg.save = __mp; \ (p)->msg.len--; \ if (__mp == NULL) \ @@ -213,76 +236,33 @@ do { \ #define SAVE_MESSAGE(p) \ (p)->msg.save = &(*(p)->msg.save)->next -/* - * ErtsMoveMsgAttachmentIntoProc() moves data attached to a message - * onto the heap of a process. The attached data is the content of - * the the message either on the internal format or on the external - * format, and also possibly a seq trace token on the internal format. - * If the message content is on the external format, the decode might - * fail. If the decoding fails, ERL_MESSAGE_TERM(M) will contain - * THE_NON_VALUE. That is, ERL_MESSAGE_TERM(M) *has* to be checked - * afterwards and taken care of appropriately. - * - * ErtsMoveMsgAttachmentIntoProc() will shallow copy to heap if - * possible; otherwise, move to heap via garbage collection. - * - * ErtsMoveMsgAttachmentIntoProc() is used when receiveing messages - * in process_main() and in hipe_check_get_msg(). - */ - -#define ErtsMoveMsgAttachmentIntoProc(M, P, ST, HT, FC, SWPO, SWPI) \ -do { \ - if ((M)->data.attached) { \ - Uint need__ = erts_msg_attached_data_size((M)); \ - { SWPO ; } \ - if ((ST) - (HT) >= need__) { \ - ErtsHeapFactory factory__; \ - erts_factory_proc_prealloc_init(&factory__, (P), need__); \ - erts_move_msg_attached_data_to_heap(&factory__, (M)); \ - erts_factory_close(&factory__); \ - if ((P)->mbuf != NULL) { \ - /* Heap was exhausted by messages. This is a rare case */ \ - /* that can currently (OTP 18) only happen if hamts are */ \ - /* far exceeding the estimated heap size. Do GC. */ \ - (FC) -= erts_garbage_collect((P), 0, NULL, 0); \ - } \ - } \ - else { \ - (FC) -= erts_garbage_collect((P), 0, NULL, 0); \ - } \ - { SWPI ; } \ - ASSERT(!(M)->data.attached); \ - } \ -} while (0) - #define ERTS_SND_FLG_NO_SEQ_TRACE (((unsigned) 1) << 0) #define ERTS_HEAP_FRAG_SIZE(DATA_WORDS) \ (sizeof(ErlHeapFragment) - sizeof(Eterm) + (DATA_WORDS)*sizeof(Eterm)) -#define ERTS_INIT_HEAP_FRAG(HEAP_FRAG_P, DATA_WORDS) \ -do { \ - (HEAP_FRAG_P)->next = NULL; \ - (HEAP_FRAG_P)->alloc_size = (DATA_WORDS); \ - (HEAP_FRAG_P)->used_size = (DATA_WORDS); \ - (HEAP_FRAG_P)->off_heap.first = NULL; \ - (HEAP_FRAG_P)->off_heap.overhead = 0; \ -} while (0) +#define ERTS_INIT_HEAP_FRAG(HEAP_FRAG_P, USED_WORDS, DATA_WORDS) \ + do { \ + (HEAP_FRAG_P)->next = NULL; \ + (HEAP_FRAG_P)->alloc_size = (DATA_WORDS); \ + (HEAP_FRAG_P)->used_size = (USED_WORDS); \ + (HEAP_FRAG_P)->off_heap.first = NULL; \ + (HEAP_FRAG_P)->off_heap.overhead = 0; \ + } while (0) void init_message(void); -void free_message(ErlMessage *); ErlHeapFragment* new_message_buffer(Uint); ErlHeapFragment* erts_resize_message_buffer(ErlHeapFragment *, Uint, Eterm *, Uint); void free_message_buffer(ErlHeapFragment *); void erts_queue_dist_message(Process*, ErtsProcLocks*, ErtsDistExternal *, Eterm); #ifdef USE_VM_PROBES -void erts_queue_message_probe(Process*, ErtsProcLocks*, ErlHeapFragment*, +void erts_queue_message_probe(Process*, ErtsProcLocks*, ErtsMessage*, Eterm message, Eterm seq_trace_token, Eterm dt_utag); #define erts_queue_message(RP,RL,BP,Msg,SEQ) \ erts_queue_message_probe((RP),(RL),(BP),(Msg),(SEQ),NIL) #else -void erts_queue_message(Process*, ErtsProcLocks*, ErlHeapFragment*, +void erts_queue_message(Process*, ErtsProcLocks*, ErtsMessage*, Eterm message, Eterm seq_trace_token); #define erts_queue_message_probe(RP,RL,BP,Msg,SEQ,TAG) \ erts_queue_message((RP),(RL),(BP),(Msg),(SEQ)) @@ -291,20 +271,141 @@ void erts_deliver_exit_message(Eterm, Process*, ErtsProcLocks *, Eterm, Eterm); Sint erts_send_message(Process*, Process*, ErtsProcLocks*, Eterm, unsigned); void erts_link_mbuf_to_proc(Process *proc, ErlHeapFragment *bp); -void erts_move_msg_mbuf_to_heap(Eterm**, ErlOffHeap*, ErlMessage *); - -Uint erts_msg_attached_data_size_aux(ErlMessage *msg); -void erts_move_msg_attached_data_to_heap(ErtsHeapFactory*, ErlMessage *); -Eterm erts_msg_distext2heap(Process *, ErtsProcLocks *, ErlHeapFragment **, - Eterm *, ErtsDistExternal *); +Uint erts_msg_attached_data_size_aux(ErtsMessage *msg); void erts_cleanup_offheap(ErlOffHeap *offheap); +void erts_save_message_in_proc(Process *p, ErtsMessage *msg); +Sint erts_move_messages_off_heap(Process *c_p); +Sint erts_complete_off_heap_message_queue_change(Process *c_p); +Eterm erts_change_off_heap_message_queue_state(Process *c_p, int enable); + +int erts_decode_dist_message(Process *, ErtsProcLocks, ErtsMessage *, int); + +void erts_cleanup_messages(ErtsMessage *mp); + +typedef struct { + Uint size; + ErtsMessage *msgp; +} ErtsMessageInfo; + +Uint erts_prep_msgq_for_inspection(Process *c_p, + Process *rp, + ErtsProcLocks rp_locks, + ErtsMessageInfo *mip); +void *erts_alloc_message_ref(void); +void erts_free_message_ref(void *); +#define ERTS_SMALL_FIX_MSG_SZ 10 +#define ERTS_MEDIUM_FIX_MSG_SZ 20 +#define ERTS_LARGE_FIX_MSG_SZ 30 + +void *erts_alloc_small_message(void); +void erts_free_small_message(void *mp); + +typedef struct { + ErtsMessage m; + Eterm data[ERTS_SMALL_FIX_MSG_SZ-1]; +} ErtsSmallFixSzMessage; + +typedef struct { + ErtsMessage m; + Eterm data[ERTS_MEDIUM_FIX_MSG_SZ-1]; +} ErtsMediumFixSzMessage; + +typedef struct { + ErtsMessage m; + Eterm data[ERTS_LARGE_FIX_MSG_SZ-1]; +} ErtsLargeFixSzMessage; + +ErtsMessage *erts_try_alloc_message_on_heap(Process *pp, + erts_aint32_t *psp, + ErtsProcLocks *plp, + Uint sz, + Eterm **hpp, + ErlOffHeap **ohpp, + int *on_heap_p); +ErtsMessage *erts_realloc_shrink_message(ErtsMessage *mp, Uint sz, + Eterm *brefs, Uint brefs_size); + +ERTS_GLB_FORCE_INLINE ErtsMessage *erts_alloc_message(Uint sz, Eterm **hpp); +ERTS_GLB_FORCE_INLINE ErtsMessage *erts_shrink_message(ErtsMessage *mp, Uint sz, + Eterm *brefs, Uint brefs_size); +ERTS_GLB_FORCE_INLINE void erts_free_message(ErtsMessage *mp); ERTS_GLB_INLINE Uint erts_used_frag_sz(const ErlHeapFragment*); -ERTS_GLB_INLINE Uint erts_msg_attached_data_size(ErlMessage *msg); +ERTS_GLB_INLINE Uint erts_msg_attached_data_size(ErtsMessage *msg); + +#define ERTS_MSG_COMBINED_HFRAG ((void *) 0x1) #if ERTS_GLB_INLINE_INCL_FUNC_DEF + +ERTS_GLB_FORCE_INLINE ErtsMessage *erts_alloc_message(Uint sz, Eterm **hpp) +{ + ErtsMessage *mp; + + if (sz == 0) { + mp = erts_alloc_message_ref(); + mp->next = NULL; + ERL_MESSAGE_TERM(mp) = NIL; + mp->data.attached = NULL; + if (hpp) + *hpp = NULL; + return mp; + } + + mp = erts_alloc(ERTS_ALC_T_MSG, + sizeof(ErtsMessage) + (sz - 1)*sizeof(Eterm)); + + mp->next = NULL; + ERL_MESSAGE_TERM(mp) = NIL; + mp->data.attached = ERTS_MSG_COMBINED_HFRAG; + ERTS_INIT_HEAP_FRAG(&mp->hfrag, sz, sz); + + if (hpp) + *hpp = &mp->hfrag.mem[0]; + + return mp; +} + +ERTS_GLB_FORCE_INLINE ErtsMessage * +erts_shrink_message(ErtsMessage *mp, Uint sz, Eterm *brefs, Uint brefs_size) +{ + if (sz == 0) { + ErtsMessage *nmp; + if (!mp->data.attached) + return mp; + ASSERT(mp->data.attached == ERTS_MSG_COMBINED_HFRAG); + nmp = erts_alloc_message_ref(); +#ifdef DEBUG + if (brefs && brefs_size) { + int i; + for (i = 0; i < brefs_size; i++) + ASSERT(is_non_value(brefs[i]) || is_immed(brefs[i])); + } +#endif + erts_free(ERTS_ALC_T_MSG, mp); + return nmp; + } + + ASSERT(mp->data.attached == ERTS_MSG_COMBINED_HFRAG); + ASSERT(mp->hfrag.used_size >= sz); + + if (sz >= (mp->hfrag.alloc_size - mp->hfrag.alloc_size / 16)) { + mp->hfrag.used_size = sz; + return mp; + } + + return erts_realloc_shrink_message(mp, sz, brefs, brefs_size); +} + +ERTS_GLB_FORCE_INLINE void erts_free_message(ErtsMessage *mp) +{ + if (mp->data.attached != ERTS_MSG_COMBINED_HFRAG) + erts_free_message_ref(mp); + else + erts_free(ERTS_ALC_T_MSG, mp); +} + ERTS_GLB_INLINE Uint erts_used_frag_sz(const ErlHeapFragment* bp) { Uint sz = 0; @@ -314,11 +415,17 @@ ERTS_GLB_INLINE Uint erts_used_frag_sz(const ErlHeapFragment* bp) return sz; } -ERTS_GLB_INLINE Uint erts_msg_attached_data_size(ErlMessage *msg) +ERTS_GLB_INLINE Uint erts_msg_attached_data_size(ErtsMessage *msg) { ASSERT(msg->data.attached); - if (is_value(ERL_MESSAGE_TERM(msg))) - return erts_used_frag_sz(msg->data.heap_frag); + if (is_value(ERL_MESSAGE_TERM(msg))) { + ErlHeapFragment *bp; + if (msg->data.attached == ERTS_MSG_COMBINED_HFRAG) + bp = &msg->hfrag; + else + bp = msg->data.heap_frag; + return erts_used_frag_sz(bp); + } else if (msg->data.dist_ext->heap_size < 0) return erts_msg_attached_data_size_aux(msg); else { -- cgit v1.2.3