From 4f6a7255afbd5b296139e073d66564976d80cc06 Mon Sep 17 00:00:00 2001
From: Rickard Green <rickard@erlang.org>
Date: Mon, 18 Feb 2013 00:28:12 +0100
Subject: Add atomic dirty read and dirty set operations

---
 erts/emulator/beam/erl_smp.h     | 18 ++++++++++
 erts/emulator/beam/erl_threads.h | 78 ++++++++++++++++++++++++++++++++++++++++
 2 files changed, 96 insertions(+)

(limited to 'erts')

diff --git a/erts/emulator/beam/erl_smp.h b/erts/emulator/beam/erl_smp.h
index a32e9d9d7c..d9ab6bb5e5 100644
--- a/erts/emulator/beam/erl_smp.h
+++ b/erts/emulator/beam/erl_smp.h
@@ -259,6 +259,9 @@ ERTS_GLB_INLINE void erts_smp_thr_sigwait(const sigset_t *set, int *sig);
 #define erts_smp_dw_atomic_read_wb erts_dw_atomic_read_wb
 #define erts_smp_dw_atomic_cmpxchg_wb erts_dw_atomic_cmpxchg_wb
 
+#define erts_smp_dw_atomic_set_dirty erts_dw_atomic_set_dirty
+#define erts_smp_dw_atomic_read_dirty erts_dw_atomic_read_dirty
+
 /* Word size atomics */
 
 #define erts_smp_atomic_init_nob erts_atomic_init_nob
@@ -359,6 +362,9 @@ ERTS_GLB_INLINE void erts_smp_thr_sigwait(const sigset_t *set, int *sig);
 #define erts_smp_atomic_xchg_wb erts_atomic_xchg_wb
 #define erts_smp_atomic_cmpxchg_wb erts_atomic_cmpxchg_wb
 
+#define erts_smp_atomic_set_dirty erts_atomic_set_dirty
+#define erts_smp_atomic_read_dirty erts_atomic_read_dirty
+
 /* 32-bit atomics */
 
 #define erts_smp_atomic32_init_nob erts_atomic32_init_nob
@@ -459,6 +465,9 @@ ERTS_GLB_INLINE void erts_smp_thr_sigwait(const sigset_t *set, int *sig);
 #define erts_smp_atomic32_xchg_wb erts_atomic32_xchg_wb
 #define erts_smp_atomic32_cmpxchg_wb erts_atomic32_cmpxchg_wb
 
+#define erts_smp_atomic32_set_dirty erts_atomic32_set_dirty
+#define erts_smp_atomic32_read_dirty erts_atomic32_read_dirty
+
 #else /* !ERTS_SMP */
 
 /* Double word size atomics */
@@ -498,6 +507,9 @@ ERTS_GLB_INLINE void erts_smp_thr_sigwait(const sigset_t *set, int *sig);
 #define erts_smp_dw_atomic_read_wb erts_no_dw_atomic_read
 #define erts_smp_dw_atomic_cmpxchg_wb erts_no_dw_atomic_cmpxchg
 
+#define erts_smp_dw_atomic_set_dirty erts_no_dw_atomic_set
+#define erts_smp_dw_atomic_read_dirty erts_no_dw_atomic_read
+
 /* Word size atomics */
 
 #define erts_smp_atomic_init_nob erts_no_atomic_set
@@ -598,6 +610,9 @@ ERTS_GLB_INLINE void erts_smp_thr_sigwait(const sigset_t *set, int *sig);
 #define erts_smp_atomic_xchg_wb erts_no_atomic_xchg
 #define erts_smp_atomic_cmpxchg_wb erts_no_atomic_cmpxchg
 
+#define erts_smp_atomic_set_dirty erts_no_atomic_set
+#define erts_smp_atomic_read_dirty erts_no_atomic_read
+
 /* 32-bit atomics */
 
 #define erts_smp_atomic32_init_nob erts_no_atomic32_set
@@ -698,6 +713,9 @@ ERTS_GLB_INLINE void erts_smp_thr_sigwait(const sigset_t *set, int *sig);
 #define erts_smp_atomic32_xchg_wb erts_no_atomic32_xchg
 #define erts_smp_atomic32_cmpxchg_wb erts_no_atomic32_cmpxchg
 
