aboutsummaryrefslogblamecommitdiffstats
path: root/erts/emulator/beam/erl_flxctr.c
blob: 35f4a2150820ae3fec8a491eabb986772fcb6321 (plain) (tree)

















































































































































































































































































































































































                                                                                          
/*
 * %CopyrightBegin%
 *
 * Copyright Ericsson AB 2019. All Rights Reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *
 * %CopyrightEnd%
 */

/*
 * Author: Kjell Winblad
 */

#include "erl_flxctr.h"

static int reader_groups_array_size = 0;
#define ERTS_FLXCTR_DECENTRALIZED_NO_SLOTS (reader_groups_array_size)

static int erts_flxctr_read_ctx_bin_dtor(Binary *context_bin);
static int erts_flxctr_wait_dtor(Binary *context_bin);

typedef struct {
    ErtsThrPrgrLaterOp later_op;
    Process* process;
    ErtsFlxCtrDecentralizedCtrArray* array;
    ErtsFlxCtrDecentralizedCtrArray* next_array;
    ErtsAlcType_t alloc_type;
    int nr_of_counters;
    Sint result[ERTS_FLXCTR_ATOMICS_PER_CACHE_LINE];
} DecentralizedReadSnapshotInfo;

typedef enum {
    ERTS_FLXCTR_SNAPSHOT_NOT_ONGOING = 0,
    ERTS_FLXCTR_SNAPSHOT_ONGOING = 1,
    ERTS_FLXCTR_SNAPSHOT_ONGOING_TP_THREAD_DO_FREE = 2
} erts_flxctr_snapshot_status;

static void
thr_prg_wake_up_and_count(void* bin_p)
{
    Binary* bin = bin_p;
    DecentralizedReadSnapshotInfo* info = ERTS_MAGIC_BIN_DATA(bin);
    Process* p = info->process;
    ErtsFlxCtrDecentralizedCtrArray* array = info->array;
    ErtsFlxCtrDecentralizedCtrArray* next = info->next_array;
    int i, sched;
    /* Reset result array */
    for (i = 0; i < info->nr_of_counters; i++) {
        info->result[i] = 0;
    }
    /* Read result from snapshot */
    for (sched = 0; sched < ERTS_FLXCTR_DECENTRALIZED_NO_SLOTS; sched++) {
        for (i = 0; i < info->nr_of_counters; i++) {
            info->result[i] = info->result[i] +
                erts_atomic_read_nob(&array->array[sched].counters[i]);
        }
    }
    /* Update the next decentralized counter array */
    for (i = 0; i < info->nr_of_counters; i++) {
        erts_atomic_add_nob(&next->array[0].counters[i], info->result[i]);
    }
    /* Announce that the snapshot is done */
    {
    Sint expected = ERTS_FLXCTR_SNAPSHOT_ONGOING;
    if (expected != erts_atomic_cmpxchg_mb(&next->snapshot_status,
                                           ERTS_FLXCTR_SNAPSHOT_NOT_ONGOING,
                                           expected)) {
        /* The CAS failed which means that this thread need to free the next array. */
        erts_free(info->alloc_type, next->block_start);
    }
    }
    /* Resume the process that requested the snapshot */
    erts_proc_lock(p, ERTS_PROC_LOCK_STATUS);
    if (!ERTS_PROC_IS_EXITING(p)) {
        erts_resume(p, ERTS_PROC_LOCK_STATUS);
    }
    /* Free the memory that is no longer needed */
    erts_free(info->alloc_type, array->block_start);
    erts_proc_unlock(p, ERTS_PROC_LOCK_STATUS);
    erts_proc_dec_refc(p);
    erts_bin_release(bin);
}

typedef struct {
    ErtsThrPrgrLaterOp later_op;
    Process* process;
} ErtsFlxCtrWakeUpLaterInfo;

