/*
* %CopyrightBegin%
*
* Copyright Ericsson AB 2019. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* %CopyrightEnd%
*/
/*
* Author: Kjell Winblad
*/
#include "erl_flxctr.h"
static int reader_groups_array_size = 0;
#define ERTS_FLXCTR_DECENTRALIZED_NO_SLOTS (reader_groups_array_size)
static int erts_flxctr_read_ctx_bin_dtor(Binary *context_bin);
static int erts_flxctr_wait_dtor(Binary *context_bin);
typedef struct {
ErtsThrPrgrLaterOp later_op;
Process* process;
ErtsFlxCtrDecentralizedCtrArray* array;
ErtsFlxCtrDecentralizedCtrArray* next_array;
ErtsAlcType_t alloc_type;
int nr_of_counters;
Sint result[ERTS_FLXCTR_ATOMICS_PER_CACHE_LINE];
} DecentralizedReadSnapshotInfo;
typedef enum {
ERTS_FLXCTR_SNAPSHOT_NOT_ONGOING = 0,
ERTS_FLXCTR_SNAPSHOT_ONGOING = 1,
ERTS_FLXCTR_SNAPSHOT_ONGOING_TP_THREAD_DO_FREE = 2
} erts_flxctr_snapshot_status;
static void
thr_prg_wake_up_and_count(void* bin_p)
{
Binary* bin = bin_p;
DecentralizedReadSnapshotInfo* info = ERTS_MAGIC_BIN_DATA(bin);
Process* p = info->process;
ErtsFlxCtrDecentralizedCtrArray* array = info->array;
ErtsFlxCtrDecentralizedCtrArray* next = info->next_array;
int i, sched;
/* Reset result array */
for (i = 0; i < info->nr_of_counters; i++) {
info->result[i] = 0;
}
/* Read result from snapshot */
for (sched = 0; sched < ERTS_FLXCTR_DECENTRALIZED_NO_SLOTS; sched++) {
for (i = 0; i < info->nr_of_counters; i++) {
info->result[i] = info->result[i] +
erts_atomic_read_nob(&array->array[sched].counters[i]);
}
}
/* Update the next decentralized counter array */
for (i = 0; i < info->nr_of_counters; i++) {
erts_atomic_add_nob(&next->array[0].counters[i], info->result[i]);
}
/* Announce that the snapshot is done */
{
Sint expected = ERTS_FLXCTR_SNAPSHOT_ONGOING;
if (expected != erts_atomic_cmpxchg_mb(&next->snapshot_status,
ERTS_FLXCTR_SNAPSHOT_NOT_ONGOING,
expected)) {
/* The CAS failed which means that this thread need to free the next array. */
erts_free(info->alloc_type, next->block_start);
}
}
/* Resume the process that requested the snapshot */
erts_proc_lock(p, ERTS_PROC_LOCK_STATUS);
if (!ERTS_PROC_IS_EXITING(p)) {
erts_resume(p, ERTS_PROC_LOCK_STATUS);
}
/* Free the memory that is no longer needed */
erts_free(info->alloc_type, array->block_start);
erts_proc_unlock(p, ERTS_PROC_LOCK_STATUS);
erts_proc_dec_refc(p);
erts_bin_release(bin);
}
typedef struct {
ErtsThrPrgrLaterOp later_op;
Process* process;
} ErtsFlxCtrWakeUpLaterInfo;
static void
thr_prg_wake_up_later(void* bin_p)
{
Binary* bin = bin_p;
ErtsFlxCtrWakeUpLaterInfo* info = ERTS_MAGIC_BIN_DATA(bin);
Process* p = info->process;
/* Resume the requesting process */
erts_proc_lock(p, ERTS_PROC_LOCK_STATUS);
if (!ERTS_PROC_IS_EXITING(p)) {
erts_resume(p, ERTS_PROC_LOCK_STATUS);
}
erts_proc_unlock(p, ERTS_PROC_LOCK_STATUS);
/* Free data */
erts_proc_dec_refc(p);
erts_bin_release(bin);
}
static
int erts_flxctr_read_ctx_bin_dtor(Binary *context_bin) {
(void)context_bin;
return 1;
}
static
int erts_flxctr_wait_dtor(Binary *context_bin) {
(void)context_bin;
return 1;
}
static void suspend_until_thr_prg(Process* p)
{
Binary* state_bin;
ErtsFlxCtrWakeUpLaterInfo* info;
state_bin = erts_create_magic_binary(sizeof(ErtsFlxCtrWakeUpLaterInfo),
erts_flxctr_wait_dtor);
info = ERTS_MAGIC_BIN_DATA(state_bin);
info->process = p;
erts_refc_inctest(&state_bin->intern.refc, 1);
erts_suspend(p, ERTS_PROC_LOCK_MAIN, NULL);
erts_proc_inc_refc(p);
ERTS_VBUMP_ALL_REDS(p);
erts_schedule_thr_prgr_later_op(thr_prg_wake_up_later, state_bin, &info->later_op);
}
static ErtsFlxCtrDecentralizedCtrArray*
create_decentralized_ctr_array(ErtsAlcType_t alloc_type, Uint nr_of_counters) {
/* Allocate an ErtsFlxCtrDecentralizedCtrArray and make sure that
the array field is located at the start of a cache line */
char* bytes =
erts_alloc(alloc_type,
sizeof(ErtsFlxCtrDecentralizedCtrArray) +
(sizeof(ErtsFlxCtrDecentralizedCtrArrayElem) *
ERTS_FLXCTR_DECENTRALIZED_NO_SLOTS) +
ERTS_CACHE_LINE_SIZE);
void* block_start = bytes;
int bytes_to_next_cacheline_border;
ErtsFlxCtrDecentralizedCtrArray* array;
int i, sched;
bytes = &bytes[offsetof(ErtsFlxCtrDecentralizedCtrArray, array)];
bytes_to_next_cacheline_border =
ERTS_CACHE_LINE_SIZE - (((Uint)bytes) % ERTS_CACHE_LINE_SIZE);
array = (ErtsFlxCtrDecentralizedCtrArray*)
(&bytes[bytes_to_next_cacheline_border -
(int)offsetof(ErtsFlxCtrDecentralizedCtrArray, array)]);
ASSERT(((Uint)array->array) % ERTS_CACHE_LINE_SIZE == 0);
ASSERT(((Uint)array - (Uint)block_start) <= ERTS_CACHE_LINE_SIZE);
/* Initialize fields */
erts_atomic_init_nob(&array->snapshot_status, ERTS_FLXCTR_SNAPSHOT_ONGOING);
for (sched = 0; sched < ERTS_FLXCTR_DECENTRALIZED_NO_SLOTS; sched++) {
for (i = 0; i < nr_of_counters; i++) {
erts_atomic_init_nob(&array->array[sched].counters[i], 0);
}
}
array->block_start = block_start;
return array;
}
void erts_flxctr_setup(int decentralized_counter_groups)
{
reader_groups_array_size = decentralized_counter_groups+1;
}
void erts_flxctr_init(ErtsFlxCtr* c,
int is_decentralized,
Uint nr_of_counters,
ErtsAlcType_t alloc_type)
{
ASSERT(nr_of_counters <= ERTS_FLXCTR_ATOMICS_PER_CACHE_LINE);
c->is_decentralized = is_decentralized;
c->nr_of_counters = nr_of_counters;
if (c->is_decentralized) {
ErtsFlxCtrDecentralizedCtrArray* array =
create_decentralized_ctr_array(alloc_type, nr_of_counters);
erts_atomic_set_nob(&array->snapshot_status,
ERTS_FLXCTR_SNAPSHOT_NOT_ONGOING);
erts_atomic_init_nob(&c->u.counters_ptr, (Sint)array);
ASSERT(((Uint)array->array) % ERTS_CACHE_LINE_SIZE == 0);
} else {
int i;
for (i = 0; i < nr_of_counters; i++) {
erts_atomic_init_nob(&c->u.counters[i], 0);
}
}
}
void erts_flxctr_destroy(ErtsFlxCtr* c, ErtsAlcType_t type)
{
if (c->is_decentralized) {
if (erts_flxctr_is_snapshot_ongoing(c)) {
ErtsFlxCtrDecentralizedCtrArray* array =
ERTS_FLXCTR_GET_CTR_ARRAY_PTR(c);
/* Try to delegate the resposibilty of freeing to
thr_prg_wake_up_and_count */
Sint expected = ERTS_FLXCTR_SNAPSHOT_ONGOING;
if (expected !=
erts_atomic_cmpxchg_mb(&array->snapshot_status,
ERTS_FLXCTR_SNAPSHOT_ONGOING_TP_THREAD_DO_FREE,
expected)) {
/* The delegation was unsuccessful which means that no
snapshot is ongoing anymore and the freeing needs
to be done here */
ERTS_ASSERT(!erts_flxctr_is_snapshot_ongoing(c));
erts_free(type, array->block_start);
}
} else {
erts_free(type, ERTS_FLXCTR_GET_CTR_ARRAY_PTR(c)->block_start);
}
}
}
ErtsFlxCtrSnapshotResult
erts_flxctr_snapshot(ErtsFlxCtr* c,
ErtsAlcType_t alloc_type,
Process* p)
{
if (c->is_decentralized) {
ErtsFlxCtrDecentralizedCtrArray* array = ERTS_FLXCTR_GET_CTR_ARRAY_PTR(c);
if (erts_flxctr_is_snapshot_ongoing(c)) {
/* Let the caller try again later */
ErtsFlxCtrSnapshotResult res =
{.type = ERTS_FLXCTR_TRY_AGAIN_AFTER_TRAP};
suspend_until_thr_prg(p);
return res;
} else {
Eterm* hp;
Binary* state_bin;
Eterm state_mref;
DecentralizedReadSnapshotInfo* info;
ErtsFlxCtrDecentralizedCtrArray* new_array =
create_decentralized_ctr_array(alloc_type, c->nr_of_counters);
int success =
((Sint)array) == erts_atomic_cmpxchg_mb(&c->u.counters_ptr,
(Sint)new_array,
(Sint)array);
if (!success) {
/* Let the caller try again later */
ErtsFlxCtrSnapshotResult res =
{.type = ERTS_FLXCTR_TRY_AGAIN_AFTER_TRAP};
suspend_until_thr_prg(p);
erts_free(alloc_type, new_array->block_start);
return res;
}
/* Create binary with info about the operation that can be
sent to the caller and to a thread progress function */
state_bin =
erts_create_magic_binary(sizeof(DecentralizedReadSnapshotInfo),
erts_flxctr_read_ctx_bin_dtor);
hp = HAlloc(p, ERTS_MAGIC_REF_THING_SIZE);
state_mref = erts_mk_magic_ref(&hp, &MSO(p), state_bin);
info = ERTS_MAGIC_BIN_DATA(state_bin);
info->alloc_type = alloc_type;
info->array = array;
info->next_array = new_array;
info->process = p;
info->nr_of_counters = c->nr_of_counters;
erts_proc_inc_refc(p);
erts_refc_inctest(&state_bin->intern.refc, 2);
erts_suspend(p, ERTS_PROC_LOCK_MAIN, NULL);
ERTS_VBUMP_ALL_REDS(p);
erts_schedule_thr_prgr_later_op(thr_prg_wake_up_and_count,
state_bin,
&info->later_op);
{
ErtsFlxCtrSnapshotResult res = {
.type = ERTS_FLXCTR_GET_RESULT_AFTER_TRAP,
.trap_resume_state = state_mref};
return res;
}
}
} else {
ErtsFlxCtrSnapshotResult res;
int i;
res.type = ERTS_FLXCTR_DONE;
for (i = 0; i < c->nr_of_counters; i++){
res.result[i] = erts_flxctr_read_centralized(c, i);
}
return res;
}
}
Sint erts_flxctr_get_snapshot_result_after_trap(Eterm result_holder,
Uint counter_nr)
{
Binary* bin = erts_magic_ref2bin(result_holder);
DecentralizedReadSnapshotInfo* data = ERTS_MAGIC_BIN_DATA(bin);;
return data->result[counter_nr];
}
int erts_flxctr_is_snapshot_result(Eterm term)
{
if (is_internal_magic_ref(term)) {
Binary* bin = erts_magic_ref2bin(term);
return ERTS_MAGIC_BIN_DESTRUCTOR(bin) == erts_flxctr_read_ctx_bin_dtor;
} else return 0;
}
Sint erts_flxctr_read_approx(ErtsFlxCtr* c,
Uint counter_nr)
{
if (c->is_decentralized) {
ErtsFlxCtrDecentralizedCtrArray* counter = ERTS_FLXCTR_GET_CTR_ARRAY_PTR(c);
Sint sum = 0;
int sched;
for (sched = 0; sched < ERTS_FLXCTR_DECENTRALIZED_NO_SLOTS; sched++) {
sum = sum + erts_atomic_read_nob(&counter->array[sched].counters[counter_nr]);
}
return sum;
} else {
return erts_flxctr_read_centralized(c, counter_nr);
}
}
int erts_flxctr_is_snapshot_ongoing(ErtsFlxCtr* c)
{
return c->is_decentralized &&
(ERTS_FLXCTR_SNAPSHOT_NOT_ONGOING !=
erts_atomic_read_acqb(&ERTS_FLXCTR_GET_CTR_ARRAY_PTR(c)->snapshot_status));
}
int erts_flxctr_suspend_until_thr_prg_if_snapshot_ongoing(ErtsFlxCtr* c, Process* p)
{
if (erts_flxctr_is_snapshot_ongoing(c)) {
suspend_until_thr_prg(p);
return 1;
} else {
return 0;
}
}
void erts_flxctr_reset(ErtsFlxCtr* c,
Uint counter_nr)
{
if (c->is_decentralized) {
int sched;
ErtsFlxCtrDecentralizedCtrArray* counter =
ERTS_FLXCTR_GET_CTR_ARRAY_PTR(c);
for (sched = 0; sched < ERTS_FLXCTR_DECENTRALIZED_NO_SLOTS; sched++) {
erts_atomic_set_nob(&counter->array[sched].counters[counter_nr], 0);
}
} else {
erts_atomic_set_nob(&c->u.counters[counter_nr], 0);
}
}
void erts_flxctr_set_slot(int group) {
ErtsSchedulerData *esdp = erts_get_scheduler_data();
esdp->flxctr_slot_no = group;
}