+#define erts_smp_atomic32_set_dirty erts_no_atomic32_set
+#define erts_smp_atomic32_read_dirty erts_no_atomic32_read
+
 #endif /* !ERTS_SMP */
 
 #if ERTS_GLB_INLINE_INCL_FUNC_DEF
diff --git a/erts/emulator/beam/erl_threads.h b/erts/emulator/beam/erl_threads.h
index ee47c98009..5e853bd8f6 100644
--- a/erts/emulator/beam/erl_threads.h
+++ b/erts/emulator/beam/erl_threads.h
@@ -601,6 +601,19 @@ ERTS_GLB_INLINE void erts_thr_sigwait(const sigset_t *set, int *sig);
 
 #ifdef USE_THREADS
 
+ERTS_GLB_INLINE void
+erts_dw_atomic_set_dirty(erts_dw_atomic_t *var, erts_dw_aint_t *val);
+ERTS_GLB_INLINE void
+erts_dw_atomic_read_dirty(erts_dw_atomic_t *var, erts_dw_aint_t *val);
+ERTS_GLB_INLINE void
+erts_atomic_set_dirty(erts_atomic_t *var, erts_aint_t val);
+ERTS_GLB_INLINE erts_aint_t
+erts_atomic_read_dirty(erts_atomic_t *var);
+ERTS_GLB_INLINE void
+erts_atomic32_set_dirty(erts_atomic32_t *var, erts_aint32_t val);
+ERTS_GLB_INLINE erts_aint32_t
+erts_atomic32_read_dirty(erts_atomic32_t *var);
+
 /*
  * See "Documentation of atomics and memory barriers" at the top
  * of this file for info on atomics.
@@ -643,6 +656,26 @@ ERTS_GLB_INLINE void erts_thr_sigwait(const sigset_t *set, int *sig);
 #define erts_dw_atomic_read_wb ethr_dw_atomic_read_wb
 #define erts_dw_atomic_cmpxchg_wb ethr_dw_atomic_cmpxchg_wb
 
+#if ERTS_GLB_INLINE_INCL_FUNC_DEF
+
+ERTS_GLB_INLINE void
+erts_dw_atomic_set_dirty(erts_dw_atomic_t *var, erts_dw_aint_t *val)
+{
+    ethr_sint_t *sint = ethr_dw_atomic_addr(var);
+    sint[0] = val->sint[0];
+    sint[1] = val->sint[1];    
+}
+
+ERTS_GLB_INLINE void
+erts_dw_atomic_read_dirty(erts_dw_atomic_t *var, erts_dw_aint_t *val)
+{
+    ethr_sint_t *sint = ethr_dw_atomic_addr(var);
+    val->sint[0] = sint[0];
+    val->sint[1] = sint[1];
+}
+
+#endif
+
 /* Word size atomics */
 
 #define erts_atomic_init_nob ethr_atomic_init
@@ -743,6 +776,24 @@ ERTS_GLB_INLINE void erts_thr_sigwait(const sigset_t *set, int *sig);
 #define erts_atomic_xchg_wb ethr_atomic_xchg_wb
 #define erts_atomic_cmpxchg_wb ethr_atomic_cmpxchg_wb
 
+#if ERTS_GLB_INLINE_INCL_FUNC_DEF
+
+ERTS_GLB_INLINE void
+erts_atomic_set_dirty(erts_atomic_t *var, erts_aint_t val)
+{
+    ethr_sint_t *sint = ethr_atomic_addr(var);
+    *sint = val;
+}
+
+ERTS_GLB_INLINE erts_aint_t
+erts_atomic_read_dirty(erts_atomic_t *var)
+{
+    ethr_sint_t *sint = ethr_atomic_addr(var);
+    return *sint;
+}
+
+#endif
+
 /* 32-bit atomics */
 
 #define erts_atomic32_init_nob ethr_atomic32_init
@@ -843,6 +894,24 @@ ERTS_GLB_INLINE void erts_thr_sigwait(const sigset_t *set, int *sig);
 #define erts_atomic32_xchg_wb ethr_atomic32_xchg_wb
 #define erts_atomic32_cmpxchg_wb ethr_atomic32_cmpxchg_wb
 