static void
thr_prg_wake_up_later(void* bin_p)
{
    Binary* bin = bin_p;
    ErtsFlxCtrWakeUpLaterInfo* info = ERTS_MAGIC_BIN_DATA(bin);
    Process* p = info->process;
    /* Resume the requesting process */
    erts_proc_lock(p, ERTS_PROC_LOCK_STATUS);
    if (!ERTS_PROC_IS_EXITING(p)) {
        erts_resume(p, ERTS_PROC_LOCK_STATUS);
    }
    erts_proc_unlock(p, ERTS_PROC_LOCK_STATUS);
    /* Free data */
    erts_proc_dec_refc(p);
    erts_bin_release(bin);
}

static
int erts_flxctr_read_ctx_bin_dtor(Binary *context_bin) {
    (void)context_bin;
    return 1;
}

static
int erts_flxctr_wait_dtor(Binary *context_bin) {
    (void)context_bin;
    return 1;
}

static void suspend_until_thr_prg(Process* p)
{
    Binary* state_bin;
    ErtsFlxCtrWakeUpLaterInfo* info;
    state_bin = erts_create_magic_binary(sizeof(ErtsFlxCtrWakeUpLaterInfo),
                                         erts_flxctr_wait_dtor);
    info = ERTS_MAGIC_BIN_DATA(state_bin);
    info->process = p;
    erts_refc_inctest(&state_bin->intern.refc, 1);
    erts_suspend(p, ERTS_PROC_LOCK_MAIN, NULL);
    erts_proc_inc_refc(p);
    ERTS_VBUMP_ALL_REDS(p);
    erts_schedule_thr_prgr_later_op(thr_prg_wake_up_later, state_bin, &info->later_op);
}


static ErtsFlxCtrDecentralizedCtrArray*
create_decentralized_ctr_array(ErtsAlcType_t alloc_type, Uint nr_of_counters) {
    /* Allocate an ErtsFlxCtrDecentralizedCtrArray and make sure that
       the array field is located at the start of a cache line */
    char* bytes =
        erts_alloc(alloc_type,
                   sizeof(ErtsFlxCtrDecentralizedCtrArray) +
                   (sizeof(ErtsFlxCtrDecentralizedCtrArrayElem) *
                    ERTS_FLXCTR_DECENTRALIZED_NO_SLOTS) +
                   ERTS_CACHE_LINE_SIZE);
    void* block_start = bytes;
    int bytes_to_next_cacheline_border;
    ErtsFlxCtrDecentralizedCtrArray* array;
    int i, sched;
    bytes = &bytes[offsetof(ErtsFlxCtrDecentralizedCtrArray, array)];
    bytes_to_next_cacheline_border =
        ERTS_CACHE_LINE_SIZE - (((Uint)bytes) % ERTS_CACHE_LINE_SIZE);
    array = (ErtsFlxCtrDecentralizedCtrArray*)
        (&bytes[bytes_to_next_cacheline_border -
                (int)offsetof(ErtsFlxCtrDecentralizedCtrArray, array)]);
    ASSERT(((Uint)array->array) % ERTS_CACHE_LINE_SIZE == 0);
    ASSERT(((Uint)array - (Uint)block_start) <= ERTS_CACHE_LINE_SIZE);
    /* Initialize fields */
    erts_atomic_init_nob(&array->snapshot_status, ERTS_FLXCTR_SNAPSHOT_ONGOING);
    for (sched = 0; sched < ERTS_FLXCTR_DECENTRALIZED_NO_SLOTS; sched++) {
        for (i = 0; i < nr_of_counters; i++) {
            erts_atomic_init_nob(&array->array[sched].counters[i], 0);
        }
    }
    array->block_start = block_start;
    return array;
}

void erts_flxctr_setup(int decentralized_counter_groups)
{
    reader_groups_array_size = decentralized_counter_groups+1;
}

