/*
* %CopyrightBegin%
*
* Copyright Ericsson AB 2011-2012. All Rights Reserved.
*
* The contents of this file are subject to the Erlang Public License,
* Version 1.1, (the "License"); you may not use this file except in
* compliance with the License. You should have received a copy of the
* Erlang Public License along with this software. If not, it can be
* retrieved online at http://www.erlang.org/.
*
* Software distributed under the License is distributed on an "AS IS"
* basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
* the License for the specific language governing rights and limitations
* under the License.
*
* %CopyrightEnd%
*/
/*
* Description: Scheduler specific pre-allocators. Each scheduler
* thread allocates memory in its own private chunk of
* memory. Memory blocks deallocated by remote
* schedulers (or other threads) are passed back to
* the chunk owner via a lock-free data structure.
*
* Author: Rickard Green
*/
#ifdef HAVE_CONFIG_H
# include "config.h"
#endif
#ifdef ERTS_SMP
#include "erl_process.h"
#include "erl_thr_progress.h"
erts_sspa_data_t *
erts_sspa_create(size_t blk_sz, int pa_size)
{
erts_sspa_data_t *data;
size_t tot_size;
size_t chunk_mem_size;
char *p;
char *chunk_start;
int cix;
int no_blocks = pa_size;
int no_blocks_per_chunk;
if (erts_no_schedulers == 1)
no_blocks_per_chunk = no_blocks;
else {
int extra = (no_blocks - 1)/4 + 1;
if (extra == 0)
extra = 1;
no_blocks_per_chunk = no_blocks;
no_blocks_per_chunk += extra*erts_no_schedulers;
no_blocks_per_chunk /= erts_no_schedulers;
}
no_blocks = no_blocks_per_chunk * erts_no_schedulers;
chunk_mem_size = ERTS_ALC_CACHE_LINE_ALIGN_SIZE(sizeof(erts_sspa_chunk_header_t));
chunk_mem_size += blk_sz * no_blocks_per_chunk;
chunk_mem_size = ERTS_ALC_CACHE_LINE_ALIGN_SIZE(chunk_mem_size);
tot_size = ERTS_ALC_CACHE_LINE_ALIGN_SIZE(sizeof(erts_sspa_data_t));
tot_size += chunk_mem_size*erts_no_schedulers;
p = erts_alloc_permanent_cache_aligned(ERTS_ALC_T_PRE_ALLOC_DATA, tot_size);
data = (erts_sspa_data_t *) p;
p += ERTS_ALC_CACHE_LINE_ALIGN_SIZE(sizeof(erts_sspa_data_t));
chunk_start = p;
data->chunks_mem_size = chunk_mem_size;
data->start = chunk_start;
data->end = chunk_start + chunk_mem_size*erts_no_schedulers;
/* Initialize all chunks */
for (cix = 0; cix < erts_no_schedulers; cix++) {
erts_sspa_chunk_t *chnk = erts_sspa_cix2chunk(data, cix);
erts_sspa_chunk_header_t *chdr = &chnk->aligned.header;
erts_sspa_blk_t *blk;
int i;
erts_atomic_init_nob(&chdr->tail.data.last, (erts_aint_t) &chdr->tail.data.marker);
erts_atomic_init_nob(&chdr->tail.data.marker.next_atmc, ERTS_AINT_NULL);
erts_atomic_init_nob(&chdr->tail.data.um_refc[0], 0);
erts_atomic_init_nob(&chdr->tail.data.um_refc[1], 0);
erts_atomic32_init_nob(&chdr->tail.data.um_refc_ix, 0);
chdr->head.no_thr_progress_check = 0;
chdr->head.used_marker = 1;
chdr->head.first = &chdr->tail.data.marker;
chdr->head.unref_end = &chdr->tail.data.marker;
chdr->head.next.thr_progress = erts_thr_progress_current();
chdr->head.next.thr_progress_reached = 1;
chdr->head.next.um_refc_ix = 1;
chdr->head.next.unref_end = &chdr->tail.data.marker;
p = &chnk->data[0];
chdr->local.first = (erts_sspa_blk_t *) p;
blk = (erts_sspa_blk_t *) p;
for (i = 0; i < no_blocks_per_chunk; i++) {
blk = (erts_sspa_blk_t *) p;
p += blk_sz;
blk->next_ptr = (erts_sspa_blk_t *) p;
}
blk->next_ptr = NULL;
chdr->local.last = blk;
chdr->local.cnt = no_blocks_per_chunk;
chdr->local.lim = no_blocks_per_chunk / 3;
ERTS_SSPA_DBG_CHK_LCL(chdr);
}
return data;
}
static ERTS_INLINE erts_aint_t
enqueue_remote_managed_thread(erts_sspa_chunk_header_t *chdr,
erts_sspa_blk_t *this,
int want_last)
{
erts_aint_t ilast, itmp;
erts_atomic_init_nob(&this->next_atmc, ERTS_AINT_NULL);
/* Enqueue at end of list... */
ilast = erts_atomic_read_nob(&chdr->tail.data.last);
while (1) {
erts_sspa_blk_t *last = (erts_sspa_blk_t *) ilast;
itmp = erts_atomic_cmpxchg_mb(&last->next_atmc,
(erts_aint_t) this,
ERTS_AINT_NULL);
if (itmp == ERTS_AINT_NULL)
break;
ilast = itmp;
}
/* Move last pointer forward... */
while (1) {
erts_aint_t itmp;
if (want_last) {
if (erts_atomic_read_rb(&this->next_atmc) != ERTS_AINT_NULL) {
/* Someone else will move it forward */
return erts_atomic_read_nob(&chdr->tail.data.last);
}
}
else {
if (erts_atomic_read_nob(&this->next_atmc) != ERTS_AINT_NULL) {
/* Someone else will move it forward */
return ERTS_AINT_NULL;
}
}
itmp = erts_atomic_cmpxchg_mb(&chdr->tail.data.last,
(erts_aint_t) this,
ilast);
if (ilast == itmp)
return want_last ? (erts_aint_t) this : ERTS_AINT_NULL;
ilast = itmp;
}
}
void
erts_sspa_remote_free(erts_sspa_chunk_header_t *chdr, erts_sspa_blk_t *blk)
{
int um_refc_ix = 0;
int managed_thread = erts_thr_progress_is_managed_thread();
if (!managed_thread) {
um_refc_ix = erts_atomic32_read_acqb(&chdr->tail.data.um_refc_ix);
while (1) {
int tmp_um_refc_ix;
erts_atomic_inc_acqb(&chdr->tail.data.um_refc[um_refc_ix]);
tmp_um_refc_ix = erts_atomic32_read_acqb(&chdr->tail.data.um_refc_ix);
if (tmp_um_refc_ix == um_refc_ix)
break;
erts_atomic_dec_relb(&chdr->tail.data.um_refc[um_refc_ix]);
um_refc_ix = tmp_um_refc_ix;
}
}
(void) enqueue_remote_managed_thread(chdr, blk, 0);
if (!managed_thread)
erts_atomic_dec_relb(&chdr->tail.data.um_refc[um_refc_ix]);
}
static ERTS_INLINE void
fetch_remote(erts_sspa_chunk_header_t *chdr, int max)
{
int new_local = 0;
if (chdr->head.no_thr_progress_check < ERTS_SSPA_FORCE_THR_CHECK_PROGRESS)
chdr->head.no_thr_progress_check++;
else {
erts_aint_t ilast;
chdr->head.no_thr_progress_check = 0;
ilast = erts_atomic_read_nob(&chdr->tail.data.last);
if (((erts_sspa_blk_t *) ilast) == &chdr->tail.data.marker
&& chdr->head.first == &chdr->tail.data.marker)
return;
if (chdr->head.next.thr_progress_reached
|| erts_thr_progress_has_reached(chdr->head.next.thr_progress)) {
int um_refc_ix;
chdr->head.next.thr_progress_reached = 1;
um_refc_ix = chdr->head.next.um_refc_ix;
if (erts_atomic_read_acqb(&chdr->tail.data.um_refc[um_refc_ix]) == 0) {
/* Move unreferenced end pointer forward... */
chdr->head.unref_end = chdr->head.next.unref_end;
if (!chdr->head.used_marker
&& chdr->head.unref_end == (erts_sspa_blk_t *) ilast) {
/* Need to equeue marker */
chdr->head.used_marker = 1;
ilast = enqueue_remote_managed_thread(chdr,
&chdr->tail.data.marker,
1);
}
if (chdr->head.unref_end == (erts_sspa_blk_t *) ilast)
ERTS_THR_MEMORY_BARRIER;
else {
chdr->head.next.unref_end = (erts_sspa_blk_t *) ilast;
chdr->head.next.thr_progress = erts_thr_progress_later();
erts_atomic32_set_relb(&chdr->tail.data.um_refc_ix,
um_refc_ix);
chdr->head.next.um_refc_ix = um_refc_ix == 0 ? 1 : 0;
chdr->head.next.thr_progress_reached = 0;
}
}
}
}
if (new_local < max && chdr->head.first != chdr->head.unref_end) {
erts_sspa_blk_t *first, *this, *next, *last;
first = chdr->head.first;
if (first == &chdr->tail.data.marker) {
chdr->head.used_marker = 0;
first = ((erts_sspa_blk_t *)
erts_atomic_read_nob(&first->next_atmc));
chdr->head.first = first;
}
if (first != chdr->head.unref_end) {
ERTS_SSPA_DBG_CHK_LCL(chdr);
this = last = first;
do {
next = (erts_sspa_blk_t *) erts_atomic_read_nob(&this->next_atmc);
if (this == &chdr->tail.data.marker)
chdr->head.used_marker = 0;
else {
last->next_ptr = this;
last = this;
new_local++;
}
this = next;
} while (new_local < max && this != chdr->head.unref_end);
chdr->head.first = this;
if (!chdr->local.last)
chdr->local.first = first;
else
chdr->local.last->next_ptr = first;
chdr->local.last = last;
last->next_ptr = NULL;
chdr->local.cnt += new_local;
ERTS_SSPA_DBG_CHK_LCL(chdr);
}
}
}
erts_sspa_blk_t *
erts_sspa_process_remote_frees(erts_sspa_chunk_header_t *chdr,
erts_sspa_blk_t *old_res)
{
erts_sspa_blk_t *res = old_res;
fetch_remote(chdr, ERTS_SSPA_MAX_GET_NEW_LOCAL);
if (!res && chdr->local.first) {
ERTS_SSPA_DBG_CHK_LCL(chdr);
res = chdr->local.first;
chdr->local.first = res->next_ptr;
chdr->local.cnt--;
if (!chdr->local.first)
chdr->local.last = NULL;
ERTS_SSPA_DBG_CHK_LCL(chdr);
}
return res;
}
#endif /* ERTS_SMP */