+#if ERTS_GLB_INLINE_INCL_FUNC_DEF
+
+ERTS_GLB_INLINE void
+erts_atomic32_set_dirty(erts_atomic32_t *var, erts_aint32_t val)
+{
+    ethr_sint32_t *sint = ethr_atomic32_addr(var);
+    *sint = val;
+}
+
+ERTS_GLB_INLINE erts_aint32_t
+erts_atomic32_read_dirty(erts_atomic32_t *var)
+{
+    ethr_sint32_t *sint = ethr_atomic32_addr(var);
+    return *sint;
+}
+
+#endif
+
 #else /* !USE_THREADS */
 
 /* Double word size atomics */
@@ -882,6 +951,9 @@ ERTS_GLB_INLINE void erts_thr_sigwait(const sigset_t *set, int *sig);
 #define erts_dw_atomic_read_wb erts_no_dw_atomic_read
 #define erts_dw_atomic_cmpxchg_wb erts_no_dw_atomic_cmpxchg
 
+#define erts_dw_atomic_set_dirty erts_no_dw_atomic_set
+#define erts_dw_atomic_read_dirty erts_no_dw_atomic_read
+
 /* Word size atomics */
 
 #define erts_atomic_init_nob erts_no_atomic_set
@@ -982,6 +1054,9 @@ ERTS_GLB_INLINE void erts_thr_sigwait(const sigset_t *set, int *sig);
 #define erts_atomic_xchg_wb erts_no_atomic_xchg
 #define erts_atomic_cmpxchg_wb erts_no_atomic_cmpxchg
 
+#define erts_atomic_set_dirty erts_no_atomic_set
+#define erts_atomic_read_dirty erts_no_atomic_read
+
 /* 32-bit atomics */
 
 #define erts_atomic32_init_nob erts_no_atomic32_set
@@ -1082,6 +1157,9 @@ ERTS_GLB_INLINE void erts_thr_sigwait(const sigset_t *set, int *sig);
 #define erts_atomic32_xchg_wb erts_no_atomic32_xchg
 #define erts_atomic32_cmpxchg_wb erts_no_atomic32_cmpxchg
 
+#define erts_atomic32_set_dirty erts_no_atomic32_set
+#define erts_atomic32_read_dirty erts_no_atomic32_read
+
 #endif /* !USE_THREADS */
 
 #if ERTS_GLB_INLINE_INCL_FUNC_DEF
-- 
cgit v1.2.3


From df40b0b99810121d9630730d6f58cf96bfea70e8 Mon Sep 17 00:00:00 2001
From: Rickard Green <rickard@erlang.org>
Date: Mon, 18 Feb 2013 00:29:51 +0100
Subject: Use dirty read instead of union which can be unsafe on some platforms

---
 erts/emulator/beam/erl_thr_queue.c | 100 ++++++++++++++++++++-----------------
 erts/emulator/beam/erl_thr_queue.h |  13 +++--
 2 files changed, 61 insertions(+), 52 deletions(-)

(limited to 'erts')

diff --git a/erts/emulator/beam/erl_thr_queue.c b/erts/emulator/beam/erl_thr_queue.c
index f07964a265..acadc5d4cd 100644
--- a/erts/emulator/beam/erl_thr_queue.c
+++ b/erts/emulator/beam/erl_thr_queue.c
@@ -113,6 +113,11 @@ sl_element_free(ErtsThrQElement_t *p)
 
 #endif
 
