From f4aa12fc5f5756d7574311cf66cd5ec8025df682 Mon Sep 17 00:00:00 2001 From: Rickard Green Date: Fri, 24 Sep 2010 11:09:55 +0200 Subject: Generalize reader groups Reader groups have been generalized to cpu groups which can be used for implementing reader groups, but also for implementing other functionality in the future. --- erts/emulator/beam/erl_alloc.types | 2 +- erts/emulator/beam/erl_cpu_topology.c | 673 ++++++++++++++++++++++------------ erts/emulator/beam/erl_cpu_topology.h | 11 + erts/emulator/beam/erl_process.c | 4 +- 4 files changed, 449 insertions(+), 241 deletions(-) diff --git a/erts/emulator/beam/erl_alloc.types b/erts/emulator/beam/erl_alloc.types index 7df9f19af0..408ffd12f7 100644 --- a/erts/emulator/beam/erl_alloc.types +++ b/erts/emulator/beam/erl_alloc.types @@ -247,7 +247,7 @@ type CPUDATA LONG_LIVED SYSTEM cpu_data type TMP_CPU_IDS SHORT_LIVED SYSTEM tmp_cpu_ids type EXT_TERM_DATA SHORT_LIVED PROCESSES external_term_data type ZLIB STANDARD SYSTEM zlib -type RDR_GRPS_MAP LONG_LIVED SYSTEM reader_groups_map +type CPU_GRPS_MAP LONG_LIVED SYSTEM cpu_groups_map +if smp type ASYNC SHORT_LIVED SYSTEM async diff --git a/erts/emulator/beam/erl_cpu_topology.c b/erts/emulator/beam/erl_cpu_topology.c index befab6f3b7..db95c4a5d4 100644 --- a/erts/emulator/beam/erl_cpu_topology.c +++ b/erts/emulator/beam/erl_cpu_topology.c @@ -76,7 +76,8 @@ typedef enum { #define ERTS_CPU_BIND_DEFAULT_BIND \ ERTS_CPU_BIND_THREAD_NO_NODE_PROCESSOR_SPREAD -ErtsCpuBindOrder cpu_bind_order; +static int no_cpu_groups_callbacks; +static ErtsCpuBindOrder cpu_bind_order; static erts_cpu_topology_t *user_cpudata; static int user_cpudata_size; @@ -87,36 +88,45 @@ typedef struct { int level[ERTS_TOPOLOGY_MAX_DEPTH+1]; } erts_avail_cput; -typedef struct { - int *map; - int size; - int groups; -} erts_reader_groups_map_test; - typedef struct { int id; int sub_levels; - int reader_groups; -} erts_rg_count_t; + int cpu_groups; +} erts_cpu_groups_count_t; typedef struct { int logical; - int reader_group; -} erts_reader_groups_map_t; + int cpu_group; +} erts_cpu_groups_map_array_t; + +typedef struct erts_cpu_groups_callback_list_t_ erts_cpu_groups_callback_list_t; +struct erts_cpu_groups_callback_list_t_ { + erts_cpu_groups_callback_list_t *next; + erts_cpu_groups_callback_t callback; + void *arg; +}; + +typedef struct erts_cpu_groups_map_t_ erts_cpu_groups_map_t; +struct erts_cpu_groups_map_t_ { + erts_cpu_groups_map_t *next; + int groups; + erts_cpu_groups_map_array_t *array; + int size; + int logical_processors; + erts_cpu_groups_callback_list_t *callback_list; +}; typedef struct { - erts_reader_groups_map_t *map; - int map_size; - int logical_processors; - int groups; -} erts_make_reader_groups_map_test; + erts_cpu_groups_callback_t callback; + int ix; + void *arg; +} erts_cpu_groups_callback_call_t; + +static erts_cpu_groups_map_t *cpu_groups_maps; -static int reader_groups_available_cpu_check; -static int reader_groups_logical_processors; -static int reader_groups_map_size; -static erts_reader_groups_map_t *reader_groups_map; +static erts_cpu_groups_map_t *reader_groups_map; -#define ERTS_TOPOLOGY_RG ERTS_TOPOLOGY_MAX_DEPTH +#define ERTS_TOPOLOGY_CG ERTS_TOPOLOGY_MAX_DEPTH #define ERTS_MAX_CPU_TOPOLOGY_ID ((int) 0xffff) @@ -128,8 +138,14 @@ static void cpu_bind_order_sort(erts_cpu_topology_t *cpudata, static void write_schedulers_bind_change(erts_cpu_topology_t *cpudata, int size); #endif -static void make_reader_groups_map(erts_make_reader_groups_map_test *test); -static int reader_group_lookup(int logical); +static void reader_groups_callback(int, ErtsSchedulerData *, int, void *); +static erts_cpu_groups_map_t *add_cpu_groups(int groups, + erts_cpu_groups_callback_t callback, + void *arg); +static void update_cpu_groups_maps(void); +static void make_cpu_groups_map(erts_cpu_groups_map_t *map, int test); +static int cpu_groups_lookup(erts_cpu_groups_map_t *map, + ErtsSchedulerData *esdp); static void create_tmp_cpu_topology_copy(erts_cpu_topology_t **cpudata, int *cpudata_size); @@ -421,28 +437,51 @@ processor_order_cmp(const void *vx, const void *vy) void erts_sched_check_cpu_bind_prep_suspend(ErtsSchedulerData *esdp) { - int reset_read_group = 0; - ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(esdp->run_queue)); + erts_cpu_groups_map_t *cgm; + erts_cpu_groups_callback_list_t *cgcl; + erts_cpu_groups_callback_call_t *cgcc; + int cgcc_ix; + /* Unbind from cpu */ erts_smp_rwmtx_rwlock(&cpuinfo_rwmtx); if (scheduler2cpu_map[esdp->no].bound_id >= 0 && erts_unbind_from_cpu(cpuinfo) == 0) { esdp->cpu_id = scheduler2cpu_map[esdp->no].bound_id = -1; - reset_read_group = 1; } + + cgcc = erts_alloc(ERTS_ALC_T_TMP, + (no_cpu_groups_callbacks + * sizeof(erts_cpu_groups_callback_call_t))); + cgcc_ix = 0; + for (cgm = cpu_groups_maps; cgm; cgm = cgm->next) { + for (cgcl = cgm->callback_list; cgcl; cgcl = cgcl->next) { + cgcc[cgcc_ix].callback = cgcl->callback; + cgcc[cgcc_ix].ix = cpu_groups_lookup(cgm, esdp); + cgcc[cgcc_ix].arg = cgcl->arg; + cgcc_ix++; + } + } + ASSERT(no_cpu_groups_callbacks == cgcc_ix); erts_smp_rwmtx_rwunlock(&cpuinfo_rwmtx); - if (reset_read_group) - erts_smp_rwmtx_set_reader_group(0); + for (cgcc_ix = 0; cgcc_ix < no_cpu_groups_callbacks; cgcc_ix++) + cgcc[cgcc_ix].callback(1, + esdp, + cgcc[cgcc_ix].ix, + cgcc[cgcc_ix].arg); + + erts_free(ERTS_ALC_T_TMP, cgcc); if (esdp->no <= max_main_threads) erts_thr_set_main_status(0, 0); + } void erts_sched_check_cpu_bind_post_suspend(ErtsSchedulerData *esdp) { ERTS_SMP_LC_ASSERT(erts_smp_lc_runq_is_locked(esdp->run_queue)); + if (esdp->no <= max_main_threads) erts_thr_set_main_status(1, (int) esdp->no); @@ -458,9 +497,17 @@ erts_sched_check_cpu_bind_post_suspend(ErtsSchedulerData *esdp) void erts_sched_check_cpu_bind(ErtsSchedulerData *esdp) { - int rg = 0; - int res; - int cpu_id; + int res, cpu_id, cgcc_ix; + erts_cpu_groups_map_t *cgm; + erts_cpu_groups_callback_list_t *cgcl; + erts_cpu_groups_callback_call_t *cgcc; +#ifdef ERTS_SMP + if (erts_common_run_queue) + erts_smp_atomic_set(&esdp->chk_cpu_bind, 0); + else { + esdp->run_queue->flags &= ~ERTS_RUNQ_FLG_CHK_CPU_BIND; + } +#endif erts_smp_runq_unlock(esdp->run_queue); erts_smp_rwmtx_rwlock(&cpuinfo_rwmtx); cpu_id = scheduler2cpu_map[esdp->no].bind_id; @@ -490,39 +537,71 @@ erts_sched_check_cpu_bind(ErtsSchedulerData *esdp) erts_send_error_to_logger_nogl(dsbufp); } } - if (reader_groups) { - if (esdp->cpu_id >= 0) - rg = reader_group_lookup(esdp->cpu_id); - else - rg = (((int) esdp->no) - 1) % reader_groups + 1; + + cgcc = erts_alloc(ERTS_ALC_T_TMP, + (no_cpu_groups_callbacks + * sizeof(erts_cpu_groups_callback_call_t))); + cgcc_ix = 0; + for (cgm = cpu_groups_maps; cgm; cgm = cgm->next) { + for (cgcl = cgm->callback_list; cgcl; cgcl = cgcl->next) { + cgcc[cgcc_ix].callback = cgcl->callback; + cgcc[cgcc_ix].ix = cpu_groups_lookup(cgm, esdp); + cgcc[cgcc_ix].arg = cgcl->arg; + cgcc_ix++; + } } + + ASSERT(no_cpu_groups_callbacks == cgcc_ix); erts_smp_rwmtx_rwunlock(&cpuinfo_rwmtx); + + for (cgcc_ix = 0; cgcc_ix < no_cpu_groups_callbacks; cgcc_ix++) + cgcc[cgcc_ix].callback(0, + esdp, + cgcc[cgcc_ix].ix, + cgcc[cgcc_ix].arg); + + erts_free(ERTS_ALC_T_TMP, cgcc); + erts_smp_runq_lock(esdp->run_queue); -#ifdef ERTS_SMP - if (erts_common_run_queue) - erts_smp_atomic_set(&esdp->chk_cpu_bind, 0); - else { - esdp->run_queue->flags &= ~ERTS_RUNQ_FLG_CHK_CPU_BIND; - } -#endif - if (reader_groups) - erts_smp_rwmtx_set_reader_group(rg); } #ifdef ERTS_SMP void erts_sched_init_check_cpu_bind(ErtsSchedulerData *esdp) { - int no = (int) esdp->no; - if (no <= max_main_threads) { - erts_thr_set_main_status(1, (int) no); - if (reader_groups) { - int rg = (int) no; - if (rg > reader_groups) - rg = (((int) no) - 1) % reader_groups + 1; - erts_smp_rwmtx_set_reader_group(rg); + int cgcc_ix; + erts_cpu_groups_map_t *cgm; + erts_cpu_groups_callback_list_t *cgcl; + erts_cpu_groups_callback_call_t *cgcc; + + erts_smp_rwmtx_rlock(&cpuinfo_rwmtx); + + cgcc = erts_alloc(ERTS_ALC_T_TMP, + (no_cpu_groups_callbacks + * sizeof(erts_cpu_groups_callback_call_t))); + cgcc_ix = 0; + for (cgm = cpu_groups_maps; cgm; cgm = cgm->next) { + for (cgcl = cgm->callback_list; cgcl; cgcl = cgcl->next) { + cgcc[cgcc_ix].callback = cgcl->callback; + cgcc[cgcc_ix].ix = cpu_groups_lookup(cgm, esdp); + cgcc[cgcc_ix].arg = cgcl->arg; + cgcc_ix++; } } + + ASSERT(no_cpu_groups_callbacks == cgcc_ix); + erts_smp_rwmtx_runlock(&cpuinfo_rwmtx); + + for (cgcc_ix = 0; cgcc_ix < no_cpu_groups_callbacks; cgcc_ix++) + cgcc[cgcc_ix].callback(0, + esdp, + cgcc[cgcc_ix].ix, + cgcc[cgcc_ix].arg); + + erts_free(ERTS_ALC_T_TMP, cgcc); + + if (esdp->no <= max_main_threads) + erts_thr_set_main_status(1, (int) esdp->no); } #endif @@ -1388,7 +1467,7 @@ erts_set_cpu_topology(Process *c_p, Eterm term) sizeof(erts_cpu_topology_t)*cpudata_size); } - make_reader_groups_map(NULL); + update_cpu_groups_maps(); write_schedulers_bind_change(cpudata, cpudata_size); @@ -1588,6 +1667,8 @@ erts_pre_early_init_cpu_topology(int *max_rg_p, int *onln_p, int *avail_p) { + cpu_groups_maps = NULL; + no_cpu_groups_callbacks = 0; *max_rg_p = ERTS_MAX_READER_GROUPS; cpuinfo = erts_cpu_info_create(); get_logical_processors(conf_p, onln_p, avail_p); @@ -1609,11 +1690,6 @@ erts_early_init_cpu_topology(int no_schedulers, cpu_bind_order = ERTS_CPU_BIND_UNDEFINED; - reader_groups_available_cpu_check = 1; - reader_groups_logical_processors = 0; - reader_groups_map_size = 0; - reader_groups_map = NULL; - if (!erts_get_cpu_topology(cpuinfo, system_cpudata) || ERTS_INIT_CPU_TOPOLOGY_OK != verify_topology(system_cpudata, system_cpudata_size)) { @@ -1641,6 +1717,7 @@ erts_init_cpu_topology(void) int ix; erts_smp_rwmtx_init(&cpuinfo_rwmtx, "cpu_info"); + erts_smp_rwmtx_rwlock(&cpuinfo_rwmtx); scheduler2cpu_map = erts_alloc(ERTS_ALC_T_CPUDATA, (sizeof(ErtsCpuBindData) @@ -1661,7 +1738,9 @@ erts_init_cpu_topology(void) : ERTS_CPU_BIND_NONE); } - make_reader_groups_map(NULL); + reader_groups_map = add_cpu_groups(reader_groups, + reader_groups_callback, + NULL); if (cpu_bind_order == ERTS_CPU_BIND_NONE) erts_smp_rwmtx_rwunlock(&cpuinfo_rwmtx); @@ -1707,7 +1786,7 @@ erts_update_cpu_info(void) } } - make_reader_groups_map(NULL); + update_cpu_groups_maps(); create_tmp_cpu_topology_copy(&cpudata, &cpudata_size); write_schedulers_bind_change(cpudata, cpudata_size); @@ -1723,53 +1802,33 @@ erts_update_cpu_info(void) * reader groups map */ -static Eterm -get_reader_groups_map(Process *c_p, - erts_reader_groups_map_t *map, - int map_size, - int logical_processors) +void +reader_groups_callback(int suspending, + ErtsSchedulerData *esdp, + int group, + void *unused) { -#ifdef DEBUG - Eterm *endp; -#endif - Eterm res = NIL, tuple; - Eterm *hp; - int i; - - hp = HAlloc(c_p, logical_processors*(2+3)); -#ifdef DEBUG - endp = hp + logical_processors*(2+3); -#endif - for (i = map_size - 1; i >= 0; i--) { - if (map[i].logical >= 0) { - tuple = TUPLE2(hp, - make_small(map[i].logical), - make_small(map[i].reader_group)); - hp += 3; - res = CONS(hp, tuple, res); - hp += 2; - } - } - ASSERT(hp == endp); - return res; + if (reader_groups && esdp->no <= max_main_threads) + erts_smp_rwmtx_set_reader_group(suspending ? 0 : group+1); } +static Eterm get_cpu_groups_map(Process *c_p, + erts_cpu_groups_map_t *map, + int offset); Eterm erts_debug_reader_groups_map(Process *c_p, int groups) { Eterm res; - erts_make_reader_groups_map_test test; + erts_cpu_groups_map_t test; + test.array = NULL; test.groups = groups; - make_reader_groups_map(&test); - if (!test.map) + make_cpu_groups_map(&test, 1); + if (!test.array) res = NIL; else { - res = get_reader_groups_map(c_p, - test.map, - test.map_size, - test.logical_processors); - erts_free(ERTS_ALC_T_TMP, test.map); + res = get_cpu_groups_map(c_p, &test, 1); + erts_free(ERTS_ALC_T_TMP, test.array); } return res; } @@ -1780,14 +1839,45 @@ erts_get_reader_groups_map(Process *c_p) { Eterm res; erts_smp_rwmtx_rlock(&cpuinfo_rwmtx); - res = get_reader_groups_map(c_p, - reader_groups_map, - reader_groups_map_size, - reader_groups_logical_processors); + res = get_cpu_groups_map(c_p, reader_groups_map, 1); erts_smp_rwmtx_runlock(&cpuinfo_rwmtx); return res; } +/* + * CPU groups + */ + +static Eterm +get_cpu_groups_map(Process *c_p, + erts_cpu_groups_map_t *map, + int offset) +{ +#ifdef DEBUG + Eterm *endp; +#endif + Eterm res = NIL, tuple; + Eterm *hp; + int i; + + hp = HAlloc(c_p, map->logical_processors*(2+3)); +#ifdef DEBUG + endp = hp + map->logical_processors*(2+3); +#endif + for (i = map->size - 1; i >= 0; i--) { + if (map->array[i].logical >= 0) { + tuple = TUPLE2(hp, + make_small(map->array[i].logical), + make_small(map->array[i].cpu_group + offset)); + hp += 3; + res = CONS(hp, tuple, res); + hp += 2; + } + } + ASSERT(hp == endp); + return res; +} + static void make_available_cpu_topology(erts_avail_cput *no, erts_avail_cput *avail, @@ -1848,7 +1938,7 @@ make_available_cpu_topology(erts_avail_cput *no, avail[a].level[j] = no->level[j]; avail[a].level[ERTS_TOPOLOGY_LOGICAL] = cpudata[i].logical; - avail[a].level[ERTS_TOPOLOGY_RG] = 0; + avail[a].level[ERTS_TOPOLOGY_CG] = 0; ASSERT(last.logical != cpudata[i].logical); @@ -1866,40 +1956,21 @@ make_available_cpu_topology(erts_avail_cput *no, *size = a; } -static int -reader_group_lookup(int logical) -{ - int start = logical % reader_groups_map_size; - int ix = start; - - do { - if (reader_groups_map[ix].logical == logical) { - ASSERT(reader_groups_map[ix].reader_group > 0); - return reader_groups_map[ix].reader_group; - } - ix++; - if (ix == reader_groups_map_size) - ix = 0; - } while (ix != start); - - erl_exit(ERTS_ABORT_EXIT, "Logical cpu id %d not found\n", logical); -} - static void -reader_group_insert(erts_reader_groups_map_t *map, int map_size, - int logical, int reader_group) +cpu_group_insert(erts_cpu_groups_map_t *map, + int logical, int cpu_group) { - int start = logical % map_size; + int start = logical % map->size; int ix = start; do { - if (map[ix].logical < 0) { - map[ix].logical = logical; - map[ix].reader_group = reader_group; + if (map->array[ix].logical < 0) { + map->array[ix].logical = logical; + map->array[ix].cpu_group = cpu_group; return; } ix++; - if (ix == map_size) + if (ix == map->size) ix = 0; } while (ix != start); @@ -1908,107 +1979,100 @@ reader_group_insert(erts_reader_groups_map_t *map, int map_size, static int -sub_levels(erts_rg_count_t *rgc, int level, int aix, int avail_sz, erts_avail_cput *avail) +sub_levels(erts_cpu_groups_count_t *cgc, int level, int aix, + int avail_sz, erts_avail_cput *avail) { int sub_level = level+1; int last = -1; - rgc->sub_levels = 0; + cgc->sub_levels = 0; do { if (last != avail[aix].level[sub_level]) { - rgc->sub_levels++; + cgc->sub_levels++; last = avail[aix].level[sub_level]; } aix++; } - while (aix < avail_sz && rgc->id == avail[aix].level[level]); - rgc->reader_groups = 0; + while (aix < avail_sz && cgc->id == avail[aix].level[level]); + cgc->cpu_groups = 0; return aix; } static int -write_reader_groups(int *rgp, erts_rg_count_t *rgcp, +write_cpu_groups(int *cgp, erts_cpu_groups_count_t *cgcp, int level, int a, int avail_sz, erts_avail_cput *avail) { - int rg = *rgp; + int cg = *cgp; int sub_level = level+1; - int sl_per_gr = rgcp->sub_levels / rgcp->reader_groups; - int xsl = rgcp->sub_levels % rgcp->reader_groups; + int sl_per_gr = cgcp->sub_levels / cgcp->cpu_groups; + int xsl = cgcp->sub_levels % cgcp->cpu_groups; int sls = 0; int last = -1; - int xsl_rg_lim = (rgcp->reader_groups - xsl) + rg + 1; + int xsl_cg_lim = (cgcp->cpu_groups - xsl) + cg + 1; - ASSERT(level < 0 || avail[a].level[level] == rgcp->id) + ASSERT(level < 0 || avail[a].level[level] == cgcp->id); do { if (last != avail[a].level[sub_level]) { if (!sls) { sls = sl_per_gr; - rg++; - if (rg >= xsl_rg_lim) + cg++; + if (cg >= xsl_cg_lim) sls++; } last = avail[a].level[sub_level]; sls--; } - avail[a].level[ERTS_TOPOLOGY_RG] = rg; + avail[a].level[ERTS_TOPOLOGY_CG] = cg; a++; } while (a < avail_sz && (level < 0 - || avail[a].level[level] == rgcp->id)); + || avail[a].level[level] == cgcp->id)); - ASSERT(rgcp->reader_groups == rg - *rgp); + ASSERT(cgcp->cpu_groups == cg - *cgp); - *rgp = rg; + *cgp = cg; return a; } static int -rg_count_sub_levels_compare(const void *vx, const void *vy) +cg_count_sub_levels_compare(const void *vx, const void *vy) { - erts_rg_count_t *x = (erts_rg_count_t *) vx; - erts_rg_count_t *y = (erts_rg_count_t *) vy; + erts_cpu_groups_count_t *x = (erts_cpu_groups_count_t *) vx; + erts_cpu_groups_count_t *y = (erts_cpu_groups_count_t *) vy; if (x->sub_levels != y->sub_levels) return y->sub_levels - x->sub_levels; return x->id - y->id; } static int -rg_count_id_compare(const void *vx, const void *vy) +cg_count_id_compare(const void *vx, const void *vy) { - erts_rg_count_t *x = (erts_rg_count_t *) vx; - erts_rg_count_t *y = (erts_rg_count_t *) vy; + erts_cpu_groups_count_t *x = (erts_cpu_groups_count_t *) vx; + erts_cpu_groups_count_t *y = (erts_cpu_groups_count_t *) vy; return x->id - y->id; } static void -make_reader_groups_map(erts_make_reader_groups_map_test *test) +make_cpu_groups_map(erts_cpu_groups_map_t *map, int test) { int i, spread_level, avail_sz; erts_avail_cput no, *avail; erts_cpu_topology_t *cpudata; - erts_reader_groups_map_t *map; - int map_sz; - int groups = reader_groups; - - if (test) { - test->map = NULL; - test->map_size = 0; - groups = test->groups; - } + ErtsAlcType_t alc_type = (test + ? ERTS_ALC_T_TMP + : ERTS_ALC_T_CPU_GRPS_MAP); - if (!groups) - return; + if (map->array) + erts_free(alc_type, map->array); - if (!test) { - if (reader_groups_map) - erts_free(ERTS_ALC_T_RDR_GRPS_MAP, reader_groups_map); + map->array = NULL; + map->logical_processors = 0; + map->size = 0; - reader_groups_logical_processors = 0; - reader_groups_map_size = 0; - reader_groups_map = NULL; - } + if (!map->groups) + return; create_tmp_cpu_topology_copy(&cpudata, &avail_sz); @@ -2024,61 +2088,47 @@ make_reader_groups_map(erts_make_reader_groups_map_test *test) sizeof(erts_avail_cput)*avail_sz); make_available_cpu_topology(&no, avail, cpudata, - &avail_sz, test != NULL); + &avail_sz, test); destroy_tmp_cpu_topology_copy(cpudata); - map_sz = avail_sz*2+1; - - if (test) { - map = erts_alloc(ERTS_ALC_T_TMP, - (sizeof(erts_reader_groups_map_t) - * map_sz)); - test->map = map; - test->map_size = map_sz; - test->logical_processors = avail_sz; - } - else { - map = erts_alloc(ERTS_ALC_T_RDR_GRPS_MAP, - (sizeof(erts_reader_groups_map_t) - * map_sz)); - reader_groups_map = map; - reader_groups_logical_processors = avail_sz; - reader_groups_map_size = map_sz; + map->size = avail_sz*2+1; - } + map->array = erts_alloc(alc_type, + (sizeof(erts_cpu_groups_map_array_t) + * map->size));; + map->logical_processors = avail_sz; - for (i = 0; i < map_sz; i++) { - map[i].logical = -1; - map[i].reader_group = 0; + for (i = 0; i < map->size; i++) { + map->array[i].logical = -1; + map->array[i].cpu_group = -1; } spread_level = ERTS_TOPOLOGY_CORE; for (i = ERTS_TOPOLOGY_NODE; i < ERTS_TOPOLOGY_THREAD; i++) { - if (no.level[i] > groups) { + if (no.level[i] > map->groups) { spread_level = i; break; } } - if (no.level[spread_level] <= groups) { - int a, rg, last = -1; - rg = 0; + if (no.level[spread_level] <= map->groups) { + int a, cg, last = -1; + cg = -1; ASSERT(spread_level == ERTS_TOPOLOGY_CORE); for (a = 0; a < avail_sz; a++) { if (last != avail[a].level[spread_level]) { - rg++; + cg++; last = avail[a].level[spread_level]; } - reader_group_insert(map, - map_sz, - avail[a].level[ERTS_TOPOLOGY_LOGICAL], - rg); + cpu_group_insert(map, + avail[a].level[ERTS_TOPOLOGY_LOGICAL], + cg); } } - else { /* groups < no.level[spread_level] */ - erts_rg_count_t *rg_count; - int a, rg, tl, toplevels; + else { /* map->groups < no.level[spread_level] */ + erts_cpu_groups_count_t *cg_count; + int a, cg, tl, toplevels; tl = spread_level-1; @@ -2087,76 +2137,223 @@ make_reader_groups_map(erts_make_reader_groups_map_test *test) else toplevels = no.level[tl]; - rg_count = erts_alloc(ERTS_ALC_T_TMP, - toplevels*sizeof(erts_rg_count_t)); + cg_count = erts_alloc(ERTS_ALC_T_TMP, + toplevels*sizeof(erts_cpu_groups_count_t)); if (toplevels == 1) { - rg_count[0].id = 0; - rg_count[0].sub_levels = no.level[spread_level]; - rg_count[0].reader_groups = groups; + cg_count[0].id = 0; + cg_count[0].sub_levels = no.level[spread_level]; + cg_count[0].cpu_groups = map->groups; } else { - int rgs_per_tl, rgs; - rgs = groups; - rgs_per_tl = rgs / toplevels; + int cgs_per_tl, cgs; + cgs = map->groups; + cgs_per_tl = cgs / toplevels; a = 0; for (i = 0; i < toplevels; i++) { - rg_count[i].id = avail[a].level[tl]; - a = sub_levels(&rg_count[i], tl, a, avail_sz, avail); + cg_count[i].id = avail[a].level[tl]; + a = sub_levels(&cg_count[i], tl, a, avail_sz, avail); } - qsort(rg_count, + qsort(cg_count, toplevels, - sizeof(erts_rg_count_t), - rg_count_sub_levels_compare); + sizeof(erts_cpu_groups_count_t), + cg_count_sub_levels_compare); for (i = 0; i < toplevels; i++) { - if (rg_count[i].sub_levels < rgs_per_tl) { - rg_count[i].reader_groups = rg_count[i].sub_levels; - rgs -= rg_count[i].sub_levels; + if (cg_count[i].sub_levels < cgs_per_tl) { + cg_count[i].cpu_groups = cg_count[i].sub_levels; + cgs -= cg_count[i].sub_levels; } else { - rg_count[i].reader_groups = rgs_per_tl; - rgs -= rgs_per_tl; + cg_count[i].cpu_groups = cgs_per_tl; + cgs -= cgs_per_tl; } } - while (rgs > 0) { + while (cgs > 0) { for (i = 0; i < toplevels; i++) { - if (rg_count[i].sub_levels == rg_count[i].reader_groups) + if (cg_count[i].sub_levels == cg_count[i].cpu_groups) break; else { - rg_count[i].reader_groups++; - if (--rgs == 0) + cg_count[i].cpu_groups++; + if (--cgs == 0) break; } } } - qsort(rg_count, + qsort(cg_count, toplevels, - sizeof(erts_rg_count_t), - rg_count_id_compare); + sizeof(erts_cpu_groups_count_t), + cg_count_id_compare); } - a = i = rg = 0; + a = i = 0; + cg = -1; while (a < avail_sz) { - a = write_reader_groups(&rg, &rg_count[i], tl, - a, avail_sz, avail); + a = write_cpu_groups(&cg, &cg_count[i], tl, + a, avail_sz, avail); i++; } - ASSERT(groups == rg); + ASSERT(map->groups == cg + 1); for (a = 0; a < avail_sz; a++) - reader_group_insert(map, - map_sz, - avail[a].level[ERTS_TOPOLOGY_LOGICAL], - avail[a].level[ERTS_TOPOLOGY_RG]); + cpu_group_insert(map, + avail[a].level[ERTS_TOPOLOGY_LOGICAL], + avail[a].level[ERTS_TOPOLOGY_CG]); - erts_free(ERTS_ALC_T_TMP, rg_count); + erts_free(ERTS_ALC_T_TMP, cg_count); } erts_free(ERTS_ALC_T_TMP, avail); } + +static erts_cpu_groups_map_t * +add_cpu_groups(int groups, + erts_cpu_groups_callback_t callback, + void *arg) +{ + int use_groups = groups; + erts_cpu_groups_callback_list_t *cgcl; + erts_cpu_groups_map_t *cgm; + + ERTS_SMP_LC_ASSERT(erts_lc_rwmtx_is_rwlocked(&cpuinfo_rwmtx)); + + if (use_groups > max_main_threads) + use_groups = max_main_threads; + + if (!use_groups) + return NULL; + + no_cpu_groups_callbacks++; + cgcl = erts_alloc(ERTS_ALC_T_CPU_GRPS_MAP, + sizeof(erts_cpu_groups_callback_list_t)); + cgcl->callback = callback; + cgcl->arg = arg; + + for (cgm = cpu_groups_maps; cgm; cgm = cgm->next) { + if (cgm->groups == use_groups) { + cgcl->next = cgm->callback_list; + cgm->callback_list = cgcl; + return cgm; + } + } + + + cgm = erts_alloc(ERTS_ALC_T_CPU_GRPS_MAP, + sizeof(erts_cpu_groups_map_t)); + cgm->next = cpu_groups_maps; + cgm->groups = use_groups; + cgm->array = NULL; + cgm->size = 0; + cgm->logical_processors = 0; + cgm->callback_list = cgcl; + + cgcl->next = NULL; + + make_cpu_groups_map(cgm, 0); + + cpu_groups_maps = cgm; + + return cgm; +} + +static void +remove_cpu_groups(erts_cpu_groups_callback_t callback, void *arg) +{ + erts_cpu_groups_map_t *prev_cgm, *cgm; + erts_cpu_groups_callback_list_t *prev_cgcl, *cgcl; + + ERTS_SMP_LC_ASSERT(erts_lc_rwmtx_is_rwlocked(&cpuinfo_rwmtx)); + + no_cpu_groups_callbacks--; + + prev_cgm = NULL; + for (cgm = cpu_groups_maps; cgm; cgm = cgm->next) { + prev_cgcl = NULL; + for (cgcl = cgm->callback_list; cgcl; cgcl = cgcl->next) { + if (cgcl->callback == callback && cgcl->arg == arg) { + if (prev_cgcl) + prev_cgcl->next = cgcl->next; + else + cgm->callback_list = cgcl->next; + erts_free(ERTS_ALC_T_CPU_GRPS_MAP, cgcl); + if (!cgm->callback_list) { + if (prev_cgm) + prev_cgm->next = cgm->next; + else + cpu_groups_maps = cgm->next; + if (cgm->array) + erts_free(ERTS_ALC_T_CPU_GRPS_MAP, cgm->array); + erts_free(ERTS_ALC_T_CPU_GRPS_MAP, cgm); + } + return; + } + prev_cgcl = cgcl; + } + prev_cgm = cgm; + } + + erl_exit(ERTS_ABORT_EXIT, "Cpu groups not found\n"); +} + +static int +cpu_groups_lookup(erts_cpu_groups_map_t *map, + ErtsSchedulerData *esdp) +{ + int start, logical, ix; + + ERTS_SMP_LC_ASSERT(erts_lc_rwmtx_is_rlocked(&cpuinfo_rwmtx) + || erts_lc_rwmtx_is_rwlocked(&cpuinfo_rwmtx)); + + if (esdp->cpu_id < 0) + return (((int) esdp->no) - 1) % map->groups; + + logical = esdp->cpu_id; + start = logical % map->size; + ix = start; + + do { + if (map->array[ix].logical == logical) { + int group = map->array[ix].cpu_group; + ASSERT(0 <= group && group < map->groups); + return group; + } + ix++; + if (ix == map->size) + ix = 0; + } while (ix != start); + + erl_exit(ERTS_ABORT_EXIT, "Logical cpu id %d not found\n", logical); +} + +static void +update_cpu_groups_maps(void) +{ + erts_cpu_groups_map_t *cgm; + ERTS_SMP_LC_ASSERT(erts_lc_rwmtx_is_rwlocked(&cpuinfo_rwmtx)); + + for (cgm = cpu_groups_maps; cgm; cgm = cgm->next) + make_cpu_groups_map(cgm, 0); +} + +void +erts_add_cpu_groups(int groups, + erts_cpu_groups_callback_t callback, + void *arg) +{ + erts_smp_rwmtx_rwlock(&cpuinfo_rwmtx); + add_cpu_groups(groups, callback, arg); + erts_smp_rwmtx_rwunlock(&cpuinfo_rwmtx); +} + +void erts_remove_cpu_groups(erts_cpu_groups_callback_t callback, + void *arg) +{ + erts_smp_rwmtx_rwlock(&cpuinfo_rwmtx); + remove_cpu_groups(callback, arg); + erts_smp_rwmtx_rwunlock(&cpuinfo_rwmtx); +} diff --git a/erts/emulator/beam/erl_cpu_topology.h b/erts/emulator/beam/erl_cpu_topology.h index b83ddc25da..c5a9520b61 100644 --- a/erts/emulator/beam/erl_cpu_topology.h +++ b/erts/emulator/beam/erl_cpu_topology.h @@ -91,4 +91,15 @@ Eterm erts_fake_scheduler_bindings(Process *p, Eterm how); Eterm erts_debug_cpu_groups_map(Process *c_p, int groups); +typedef void (*erts_cpu_groups_callback_t)(int, + ErtsSchedulerData *, + int, + void *); + +void erts_add_cpu_groups(int groups, + erts_cpu_groups_callback_t callback, + void *arg); +void erts_remove_cpu_groups(erts_cpu_groups_callback_t callback, + void *arg); + #endif diff --git a/erts/emulator/beam/erl_process.c b/erts/emulator/beam/erl_process.c index 4940344108..ee282ebbee 100644 --- a/erts/emulator/beam/erl_process.c +++ b/erts/emulator/beam/erl_process.c @@ -2854,10 +2854,10 @@ suspend_scheduler(ErtsSchedulerData *esdp) ASSERT(no != 1); - erts_sched_check_cpu_bind_prep_suspend(esdp); - erts_smp_runq_unlock(esdp->run_queue); + erts_sched_check_cpu_bind_prep_suspend(esdp); + if (erts_system_profile_flags.scheduler) profile_scheduler(make_small(esdp->no), am_inactive); -- cgit v1.2.3