diff options
Diffstat (limited to 'erts/emulator/beam/erl_thr_queue.h')
-rw-r--r-- | erts/emulator/beam/erl_thr_queue.h | 209 |
1 files changed, 209 insertions, 0 deletions
diff --git a/erts/emulator/beam/erl_thr_queue.h b/erts/emulator/beam/erl_thr_queue.h new file mode 100644 index 0000000000..edcf2c3823 --- /dev/null +++ b/erts/emulator/beam/erl_thr_queue.h @@ -0,0 +1,209 @@ +/* + * %CopyrightBegin% + * + * Copyright Ericsson AB 2011. All Rights Reserved. + * + * The contents of this file are subject to the Erlang Public License, + * Version 1.1, (the "License"); you may not use this file except in + * compliance with the License. You should have received a copy of the + * Erlang Public License along with this software. If not, it can be + * retrieved online at http://www.erlang.org/. + * + * Software distributed under the License is distributed on an "AS IS" + * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See + * the License for the specific language governing rights and limitations + * under the License. + * + * %CopyrightEnd% + */ + +/* + * Description: Lock-free queue for communication between threads. + * + * Currently only a many-to-one version has been, + * implemented, i.e., many threads can enqueue but + * only one thread can dequeue at a time. It doesn't + * have to be the same thread dequeuing every time, but + * synchronization so that only one thread dequeues + * at a time has to be provided by other means. + * + * When/If the need for a many-to-many queue arises, + * this implementation can relatively easy be extended + * to support that too. + * + * Usage instructions can be found in erts_thr_queue.c + * + * Author: Rickard Green + */ + +#ifndef ERL_THR_QUEUE_H__ +#define ERL_THR_QUEUE_H__ + +#include "sys.h" +#include "erl_threads.h" +#include "erl_alloc.h" +#include "erl_thr_progress.h" + +typedef enum { + ERTS_THR_Q_LIVE_UNDEF, + ERTS_THR_Q_LIVE_SHORT, + ERTS_THR_Q_LIVE_LONG +} ErtsThrQLive_t; + +#define ERTS_THR_Q_INIT_DEFAULT \ +{ \ + { \ + ERTS_THR_Q_LIVE_UNDEF, \ + ERTS_THR_Q_LIVE_SHORT \ + }, \ + NULL, \ + NULL, \ + 1 \ +} + +typedef struct ErtsThrQ_t_ ErtsThrQ_t; + +typedef struct { + struct { + ErtsThrQLive_t queue; + ErtsThrQLive_t objects; + } live; + void *arg; + void (*notify)(void *); + int auto_finalize_dequeue; +} ErtsThrQInit_t; + +typedef struct ErtsThrQElement_t_ ErtsThrQElement_t; +typedef struct ErtsThrQElement_t ErtsThrQPrepEnQ_t; + +typedef union { + erts_atomic_t atmc; + ErtsThrQElement_t *ptr; +} ErtsThrQPtr_t; + +struct ErtsThrQElement_t_ { + ErtsThrQPtr_t next; + union { + erts_atomic_t atmc; + void *ptr; + } data; +}; + +typedef struct { + ErtsThrQElement_t *start; + ErtsThrQElement_t *end; +} ErtsThrQFinDeQ_t; + +typedef enum { + ERTS_THR_Q_CLEAN, + ERTS_THR_Q_NEED_THR_PRGR, + ERTS_THR_Q_DIRTY, +} ErtsThrQCleanState_t; + +#ifdef USE_THREADS + +typedef struct { + ErtsThrQElement_t marker; + erts_atomic_t last; + erts_atomic_t um_refc[2]; + erts_atomic32_t um_refc_ix; + ErtsThrQLive_t live; +#ifdef ERTS_SMP + erts_atomic32_t thr_prgr_clean_scheduled; +#endif + void *arg; + void (*notify)(void *); +} ErtsThrQTail_t; + +struct ErtsThrQ_t_ { + /* + * This structure needs to be cache line aligned for best + * performance. + */ + union { + /* Modified by threads enqueuing */ + ErtsThrQTail_t data; + char align__[ERTS_ALC_CACHE_LINE_ALIGN_SIZE(sizeof(ErtsThrQTail_t))]; + } tail; + /* + * Everything below this point is *only* accessed by the + * thread dequeuing. + */ + struct { + ErtsThrQPtr_t head; + ErtsThrQLive_t live; + ErtsThrQElement_t *first; + ErtsThrQElement_t *unref_end; + int clean_reached_head_count; + struct { + int automatic; + ErtsThrQElement_t *start; + ErtsThrQElement_t *end; + } deq_fini; + struct { +#ifdef ERTS_SMP + ErtsThrPrgrVal thr_progress; + int thr_progress_reached; +#endif + int um_refc_ix; + ErtsThrQElement_t *unref_end; + } next; + int used_marker; + void *arg; + void (*notify)(void *); + } head; + struct { + int finalizing; + ErtsThrQLive_t live; + void *blk; + } q; +}; + +#else /* !USE_THREADS */ + +struct ErtsThrQ_t_ { + ErtsThrQInit_t init; + ErtsThrQElement_t *first; + ErtsThrQElement_t *last; + struct { + void *blk; + } q; +}; + +#endif + +void erts_thr_q_init(void); +void erts_thr_q_initialize(ErtsThrQ_t *, ErtsThrQInit_t *); +ErtsThrQCleanState_t erts_thr_q_finalize(ErtsThrQ_t *); +ErtsThrQ_t *erts_thr_q_create(ErtsThrQInit_t *); +ErtsThrQCleanState_t erts_thr_q_destroy(ErtsThrQ_t *); +ErtsThrQCleanState_t erts_thr_q_clean(ErtsThrQ_t *); +ErtsThrQCleanState_t erts_thr_q_inspect(ErtsThrQ_t *, int); +ErtsThrQPrepEnQ_t *erts_thr_q_prepare_enqueue(ErtsThrQ_t *); +void erts_thr_q_enqueue_prepared(ErtsThrQ_t *, void *, ErtsThrQPrepEnQ_t *); +void erts_thr_q_enqueue(ErtsThrQ_t *, void *); +void * erts_thr_q_dequeue(ErtsThrQ_t *); +int erts_thr_q_get_finalize_dequeue_data(ErtsThrQ_t *, + ErtsThrQFinDeQ_t *); +void erts_thr_q_append_finalize_dequeue_data(ErtsThrQFinDeQ_t *, + ErtsThrQFinDeQ_t *); +int erts_thr_q_finalize_dequeue(ErtsThrQFinDeQ_t *); +void erts_thr_q_finalize_dequeue_state_init(ErtsThrQFinDeQ_t *); + +#ifdef ERTS_SMP +ERTS_GLB_INLINE ErtsThrPrgrVal erts_thr_q_need_thr_progress(ErtsThrQ_t *q); +#endif + +#if ERTS_GLB_INLINE_INCL_FUNC_DEF + +#ifdef ERTS_SMP +ERTS_GLB_INLINE ErtsThrPrgrVal +erts_thr_q_need_thr_progress(ErtsThrQ_t *q) +{ + return q->head.next.thr_progress; +} +#endif + +#endif /* ERTS_GLB_INLINE_INCL_FUNC_DEF */ + +#endif /* ERL_THR_QUEUE_H__ */ |