void erts_flxctr_init(ErtsFlxCtr* c,
                      int is_decentralized,
                      Uint nr_of_counters,
                      ErtsAlcType_t alloc_type)
{
    ASSERT(nr_of_counters <= ERTS_FLXCTR_ATOMICS_PER_CACHE_LINE);
    c->is_decentralized = is_decentralized;
    c->nr_of_counters = nr_of_counters;
    if (c->is_decentralized) {
        ErtsFlxCtrDecentralizedCtrArray* array =
            create_decentralized_ctr_array(alloc_type, nr_of_counters);
        erts_atomic_set_nob(&array->snapshot_status,
                            ERTS_FLXCTR_SNAPSHOT_NOT_ONGOING);
        erts_atomic_init_nob(&c->u.counters_ptr, (Sint)array);
        ASSERT(((Uint)array->array) % ERTS_CACHE_LINE_SIZE == 0);
    } else {
        int i;
        for (i = 0; i < nr_of_counters; i++) {
            erts_atomic_init_nob(&c->u.counters[i], 0);
        }
    }
}

void erts_flxctr_destroy(ErtsFlxCtr* c, ErtsAlcType_t type)
{
    if (c->is_decentralized) {
        if (erts_flxctr_is_snapshot_ongoing(c)) {
            ErtsFlxCtrDecentralizedCtrArray* array =
                ERTS_FLXCTR_GET_CTR_ARRAY_PTR(c);
            /* Try to delegate the resposibilty of freeing to
               thr_prg_wake_up_and_count */
            Sint expected = ERTS_FLXCTR_SNAPSHOT_ONGOING;
            if (expected !=
                erts_atomic_cmpxchg_mb(&array->snapshot_status,
                                       ERTS_FLXCTR_SNAPSHOT_ONGOING_TP_THREAD_DO_FREE,
                                       expected)) {
                /* The delegation was unsuccessful which means that no
                   snapshot is ongoing anymore and the freeing needs
                   to be done here */
                ERTS_ASSERT(!erts_flxctr_is_snapshot_ongoing(c));
                erts_free(type, array->block_start);
            }
        } else {
            erts_free(type, ERTS_FLXCTR_GET_CTR_ARRAY_PTR(c)->block_start);
        }
    }
}

ErtsFlxCtrSnapshotResult
erts_flxctr_snapshot(ErtsFlxCtr* c,
                     ErtsAlcType_t alloc_type,
                     Process* p)
{
    if (c->is_decentralized) {
        ErtsFlxCtrDecentralizedCtrArray* array = ERTS_FLXCTR_GET_CTR_ARRAY_PTR(c);
        if (erts_flxctr_is_snapshot_ongoing(c)) {
            /* Let the caller try again later */
            ErtsFlxCtrSnapshotResult res =
                {.type = ERTS_FLXCTR_TRY_AGAIN_AFTER_TRAP};
            suspend_until_thr_prg(p);
            return res;
        } else {
            Eterm* hp;
            Binary* state_bin;
            Eterm state_mref;
            DecentralizedReadSnapshotInfo* info;
            ErtsFlxCtrDecentralizedCtrArray* new_array =
                create_decentralized_ctr_array(alloc_type, c->nr_of_counters);
            int success =
                ((Sint)array) == erts_atomic_cmpxchg_mb(&c->u.counters_ptr,
                                                        (Sint)new_array,
                                                        (Sint)array);
            if (!success) {
                /* Let the caller try again later */
                ErtsFlxCtrSnapshotResult res =
                    {.type = ERTS_FLXCTR_TRY_AGAIN_AFTER_TRAP};
                suspend_until_thr_prg(p);
                erts_free(alloc_type, new_array->block_start);
                return res;
            }
            /* Create binary with info about the operation that can be
               sent to the caller and to a thread progress function */
            state_bin =
                erts_create_magic_binary(sizeof(DecentralizedReadSnapshotInfo),
                                         erts_flxctr_read_ctx_bin_dtor);
            hp = HAlloc(p, ERTS_MAGIC_REF_THING_SIZE);
            state_mref = erts_mk_magic_ref(&hp, &MSO(p), state_bin);
            info = ERTS_MAGIC_BIN_DATA(state_bin);
            info->alloc_type = alloc_type;
            info->array = array;
            info->next_array = new_array;
            info->process = p;
            info->nr_of_counters = c->nr_of_counters;
            erts_proc_inc_refc(p);
            erts_refc_inctest(&state_bin->intern.refc, 2);
            erts_suspend(p, ERTS_PROC_LOCK_MAIN, NULL);
            ERTS_VBUMP_ALL_REDS(p);
            erts_schedule_thr_prgr_later_op(thr_prg_wake_up_and_count,
                                            state_bin,
                                            &info->later_op);
            {
                ErtsFlxCtrSnapshotResult res = {
                    .type = ERTS_FLXCTR_GET_RESULT_AFTER_TRAP,
                    .trap_resume_state = state_mref};
                return res;
            }
        }
    } else {
        ErtsFlxCtrSnapshotResult res;
        int i;
        res.type = ERTS_FLXCTR_DONE;
        for (i = 0; i < c->nr_of_counters; i++){
            res.result[i] = erts_flxctr_read_centralized(c, i);
        }
        return res;
    }
}


