From 4f6a7255afbd5b296139e073d66564976d80cc06 Mon Sep 17 00:00:00 2001 From: Rickard Green Date: Mon, 18 Feb 2013 00:28:12 +0100 Subject: Add atomic dirty read and dirty set operations --- erts/emulator/beam/erl_smp.h | 18 ++++++++++ erts/emulator/beam/erl_threads.h | 78 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 96 insertions(+) (limited to 'erts/emulator/beam') diff --git a/erts/emulator/beam/erl_smp.h b/erts/emulator/beam/erl_smp.h index a32e9d9d7c..d9ab6bb5e5 100644 --- a/erts/emulator/beam/erl_smp.h +++ b/erts/emulator/beam/erl_smp.h @@ -259,6 +259,9 @@ ERTS_GLB_INLINE void erts_smp_thr_sigwait(const sigset_t *set, int *sig); #define erts_smp_dw_atomic_read_wb erts_dw_atomic_read_wb #define erts_smp_dw_atomic_cmpxchg_wb erts_dw_atomic_cmpxchg_wb +#define erts_smp_dw_atomic_set_dirty erts_dw_atomic_set_dirty +#define erts_smp_dw_atomic_read_dirty erts_dw_atomic_read_dirty + /* Word size atomics */ #define erts_smp_atomic_init_nob erts_atomic_init_nob @@ -359,6 +362,9 @@ ERTS_GLB_INLINE void erts_smp_thr_sigwait(const sigset_t *set, int *sig); #define erts_smp_atomic_xchg_wb erts_atomic_xchg_wb #define erts_smp_atomic_cmpxchg_wb erts_atomic_cmpxchg_wb +#define erts_smp_atomic_set_dirty erts_atomic_set_dirty +#define erts_smp_atomic_read_dirty erts_atomic_read_dirty + /* 32-bit atomics */ #define erts_smp_atomic32_init_nob erts_atomic32_init_nob @@ -459,6 +465,9 @@ ERTS_GLB_INLINE void erts_smp_thr_sigwait(const sigset_t *set, int *sig); #define erts_smp_atomic32_xchg_wb erts_atomic32_xchg_wb #define erts_smp_atomic32_cmpxchg_wb erts_atomic32_cmpxchg_wb +#define erts_smp_atomic32_set_dirty erts_atomic32_set_dirty +#define erts_smp_atomic32_read_dirty erts_atomic32_read_dirty + #else /* !ERTS_SMP */ /* Double word size atomics */ @@ -498,6 +507,9 @@ ERTS_GLB_INLINE void erts_smp_thr_sigwait(const sigset_t *set, int *sig); #define erts_smp_dw_atomic_read_wb erts_no_dw_atomic_read #define erts_smp_dw_atomic_cmpxchg_wb erts_no_dw_atomic_cmpxchg +#define erts_smp_dw_atomic_set_dirty erts_no_dw_atomic_set +#define erts_smp_dw_atomic_read_dirty erts_no_dw_atomic_read + /* Word size atomics */ #define erts_smp_atomic_init_nob erts_no_atomic_set @@ -598,6 +610,9 @@ ERTS_GLB_INLINE void erts_smp_thr_sigwait(const sigset_t *set, int *sig); #define erts_smp_atomic_xchg_wb erts_no_atomic_xchg #define erts_smp_atomic_cmpxchg_wb erts_no_atomic_cmpxchg +#define erts_smp_atomic_set_dirty erts_no_atomic_set +#define erts_smp_atomic_read_dirty erts_no_atomic_read + /* 32-bit atomics */ #define erts_smp_atomic32_init_nob erts_no_atomic32_set @@ -698,6 +713,9 @@ ERTS_GLB_INLINE void erts_smp_thr_sigwait(const sigset_t *set, int *sig); #define erts_smp_atomic32_xchg_wb erts_no_atomic32_xchg #define erts_smp_atomic32_cmpxchg_wb erts_no_atomic32_cmpxchg +#define erts_smp_atomic32_set_dirty erts_no_atomic32_set +#define erts_smp_atomic32_read_dirty erts_no_atomic32_read + #endif /* !ERTS_SMP */ #if ERTS_GLB_INLINE_INCL_FUNC_DEF diff --git a/erts/emulator/beam/erl_threads.h b/erts/emulator/beam/erl_threads.h index ee47c98009..5e853bd8f6 100644 --- a/erts/emulator/beam/erl_threads.h +++ b/erts/emulator/beam/erl_threads.h @@ -601,6 +601,19 @@ ERTS_GLB_INLINE void erts_thr_sigwait(const sigset_t *set, int *sig); #ifdef USE_THREADS +ERTS_GLB_INLINE void +erts_dw_atomic_set_dirty(erts_dw_atomic_t *var, erts_dw_aint_t *val); +ERTS_GLB_INLINE void +erts_dw_atomic_read_dirty(erts_dw_atomic_t *var, erts_dw_aint_t *val); +ERTS_GLB_INLINE void +erts_atomic_set_dirty(erts_atomic_t *var, erts_aint_t val); +ERTS_GLB_INLINE erts_aint_t +erts_atomic_read_dirty(erts_atomic_t *var); +ERTS_GLB_INLINE void +erts_atomic32_set_dirty(erts_atomic32_t *var, erts_aint32_t val); +ERTS_GLB_INLINE erts_aint32_t +erts_atomic32_read_dirty(erts_atomic32_t *var); + /* * See "Documentation of atomics and memory barriers" at the top * of this file for info on atomics. @@ -643,6 +656,26 @@ ERTS_GLB_INLINE void erts_thr_sigwait(const sigset_t *set, int *sig); #define erts_dw_atomic_read_wb ethr_dw_atomic_read_wb #define erts_dw_atomic_cmpxchg_wb ethr_dw_atomic_cmpxchg_wb +#if ERTS_GLB_INLINE_INCL_FUNC_DEF + +ERTS_GLB_INLINE void +erts_dw_atomic_set_dirty(erts_dw_atomic_t *var, erts_dw_aint_t *val) +{ + ethr_sint_t *sint = ethr_dw_atomic_addr(var); + sint[0] = val->sint[0]; + sint[1] = val->sint[1]; +} + +ERTS_GLB_INLINE void +erts_dw_atomic_read_dirty(erts_dw_atomic_t *var, erts_dw_aint_t *val) +{ + ethr_sint_t *sint = ethr_dw_atomic_addr(var); + val->sint[0] = sint[0]; + val->sint[1] = sint[1]; +} + +#endif + /* Word size atomics */ #define erts_atomic_init_nob ethr_atomic_init @@ -743,6 +776,24 @@ ERTS_GLB_INLINE void erts_thr_sigwait(const sigset_t *set, int *sig); #define erts_atomic_xchg_wb ethr_atomic_xchg_wb #define erts_atomic_cmpxchg_wb ethr_atomic_cmpxchg_wb +#if ERTS_GLB_INLINE_INCL_FUNC_DEF + +ERTS_GLB_INLINE void +erts_atomic_set_dirty(erts_atomic_t *var, erts_aint_t val) +{ + ethr_sint_t *sint = ethr_atomic_addr(var); + *sint = val; +} + +ERTS_GLB_INLINE erts_aint_t +erts_atomic_read_dirty(erts_atomic_t *var) +{ + ethr_sint_t *sint = ethr_atomic_addr(var); + return *sint; +} + +#endif + /* 32-bit atomics */ #define erts_atomic32_init_nob ethr_atomic32_init @@ -843,6 +894,24 @@ ERTS_GLB_INLINE void erts_thr_sigwait(const sigset_t *set, int *sig); #define erts_atomic32_xchg_wb ethr_atomic32_xchg_wb #define erts_atomic32_cmpxchg_wb ethr_atomic32_cmpxchg_wb +#if ERTS_GLB_INLINE_INCL_FUNC_DEF + +ERTS_GLB_INLINE void +erts_atomic32_set_dirty(erts_atomic32_t *var, erts_aint32_t val) +{ + ethr_sint32_t *sint = ethr_atomic32_addr(var); + *sint = val; +} + +ERTS_GLB_INLINE erts_aint32_t +erts_atomic32_read_dirty(erts_atomic32_t *var) +{ + ethr_sint32_t *sint = ethr_atomic32_addr(var); + return *sint; +} + +#endif + #else /* !USE_THREADS */ /* Double word size atomics */ @@ -882,6 +951,9 @@ ERTS_GLB_INLINE void erts_thr_sigwait(const sigset_t *set, int *sig); #define erts_dw_atomic_read_wb erts_no_dw_atomic_read #define erts_dw_atomic_cmpxchg_wb erts_no_dw_atomic_cmpxchg +#define erts_dw_atomic_set_dirty erts_no_dw_atomic_set +#define erts_dw_atomic_read_dirty erts_no_dw_atomic_read + /* Word size atomics */ #define erts_atomic_init_nob erts_no_atomic_set @@ -982,6 +1054,9 @@ ERTS_GLB_INLINE void erts_thr_sigwait(const sigset_t *set, int *sig); #define erts_atomic_xchg_wb erts_no_atomic_xchg #define erts_atomic_cmpxchg_wb erts_no_atomic_cmpxchg +#define erts_atomic_set_dirty erts_no_atomic_set +#define erts_atomic_read_dirty erts_no_atomic_read + /* 32-bit atomics */ #define erts_atomic32_init_nob erts_no_atomic32_set @@ -1082,6 +1157,9 @@ ERTS_GLB_INLINE void erts_thr_sigwait(const sigset_t *set, int *sig); #define erts_atomic32_xchg_wb erts_no_atomic32_xchg #define erts_atomic32_cmpxchg_wb erts_no_atomic32_cmpxchg +#define erts_atomic32_set_dirty erts_no_atomic32_set +#define erts_atomic32_read_dirty erts_no_atomic32_read + #endif /* !USE_THREADS */ #if ERTS_GLB_INLINE_INCL_FUNC_DEF -- cgit v1.2.3 From df40b0b99810121d9630730d6f58cf96bfea70e8 Mon Sep 17 00:00:00 2001 From: Rickard Green Date: Mon, 18 Feb 2013 00:29:51 +0100 Subject: Use dirty read instead of union which can be unsafe on some platforms --- erts/emulator/beam/erl_thr_queue.c | 100 ++++++++++++++++++++----------------- erts/emulator/beam/erl_thr_queue.h | 13 +++-- 2 files changed, 61 insertions(+), 52 deletions(-) (limited to 'erts/emulator/beam') diff --git a/erts/emulator/beam/erl_thr_queue.c b/erts/emulator/beam/erl_thr_queue.c index f07964a265..acadc5d4cd 100644 --- a/erts/emulator/beam/erl_thr_queue.c +++ b/erts/emulator/beam/erl_thr_queue.c @@ -113,6 +113,11 @@ sl_element_free(ErtsThrQElement_t *p) #endif +#define ErtsThrQDirtyReadEl(A) \ + ((ErtsThrQElement_t *) erts_atomic_read_dirty((A))) +#define ErtsThrQDirtySetEl(A, V) \ + erts_atomic_set_dirty((A), (erts_aint_t) (V)) + typedef union { ErtsThrQ_t q; char align__[ERTS_ALC_CACHE_LINE_ALIGN_SIZE(sizeof(ErtsThrQ_t))]; @@ -137,7 +142,7 @@ erts_thr_q_initialize(ErtsThrQ_t *q, ErtsThrQInit_t *qi) q->last = NULL; q->q.blk = NULL; #else - erts_atomic_init_nob(&q->tail.data.marker.next.atmc, ERTS_AINT_NULL); + erts_atomic_init_nob(&q->tail.data.marker.next, ERTS_AINT_NULL); q->tail.data.marker.data.ptr = NULL; erts_atomic_init_nob(&q->tail.data.last, (erts_aint_t) &q->tail.data.marker); @@ -150,7 +155,7 @@ erts_thr_q_initialize(ErtsThrQ_t *q, ErtsThrQInit_t *qi) if (!q->tail.data.notify) q->tail.data.notify = noop_callback; - q->head.head.ptr = &q->tail.data.marker; + erts_atomic_init_nob(&q->head.head, (erts_aint_t) &q->tail.data.marker); q->head.live = qi->live.objects; q->head.first = &q->tail.data.marker; q->head.unref_end = &q->tail.data.marker; @@ -300,13 +305,13 @@ enqueue_managed(ErtsThrQ_t *q, ErtsThrQElement_t *this, int want_last) { erts_aint_t ilast, itmp; - erts_atomic_init_nob(&this->next.atmc, ERTS_AINT_NULL); + erts_atomic_init_nob(&this->next, ERTS_AINT_NULL); /* Enqueue at end of list... */ ilast = erts_atomic_read_nob(&q->tail.data.last); while (1) { ErtsThrQElement_t *last = (ErtsThrQElement_t *) ilast; - itmp = erts_atomic_cmpxchg_mb(&last->next.atmc, + itmp = erts_atomic_cmpxchg_mb(&last->next, (erts_aint_t) this, ERTS_AINT_NULL); if (itmp == ERTS_AINT_NULL) @@ -317,14 +322,14 @@ enqueue_managed(ErtsThrQ_t *q, ErtsThrQElement_t *this, int want_last) /* Move last pointer forward... */ while (1) { if (want_last) { - if (erts_atomic_read_rb(&this->next.atmc) != ERTS_AINT_NULL) { + if (erts_atomic_read_rb(&this->next) != ERTS_AINT_NULL) { /* Someone else will move it forward */ ilast = erts_atomic_read_rb(&q->tail.data.last); return (ErtsThrQElement_t *) ilast; } } else { - if (erts_atomic_read_nob(&this->next.atmc) != ERTS_AINT_NULL) { + if (erts_atomic_read_nob(&this->next) != ERTS_AINT_NULL) { /* Someone else will move it forward */ return NULL; } @@ -341,6 +346,7 @@ enqueue_managed(ErtsThrQ_t *q, ErtsThrQElement_t *this, int want_last) static ErtsThrQCleanState_t clean(ErtsThrQ_t *q, int max_ops, int do_notify) { + ErtsThrQElement_t *head; erts_aint_t ilast; int um_refc_ix; int ops; @@ -349,7 +355,8 @@ clean(ErtsThrQ_t *q, int max_ops, int do_notify) ErtsThrQElement_t *tmp; restart: ASSERT(q->head.first); - if (q->head.first == q->head.head.ptr) { + head = ErtsThrQDirtyReadEl(&q->head.head); + if (q->head.first == head) { q->head.clean_reached_head_count++; if (q->head.clean_reached_head_count >= ERTS_THR_Q_MAX_CLEAN_REACHED_HEAD_COUNT) { @@ -362,19 +369,20 @@ clean(ErtsThrQ_t *q, int max_ops, int do_notify) break; if (q->head.first == &q->tail.data.marker) { q->head.used_marker = 0; - q->head.first = q->head.first->next.ptr; + q->head.first = ErtsThrQDirtyReadEl(&q->head.first->next); goto restart; } tmp = q->head.first; - q->head.first = q->head.first->next.ptr; + q->head.first = ErtsThrQDirtyReadEl(&q->head.first->next); if (q->head.deq_fini.automatic) element_free(q, tmp); else { tmp->data.ptr = (void *) (UWord) q->head.live; if (!q->head.deq_fini.start) q->head.deq_fini.start = tmp; - else if (q->head.deq_fini.end->next.ptr == &q->tail.data.marker) - q->head.deq_fini.end->next.ptr = tmp; + else if (ErtsThrQDirtyReadEl(&q->head.deq_fini.end->next) + == &q->tail.data.marker) + ErtsThrQDirtySetEl(&q->head.deq_fini.end->next, tmp); q->head.deq_fini.end = tmp; } } @@ -406,14 +414,13 @@ clean(ErtsThrQ_t *q, int max_ops, int do_notify) ilast = (erts_aint_t) enqueue_managed(q, &q->tail.data.marker, 1); - if (q->head.head.ptr == q->head.unref_end) { + head = ErtsThrQDirtyReadEl(&q->head.head); + if (head == q->head.unref_end) { ErtsThrQElement_t *next; next = ((ErtsThrQElement_t *) - erts_atomic_read_acqb(&q->head.head.ptr->next.atmc)); - if (next == &q->tail.data.marker) { - q->head.head.ptr->next.ptr = &q->tail.data.marker; - q->head.head.ptr = &q->tail.data.marker; - } + erts_atomic_read_acqb(&head->next)); + if (next == &q->tail.data.marker) + ErtsThrQDirtySetEl(&q->head.head, &q->tail.data.marker); } } @@ -436,18 +443,18 @@ clean(ErtsThrQ_t *q, int max_ops, int do_notify) } #endif - if (q->head.first == q->head.head.ptr) { + head = ErtsThrQDirtyReadEl(&q->head.head); + if (q->head.first == head) { inspect_head: if (!q->head.used_marker) { erts_aint_t inext; - inext = erts_atomic_read_acqb(&q->head.head.ptr->next.atmc); + inext = erts_atomic_read_acqb(&head->next); if (inext == ERTS_AINT_NULL) { q->head.used_marker = 1; (void) enqueue_managed(q, &q->tail.data.marker, 0); - inext = erts_atomic_read_acqb(&q->head.head.ptr->next.atmc); + inext = erts_atomic_read_acqb(&head->next); if (inext == (erts_aint_t) &q->tail.data.marker) { - q->head.head.ptr->next.ptr = &q->tail.data.marker; - q->head.head.ptr = &q->tail.data.marker; + ErtsThrQDirtySetEl(&q->head.head, &q->tail.data.marker); goto check_thr_progress; } } @@ -506,26 +513,27 @@ erts_thr_q_inspect(ErtsThrQ_t *q, int ensure_empty) #ifndef USE_THREADS return ERTS_THR_Q_CLEAN; #else + ErtsThrQElement_t *head = ErtsThrQDirtyReadEl(&q->head.head); if (ensure_empty) { erts_aint_t inext; - inext = erts_atomic_read_acqb(&q->head.head.ptr->next.atmc); + inext = erts_atomic_read_acqb(&head->next); if (inext != ERTS_AINT_NULL) { if (&q->tail.data.marker != (ErtsThrQElement_t *) inext) return ERTS_THR_Q_DIRTY; else { - q->head.head.ptr->next.ptr = (ErtsThrQElement_t *) inext; - q->head.head.ptr = (ErtsThrQElement_t *) inext; - inext = erts_atomic_read_acqb(&q->head.head.ptr->next.atmc); + head = (ErtsThrQElement_t *) inext; + ErtsThrQDirtySetEl(&q->head.head, head); + inext = erts_atomic_read_acqb(&head->next); if (inext != ERTS_AINT_NULL) return ERTS_THR_Q_DIRTY; } } } - if (q->head.first == q->head.head.ptr) { + if (q->head.first == head) { if (!q->head.used_marker) { erts_aint_t inext; - inext = erts_atomic_read_acqb(&q->head.head.ptr->next.atmc); + inext = erts_atomic_read_acqb(&head->next); if (inext == ERTS_AINT_NULL) return ERTS_THR_Q_DIRTY; } @@ -553,11 +561,11 @@ enqueue(ErtsThrQ_t *q, void *data, ErtsThrQElement_t *this) #ifndef USE_THREADS ASSERT(data); - this->next.ptr = NULL; + this->next = NULL; this->data.ptr = data; if (q->last) - q->last->next.ptr = this; + q->last->next = this; else { q->first = q->last = this; q->init.notify(q->init.arg); @@ -638,17 +646,17 @@ erts_thr_q_get_finalize_dequeue_data(ErtsThrQ_t *q, ErtsThrQFinDeQ_t *fdp) ErtsThrQElement_t *e = q->head.deq_fini.start; ErtsThrQElement_t *end = q->head.deq_fini.end; while (e != end) { - ASSERT(q->head.head.ptr != e); + ASSERT(ErtsThrQDirtyReadEl(&q->head.head) != e); ASSERT(q->head.first != e); ASSERT(q->head.unref_end != e); - e = e->next.ptr; + e = ErtsThrQDirtyReadEl(&e->next); } } #endif fdp->start = q->head.deq_fini.start; fdp->end = q->head.deq_fini.end; if (fdp->end) - fdp->end->next.ptr = NULL; + ErtsThrQDirtySetEl(&fdp->end->next, NULL); q->head.deq_fini.start = NULL; q->head.deq_fini.end = NULL; return fdp->start != NULL; @@ -662,7 +670,7 @@ erts_thr_q_append_finalize_dequeue_data(ErtsThrQFinDeQ_t *fdp0, #ifdef USE_THREADS if (fdp1->start) { if (fdp0->end) - fdp0->end->next.ptr = fdp1->start; + ErtsThrQDirtySetEl(&fdp0->end->next, fdp1->start); else fdp0->start = fdp1->start; fdp0->end = fdp1->end; @@ -683,7 +691,7 @@ int erts_thr_q_finalize_dequeue(ErtsThrQFinDeQ_t *state) if (!start) break; tmp = start; - start = start->next.ptr; + start = ErtsThrQDirtyReadEl(&start->next); live = (ErtsThrQLive_t) (UWord) tmp->data.ptr; element_live_free(live, tmp); } @@ -724,7 +732,7 @@ erts_thr_q_dequeue(ErtsThrQ_t *q) return NULL; tmp = q->first; res = tmp->data.ptr; - q->first = tmp->next.ptr; + q->first = tmp->next; if (!q->first) q->last = NULL; @@ -732,24 +740,26 @@ erts_thr_q_dequeue(ErtsThrQ_t *q) return res; #else + ErtsThrQElement_t *head; erts_aint_t inext; void *res; - inext = erts_atomic_read_acqb(&q->head.head.ptr->next.atmc); + head = ErtsThrQDirtyReadEl(&q->head.head); + inext = erts_atomic_read_acqb(&head->next); if (inext == ERTS_AINT_NULL) return NULL; - q->head.head.ptr->next.ptr = (ErtsThrQElement_t *) inext; - q->head.head.ptr = (ErtsThrQElement_t *) inext; - if (q->head.head.ptr == &q->tail.data.marker) { - inext = erts_atomic_read_acqb(&q->head.head.ptr->next.atmc); + head = (ErtsThrQElement_t *) inext; + ErtsThrQDirtySetEl(&q->head.head, head); + if (head == &q->tail.data.marker) { + inext = erts_atomic_read_acqb(&head->next); if (inext == ERTS_AINT_NULL) return NULL; - q->head.head.ptr->next.ptr = (ErtsThrQElement_t *) inext; - q->head.head.ptr = (ErtsThrQElement_t *) inext; + head = (ErtsThrQElement_t *) inext; + ErtsThrQDirtySetEl(&q->head.head, head); } - res = q->head.head.ptr->data.ptr; + res = head->data.ptr; #if ERTS_THR_Q_DBG_CHK_DATA - q->head.head.ptr->data.ptr = NULL; + head->data.ptr = NULL; if (!res) erl_exit(ERTS_ABORT_EXIT, "Missing data in dequeue\n"); #endif diff --git a/erts/emulator/beam/erl_thr_queue.h b/erts/emulator/beam/erl_thr_queue.h index edcf2c3823..ae8c7fb19a 100644 --- a/erts/emulator/beam/erl_thr_queue.h +++ b/erts/emulator/beam/erl_thr_queue.h @@ -76,13 +76,12 @@ typedef struct { typedef struct ErtsThrQElement_t_ ErtsThrQElement_t; typedef struct ErtsThrQElement_t ErtsThrQPrepEnQ_t; -typedef union { - erts_atomic_t atmc; - ErtsThrQElement_t *ptr; -} ErtsThrQPtr_t; - struct ErtsThrQElement_t_ { - ErtsThrQPtr_t next; +#ifdef USE_THREADS + erts_atomic_t next; +#else + ErtsThrQElement_t *next; +#endif union { erts_atomic_t atmc; void *ptr; @@ -130,7 +129,7 @@ struct ErtsThrQ_t_ { * thread dequeuing. */ struct { - ErtsThrQPtr_t head; + erts_atomic_t head; ErtsThrQLive_t live; ErtsThrQElement_t *first; ErtsThrQElement_t *unref_end; -- cgit v1.2.3 From 9ccc56b450cef0fce8b0ed976e91d3dee13ee3b9 Mon Sep 17 00:00:00 2001 From: Rickard Green Date: Wed, 20 Feb 2013 17:13:45 +0100 Subject: Fix lost enqueue notification --- erts/emulator/beam/erl_thr_queue.c | 79 +++++++++++++++++++++----------------- 1 file changed, 44 insertions(+), 35 deletions(-) (limited to 'erts/emulator/beam') diff --git a/erts/emulator/beam/erl_thr_queue.c b/erts/emulator/beam/erl_thr_queue.c index acadc5d4cd..ee2ff765e0 100644 --- a/erts/emulator/beam/erl_thr_queue.c +++ b/erts/emulator/beam/erl_thr_queue.c @@ -301,7 +301,7 @@ element_free(ErtsThrQ_t *q, ErtsThrQElement_t *el) #ifdef USE_THREADS static ERTS_INLINE ErtsThrQElement_t * -enqueue_managed(ErtsThrQ_t *q, ErtsThrQElement_t *this, int want_last) +enqueue_managed(ErtsThrQ_t *q, ErtsThrQElement_t *this) { erts_aint_t ilast, itmp; @@ -321,28 +321,53 @@ enqueue_managed(ErtsThrQ_t *q, ErtsThrQElement_t *this, int want_last) /* Move last pointer forward... */ while (1) { - if (want_last) { - if (erts_atomic_read_rb(&this->next) != ERTS_AINT_NULL) { - /* Someone else will move it forward */ - ilast = erts_atomic_read_rb(&q->tail.data.last); - return (ErtsThrQElement_t *) ilast; - } - } - else { - if (erts_atomic_read_nob(&this->next) != ERTS_AINT_NULL) { - /* Someone else will move it forward */ - return NULL; - } + if (erts_atomic_read_rb(&this->next) != ERTS_AINT_NULL) { + /* Someone else will move it forward */ + ilast = erts_atomic_read_rb(&q->tail.data.last); + return (ErtsThrQElement_t *) ilast; } itmp = erts_atomic_cmpxchg_mb(&q->tail.data.last, (erts_aint_t) this, ilast); if (ilast == itmp) - return want_last ? this : NULL; + return this; ilast = itmp; } } +static ERTS_INLINE ErtsThrQElement_t * +enqueue_marker(ErtsThrQ_t *q, ErtsThrQElement_t **headp) +{ + int maybe_notify; + erts_aint_t inext; + ErtsThrQElement_t *last, *head; + + if (headp) + head = *headp; + else + head = ErtsThrQDirtyReadEl(&q->head.head); + + ASSERT(!q->head.used_marker); + q->head.used_marker = 1; + last = enqueue_managed(q, &q->tail.data.marker); + maybe_notify = &q->tail.data.marker == last; + inext = erts_atomic_read_acqb(&head->next); + if (inext == (erts_aint_t) &q->tail.data.marker) { + ErtsThrQDirtySetEl(&q->head.head, &q->tail.data.marker); + if (headp) + *headp = &q->tail.data.marker; + } + else if (maybe_notify) { + /* + * We need to notify; otherwise, we might loose a notification + * for a concurrently inserted element. + */ + q->head.notify(q->head.arg); + } + return last; +} + + static ErtsThrQCleanState_t clean(ErtsThrQ_t *q, int max_ops, int do_notify) { @@ -409,20 +434,8 @@ clean(ErtsThrQ_t *q, int max_ops, int do_notify) q->head.unref_end = q->head.next.unref_end; if (!q->head.used_marker - && q->head.unref_end == (ErtsThrQElement_t *) ilast) { - q->head.used_marker = 1; - ilast = (erts_aint_t) enqueue_managed(q, - &q->tail.data.marker, - 1); - head = ErtsThrQDirtyReadEl(&q->head.head); - if (head == q->head.unref_end) { - ErtsThrQElement_t *next; - next = ((ErtsThrQElement_t *) - erts_atomic_read_acqb(&head->next)); - if (next == &q->tail.data.marker) - ErtsThrQDirtySetEl(&q->head.head, &q->tail.data.marker); - } - } + && q->head.unref_end == (ErtsThrQElement_t *) ilast) + ilast = (erts_aint_t) enqueue_marker(q, NULL); if (q->head.unref_end == (ErtsThrQElement_t *) ilast) ERTS_SMP_MEMORY_BARRIER; @@ -450,13 +463,9 @@ clean(ErtsThrQ_t *q, int max_ops, int do_notify) erts_aint_t inext; inext = erts_atomic_read_acqb(&head->next); if (inext == ERTS_AINT_NULL) { - q->head.used_marker = 1; - (void) enqueue_managed(q, &q->tail.data.marker, 0); - inext = erts_atomic_read_acqb(&head->next); - if (inext == (erts_aint_t) &q->tail.data.marker) { - ErtsThrQDirtySetEl(&q->head.head, &q->tail.data.marker); + enqueue_marker(q, &head); + if (head == &q->tail.data.marker) goto check_thr_progress; - } } } @@ -603,7 +612,7 @@ enqueue(ErtsThrQ_t *q, void *data, ErtsThrQElement_t *this) } } - notify = this == enqueue_managed(q, this, 1); + notify = this == enqueue_managed(q, this); #ifdef ERTS_SMP -- cgit v1.2.3