+#define ErtsThrQDirtyReadEl(A) \
+    ((ErtsThrQElement_t *) erts_atomic_read_dirty((A)))
+#define ErtsThrQDirtySetEl(A, V) \
+    erts_atomic_set_dirty((A), (erts_aint_t) (V))
+
 typedef union {
     ErtsThrQ_t q;
     char align__[ERTS_ALC_CACHE_LINE_ALIGN_SIZE(sizeof(ErtsThrQ_t))];
@@ -137,7 +142,7 @@ erts_thr_q_initialize(ErtsThrQ_t *q, ErtsThrQInit_t *qi)
     q->last = NULL;
     q->q.blk = NULL;
 #else
-    erts_atomic_init_nob(&q->tail.data.marker.next.atmc, ERTS_AINT_NULL);
+    erts_atomic_init_nob(&q->tail.data.marker.next, ERTS_AINT_NULL);
     q->tail.data.marker.data.ptr = NULL;
     erts_atomic_init_nob(&q->tail.data.last,
 			 (erts_aint_t) &q->tail.data.marker);
@@ -150,7 +155,7 @@ erts_thr_q_initialize(ErtsThrQ_t *q, ErtsThrQInit_t *qi)
     if (!q->tail.data.notify)
 	q->tail.data.notify = noop_callback;
 
-    q->head.head.ptr = &q->tail.data.marker;
+    erts_atomic_init_nob(&q->head.head, (erts_aint_t) &q->tail.data.marker);
     q->head.live = qi->live.objects;
     q->head.first = &q->tail.data.marker;
     q->head.unref_end = &q->tail.data.marker;
@@ -300,13 +305,13 @@ enqueue_managed(ErtsThrQ_t *q, ErtsThrQElement_t *this, int want_last)
 {
     erts_aint_t ilast, itmp;
 
-    erts_atomic_init_nob(&this->next.atmc, ERTS_AINT_NULL);
+    erts_atomic_init_nob(&this->next, ERTS_AINT_NULL);
     /* Enqueue at end of list... */
 
     ilast = erts_atomic_read_nob(&q->tail.data.last);
     while (1) {
 	ErtsThrQElement_t *last = (ErtsThrQElement_t *) ilast;
-	itmp = erts_atomic_cmpxchg_mb(&last->next.atmc,
+	itmp = erts_atomic_cmpxchg_mb(&last->next,
 				      (erts_aint_t) this,
 				      ERTS_AINT_NULL);
 	if (itmp == ERTS_AINT_NULL)
@@ -317,14 +322,14 @@ enqueue_managed(ErtsThrQ_t *q, ErtsThrQElement_t *this, int want_last)
     /* Move last pointer forward... */
     while (1) {
 	if (want_last) {
-	    if (erts_atomic_read_rb(&this->next.atmc) != ERTS_AINT_NULL) {
+	    if (erts_atomic_read_rb(&this->next) != ERTS_AINT_NULL) {
 		/* Someone else will move it forward */
 		ilast = erts_atomic_read_rb(&q->tail.data.last);
 		return (ErtsThrQElement_t *) ilast;
 	    }
 	}
 	else {
-	    if (erts_atomic_read_nob(&this->next.atmc) != ERTS_AINT_NULL) {
+	    if (erts_atomic_read_nob(&this->next) != ERTS_AINT_NULL) {
 		/* Someone else will move it forward */
 		return NULL;
 	    }
@@ -341,6 +346,7 @@ enqueue_managed(ErtsThrQ_t *q, ErtsThrQElement_t *this, int want_last)
 static ErtsThrQCleanState_t
 clean(ErtsThrQ_t *q, int max_ops, int do_notify)
 {
+    ErtsThrQElement_t *head;
     erts_aint_t ilast;
     int um_refc_ix;
     int ops;
@@ -349,7 +355,8 @@ clean(ErtsThrQ_t *q, int max_ops, int do_notify)
 	ErtsThrQElement_t *tmp;
     restart:
 	ASSERT(q->head.first);
-	if (q->head.first == q->head.head.ptr) {
+	head = ErtsThrQDirtyReadEl(&q->head.head);
+	if (q->head.first == head) {
 	    q->head.clean_reached_head_count++;
 	    if (q->head.clean_reached_head_count
 		>= ERTS_THR_Q_MAX_CLEAN_REACHED_HEAD_COUNT) {
@@ -362,19 +369,20 @@ clean(ErtsThrQ_t *q, int max_ops, int do_notify)
 	    break;
 	if (q->head.first == &q->tail.data.marker) {
 	    q->head.used_marker = 0;
-	    q->head.first = q->head.first->next.ptr;
+	    q->head.first = ErtsThrQDirtyReadEl(&q->head.first->next);
 	    goto restart;
 	}
 	tmp = q->head.first;
-	q->head.first = q->head.first->next.ptr;
+	q->head.first = ErtsThrQDirtyReadEl(&q->head.first->next);
 	if (q->head.deq_fini.automatic)
 	    element_free(q, tmp);
 	else {
 	    tmp->data.ptr = (void *) (UWord) q->head.live;
 	    if (!q->head.deq_fini.start)
 		q->head.deq_fini.start = tmp;
-	    else if (q->head.deq_fini.end->next.ptr == &q->tail.data.marker)
-		q->head.deq_fini.end->next.ptr = tmp;
+	    else if (ErtsThrQDirtyReadEl(&q->head.deq_fini.end->next)
+		     == &q->tail.data.marker)
+		ErtsThrQDirtySetEl(&q->head.deq_fini.end->next, tmp);
 	    q->head.deq_fini.end = tmp;
 	}
     }
@@ -406,14 +414,13 @@ clean(ErtsThrQ_t *q, int max_ops, int do_notify)
 		ilast = (erts_aint_t) enqueue_managed(q,
 						      &q->tail.data.marker,
 						      1);
-		if (q->head.head.ptr == q->head.unref_end) {
+		head = ErtsThrQDirtyReadEl(&q->head.head);
+		if (head == q->head.unref_end) {
 		    ErtsThrQElement_t *next;
 		    next = ((ErtsThrQElement_t *)
-			    erts_atomic_read_acqb(&q->head.head.ptr->next.atmc));
-		    if (next == &q->tail.data.marker) {
-			q->head.head.ptr->next.ptr = &q->tail.data.marker;
-			q->head.head.ptr = &q->tail.data.marker;
-		    }
+			    erts_atomic_read_acqb(&head->next));
+		    if (next == &q->tail.data.marker)
+			ErtsThrQDirtySetEl(&q->head.head, &q->tail.data.marker);
 		}
 	    }
 
@@ -436,18 +443,18 @@ clean(ErtsThrQ_t *q, int max_ops, int do_notify)
     }
 #endif
 
-    if (q->head.first == q->head.head.ptr) {
+    head = ErtsThrQDirtyReadEl(&q->head.head);
+    if (q->head.first == head) {
     inspect_head:
 	if (!q->head.used_marker) {
 	    erts_aint_t inext;
-	    inext = erts_atomic_read_acqb(&q->head.head.ptr->next.atmc);
+	    inext = erts_atomic_read_acqb(&head->next);
 	    if (inext == ERTS_AINT_NULL) {
 		q->head.used_marker = 1;
 		(void) enqueue_managed(q, &q->tail.data.marker, 0);
-		inext = erts_atomic_read_acqb(&q->head.head.ptr->next.atmc);
+		inext = erts_atomic_read_acqb(&head->next);
 		if (inext == (erts_aint_t) &q->tail.data.marker) {
-		    q->head.head.ptr->next.ptr = &q->tail.data.marker;
-		    q->head.head.ptr = &q->tail.data.marker;
+		    ErtsThrQDirtySetEl(&q->head.head, &q->tail.data.marker);
 		    goto check_thr_progress;
 		}
 	    }
@@ -506,26 +513,27 @@ erts_thr_q_inspect(ErtsThrQ_t *q, int ensure_empty)
 #ifndef USE_THREADS
     return ERTS_THR_Q_CLEAN;
 #else
+    ErtsThrQElement_t *head = ErtsThrQDirtyReadEl(&q->head.head);
     if (ensure_empty) {
 	erts_aint_t inext;
-	inext = erts_atomic_read_acqb(&q->head.head.ptr->next.atmc);
+	inext = erts_atomic_read_acqb(&head->next);
 	if (inext != ERTS_AINT_NULL) {
 	    if (&q->tail.data.marker != (ErtsThrQElement_t *) inext)
 		return ERTS_THR_Q_DIRTY;
 	    else {
-		q->head.head.ptr->next.ptr = (ErtsThrQElement_t *) inext;
-		q->head.head.ptr = (ErtsThrQElement_t *) inext;
-		inext = erts_atomic_read_acqb(&q->head.head.ptr->next.atmc);
+		head = (ErtsThrQElement_t *) inext;
+		ErtsThrQDirtySetEl(&q->head.head, head);
+		inext = erts_atomic_read_acqb(&head->next);
 		if (inext != ERTS_AINT_NULL)
 		    return ERTS_THR_Q_DIRTY;
 	    }
 	}
     }
 
-    if (q->head.first == q->head.head.ptr) {
+    if (q->head.first == head) {
 	if (!q->head.used_marker) {
 	    erts_aint_t inext;
-	    inext = erts_atomic_read_acqb(&q->head.head.ptr->next.atmc);
+	    inext = erts_atomic_read_acqb(&head->next);
 	    if (inext == ERTS_AINT_NULL)
 		return ERTS_THR_Q_DIRTY;
 	}
@@ -553,11 +561,11 @@ enqueue(ErtsThrQ_t *q, void *data, ErtsThrQElement_t *this)
 #ifndef USE_THREADS
     ASSERT(data);
 
-    this->next.ptr = NULL;
+    this->next = NULL;
     this->data.ptr = data;
 
     if (q->last)
-	q->last->next.ptr = this;
+	q->last->next = this;
     else {
 	q->first = q->last = this;
 	q->init.notify(q->init.arg);
@@ -638,17 +646,17 @@ erts_thr_q_get_finalize_dequeue_data(ErtsThrQ_t *q, ErtsThrQFinDeQ_t *fdp)
 	ErtsThrQElement_t *e = q->head.deq_fini.start;
 	ErtsThrQElement_t *end = q->head.deq_fini.end;
 	while (e != end) {
-	    ASSERT(q->head.head.ptr != e);
+	    ASSERT(ErtsThrQDirtyReadEl(&q->head.head) != e);
 	    ASSERT(q->head.first != e);
 	    ASSERT(q->head.unref_end != e);
-	    e = e->next.ptr;
+	    e = ErtsThrQDirtyReadEl(&e->next);
 	}
     }	
 #endif
     fdp->start = q->head.deq_fini.start;
     fdp->end = q->head.deq_fini.end;
     if (fdp->end)
-	fdp->end->next.ptr = NULL;
+	ErtsThrQDirtySetEl(&fdp->end->next, NULL);
     q->head.deq_fini.start = NULL;
     q->head.deq_fini.end = NULL;
     return fdp->start != NULL;
@@ -662,7 +670,7 @@ erts_thr_q_append_finalize_dequeue_data(ErtsThrQFinDeQ_t *fdp0,
 #ifdef USE_THREADS
     if (fdp1->start) {
 	if (fdp0->end)
-	    fdp0->end->next.ptr = fdp1->start;
+	    ErtsThrQDirtySetEl(&fdp0->end->next, fdp1->start);
 	else
 	    fdp0->start = fdp1->start;
 	fdp0->end = fdp1->end;
@@ -683,7 +691,7 @@ int erts_thr_q_finalize_dequeue(ErtsThrQFinDeQ_t *state)
 	    if (!start)
 		break;
 	    tmp = start;
-	    start = start->next.ptr;
+	    start = ErtsThrQDirtyReadEl(&start->next);
 	    live = (ErtsThrQLive_t) (UWord) tmp->data.ptr;
 	    element_live_free(live, tmp);
 	}
@@ -724,7 +732,7 @@ erts_thr_q_dequeue(ErtsThrQ_t *q)
 	return NULL;
     tmp = q->first;
     res = tmp->data.ptr;
-    q->first = tmp->next.ptr;
+    q->first = tmp->next;
     if (!q->first)
 	q->last = NULL;
 
@@ -732,24 +740,26 @@ erts_thr_q_dequeue(ErtsThrQ_t *q)
 
     return res;
 #else
+    ErtsThrQElement_t *head;
     erts_aint_t inext;
     void *res;
 
-    inext = erts_atomic_read_acqb(&q->head.head.ptr->next.atmc);
+    head = ErtsThrQDirtyReadEl(&q->head.head);
+    inext = erts_atomic_read_acqb(&head->next);
     if (inext == ERTS_AINT_NULL)
 	return NULL;
-    q->head.head.ptr->next.ptr = (ErtsThrQElement_t *) inext;
-    q->head.head.ptr = (ErtsThrQElement_t *) inext;
-    if (q->head.head.ptr == &q->tail.data.marker) {
-	inext = erts_atomic_read_acqb(&q->head.head.ptr->next.atmc);
+    head = (ErtsThrQElement_t *) inext;
+    ErtsThrQDirtySetEl(&q->head.head, head);
+    if (head == &q->tail.data.marker) {
+	inext = erts_atomic_read_acqb(&head->next);
 	if (inext == ERTS_AINT_NULL)
 	    return NULL;
-	q->head.head.ptr->next.ptr = (ErtsThrQElement_t *) inext;
-	q->head.head.ptr = (ErtsThrQElement_t *) inext;
+	head = (ErtsThrQElement_t *) inext;
+	ErtsThrQDirtySetEl(&q->head.head, head);
     }
-    res = q->head.head.ptr->data.ptr;
+    res = head->data.ptr;
 #if ERTS_THR_Q_DBG_CHK_DATA
-    q->head.head.ptr->data.ptr = NULL;
+    head->data.ptr = NULL;
     if (!res)
 	erl_exit(ERTS_ABORT_EXIT, "Missing data in dequeue\n");
 #endif
diff --git a/erts/emulator/beam/erl_thr_queue.h b/erts/emulator/beam/erl_thr_queue.h
index edcf2c3823..ae8c7fb19a 100644
--- a/erts/emulator/beam/erl_thr_queue.h
+++ b/erts/emulator/beam/erl_thr_queue.h
@@ -76,13 +76,12 @@ typedef struct {
 typedef struct ErtsThrQElement_t_ ErtsThrQElement_t;
 typedef struct ErtsThrQElement_t ErtsThrQPrepEnQ_t;
 
-typedef union {
-    erts_atomic_t atmc;
-    ErtsThrQElement_t *ptr;
-} ErtsThrQPtr_t;
-
 struct ErtsThrQElement_t_ {
-    ErtsThrQPtr_t next;
+#ifdef USE_THREADS
+    erts_atomic_t next;
+#else
+    ErtsThrQElement_t *next;
+#endif
     union {
 	erts_atomic_t atmc;
 	void *ptr;
@@ -130,7 +129,7 @@ struct ErtsThrQ_t_ {
      * thread dequeuing.
      */
     struct {
-	ErtsThrQPtr_t head;
+	erts_atomic_t head;
 	ErtsThrQLive_t live;
 	ErtsThrQElement_t *first;
 	ErtsThrQElement_t *unref_end;
-- 
cgit v1.2.3


From 9ccc56b450cef0fce8b0ed976e91d3dee13ee3b9 Mon Sep 17 00:00:00 2001
From: Rickard Green <rickard@erlang.org>
Date: Wed, 20 Feb 2013 17:13:45 +0100
Subject: Fix lost enqueue notification

---
 erts/emulator/beam/erl_thr_queue.c | 79 +++++++++++++++++++++-----------------
 1 file changed, 44 insertions(+), 35 deletions(-)

(limited to 'erts')

diff --git a/erts/emulator/beam/erl_thr_queue.c b/erts/emulator/beam/erl_thr_queue.c
index acadc5d4cd..ee2ff765e0 100644
--- a/erts/emulator/beam/erl_thr_queue.c
+++ b/erts/emulator/beam/erl_thr_queue.c
@@ -301,7 +301,7 @@ element_free(ErtsThrQ_t *q, ErtsThrQElement_t *el)
 #ifdef USE_THREADS
 
 static ERTS_INLINE ErtsThrQElement_t *
-enqueue_managed(ErtsThrQ_t *q, ErtsThrQElement_t *this, int want_last)
+enqueue_managed(ErtsThrQ_t *q, ErtsThrQElement_t *this)
 {
     erts_aint_t ilast, itmp;
 
@@ -321,28 +321,53 @@ enqueue_managed(ErtsThrQ_t *q, ErtsThrQElement_t *this, int want_last)
 
     /* Move last pointer forward... */
     while (1) {
-	if (want_last) {
-	    if (erts_atomic_read_rb(&this->next) != ERTS_AINT_NULL) {
-		/* Someone else will move it forward */
-		ilast = erts_atomic_read_rb(&q->tail.data.last);
-		return (ErtsThrQElement_t *) ilast;
-	    }
-	}
-	else {
-	    if (erts_atomic_read_nob(&this->next) != ERTS_AINT_NULL) {
-		/* Someone else will move it forward */
-		return NULL;
-	    }
+	if (erts_atomic_read_rb(&this->next) != ERTS_AINT_NULL) {
+	    /* Someone else will move it forward */
+	    ilast = erts_atomic_read_rb(&q->tail.data.last);
+	    return (ErtsThrQElement_t *) ilast;
 	}
 	itmp = erts_atomic_cmpxchg_mb(&q->tail.data.last,
 				      (erts_aint_t) this,
 				      ilast);
 	if (ilast == itmp)
-	    return want_last ? this : NULL;
+	    return this;
 	ilast = itmp;
     }
 }
 
+static ERTS_INLINE ErtsThrQElement_t *
+enqueue_marker(ErtsThrQ_t *q, ErtsThrQElement_t **headp)
+{
+    int maybe_notify;
+    erts_aint_t inext;
+    ErtsThrQElement_t *last, *head;
+
+    if (headp)
+	head = *headp;
+    else
+	head = ErtsThrQDirtyReadEl(&q->head.head);
+
+    ASSERT(!q->head.used_marker);
+    q->head.used_marker = 1;
+    last = enqueue_managed(q, &q->tail.data.marker);
+    maybe_notify = &q->tail.data.marker == last;
+    inext = erts_atomic_read_acqb(&head->next);
+    if (inext == (erts_aint_t) &q->tail.data.marker) {
+	ErtsThrQDirtySetEl(&q->head.head, &q->tail.data.marker);
+	if (headp)
+	    *headp = &q->tail.data.marker;
+    }
+    else if (maybe_notify) {
+	/*
+	 * We need to notify; otherwise, we might loose a notification
+	 * for a concurrently inserted element.
+	 */
+	q->head.notify(q->head.arg);
+    }
+    return last;
+}
+
+
 static ErtsThrQCleanState_t
 clean(ErtsThrQ_t *q, int max_ops, int do_notify)
 {
@@ -409,20 +434,8 @@ clean(ErtsThrQ_t *q, int max_ops, int do_notify)
 	    q->head.unref_end = q->head.next.unref_end;
 
 	    if (!q->head.used_marker
-		&& q->head.unref_end == (ErtsThrQElement_t *) ilast) {
-		q->head.used_marker = 1;
-		ilast = (erts_aint_t) enqueue_managed(q,
-						      &q->tail.data.marker,
-						      1);
-		head = ErtsThrQDirtyReadEl(&q->head.head);
-		if (head == q->head.unref_end) {
-		    ErtsThrQElement_t *next;
-		    next = ((ErtsThrQElement_t *)
-			    erts_atomic_read_acqb(&head->next));
-		    if (next == &q->tail.data.marker)
-			ErtsThrQDirtySetEl(&q->head.head, &q->tail.data.marker);
-		}
-	    }
+		&& q->head.unref_end == (ErtsThrQElement_t *) ilast)
+		ilast = (erts_aint_t) enqueue_marker(q, NULL);
 
 	    if (q->head.unref_end == (ErtsThrQElement_t *) ilast)
 		ERTS_SMP_MEMORY_BARRIER;
@@ -450,13 +463,9 @@ clean(ErtsThrQ_t *q, int max_ops, int do_notify)
 	    erts_aint_t inext;
 	    inext = erts_atomic_read_acqb(&head->next);
 	    if (inext == ERTS_AINT_NULL) {
-		q->head.used_marker = 1;
-		(void) enqueue_managed(q, &q->tail.data.marker, 0);
-		inext = erts_atomic_read_acqb(&head->next);
-		if (inext == (erts_aint_t) &q->tail.data.marker) {
-		    ErtsThrQDirtySetEl(&q->head.head, &q->tail.data.marker);
+		enqueue_marker(q, &head);
+		if (head == &q->tail.data.marker)
 		    goto check_thr_progress;
-		}
 	    }
 	}
 
@@ -603,7 +612,7 @@ enqueue(ErtsThrQ_t *q, void *data, ErtsThrQElement_t *this)
 	}
     }
 
-    notify = this == enqueue_managed(q, this, 1);
+    notify = this == enqueue_managed(q, this);
 	
 
 #ifdef ERTS_SMP
-- 
cgit v1.2.3