Sint erts_flxctr_get_snapshot_result_after_trap(Eterm result_holder,
                                            Uint counter_nr)
{
    Binary* bin = erts_magic_ref2bin(result_holder);
    DecentralizedReadSnapshotInfo* data = ERTS_MAGIC_BIN_DATA(bin);;
    return data->result[counter_nr];
}

int erts_flxctr_is_snapshot_result(Eterm term)
{
    if (is_internal_magic_ref(term)) {
        Binary* bin = erts_magic_ref2bin(term);
        return ERTS_MAGIC_BIN_DESTRUCTOR(bin) ==  erts_flxctr_read_ctx_bin_dtor;
    } else return 0;
}

Sint erts_flxctr_read_approx(ErtsFlxCtr* c,
                             Uint counter_nr)
{
    if (c->is_decentralized) {
        ErtsFlxCtrDecentralizedCtrArray* counter = ERTS_FLXCTR_GET_CTR_ARRAY_PTR(c);
        Sint sum = 0;
        int sched;
        for (sched = 0; sched < ERTS_FLXCTR_DECENTRALIZED_NO_SLOTS; sched++) {
            sum = sum + erts_atomic_read_nob(&counter->array[sched].counters[counter_nr]);
        }
        return sum;
    } else {
        return erts_flxctr_read_centralized(c, counter_nr);
    }
}

int erts_flxctr_is_snapshot_ongoing(ErtsFlxCtr* c)
{
    return c->is_decentralized &&
        (ERTS_FLXCTR_SNAPSHOT_NOT_ONGOING !=
         erts_atomic_read_acqb(&ERTS_FLXCTR_GET_CTR_ARRAY_PTR(c)->snapshot_status));
}

int erts_flxctr_suspend_until_thr_prg_if_snapshot_ongoing(ErtsFlxCtr* c, Process* p)
{
    if (erts_flxctr_is_snapshot_ongoing(c)) {
        suspend_until_thr_prg(p);
        return 1;
    } else {
        return 0;
    }
}

void erts_flxctr_reset(ErtsFlxCtr* c,
                       Uint counter_nr)
{
    if (c->is_decentralized) {
        int sched;
        ErtsFlxCtrDecentralizedCtrArray* counter =
            ERTS_FLXCTR_GET_CTR_ARRAY_PTR(c);
        for (sched = 0; sched < ERTS_FLXCTR_DECENTRALIZED_NO_SLOTS; sched++) {
            erts_atomic_set_nob(&counter->array[sched].counters[counter_nr], 0);
        }
    } else {
        erts_atomic_set_nob(&c->u.counters[counter_nr], 0);
    }
}


void erts_flxctr_set_slot(int group) {
    ErtsSchedulerData *esdp = erts_get_scheduler_data();
    esdp->flxctr_slot_no = group;
}