/*
* %CopyrightBegin%
*
* Copyright Ericsson AB 1997-2012. All Rights Reserved.
*
* The contents of this file are subject to the Erlang Public License,
* Version 1.1, (the "License"); you may not use this file except in
* compliance with the License. You should have received a copy of the
* Erlang Public License along with this software. If not, it can be
* retrieved online at http://www.erlang.org/.
*
* Software distributed under the License is distributed on an "AS IS"
* basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
* the License for the specific language governing rights and limitations
* under the License.
*
* %CopyrightEnd%
*/
#ifndef __ERL_MESSAGE_H__
#define __ERL_MESSAGE_H__
struct proc_bin;
struct external_thing_;
/*
* This struct represents data that must be updated by structure copy,
* but is stored outside of any heap.
*/
struct erl_off_heap_header {
Eterm thing_word;
Uint size;
#if HALFWORD_HEAP
void* dummy_ptr_padding__;
#endif
struct erl_off_heap_header* next;
};
#define OH_OVERHEAD(oh, size) do { \
(oh)->overhead += size; \
} while(0)
typedef struct erl_off_heap {
struct erl_off_heap_header* first;
Uint64 overhead; /* Administrative overhead (used to force GC). */
} ErlOffHeap;
#define ERTS_INIT_OFF_HEAP(OHP) \
do { \
(OHP)->first = NULL; \
(OHP)->overhead = 0; \
} while (0)
#include "external.h"
#include "erl_process.h"
/*
* This struct represents a heap fragment, which is used when there
* isn't sufficient room in the process heap and we can't do a GC.
*/
typedef struct erl_heap_fragment ErlHeapFragment;
struct erl_heap_fragment {
ErlHeapFragment* next; /* Next heap fragment */
ErlOffHeap off_heap; /* Offset heap data. */
unsigned alloc_size; /* Size in (half)words of mem */
unsigned used_size; /* With terms to be moved to heap by GC */
Eterm mem[1]; /* Data */
};
typedef struct erl_mesg {
struct erl_mesg* next; /* Next message */
union {
ErtsDistExternal *dist_ext;
ErlHeapFragment *heap_frag;
void *attached;
} data;
#ifdef USE_VM_PROBES
Eterm m[3]; /* m[0] = message, m[1] = seq trace token, m[3] = dynamic trace user tag */
#else
Eterm m[2]; /* m[0] = message, m[1] = seq trace token */
#endif
} ErlMessage;
#define ERL_MESSAGE_TERM(mp) ((mp)->m[0])
#define ERL_MESSAGE_TOKEN(mp) ((mp)->m[1])
#ifdef USE_VM_PROBES
#define ERL_MESSAGE_DT_UTAG(mp) ((mp)->m[2])
#endif
/* Size of default message buffer (erl_message.c) */
#define ERL_MESSAGE_BUF_SZ 500
typedef struct {
ErlMessage* first;
ErlMessage** last; /* point to the last next pointer */
ErlMessage** save;
Sint len; /* queue length */
/*
* The following two fields are used by the recv_mark/1 and
* recv_set/1 instructions.
*/
BeamInstr* mark; /* address to rec_loop/2 instruction */
ErlMessage** saved_last; /* saved last pointer */
} ErlMessageQueue;
#ifdef ERTS_SMP
typedef struct {
ErlMessage* first;
ErlMessage** last; /* point to the last next pointer */
Sint len; /* queue length */
} ErlMessageInQueue;
#endif
/* Get "current" message */
#define PEEK_MESSAGE(p) (*(p)->msg.save)
/* Add message last in private message queue */
#define LINK_MESSAGE_PRIVQ(p, mp) do { \
*(p)->msg.last = (mp); \
(p)->msg.last = &(mp)->next; \
(p)->msg.len++; \
} while(0)
#ifdef ERTS_SMP
/* Move in message queue to end of private message queue */
#define ERTS_SMP_MSGQ_MV_INQ2PRIVQ(P) \
do { \
if ((P)->msg_inq.first) { \
*(P)->msg.last = (P)->msg_inq.first; \
(P)->msg.last = (P)->msg_inq.last; \
(P)->msg.len += (P)->msg_inq.len; \
(P)->msg_inq.first = NULL; \
(P)->msg_inq.last = &(P)->msg_inq.first; \
(P)->msg_inq.len = 0; \
} \
} while (0)
/* Add message last in message queue */
#define LINK_MESSAGE(p, mp) do { \
*(p)->msg_inq.last = (mp); \
(p)->msg_inq.last = &(mp)->next; \
(p)->msg_inq.len++; \
} while(0)
#else
#define ERTS_SMP_MSGQ_MV_INQ2PRIVQ(P)
/* Add message last in message queue */
#define LINK_MESSAGE(p, mp) LINK_MESSAGE_PRIVQ((p), (mp))
#endif
/* Unlink current message */
#define UNLINK_MESSAGE(p,msgp) do { \
ErlMessage* __mp = (msgp)->next; \
*(p)->msg.save = __mp; \
(p)->msg.len--; \
if (__mp == NULL) \
(p)->msg.last = (p)->msg.save; \
(p)->msg.mark = 0; \
} while(0)
/* Reset message save point (after receive match) */
#define JOIN_MESSAGE(p) \
(p)->msg.save = &(p)->msg.first
/* Save current message */
#define SAVE_MESSAGE(p) \
(p)->msg.save = &(*(p)->msg.save)->next
/*
* ErtsMoveMsgAttachmentIntoProc() moves data attached to a message
* onto the heap of a process. The attached data is the content of
* the the message either on the internal format or on the external
* format, and also possibly a seq trace token on the internal format.
* If the message content is on the external format, the decode might
* fail. If the decoding fails, ERL_MESSAGE_TERM(M) will contain
* THE_NON_VALUE. That is, ERL_MESSAGE_TERM(M) *has* to be checked
* afterwards and taken care of appropriately.
*
* ErtsMoveMsgAttachmentIntoProc() will shallow copy to heap if
* possible; otherwise, move to heap via garbage collection.
*
* ErtsMoveMsgAttachmentIntoProc() is used when receiveing messages
* in process_main() and in hipe_check_get_msg().
*/
#define ErtsMoveMsgAttachmentIntoProc(M, P, ST, HT, FC, SWPO, SWPI) \
do { \
if ((M)->data.attached) { \
Uint need__ = erts_msg_attached_data_size((M)); \
if ((ST) - (HT) >= need__) { \
Uint *htop__ = (HT); \
erts_move_msg_attached_data_to_heap(&htop__, &MSO((P)), (M));\
ASSERT(htop__ - (HT) <= need__); \
(HT) = htop__; \
} \
else { \
{ SWPO ; } \
(FC) -= erts_garbage_collect((P), 0, NULL, 0); \
{ SWPI ; } \
} \
ASSERT(!(M)->data.attached); \
} \
} while (0)
#define ERTS_SND_FLG_NO_SEQ_TRACE (((unsigned) 1) << 0)
#define ERTS_HEAP_FRAG_SIZE(DATA_WORDS) \
(sizeof(ErlHeapFragment) - sizeof(Eterm) + (DATA_WORDS)*sizeof(Eterm))
#define ERTS_INIT_HEAP_FRAG(HEAP_FRAG_P, DATA_WORDS) \
do { \
(HEAP_FRAG_P)->next = NULL; \
(HEAP_FRAG_P)->alloc_size = (DATA_WORDS); \
(HEAP_FRAG_P)->used_size = (DATA_WORDS); \
(HEAP_FRAG_P)->off_heap.first = NULL; \
(HEAP_FRAG_P)->off_heap.overhead = 0; \
} while (0)
void init_message(void);
void free_message(ErlMessage *);
ErlHeapFragment* new_message_buffer(Uint);
ErlHeapFragment* erts_resize_message_buffer(ErlHeapFragment *, Uint,
Eterm *, Uint);
void free_message_buffer(ErlHeapFragment *);
void erts_queue_dist_message(Process*, ErtsProcLocks*, ErtsDistExternal *, Eterm);
void erts_queue_message(Process*, ErtsProcLocks*, ErlHeapFragment*, Eterm, Eterm
#ifdef USE_VM_PROBES
, Eterm dt_utag
#endif
);
void erts_deliver_exit_message(Eterm, Process*, ErtsProcLocks *, Eterm, Eterm);
Sint erts_send_message(Process*, Process*, ErtsProcLocks*, Eterm, unsigned);
void erts_link_mbuf_to_proc(Process *proc, ErlHeapFragment *bp);
void erts_move_msg_mbuf_to_heap(Eterm**, ErlOffHeap*, ErlMessage *);
Uint erts_msg_attached_data_size_aux(ErlMessage *msg);
void erts_move_msg_attached_data_to_heap(Eterm **, ErlOffHeap *, ErlMessage *);
Eterm erts_msg_distext2heap(Process *, ErtsProcLocks *, ErlHeapFragment **,
Eterm *, ErtsDistExternal *);
void erts_cleanup_offheap(ErlOffHeap *offheap);
ERTS_GLB_INLINE Uint erts_msg_used_frag_sz(const ErlMessage *msg);
ERTS_GLB_INLINE Uint erts_msg_attached_data_size(ErlMessage *msg);
#if ERTS_GLB_INLINE_INCL_FUNC_DEF
ERTS_GLB_INLINE Uint erts_msg_used_frag_sz(const ErlMessage *msg)
{
const ErlHeapFragment *bp;
Uint sz = 0;
for (bp = msg->data.heap_frag; bp!=NULL; bp=bp->next) {
sz += bp->used_size;
}
return sz;
}
ERTS_GLB_INLINE Uint erts_msg_attached_data_size(ErlMessage *msg)
{
ASSERT(msg->data.attached);
if (is_value(ERL_MESSAGE_TERM(msg)))
return erts_msg_used_frag_sz(msg);
else if (msg->data.dist_ext->heap_size < 0)
return erts_msg_attached_data_size_aux(msg);
else {
Uint sz = msg->data.dist_ext->heap_size;
if (is_not_nil(ERL_MESSAGE_TOKEN(msg))) {
ErlHeapFragment *heap_frag;
heap_frag = erts_dist_ext_trailer(msg->data.dist_ext);
sz += heap_frag->used_size;
}
return sz;
}
}
#endif
#endif