diff options
Diffstat (limited to 'erts')
-rw-r--r-- | erts/doc/src/erl_nif.xml | 2 | ||||
-rw-r--r-- | erts/emulator/beam/atom.names | 1 | ||||
-rw-r--r-- | erts/emulator/beam/dist.c | 5 | ||||
-rw-r--r-- | erts/emulator/beam/erl_alloc.types | 1 | ||||
-rw-r--r-- | erts/emulator/beam/erl_bif_info.c | 103 | ||||
-rw-r--r-- | erts/emulator/beam/erl_lock_count.c | 348 | ||||
-rw-r--r-- | erts/emulator/beam/erl_lock_count.h | 42 | ||||
-rw-r--r-- | erts/emulator/beam/erl_port.h | 3 | ||||
-rw-r--r-- | erts/emulator/beam/erl_process.c | 3 | ||||
-rw-r--r-- | erts/emulator/beam/erl_process.h | 5 | ||||
-rw-r--r-- | erts/emulator/beam/erl_process_lock.c | 186 | ||||
-rw-r--r-- | erts/emulator/beam/io.c | 208 | ||||
-rw-r--r-- | erts/emulator/beam/sys.h | 33 | ||||
-rw-r--r-- | erts/emulator/drivers/common/inet_drv.c | 141 | ||||
-rw-r--r-- | erts/preloaded/ebin/erts_internal.beam | bin | 5380 -> 5964 bytes | |||
-rw-r--r-- | erts/preloaded/ebin/prim_inet.beam | bin | 73092 -> 72748 bytes | |||
-rw-r--r-- | erts/preloaded/src/erts_internal.erl | 19 | ||||
-rw-r--r-- | erts/preloaded/src/prim_inet.erl | 16 |
18 files changed, 658 insertions, 458 deletions
diff --git a/erts/doc/src/erl_nif.xml b/erts/doc/src/erl_nif.xml index f64381c99d..412c0e02ac 100644 --- a/erts/doc/src/erl_nif.xml +++ b/erts/doc/src/erl_nif.xml @@ -1177,7 +1177,7 @@ typedef enum { <code type="none"> ERL_NIF_TERM key, value; ErlNifMapIterator iter; -enif_map_iterator_create(env, my_map, ERL_NIF_MAP_ITERATOR_FIRST); +enif_map_iterator_create(env, my_map, &iter, ERL_NIF_MAP_ITERATOR_FIRST); while (enif_map_iterator_get_pair(env, &iter, &key, &value)) { do_something(key,value); diff --git a/erts/emulator/beam/atom.names b/erts/emulator/beam/atom.names index 74b42c647e..0cf21b4635 100644 --- a/erts/emulator/beam/atom.names +++ b/erts/emulator/beam/atom.names @@ -265,6 +265,7 @@ atom get_seq_token atom get_tcw atom getenv atom gather_gc_info_result +atom gather_io_bytes atom gather_sched_wall_time_result atom getting_linked atom getting_unlinked diff --git a/erts/emulator/beam/dist.c b/erts/emulator/beam/dist.c index ae46174a14..1354cee267 100644 --- a/erts/emulator/beam/dist.c +++ b/erts/emulator/beam/dist.c @@ -2088,6 +2088,7 @@ erts_dist_command(Port *prt, int reds_limit) DistEntry *dep = prt->dist_entry; Uint (*send)(Port *prt, ErtsDistOutputBuf *obuf); erts_aint32_t sched_flags; + ErtsSchedulerData *esdp = erts_get_scheduler_data(); ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt)); @@ -2142,12 +2143,12 @@ erts_dist_command(Port *prt, int reds_limit) ErtsDistOutputBuf *fob; size = (*send)(prt, foq.first); + esdp->io.out += (Uint64) size; #ifdef ERTS_RAW_DIST_MSG_DBG erts_fprintf(stderr, ">> "); bw(foq.first->extp, size); #endif reds += ERTS_PORT_REDS_DIST_CMD_DATA(size); - erts_smp_atomic_add_nob(&erts_bytes_out, size); fob = foq.first; obufsize += size_obuf(fob); foq.first = foq.first->next; @@ -2227,12 +2228,12 @@ erts_dist_command(Port *prt, int reds_limit) ASSERT(&oq.first->data[0] <= oq.first->extp && oq.first->extp < oq.first->ext_endp); size = (*send)(prt, oq.first); + esdp->io.out += (Uint64) size; #ifdef ERTS_RAW_DIST_MSG_DBG erts_fprintf(stderr, ">> "); bw(oq.first->extp, size); #endif reds += ERTS_PORT_REDS_DIST_CMD_DATA(size); - erts_smp_atomic_add_nob(&erts_bytes_out, size); fob = oq.first; obufsize += size_obuf(fob); oq.first = oq.first->next; diff --git a/erts/emulator/beam/erl_alloc.types b/erts/emulator/beam/erl_alloc.types index 2721e13250..92c397ffd3 100644 --- a/erts/emulator/beam/erl_alloc.types +++ b/erts/emulator/beam/erl_alloc.types @@ -274,6 +274,7 @@ type BUSY_CALLER SHORT_LIVED SYSTEM busy_caller type PROC_SYS_TSK SHORT_LIVED PROCESSES proc_sys_task type PROC_SYS_TSK_QS SHORT_LIVED PROCESSES proc_sys_task_queues type NEW_TIME_OFFSET SHORT_LIVED SYSTEM new_time_offset +type IOB_REQ SHORT_LIVED SYSTEM io_bytes_request +if threads_no_smp # Need thread safe allocs, but std_alloc and fix_alloc are not; diff --git a/erts/emulator/beam/erl_bif_info.c b/erts/emulator/beam/erl_bif_info.c index 397c68e199..7eb31fb80e 100644 --- a/erts/emulator/beam/erl_bif_info.c +++ b/erts/emulator/beam/erl_bif_info.c @@ -60,6 +60,7 @@ static Export* alloc_info_trap = NULL; static Export* alloc_sizes_trap = NULL; +static Export* gather_io_bytes_trap = NULL; static Export *gather_sched_wall_time_res_trap; static Export *gather_gc_info_res_trap; @@ -3220,7 +3221,6 @@ BIF_RETTYPE process_display_2(BIF_ALIST_2) BIF_RET(am_true); } - /* this is a general call which return some possibly useful information */ BIF_RETTYPE statistics_1(BIF_ALIST_1) @@ -3293,23 +3293,8 @@ BIF_RETTYPE statistics_1(BIF_ALIST_1) res = TUPLE2(hp, b1, b2); BIF_RET(res); } else if (BIF_ARG_1 == am_io) { - Eterm r1, r2; - Eterm in, out; - Uint hsz = 9; - Uint bytes_in = (Uint) erts_smp_atomic_read_nob(&erts_bytes_in); - Uint bytes_out = (Uint) erts_smp_atomic_read_nob(&erts_bytes_out); - - (void) erts_bld_uint(NULL, &hsz, bytes_in); - (void) erts_bld_uint(NULL, &hsz, bytes_out); - hp = HAlloc(BIF_P, hsz); - in = erts_bld_uint(&hp, NULL, bytes_in); - out = erts_bld_uint(&hp, NULL, bytes_out); - - r1 = TUPLE2(hp, am_input, in); - hp += 3; - r2 = TUPLE2(hp, am_output, out); - hp += 3; - BIF_RET(TUPLE2(hp, r1, r2)); + Eterm ref = erts_request_io_bytes(BIF_P); + BIF_TRAP2(gather_io_bytes_trap, BIF_P, ref, make_small(erts_no_schedulers)); } else if (ERTS_IS_ATOM_STR("run_queues", BIF_ARG_1)) { Eterm res, *hp, **hpp; @@ -4296,59 +4281,41 @@ BIF_RETTYPE erts_debug_lock_counters_1(BIF_ALIST_1) BIF_RET(am_ok); } else if (is_tuple(BIF_ARG_1)) { - Eterm* tp = tuple_val(BIF_ARG_1); + Eterm* ptr = tuple_val(BIF_ARG_1); + + if ((arityval(ptr[0]) == 2) && (ptr[2] == am_false || ptr[2] == am_true)) { + int lock_opt = 0, enable = (ptr[2] == am_true) ? 1 : 0; + if (ERTS_IS_ATOM_STR("copy_save", ptr[1])) { + lock_opt = ERTS_LCNT_OPT_COPYSAVE; + } else if (ERTS_IS_ATOM_STR("process_locks", ptr[1])) { + lock_opt = ERTS_LCNT_OPT_PROCLOCK; + } else if (ERTS_IS_ATOM_STR("port_locks", ptr[1])) { + lock_opt = ERTS_LCNT_OPT_PORTLOCK; + } else if (ERTS_IS_ATOM_STR("suspend", ptr[1])) { + lock_opt = ERTS_LCNT_OPT_SUSPEND; + } else if (ERTS_IS_ATOM_STR("location", ptr[1])) { + lock_opt = ERTS_LCNT_OPT_LOCATION; + } else { + BIF_ERROR(BIF_P, BADARG); + } - switch (arityval(tp[0])) { - case 2: { - int opt = 0; - int val = 0; - if (ERTS_IS_ATOM_STR("copy_save", tp[1])) { - opt = ERTS_LCNT_OPT_COPYSAVE; - } else if (ERTS_IS_ATOM_STR("process_locks", tp[1])) { - opt = ERTS_LCNT_OPT_PROCLOCK; - } else if (ERTS_IS_ATOM_STR("port_locks", tp[1])) { - opt = ERTS_LCNT_OPT_PORTLOCK; - } else if (ERTS_IS_ATOM_STR("suspend", tp[1])) { - opt = ERTS_LCNT_OPT_SUSPEND; - } else if (ERTS_IS_ATOM_STR("location", tp[1])) { - opt = ERTS_LCNT_OPT_LOCATION; - } else { - BIF_ERROR(BIF_P, BADARG); - } - if (tp[2] == am_true) { - val = 1; - } else if (tp[2] == am_false) { - val = 0; - } else { - BIF_ERROR(BIF_P, BADARG); - } + erts_smp_proc_unlock(BIF_P, ERTS_PROC_LOCK_MAIN); + erts_smp_thr_progress_block(); - erts_smp_proc_unlock(BIF_P, ERTS_PROC_LOCK_MAIN); - erts_smp_thr_progress_block(); - - if (val) { - res = erts_lcnt_set_rt_opt(opt) ? am_true : am_false; - } else { - res = erts_lcnt_clear_rt_opt(opt) ? am_true : am_false; - } + if (enable) res = erts_lcnt_set_rt_opt(lock_opt) ? am_true : am_false; + else res = erts_lcnt_clear_rt_opt(lock_opt) ? am_true : am_false; + #ifdef ERTS_SMP - if (res != tp[2]) { - if (opt == ERTS_LCNT_OPT_PORTLOCK) { - erts_lcnt_enable_io_lock_count(val); - } else if (opt == ERTS_LCNT_OPT_PROCLOCK) { - erts_lcnt_enable_proc_lock_count(val); - } - } + if (res != ptr[2] && lock_opt == ERTS_LCNT_OPT_PORTLOCK) { + erts_lcnt_enable_io_lock_count(enable); + } else if (res != ptr[2] && lock_opt == ERTS_LCNT_OPT_PROCLOCK) { + erts_lcnt_enable_proc_lock_count(enable); + } #endif - erts_smp_thr_progress_unblock(); - erts_smp_proc_lock(BIF_P, ERTS_PROC_LOCK_MAIN); - BIF_RET(res); - break; - } - - default: - break; - } + erts_smp_thr_progress_unblock(); + erts_smp_proc_lock(BIF_P, ERTS_PROC_LOCK_MAIN); + BIF_RET(res); + } } #endif @@ -4388,6 +4355,8 @@ erts_bif_info_init(void) = erts_export_put(am_erlang, am_gather_sched_wall_time_result, 1); gather_gc_info_res_trap = erts_export_put(am_erlang, am_gather_gc_info_result, 1); + gather_io_bytes_trap + = erts_export_put(am_erts_internal, am_gather_io_bytes, 2); process_info_init(); os_info_init(); } diff --git a/erts/emulator/beam/erl_lock_count.c b/erts/emulator/beam/erl_lock_count.c index c6d8f4df95..c4956d8569 100644 --- a/erts/emulator/beam/erl_lock_count.c +++ b/erts/emulator/beam/erl_lock_count.c @@ -20,7 +20,7 @@ /* * Description: Statistics for locks. * - * Author: Bj�rn-Egil Dahlberg + * Author: Björn-Egil Dahlberg * Date: 2008-07-03 */ @@ -49,7 +49,7 @@ const char *str_undefined = "undefined"; static ethr_tsd_key lcnt_thr_data_key; static int lcnt_n_thr; -static erts_lcnt_thread_data_t *lcnt_thread_data[4096]; +static erts_lcnt_thread_data_t *lcnt_thread_data[2048]; /* local functions */ @@ -83,12 +83,12 @@ static ERTS_INLINE int lcnt_log2(Uint64 v) { static char* lcnt_lock_type(Uint16 flag) { switch(flag & ERTS_LCNT_LT_ALL) { - case ERTS_LCNT_LT_SPINLOCK: return "spinlock"; - case ERTS_LCNT_LT_RWSPINLOCK: return "rw_spinlock"; - case ERTS_LCNT_LT_MUTEX: return "mutex"; - case ERTS_LCNT_LT_RWMUTEX: return "rw_mutex"; - case ERTS_LCNT_LT_PROCLOCK: return "proclock"; - default: return ""; + case ERTS_LCNT_LT_SPINLOCK: return "spinlock"; + case ERTS_LCNT_LT_RWSPINLOCK: return "rw_spinlock"; + case ERTS_LCNT_LT_MUTEX: return "mutex"; + case ERTS_LCNT_LT_RWMUTEX: return "rw_mutex"; + case ERTS_LCNT_LT_PROCLOCK: return "proclock"; + default: return ""; } } @@ -116,15 +116,15 @@ static void lcnt_time(erts_lcnt_time_t *time) { static void lcnt_time_diff(erts_lcnt_time_t *d, erts_lcnt_time_t *t1, erts_lcnt_time_t *t0) { long ds; long dns; - + ds = t1->s - t0->s; dns = t1->ns - t0->ns; - + /* the difference should not be able to get bigger than 1 sec in ns*/ - + if (dns < 0) { - ds -= 1; - dns += 1000000000LL; + ds -= 1; + dns += 1000000000LL; } ASSERT(ds >= 0); @@ -145,7 +145,7 @@ static void lcnt_time_add(erts_lcnt_time_t *t, erts_lcnt_time_t *d) { static erts_lcnt_thread_data_t *lcnt_thread_data_alloc(void) { erts_lcnt_thread_data_t *eltd; - + eltd = (erts_lcnt_thread_data_t*)malloc(sizeof(erts_lcnt_thread_data_t)); if (!eltd) { ERTS_INTERNAL_ERROR("Lock counter failed to allocate memory!"); @@ -164,7 +164,6 @@ static erts_lcnt_thread_data_t *lcnt_get_thread_data(void) { return (erts_lcnt_thread_data_t *)ethr_tsd_get(lcnt_thr_data_key); } - /* debug */ #if 0 @@ -183,15 +182,15 @@ static void print_lock_x(erts_lcnt_lock_t *lock, Uint16 flag, char *action) { type = lcnt_lock_type(lock->flag); r_state = ethr_atomic_read(&lock->r_state); w_state = ethr_atomic_read(&lock->w_state); - + if (lock->flag & flag) { erts_fprintf(stderr,"%10s [%24s] [r/w state %4ld/%4ld] %2s id %T\r\n", - action, - lock->name, - r_state, - w_state, - type, - lock->id); + action, + lock->name, + r_state, + w_state, + type, + lock->id); } } #endif @@ -201,18 +200,18 @@ static erts_lcnt_lock_stats_t *lcnt_get_lock_stats(erts_lcnt_lock_t *lock, char erts_lcnt_lock_stats_t *stats = NULL; if (erts_lcnt_rt_options & ERTS_LCNT_OPT_LOCATION) { - for (i = 0; i < lock->n_stats; i++) { - if ((lock->stats[i].file == file) && (lock->stats[i].line == line)) { - return &(lock->stats[i]); - } - } - if (lock->n_stats < ERTS_LCNT_MAX_LOCK_LOCATIONS) { - stats = &lock->stats[lock->n_stats]; - lock->n_stats++; - stats->file = file; - stats->line = line; - return stats; - } + for (i = 0; i < lock->n_stats; i++) { + if ((lock->stats[i].file == file) && (lock->stats[i].line == line)) { + return &(lock->stats[i]); + } + } + if (lock->n_stats < ERTS_LCNT_MAX_LOCK_LOCATIONS) { + stats = &lock->stats[lock->n_stats]; + lock->n_stats++; + stats->file = file; + stats->line = line; + return stats; + } } return &lock->stats[0]; } @@ -222,53 +221,48 @@ static void lcnt_update_stats_hist(erts_lcnt_hist_t *hist, erts_lcnt_time_t *tim unsigned long r; if (time_wait->s > 0 || time_wait->ns > ERTS_LCNT_HISTOGRAM_MAX_NS) { - idx = ERTS_LCNT_HISTOGRAM_SLOT_SIZE - 1; + idx = ERTS_LCNT_HISTOGRAM_SLOT_SIZE - 1; } else { - r = time_wait->ns >> ERTS_LCNT_HISTOGRAM_RSHIFT; - if (r) idx = lcnt_log2(r); - else idx = 0; + r = time_wait->ns >> ERTS_LCNT_HISTOGRAM_RSHIFT; + if (r) idx = lcnt_log2(r); + else idx = 0; } hist->ns[idx]++; } static void lcnt_update_stats(erts_lcnt_lock_stats_t *stats, int lock_in_conflict, - erts_lcnt_time_t *time_wait) { - + erts_lcnt_time_t *time_wait) { + ethr_atomic_inc(&stats->tries); if (lock_in_conflict) - ethr_atomic_inc(&stats->colls); + ethr_atomic_inc(&stats->colls); if (time_wait) { - lcnt_time_add(&(stats->timer), time_wait); - stats->timer_n++; - lcnt_update_stats_hist(&stats->hist,time_wait); + lcnt_time_add(&(stats->timer), time_wait); + stats->timer_n++; + lcnt_update_stats_hist(&stats->hist,time_wait); } } -/* - * interface - */ +/* interface */ void erts_lcnt_init() { erts_lcnt_thread_data_t *eltd = NULL; - + /* init lock */ if (ethr_mutex_init(&lcnt_data_lock) != 0) abort(); /* init tsd */ lcnt_n_thr = 0; - ethr_tsd_key_create(&lcnt_thr_data_key,"lcnt_data"); lcnt_lock(); - erts_lcnt_rt_options = ERTS_LCNT_OPT_PROCLOCK | ERTS_LCNT_OPT_LOCATION; - + erts_lcnt_rt_options = ERTS_LCNT_OPT_LOCATION | ERTS_LCNT_OPT_PROCLOCK; eltd = lcnt_thread_data_alloc(); - ethr_tsd_set(lcnt_thr_data_key, eltd); - + /* init lcnt structure */ erts_lcnt_data = (erts_lcnt_data_t*)malloc(sizeof(erts_lcnt_data_t)); if (!erts_lcnt_data) { @@ -293,7 +287,7 @@ void erts_lcnt_late_init() { erts_lcnt_lock_list_t *erts_lcnt_list_init(void) { erts_lcnt_lock_list_t *list; - + list = (erts_lcnt_lock_list_t*)malloc(sizeof(erts_lcnt_lock_list_t)); if (!list) { ERTS_INTERNAL_ERROR("Lock counter failed to allocate memory!"); @@ -307,14 +301,14 @@ erts_lcnt_lock_list_t *erts_lcnt_list_init(void) { /* only do this on the list with the deleted locks! */ void erts_lcnt_list_clear(erts_lcnt_lock_list_t *list) { erts_lcnt_lock_t *lock = NULL, - *next = NULL; + *next = NULL; lock = list->head; - + while(lock != NULL) { - next = lock->next; - free(lock); - lock = next; + next = lock->next; + free(lock); + lock = next; } list->head = NULL; @@ -327,26 +321,25 @@ void erts_lcnt_list_insert(erts_lcnt_lock_list_t *list, erts_lcnt_lock_t *lock) tail = list->tail; if (tail) { - tail->next = lock; - lock->prev = tail; + tail->next = lock; + lock->prev = tail; } else { - list->head = lock; - lock->prev = NULL; - ASSERT(!lock->next); + list->head = lock; + lock->prev = NULL; + ASSERT(!lock->next); } lock->next = NULL; list->tail = lock; - + list->n++; } void erts_lcnt_list_delete(erts_lcnt_lock_list_t *list, erts_lcnt_lock_t *lock) { - if (lock->next) lock->next->prev = lock->prev; if (lock->prev) lock->prev->next = lock->next; if (list->head == lock) list->head = lock->next; if (list->tail == lock) list->tail = lock->prev; - + lock->prev = NULL; lock->next = NULL; list->n--; @@ -364,12 +357,9 @@ void erts_lcnt_init_lock(erts_lcnt_lock_t *lock, char *name, Uint16 flag ) { void erts_lcnt_init_lock_x(erts_lcnt_lock_t *lock, char *name, Uint16 flag, Eterm id) { int i; - if (!name) { - lock->flag = 0; - return; - } + if (name == NULL) { ERTS_LCNT_CLEAR_FLAG(lock); return; } lcnt_lock(); - + lock->next = NULL; lock->prev = NULL; lock->flag = flag; @@ -378,46 +368,55 @@ void erts_lcnt_init_lock_x(erts_lcnt_lock_t *lock, char *name, Uint16 flag, Eter ethr_atomic_init(&lock->r_state, 0); ethr_atomic_init(&lock->w_state, 0); - #ifdef DEBUG ethr_atomic_init(&lock->flowstate, 0); #endif - + lock->n_stats = 1; for (i = 0; i < ERTS_LCNT_MAX_LOCK_LOCATIONS; i++) { - lcnt_clear_stats(&lock->stats[i]); + lcnt_clear_stats(&lock->stats[i]); } erts_lcnt_list_insert(erts_lcnt_data->current_locks, lock); lcnt_unlock(); } - +/* init empty, instead of zero struct */ +void erts_lcnt_init_lock_empty(erts_lcnt_lock_t *lock) { + lock->next = NULL; + lock->prev = NULL; + lock->flag = 0; + lock->name = NULL; + lock->id = NIL; + ethr_atomic_init(&lock->r_state, 0); + ethr_atomic_init(&lock->w_state, 0); +#ifdef DEBUG + ethr_atomic_init(&lock->flowstate, 0); +#endif + lock->n_stats = 0; + sys_memzero(lock->stats, sizeof(lock->stats)); +} +/* destroy lock */ void erts_lcnt_destroy_lock(erts_lcnt_lock_t *lock) { - erts_lcnt_lock_t *deleted_lock; - - if (!ERTS_LCNT_LOCK_TYPE(lock)) return; - + if (ERTS_LCNT_IS_LOCK_INVALID(lock)) return; lcnt_lock(); if (erts_lcnt_rt_options & ERTS_LCNT_OPT_COPYSAVE) { - /* copy structure and insert the copy */ - - deleted_lock = (erts_lcnt_lock_t*)malloc(sizeof(erts_lcnt_lock_t)); + erts_lcnt_lock_t *deleted_lock; + /* copy structure and insert the copy */ + deleted_lock = (erts_lcnt_lock_t*)malloc(sizeof(erts_lcnt_lock_t)); if (!deleted_lock) { ERTS_INTERNAL_ERROR("Lock counter failed to allocate memory!"); } - memcpy(deleted_lock, lock, sizeof(erts_lcnt_lock_t)); - - deleted_lock->next = NULL; - deleted_lock->prev = NULL; - - erts_lcnt_list_insert(erts_lcnt_data->deleted_locks, deleted_lock); + memcpy(deleted_lock, lock, sizeof(erts_lcnt_lock_t)); + deleted_lock->next = NULL; + deleted_lock->prev = NULL; + erts_lcnt_list_insert(erts_lcnt_data->deleted_locks, deleted_lock); } /* delete original */ erts_lcnt_list_delete(erts_lcnt_data->current_locks, lock); - lock->flag = 0; - + ERTS_LCNT_CLEAR_FLAG(lock); + lcnt_unlock(); } @@ -426,16 +425,15 @@ void erts_lcnt_destroy_lock(erts_lcnt_lock_t *lock) { void erts_lcnt_lock_opt(erts_lcnt_lock_t *lock, Uint16 option) { erts_aint_t r_state = 0, w_state = 0; erts_lcnt_thread_data_t *eltd; - + if (erts_lcnt_rt_options & ERTS_LCNT_OPT_SUSPEND) return; - if (!ERTS_LCNT_LOCK_TYPE(lock)) return; + if (ERTS_LCNT_IS_LOCK_INVALID(lock)) return; eltd = lcnt_get_thread_data(); - ASSERT(eltd); - + w_state = ethr_atomic_read(&lock->w_state); - + if (option & ERTS_LCNT_LO_WRITE) { r_state = ethr_atomic_read(&lock->r_state); ethr_atomic_inc( &lock->w_state); @@ -443,48 +441,47 @@ void erts_lcnt_lock_opt(erts_lcnt_lock_t *lock, Uint16 option) { if (option & ERTS_LCNT_LO_READ) { ethr_atomic_inc( &lock->r_state); } - + /* we cannot acquire w_lock if either w or r are taken */ /* we cannot acquire r_lock if w_lock is taken */ - + if ((w_state > 0) || (r_state > 0)) { - eltd->lock_in_conflict = 1; - if (eltd->timer_set == 0) { - lcnt_time(&eltd->timer); - } - eltd->timer_set++; + eltd->lock_in_conflict = 1; + if (eltd->timer_set == 0) { + lcnt_time(&eltd->timer); + } + eltd->timer_set++; } else { - eltd->lock_in_conflict = 0; + eltd->lock_in_conflict = 0; } } void erts_lcnt_lock(erts_lcnt_lock_t *lock) { erts_aint_t w_state; erts_lcnt_thread_data_t *eltd; - + if (erts_lcnt_rt_options & ERTS_LCNT_OPT_SUSPEND) return; - if (!ERTS_LCNT_LOCK_TYPE(lock)) return; + if (ERTS_LCNT_IS_LOCK_INVALID(lock)) return; w_state = ethr_atomic_read(&lock->w_state); ethr_atomic_inc(&lock->w_state); - eltd = lcnt_get_thread_data(); ASSERT(eltd); if (w_state > 0) { - eltd->lock_in_conflict = 1; - /* only set the timer if nobody else has it - * This should only happen when proc_locks aquires several locks - * 'atomicly'. All other locks will block the thread if w_state > 0 - * i.e. locked. - */ - if (eltd->timer_set == 0) { - lcnt_time(&eltd->timer); - } - eltd->timer_set++; + eltd->lock_in_conflict = 1; + /* only set the timer if nobody else has it + * This should only happen when proc_locks aquires several locks + * 'atomicly'. All other locks will block the thread if w_state > 0 + * i.e. locked. + */ + if (eltd->timer_set == 0) { + lcnt_time(&eltd->timer); + } + eltd->timer_set++; } else { - eltd->lock_in_conflict = 0; + eltd->lock_in_conflict = 0; } } @@ -493,16 +490,19 @@ void erts_lcnt_lock(erts_lcnt_lock_t *lock) { void erts_lcnt_lock_unaquire(erts_lcnt_lock_t *lock) { /* should check if this thread was "waiting" */ if (erts_lcnt_rt_options & ERTS_LCNT_OPT_SUSPEND) return; - if (!ERTS_LCNT_LOCK_TYPE(lock)) return; + if (ERTS_LCNT_IS_LOCK_INVALID(lock)) return; ethr_atomic_dec(&lock->w_state); } -/* erts_lcnt_lock_post - * used when we get a lock (i.e. directly after a lock operation) +/* + * erts_lcnt_lock_post + * + * Used when we get a lock (i.e. directly after a lock operation) * if the timer was set then we had to wait for the lock * lock_post will calculate the wait time. */ + void erts_lcnt_lock_post(erts_lcnt_lock_t *lock) { erts_lcnt_lock_post_x(lock, (char*)str_undefined, 0); } @@ -517,31 +517,31 @@ void erts_lcnt_lock_post_x(erts_lcnt_lock_t *lock, char *file, unsigned int line #endif if (erts_lcnt_rt_options & ERTS_LCNT_OPT_SUSPEND) return; - if (!ERTS_LCNT_LOCK_TYPE(lock)) return; - + if (ERTS_LCNT_IS_LOCK_INVALID(lock)) return; + #ifdef DEBUG if (!(lock->flag & (ERTS_LCNT_LT_RWMUTEX | ERTS_LCNT_LT_RWSPINLOCK))) { - flowstate = ethr_atomic_read(&lock->flowstate); - ASSERT(flowstate == 0); - ethr_atomic_inc(&lock->flowstate); + flowstate = ethr_atomic_read(&lock->flowstate); + ASSERT(flowstate == 0); + ethr_atomic_inc(&lock->flowstate); } #endif - + eltd = lcnt_get_thread_data(); - + ASSERT(eltd); /* if lock was in conflict, time it */ stats = lcnt_get_lock_stats(lock, file, line); if (eltd->timer_set) { - lcnt_time(&timer); - - lcnt_time_diff(&time_wait, &timer, &(eltd->timer)); - lcnt_update_stats(stats, eltd->lock_in_conflict, &time_wait); - eltd->timer_set--; - ASSERT(eltd->timer_set >= 0); + lcnt_time(&timer); + + lcnt_time_diff(&time_wait, &timer, &(eltd->timer)); + lcnt_update_stats(stats, eltd->lock_in_conflict, &time_wait); + eltd->timer_set--; + ASSERT(eltd->timer_set >= 0); } else { - lcnt_update_stats(stats, eltd->lock_in_conflict, NULL); + lcnt_update_stats(stats, eltd->lock_in_conflict, NULL); } } @@ -550,27 +550,28 @@ void erts_lcnt_lock_post_x(erts_lcnt_lock_t *lock, char *file, unsigned int line void erts_lcnt_unlock_opt(erts_lcnt_lock_t *lock, Uint16 option) { if (erts_lcnt_rt_options & ERTS_LCNT_OPT_SUSPEND) return; - if (!ERTS_LCNT_LOCK_TYPE(lock)) return; + if (ERTS_LCNT_IS_LOCK_INVALID(lock)) return; if (option & ERTS_LCNT_LO_WRITE) ethr_atomic_dec(&lock->w_state); if (option & ERTS_LCNT_LO_READ ) ethr_atomic_dec(&lock->r_state); } void erts_lcnt_unlock(erts_lcnt_lock_t *lock) { -#ifdef DEBUG - erts_aint_t w_state; - erts_aint_t flowstate; -#endif if (erts_lcnt_rt_options & ERTS_LCNT_OPT_SUSPEND) return; - if (!ERTS_LCNT_LOCK_TYPE(lock)) return; + if (ERTS_LCNT_IS_LOCK_INVALID(lock)) return; #ifdef DEBUG - /* flowstate */ - flowstate = ethr_atomic_read(&lock->flowstate); - ASSERT(flowstate == 1); - ethr_atomic_dec(&lock->flowstate); - - /* write state */ - w_state = ethr_atomic_read(&lock->w_state); - ASSERT(w_state > 0); + { + erts_aint_t w_state; + erts_aint_t flowstate; + + /* flowstate */ + flowstate = ethr_atomic_read(&lock->flowstate); + ASSERT(flowstate == 1); + ethr_atomic_dec(&lock->flowstate); + + /* write state */ + w_state = ethr_atomic_read(&lock->w_state); + ASSERT(w_state > 0); + } #endif ethr_atomic_dec(&lock->w_state); } @@ -579,35 +580,34 @@ void erts_lcnt_unlock(erts_lcnt_lock_t *lock) { void erts_lcnt_trylock_opt(erts_lcnt_lock_t *lock, int res, Uint16 option) { if (erts_lcnt_rt_options & ERTS_LCNT_OPT_SUSPEND) return; - if (!ERTS_LCNT_LOCK_TYPE(lock)) return; + if (ERTS_LCNT_IS_LOCK_INVALID(lock)) return; /* Determine lock_state via res instead of state */ if (res != EBUSY) { - if (option & ERTS_LCNT_LO_WRITE) ethr_atomic_inc(&lock->w_state); - if (option & ERTS_LCNT_LO_READ ) ethr_atomic_inc(&lock->r_state); - lcnt_update_stats(&(lock->stats[0]), 0, NULL); + if (option & ERTS_LCNT_LO_WRITE) ethr_atomic_inc(&lock->w_state); + if (option & ERTS_LCNT_LO_READ ) ethr_atomic_inc(&lock->r_state); + lcnt_update_stats(&(lock->stats[0]), 0, NULL); } else { ethr_atomic_inc(&lock->stats[0].tries); ethr_atomic_inc(&lock->stats[0].colls); } } - + void erts_lcnt_trylock(erts_lcnt_lock_t *lock, int res) { /* Determine lock_state via res instead of state */ -#ifdef DEBUG - erts_aint_t flowstate; -#endif if (erts_lcnt_rt_options & ERTS_LCNT_OPT_SUSPEND) return; - if (!ERTS_LCNT_LOCK_TYPE(lock)) return; + if (ERTS_LCNT_IS_LOCK_INVALID(lock)) return; if (res != EBUSY) { - #ifdef DEBUG - flowstate = ethr_atomic_read(&lock->flowstate); - ASSERT(flowstate == 0); - ethr_atomic_inc( &lock->flowstate); + { + erts_aint_t flowstate; + flowstate = ethr_atomic_read(&lock->flowstate); + ASSERT(flowstate == 0); + ethr_atomic_inc( &lock->flowstate); + } #endif - ethr_atomic_inc(&lock->w_state); - lcnt_update_stats(&(lock->stats[0]), 0, NULL); + ethr_atomic_inc(&lock->w_state); + lcnt_update_stats(&(lock->stats[0]), 0, NULL); } else { ethr_atomic_inc(&lock->stats[0].tries); ethr_atomic_inc(&lock->stats[0].colls); @@ -662,13 +662,13 @@ void erts_lcnt_clear_counters(void) { lcnt_lock(); list = erts_lcnt_data->current_locks; - + for (lock = list->head; lock != NULL; lock = lock->next) { - for( i = 0; i < ERTS_LCNT_MAX_LOCK_LOCATIONS; i++) { - stats = &lock->stats[i]; - lcnt_clear_stats(stats); - } - lock->n_stats = 1; + for( i = 0; i < ERTS_LCNT_MAX_LOCK_LOCATIONS; i++) { + stats = &lock->stats[i]; + lcnt_clear_stats(stats); + } + lock->n_stats = 1; } /* empty deleted locks in lock list */ @@ -681,14 +681,14 @@ void erts_lcnt_clear_counters(void) { erts_lcnt_data_t *erts_lcnt_get_data(void) { erts_lcnt_time_t timer_stop; - + lcnt_lock(); - + lcnt_time(&timer_stop); lcnt_time_diff(&(erts_lcnt_data->duration), &timer_stop, &timer_start); - + lcnt_unlock(); - + return erts_lcnt_data; } diff --git a/erts/emulator/beam/erl_lock_count.h b/erts/emulator/beam/erl_lock_count.h index 09fadd7e9e..051f55271d 100644 --- a/erts/emulator/beam/erl_lock_count.h +++ b/erts/emulator/beam/erl_lock_count.h @@ -20,7 +20,7 @@ /* * Description: Statistics for locks. * - * Author: Bj�rn-Egil Dahlberg + * Author: Björn-Egil Dahlberg * Date: 2008-07-03 * Abstract: * Locks statistics internal representation. @@ -95,30 +95,35 @@ #define ERTS_LCNT_LO_WRITE (((Uint16) 1) << 7) #define ERTS_LCNT_LO_READ_WRITE ( ERTS_LCNT_LO_READ \ - | ERTS_LCNT_LO_WRITE ) + | ERTS_LCNT_LO_WRITE ) #define ERTS_LCNT_LT_ALL ( ERTS_LCNT_LT_SPINLOCK \ - | ERTS_LCNT_LT_RWSPINLOCK \ - | ERTS_LCNT_LT_MUTEX \ - | ERTS_LCNT_LT_RWMUTEX \ - | ERTS_LCNT_LT_PROCLOCK ) + | ERTS_LCNT_LT_RWSPINLOCK \ + | ERTS_LCNT_LT_MUTEX \ + | ERTS_LCNT_LT_RWMUTEX \ + | ERTS_LCNT_LT_PROCLOCK ) + +#define ERTS_LCNT_LOCK_TYPE(lock) ((lock)->flag & ERTS_LCNT_LT_ALL) +#define ERTS_LCNT_IS_LOCK_INVALID(lock) (!((lock)->flag & ERTS_LCNT_LT_ALL)) +#define ERTS_LCNT_CLEAR_FLAG(lock) ((lock)->flag = 0) + /* runtime options */ -#define ERTS_LCNT_OPT_SUSPEND (((Uint16) 1) << 0) -#define ERTS_LCNT_OPT_LOCATION (((Uint16) 1) << 1) -#define ERTS_LCNT_OPT_PROCLOCK (((Uint16) 1) << 2) -#define ERTS_LCNT_OPT_COPYSAVE (((Uint16) 1) << 3) -#define ERTS_LCNT_OPT_PORTLOCK (((Uint16) 1) << 4) +#define ERTS_LCNT_OPT_SUSPEND (((Uint16) 1) << 0) +#define ERTS_LCNT_OPT_LOCATION (((Uint16) 1) << 1) +#define ERTS_LCNT_OPT_PROCLOCK (((Uint16) 1) << 2) +#define ERTS_LCNT_OPT_PORTLOCK (((Uint16) 1) << 3) +#define ERTS_LCNT_OPT_COPYSAVE (((Uint16) 1) << 4) typedef struct { unsigned long s; unsigned long ns; } erts_lcnt_time_t; - + extern erts_lcnt_time_t timer_start; typedef struct { - Uint32 ns[ERTS_LCNT_HISTOGRAM_SLOT_SIZE]; /* log2 array of nano seconds occurences */ + Uint32 ns[ERTS_LCNT_HISTOGRAM_SLOT_SIZE]; /* log2 array of nano seconds occurences */ } erts_lcnt_hist_t; typedef struct erts_lcnt_lock_stats_s { @@ -129,10 +134,10 @@ typedef struct erts_lcnt_lock_stats_s { char *file; /* which file the lock was taken */ unsigned int line; /* line number in file */ - + ethr_atomic_t tries; /* n tries to get lock */ ethr_atomic_t colls; /* n collisions of tries to get lock */ - + unsigned long timer_n; /* #times waited for lock */ erts_lcnt_time_t timer; /* total wait time for lock */ erts_lcnt_hist_t hist; @@ -155,7 +160,7 @@ typedef struct erts_lcnt_lock_s { /* statistics */ unsigned int n_stats; erts_lcnt_lock_stats_t stats[ERTS_LCNT_MAX_LOCK_LOCATIONS]; /* first entry is "undefined"*/ - + /* chains for list handling */ /* data is hold by lcnt_lock */ struct erts_lcnt_lock_s *prev; @@ -167,7 +172,7 @@ typedef struct { erts_lcnt_lock_t *tail; unsigned long n; } erts_lcnt_lock_list_t; - + typedef struct { erts_lcnt_time_t duration; /* time since last clear */ erts_lcnt_lock_list_t *current_locks; @@ -205,6 +210,7 @@ void erts_lcnt_list_delete(erts_lcnt_lock_list_t *list, erts_lcnt_lock_t *lock); /* lock operations (global) */ void erts_lcnt_init_lock(erts_lcnt_lock_t *lock, char *name, Uint16 flag); void erts_lcnt_init_lock_x(erts_lcnt_lock_t *lock, char *name, Uint16 flag, Eterm id); +void erts_lcnt_init_lock_empty(erts_lcnt_lock_t *lock); void erts_lcnt_destroy_lock(erts_lcnt_lock_t *lock); void erts_lcnt_lock(erts_lcnt_lock_t *lock); @@ -226,7 +232,5 @@ void erts_lcnt_clear_counters(void); char *erts_lcnt_lock_type(Uint16 type); erts_lcnt_data_t *erts_lcnt_get_data(void); -#define ERTS_LCNT_LOCK_TYPE(lockp) ((lockp)->flag & ERTS_LCNT_LT_ALL) - #endif /* ifdef ERTS_ENABLE_LOCK_COUNT */ #endif /* ifndef ERTS_LOCK_COUNT_H__ */ diff --git a/erts/emulator/beam/erl_port.h b/erts/emulator/beam/erl_port.h index 3920fae2d9..66cbb67a0d 100644 --- a/erts/emulator/beam/erl_port.h +++ b/erts/emulator/beam/erl_port.h @@ -266,8 +266,7 @@ erts_prtsd_set(Port *prt, int ix, void *data) #endif -extern erts_smp_atomic_t erts_bytes_out; /* no bytes written out */ -extern erts_smp_atomic_t erts_bytes_in; /* no bytes sent into the system */ +Eterm erts_request_io_bytes(Process *c_p); /* port status flags */ diff --git a/erts/emulator/beam/erl_process.c b/erts/emulator/beam/erl_process.c index 43e84a1be1..88c1b5c121 100644 --- a/erts/emulator/beam/erl_process.c +++ b/erts/emulator/beam/erl_process.c @@ -5542,6 +5542,9 @@ init_scheduler_data(ErtsSchedulerData* esdp, int num, esdp->thr_id = (Uint32) num; erts_sched_bif_unique_init(esdp); + esdp->io.out = (Uint64) 0; + esdp->io.in = (Uint64) 0; + if (daww_ptr) { init_aux_work_data(&esdp->aux_work_data, esdp, *daww_ptr); #ifdef ERTS_SMP diff --git a/erts/emulator/beam/erl_process.h b/erts/emulator/beam/erl_process.h index 241a82d6c2..a6c3790991 100644 --- a/erts/emulator/beam/erl_process.h +++ b/erts/emulator/beam/erl_process.h @@ -659,6 +659,11 @@ struct ErtsSchedulerData_ { ErtsSchedAllocData alloc_data; + struct { + Uint64 out; + Uint64 in; + } io; + Uint64 reductions; ErtsSchedWallTime sched_wall_time; ErtsGCInfo gc_info; diff --git a/erts/emulator/beam/erl_process_lock.c b/erts/emulator/beam/erl_process_lock.c index b28bc498f6..0548c6137c 100644 --- a/erts/emulator/beam/erl_process_lock.c +++ b/erts/emulator/beam/erl_process_lock.c @@ -110,6 +110,9 @@ static struct { erts_pix_lock_t erts_pix_locks[ERTS_NO_OF_PIX_LOCKS]; +#ifdef ERTS_ENABLE_LOCK_COUNT +static void lcnt_enable_proc_lock_count(Process *proc, int enable); +#endif void erts_init_proc_lock(int cpus) @@ -1083,30 +1086,32 @@ erts_proc_lock_fin(Process *p) /* --- Process lock counting ----------------------------------------------- */ #if ERTS_PROC_LOCK_OWN_IMPL && defined(ERTS_ENABLE_LOCK_COUNT) -void erts_lcnt_proc_lock_init(Process *p) { - if (erts_lcnt_rt_options & ERTS_LCNT_OPT_PROCLOCK) { - if (p->common.id != ERTS_INVALID_PID) { - erts_lcnt_init_lock_x(&(p->lock.lcnt_main), "proc_main", ERTS_LCNT_LT_PROCLOCK, p->common.id); - erts_lcnt_init_lock_x(&(p->lock.lcnt_msgq), "proc_msgq", ERTS_LCNT_LT_PROCLOCK, p->common.id); - erts_lcnt_init_lock_x(&(p->lock.lcnt_btm), "proc_btm", ERTS_LCNT_LT_PROCLOCK, p->common.id); - erts_lcnt_init_lock_x(&(p->lock.lcnt_link), "proc_link", ERTS_LCNT_LT_PROCLOCK, p->common.id); - erts_lcnt_init_lock_x(&(p->lock.lcnt_status), "proc_status", ERTS_LCNT_LT_PROCLOCK, p->common.id); - } else { - erts_lcnt_init_lock(&(p->lock.lcnt_main), "proc_main", ERTS_LCNT_LT_PROCLOCK); - erts_lcnt_init_lock(&(p->lock.lcnt_msgq), "proc_msgq", ERTS_LCNT_LT_PROCLOCK); - erts_lcnt_init_lock(&(p->lock.lcnt_btm), "proc_btm", ERTS_LCNT_LT_PROCLOCK); - erts_lcnt_init_lock(&(p->lock.lcnt_link), "proc_link", ERTS_LCNT_LT_PROCLOCK); - erts_lcnt_init_lock(&(p->lock.lcnt_status), "proc_status", ERTS_LCNT_LT_PROCLOCK); - } - } else { - sys_memzero(&(p->lock.lcnt_main), sizeof(p->lock.lcnt_main)); - sys_memzero(&(p->lock.lcnt_msgq), sizeof(p->lock.lcnt_msgq)); - sys_memzero(&(p->lock.lcnt_btm), sizeof(p->lock.lcnt_btm)); - sys_memzero(&(p->lock.lcnt_link), sizeof(p->lock.lcnt_link)); - sys_memzero(&(p->lock.lcnt_status), sizeof(p->lock.lcnt_status)); - } + +void erts_lcnt_enable_proc_lock_count(int enable) { + int ix, max = erts_ptab_max(&erts_proc); + Process *proc = NULL; + for (ix = 0; ix < max; ++ix) { + if ((proc = erts_pix2proc(ix)) != NULL) + lcnt_enable_proc_lock_count(proc, enable); + } /* for all processes */ } - + +void erts_lcnt_proc_lock_init(Process *p) { + if (!(erts_lcnt_rt_options & ERTS_LCNT_OPT_PROCLOCK)) { + erts_lcnt_init_lock_empty(&(p->lock.lcnt_main)); + erts_lcnt_init_lock_empty(&(p->lock.lcnt_msgq)); + erts_lcnt_init_lock_empty(&(p->lock.lcnt_btm)); + erts_lcnt_init_lock_empty(&(p->lock.lcnt_link)); + erts_lcnt_init_lock_empty(&(p->lock.lcnt_status)); + } else { /* now the common case */ + Eterm pid = (p->common.id != ERTS_INVALID_PID) ? p->common.id : NIL; + erts_lcnt_init_lock_x(&(p->lock.lcnt_main), "proc_main", ERTS_LCNT_LT_PROCLOCK, pid); + erts_lcnt_init_lock_x(&(p->lock.lcnt_msgq), "proc_msgq", ERTS_LCNT_LT_PROCLOCK, pid); + erts_lcnt_init_lock_x(&(p->lock.lcnt_btm), "proc_btm", ERTS_LCNT_LT_PROCLOCK, pid); + erts_lcnt_init_lock_x(&(p->lock.lcnt_link), "proc_link", ERTS_LCNT_LT_PROCLOCK, pid); + erts_lcnt_init_lock_x(&(p->lock.lcnt_status),"proc_status",ERTS_LCNT_LT_PROCLOCK, pid); + } /* the lock names should really be aligned to four characters */ +} /* logic reversed */ void erts_lcnt_proc_lock_destroy(Process *p) { erts_lcnt_destroy_lock(&(p->lock.lcnt_main)); @@ -1116,28 +1121,31 @@ void erts_lcnt_proc_lock_destroy(Process *p) { erts_lcnt_destroy_lock(&(p->lock.lcnt_status)); } -void erts_lcnt_proc_lock(erts_proc_lock_t *lock, ErtsProcLocks locks) { - if (erts_lcnt_rt_options & ERTS_LCNT_OPT_PROCLOCK) { - if (locks & ERTS_PROC_LOCK_MAIN) { - erts_lcnt_lock(&(lock->lcnt_main)); - } - if (locks & ERTS_PROC_LOCK_MSGQ) { - erts_lcnt_lock(&(lock->lcnt_msgq)); - } - if (locks & ERTS_PROC_LOCK_BTM) { - erts_lcnt_lock(&(lock->lcnt_btm)); - } - if (locks & ERTS_PROC_LOCK_LINK) { - erts_lcnt_lock(&(lock->lcnt_link)); - } - if (locks & ERTS_PROC_LOCK_STATUS) { - erts_lcnt_lock(&(lock->lcnt_status)); +static void lcnt_enable_proc_lock_count(Process *proc, int enable) { + if (enable) { + if (!ERTS_LCNT_LOCK_TYPE(&(proc->lock.lcnt_main))) { + erts_lcnt_proc_lock_init(proc); + } } + else { + if (ERTS_LCNT_LOCK_TYPE(&(proc->lock.lcnt_main))) { + erts_lcnt_proc_lock_destroy(proc); + } } } -void erts_lcnt_proc_lock_post_x(erts_proc_lock_t *lock, ErtsProcLocks locks, char *file, unsigned int line) { - if (erts_lcnt_rt_options & ERTS_LCNT_OPT_PROCLOCK) { +void erts_lcnt_proc_lock(erts_proc_lock_t *lock, ErtsProcLocks locks) { + if (!(erts_lcnt_rt_options & ERTS_LCNT_OPT_PROCLOCK)) return; + if (locks & ERTS_PROC_LOCK_MAIN) { erts_lcnt_lock(&(lock->lcnt_main)); } + if (locks & ERTS_PROC_LOCK_MSGQ) { erts_lcnt_lock(&(lock->lcnt_msgq)); } + if (locks & ERTS_PROC_LOCK_BTM) { erts_lcnt_lock(&(lock->lcnt_btm)); } + if (locks & ERTS_PROC_LOCK_LINK) { erts_lcnt_lock(&(lock->lcnt_link)); } + if (locks & ERTS_PROC_LOCK_STATUS) { erts_lcnt_lock(&(lock->lcnt_status)); } +} + +void erts_lcnt_proc_lock_post_x(erts_proc_lock_t *lock, ErtsProcLocks locks, + char *file, unsigned int line) { + if (!(erts_lcnt_rt_options & ERTS_LCNT_OPT_PROCLOCK)) return; if (locks & ERTS_PROC_LOCK_MAIN) { erts_lcnt_lock_post_x(&(lock->lcnt_main), file, line); } @@ -1153,90 +1161,34 @@ void erts_lcnt_proc_lock_post_x(erts_proc_lock_t *lock, ErtsProcLocks locks, cha if (locks & ERTS_PROC_LOCK_STATUS) { erts_lcnt_lock_post_x(&(lock->lcnt_status), file, line); } - } } void erts_lcnt_proc_lock_unaquire(erts_proc_lock_t *lock, ErtsProcLocks locks) { - if (erts_lcnt_rt_options & ERTS_LCNT_OPT_PROCLOCK) { - if (locks & ERTS_PROC_LOCK_MAIN) { - erts_lcnt_lock_unaquire(&(lock->lcnt_main)); - } - if (locks & ERTS_PROC_LOCK_MSGQ) { - erts_lcnt_lock_unaquire(&(lock->lcnt_msgq)); - } - if (locks & ERTS_PROC_LOCK_BTM) { - erts_lcnt_lock_unaquire(&(lock->lcnt_btm)); - } - if (locks & ERTS_PROC_LOCK_LINK) { - erts_lcnt_lock_unaquire(&(lock->lcnt_link)); - } - if (locks & ERTS_PROC_LOCK_STATUS) { - erts_lcnt_lock_unaquire(&(lock->lcnt_status)); - } - } + if (!(erts_lcnt_rt_options & ERTS_LCNT_OPT_PROCLOCK)) return; + if (locks & ERTS_PROC_LOCK_MAIN) { erts_lcnt_lock_unaquire(&(lock->lcnt_main)); } + if (locks & ERTS_PROC_LOCK_MSGQ) { erts_lcnt_lock_unaquire(&(lock->lcnt_msgq)); } + if (locks & ERTS_PROC_LOCK_BTM) { erts_lcnt_lock_unaquire(&(lock->lcnt_btm)); } + if (locks & ERTS_PROC_LOCK_LINK) { erts_lcnt_lock_unaquire(&(lock->lcnt_link)); } + if (locks & ERTS_PROC_LOCK_STATUS) { erts_lcnt_lock_unaquire(&(lock->lcnt_status)); } } void erts_lcnt_proc_unlock(erts_proc_lock_t *lock, ErtsProcLocks locks) { - if (erts_lcnt_rt_options & ERTS_LCNT_OPT_PROCLOCK) { - if (locks & ERTS_PROC_LOCK_MAIN) { - erts_lcnt_unlock(&(lock->lcnt_main)); - } - if (locks & ERTS_PROC_LOCK_MSGQ) { - erts_lcnt_unlock(&(lock->lcnt_msgq)); - } - if (locks & ERTS_PROC_LOCK_BTM) { - erts_lcnt_unlock(&(lock->lcnt_btm)); - } - if (locks & ERTS_PROC_LOCK_LINK) { - erts_lcnt_unlock(&(lock->lcnt_link)); - } - if (locks & ERTS_PROC_LOCK_STATUS) { - erts_lcnt_unlock(&(lock->lcnt_status)); - } - } + if (!(erts_lcnt_rt_options & ERTS_LCNT_OPT_PROCLOCK)) return; + if (locks & ERTS_PROC_LOCK_MAIN) { erts_lcnt_unlock(&(lock->lcnt_main)); } + if (locks & ERTS_PROC_LOCK_MSGQ) { erts_lcnt_unlock(&(lock->lcnt_msgq)); } + if (locks & ERTS_PROC_LOCK_BTM) { erts_lcnt_unlock(&(lock->lcnt_btm)); } + if (locks & ERTS_PROC_LOCK_LINK) { erts_lcnt_unlock(&(lock->lcnt_link)); } + if (locks & ERTS_PROC_LOCK_STATUS) { erts_lcnt_unlock(&(lock->lcnt_status)); } } void erts_lcnt_proc_trylock(erts_proc_lock_t *lock, ErtsProcLocks locks, int res) { - if (erts_lcnt_rt_options & ERTS_LCNT_OPT_PROCLOCK) { - if (locks & ERTS_PROC_LOCK_MAIN) { - erts_lcnt_trylock(&(lock->lcnt_main), res); - } - if (locks & ERTS_PROC_LOCK_MSGQ) { - erts_lcnt_trylock(&(lock->lcnt_msgq), res); - } - if (locks & ERTS_PROC_LOCK_BTM) { - erts_lcnt_trylock(&(lock->lcnt_btm), res); - } - if (locks & ERTS_PROC_LOCK_LINK) { - erts_lcnt_trylock(&(lock->lcnt_link), res); - } - if (locks & ERTS_PROC_LOCK_STATUS) { - erts_lcnt_trylock(&(lock->lcnt_status), res); - } - } -} - - -void erts_lcnt_enable_proc_lock_count(int enable) -{ - int i, max = erts_ptab_max(&erts_proc); - - for (i = 0; i < max; ++i) { - Process* p = erts_pix2proc(i); - if (p) { - if (enable) { - if (!ERTS_LCNT_LOCK_TYPE(&(p->lock.lcnt_main))) { - erts_lcnt_proc_lock_init(p); - } - } else { - if (ERTS_LCNT_LOCK_TYPE(&(p->lock.lcnt_main))) { - erts_lcnt_proc_lock_destroy(p); - } - } - } - } -} - -#endif /* ifdef ERTS_ENABLE_LOCK_COUNT */ + if (!(erts_lcnt_rt_options & ERTS_LCNT_OPT_PROCLOCK)) return; + if (locks & ERTS_PROC_LOCK_MAIN) { erts_lcnt_trylock(&(lock->lcnt_main), res); } + if (locks & ERTS_PROC_LOCK_MSGQ) { erts_lcnt_trylock(&(lock->lcnt_msgq), res); } + if (locks & ERTS_PROC_LOCK_BTM) { erts_lcnt_trylock(&(lock->lcnt_btm), res); } + if (locks & ERTS_PROC_LOCK_LINK) { erts_lcnt_trylock(&(lock->lcnt_link), res); } + if (locks & ERTS_PROC_LOCK_STATUS) { erts_lcnt_trylock(&(lock->lcnt_status), res); } +} /* reversed logic */ +#endif /* ERTS_ENABLE_LOCK_COUNT */ /* --- Process lock checking ----------------------------------------------- */ diff --git a/erts/emulator/beam/io.c b/erts/emulator/beam/io.c index 23f208c2eb..75d80b1a58 100644 --- a/erts/emulator/beam/io.c +++ b/erts/emulator/beam/io.c @@ -65,8 +65,6 @@ static erts_smp_tsd_key_t driver_list_last_error_key; /* Save last DDLL error o per thread basis (for BC interfaces) */ ErtsPTab erts_port erts_align_attribute(ERTS_CACHE_LINE_SIZE); /* The port table */ -erts_smp_atomic_t erts_bytes_out; /* No bytes sent out of the system */ -erts_smp_atomic_t erts_bytes_in; /* No bytes gotten into the system */ const ErlDrvTermData driver_term_nil = (ErlDrvTermData)NIL; @@ -80,6 +78,9 @@ int erts_port_synchronous_ops = 0; int erts_port_schedule_all_ops = 0; int erts_port_parallelism = 0; +static erts_atomic64_t bytes_in; +static erts_atomic64_t bytes_out; + static void deliver_result(Eterm sender, Eterm pid, Eterm res); static int init_driver(erts_driver_t *, ErlDrvEntry *, DE_Handle *); static void terminate_port(Port *p); @@ -1534,12 +1535,10 @@ erts_schedule_proc2port_signal(Process *c_p, } static ERTS_INLINE void -send_badsig(Port *prt) -{ +send_badsig(Port *prt) { ErtsProcLocks rp_locks = ERTS_PROC_LOCKS_XSIG_SEND; Process* rp; Eterm connected = ERTS_PORT_GET_CONNECTED(prt); - ERTS_SMP_CHK_NO_PROC_LOCKS; ERTS_LC_ASSERT(erts_get_scheduler_id()); @@ -1559,15 +1558,13 @@ send_badsig(Port *prt) 0); if (rp_locks) erts_smp_proc_unlock(rp, rp_locks); - } -} + } /* exit sent */ +} /* send_badsig */ static void -badsig_received(int bang_op, - Port *prt, +badsig_received(int bang_op, Port *prt, erts_aint32_t state, - int bad_output_value) -{ + int bad_output_value) { /* * if (bang_op) * we are part of a "Prt ! Something" operation @@ -1583,12 +1580,12 @@ badsig_received(int bang_op, } if (bang_op) send_badsig(prt); - } -} + } /* not invalid */ +} /* behaved accordingly */ static int -port_badsig(Port *prt, erts_aint32_t state, int op, ErtsProc2PortSigData *sigdp) -{ +port_badsig(Port *prt, erts_aint32_t state, int op, + ErtsProc2PortSigData *sigdp) { if (op == ERTS_PROC2PORT_SIG_EXEC) badsig_received(sigdp->flags & ERTS_P2P_SIG_DATA_FLG_BANG_OP, prt, @@ -1597,16 +1594,14 @@ port_badsig(Port *prt, erts_aint32_t state, int op, ErtsProc2PortSigData *sigdp) if (sigdp->flags & ERTS_P2P_SIG_DATA_FLG_REPLY) port_sched_op_reply(sigdp->caller, sigdp->ref, am_badarg); return ERTS_PORT_REDS_BADSIG; -} - - -/* - * bad_port_signal() will +} /* port_badsig */ +/* bad_port_signal() will * - preserve signal order of signals. * - send a 'badsig' exit signal to connected process if 'from' is an * internal pid and the port is alive when the bad signal reaches * it. */ + static ErtsPortOpResult bad_port_signal(Process *c_p, int flags, @@ -1681,6 +1676,7 @@ call_driver_outputv(int bang_op, if (bang_op && from != ERTS_PORT_GET_CONNECTED(prt)) send_badsig(prt); else { + ErtsSchedulerData *esdp = erts_get_scheduler_data(); ErlDrvSizeT size = evp->size; ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt) @@ -1698,7 +1694,10 @@ call_driver_outputv(int bang_op, prt->caller = NIL; prt->bytes_out += size; - erts_smp_atomic_add_nob(&erts_bytes_out, size); + if (esdp) + esdp->io.out += (Uint64) size; + else + erts_atomic64_add_nob(&bytes_out, (erts_aint64_t) size); } } @@ -1778,7 +1777,7 @@ call_driver_output(int bang_op, if (bang_op && from != ERTS_PORT_GET_CONNECTED(prt)) send_badsig(prt); else { - + ErtsSchedulerData *esdp = erts_get_scheduler_data(); ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt) || ERTS_IS_CRASH_DUMPING); @@ -1794,7 +1793,10 @@ call_driver_output(int bang_op, prt->caller = NIL; prt->bytes_out += size; - erts_smp_atomic_add_nob(&erts_bytes_out, size); + if (esdp) + esdp->io.out += (Uint64) size; + else + erts_atomic64_add_nob(&bytes_out, (erts_aint64_t) size); } } @@ -2741,6 +2743,9 @@ void erts_init_io(int port_tab_size, drv_list_rwmtx_opts.type = ERTS_SMP_RWMTX_TYPE_EXTREMELY_FREQUENT_READ; drv_list_rwmtx_opts.lived = ERTS_SMP_RWMTX_LONG_LIVED; + erts_atomic64_init_nob(&bytes_in, 0); + erts_atomic64_init_nob(&bytes_out, 0); + common_element_size = ERTS_ALC_DATA_ALIGN_SIZE(sizeof(Port)); common_element_size += ERTS_ALC_DATA_ALIGN_SIZE(sizeof(ErtsPortTaskBusyPortQ)); common_element_size += 10; /* name */ @@ -2782,9 +2787,6 @@ void erts_init_io(int port_tab_size, legacy_port_tab, 1); - erts_smp_atomic_init_nob(&erts_bytes_out, 0); - erts_smp_atomic_init_nob(&erts_bytes_in, 0); - sys_init_io(); erts_smp_tsd_set(driver_list_lock_status_key, (void *) 1); @@ -2804,7 +2806,6 @@ void erts_init_io(int port_tab_size, } #if defined(ERTS_ENABLE_LOCK_COUNT) && defined(ERTS_SMP) - static ERTS_INLINE void lcnt_enable_drv_lock_count(erts_driver_t *dp, int enable) { if (dp->lock) { @@ -2844,25 +2845,26 @@ static ERTS_INLINE void lcnt_enable_port_lock_count(Port *prt, int enable) } } -void erts_lcnt_enable_io_lock_count(int enable) -{ +void erts_lcnt_enable_io_lock_count(int enable) { erts_driver_t *dp; - int i, max = erts_ptab_max(&erts_port); + int ix, max = erts_ptab_max(&erts_port); + Port *prt; - for (i = 0; i < max; i++) { - Port *prt = erts_pix2port(i); - if (prt) + for (ix = 0; ix < max; ix++) { + if ((prt = erts_pix2port(ix)) != NULL) { lcnt_enable_port_lock_count(prt, enable); - } + } + } /* for all ports */ lcnt_enable_drv_lock_count(&vanilla_driver, enable); lcnt_enable_drv_lock_count(&spawn_driver, enable); lcnt_enable_drv_lock_count(&fd_driver, enable); - for (dp = driver_list; dp; dp = dp->next) + /* enable lock counting in all drivers */ + for (dp = driver_list; dp; dp = dp->next) { lcnt_enable_drv_lock_count(dp, enable); -} -#endif - + } +} /* enable/disable lock counting of ports */ +#endif /* defined(ERTS_ENABLE_LOCK_COUNT) && defined(ERTS_SMP) */ /* * Buffering of data when using line oriented I/O on ports */ @@ -4574,6 +4576,102 @@ erts_port_info(Process* c_p, } typedef struct { + Uint sched_id; + Eterm pid; + Uint32 refn[ERTS_REF_NUMBERS]; + erts_smp_atomic32_t refc; +} ErtsIOBytesReq; + +static void +reply_io_bytes(void *vreq) +{ + ErtsIOBytesReq *req = (ErtsIOBytesReq *) vreq; + Process *rp; + + rp = erts_proc_lookup(req->pid); + if (rp) { + ErlOffHeap *ohp = NULL; + ErlHeapFragment *bp = NULL; + ErtsProcLocks rp_locks; + Eterm ref, msg, ein, eout, *hp; + Uint64 in, out; + Uint hsz; + ErtsSchedulerData *esdp = erts_get_scheduler_data(); + Uint sched_id = esdp->no; + in = esdp->io.in; + out = esdp->io.out; + if (req->sched_id != sched_id) + rp_locks = 0; + else { + in += (Uint64) erts_atomic64_read_nob(&bytes_in); + out += (Uint64) erts_atomic64_read_nob(&bytes_out); + rp_locks = ERTS_PROC_LOCK_MAIN; + } + + hsz = 5 /* 4-tuple */ + REF_THING_SIZE; + + erts_bld_uint64(NULL, &hsz, in); + erts_bld_uint64(NULL, &hsz, out); + + hp = erts_alloc_message_heap(hsz, &bp, &ohp, rp, &rp_locks); + + ref = make_internal_ref(hp); + write_ref_thing(hp, req->refn[0], req->refn[1], req->refn[2]); + hp += REF_THING_SIZE; + + ein = erts_bld_uint64(&hp, NULL, in); + eout = erts_bld_uint64(&hp, NULL, out); + + msg = TUPLE4(hp, ref, make_small(sched_id), ein, eout); + erts_queue_message(rp, &rp_locks, bp, msg, NIL); + + if (req->sched_id == sched_id) + rp_locks &= ~ERTS_PROC_LOCK_MAIN; + if (rp_locks) + erts_smp_proc_unlock(rp, rp_locks); + } + + if (erts_smp_atomic32_dec_read_nob(&req->refc) == 0) + erts_free(ERTS_ALC_T_IOB_REQ, req); +} + +Eterm +erts_request_io_bytes(Process *c_p) +{ + Uint *hp; + Eterm ref; + Uint32 *refn; + ErtsSchedulerData *esdp = ERTS_PROC_GET_SCHDATA(c_p); + ErtsIOBytesReq *req = erts_alloc(ERTS_ALC_T_IOB_REQ, + sizeof(ErtsIOBytesReq)); + + hp = HAlloc(c_p, REF_THING_SIZE); + ref = erts_sched_make_ref_in_buffer(esdp, hp); + refn = internal_ref_numbers(ref); + + req->sched_id = esdp->no; + req->pid = c_p->common.id; + req->refn[0] = refn[0]; + req->refn[1] = refn[1]; + req->refn[2] = refn[2]; + erts_smp_atomic32_init_nob(&req->refc, + (erts_aint32_t) erts_no_schedulers); + +#ifdef ERTS_SMP + if (erts_no_schedulers > 1) + erts_schedule_multi_misc_aux_work(1, + erts_no_schedulers, + reply_io_bytes, + (void *) req); +#endif + + reply_io_bytes((void *) req); + + return ref; +} + + +typedef struct { int to; void *arg; } prt_one_lnk_data; @@ -5111,6 +5209,7 @@ driver_deliver_term(Eterm to, ErlDrvTermData* data, int len) struct b2t_states__ b2t; int scheduler; int is_heap_need_limited = 1; + ErtsSchedulerData *esdp = erts_get_scheduler_data(); ERTS_UNDEF(mess,NIL); ERTS_UNDEF(scheduler,1); @@ -5418,7 +5517,10 @@ driver_deliver_term(Eterm to, ErlDrvTermData* data, int len) Uint size = ptr[1]; Uint offset = ptr[2]; - erts_smp_atomic_add_nob(&erts_bytes_in, (erts_aint_t) size); + if (esdp) + esdp->io.in += (Uint64) size; + else + erts_atomic64_add_nob(&bytes_in, (erts_aint64_t) size); if (size <= ERL_ONHEAP_BIN_LIMIT) { ErlHeapBin* hbp = (ErlHeapBin *) erts_produce_heap(&factory, @@ -5452,7 +5554,10 @@ driver_deliver_term(Eterm to, ErlDrvTermData* data, int len) byte *bufp = (byte *) ptr[0]; Uint size = (Uint) ptr[1]; - erts_smp_atomic_add_nob(&erts_bytes_in, (erts_aint_t) size); + if (esdp) + esdp->io.in += (Uint64) size; + else + erts_atomic64_add_nob(&bytes_in, (erts_aint64_t) size); if (size <= ERL_ONHEAP_BIN_LIMIT) { ErlHeapBin* hbp = (ErlHeapBin *) erts_produce_heap(&factory, @@ -5489,7 +5594,10 @@ driver_deliver_term(Eterm to, ErlDrvTermData* data, int len) } case ERL_DRV_STRING: /* char*, length */ - erts_smp_atomic_add_nob(&erts_bytes_in, (erts_aint_t) ptr[1]); + if (esdp) + esdp->io.in += (Uint64) ptr[1]; + else + erts_atomic64_add_nob(&bytes_in, (erts_aint64_t) ptr[1]); erts_reserve_heap(&factory, 2*ptr[1]); mess = buf_to_intlist(&factory.hp, (char*)ptr[0], ptr[1], NIL); ptr += 2; @@ -5765,6 +5873,7 @@ int driver_output_binary(ErlDrvPort ix, char* hbuf, ErlDrvSizeT hlen, { erts_aint32_t state; Port* prt = erts_drvport2port_state(ix, &state); + ErtsSchedulerData *esdp = erts_get_scheduler_data(); ERTS_SMP_CHK_NO_PROC_LOCKS; @@ -5775,7 +5884,10 @@ int driver_output_binary(ErlDrvPort ix, char* hbuf, ErlDrvSizeT hlen, return 0; prt->bytes_in += (hlen + len); - erts_smp_atomic_add_nob(&erts_bytes_in, (erts_aint_t) (hlen + len)); + if (esdp) + esdp->io.in += (Uint64) (hlen + len); + else + erts_atomic64_add_nob(&bytes_in, (erts_aint64_t) (hlen + len)); if (state & ERTS_PORT_SFLG_DISTRIBUTION) { return erts_net_message(prt, prt->dist_entry, @@ -5800,6 +5912,7 @@ int driver_output2(ErlDrvPort ix, char* hbuf, ErlDrvSizeT hlen, { erts_aint32_t state; Port* prt = erts_drvport2port_state(ix, &state); + ErtsSchedulerData *esdp = erts_get_scheduler_data(); ERTS_SMP_CHK_NO_PROC_LOCKS; @@ -5811,7 +5924,10 @@ int driver_output2(ErlDrvPort ix, char* hbuf, ErlDrvSizeT hlen, return 0; prt->bytes_in += (hlen + len); - erts_smp_atomic_add_nob(&erts_bytes_in, (erts_aint_t) (hlen + len)); + if (esdp) + esdp->io.in += (Uint64) (hlen + len); + else + erts_atomic64_add_nob(&bytes_in, (erts_aint64_t) (hlen + len)); if (state & ERTS_PORT_SFLG_DISTRIBUTION) { if (len == 0) return erts_net_message(prt, @@ -5851,6 +5967,7 @@ int driver_outputv(ErlDrvPort ix, char* hbuf, ErlDrvSizeT hlen, ErlDrvBinary** binv; Port* prt; erts_aint32_t state; + ErtsSchedulerData *esdp = erts_get_scheduler_data(); ERTS_SMP_CHK_NO_PROC_LOCKS; @@ -5889,7 +6006,10 @@ int driver_outputv(ErlDrvPort ix, char* hbuf, ErlDrvSizeT hlen, /* XXX handle distribution !!! */ prt->bytes_in += (hlen + size); - erts_smp_atomic_add_nob(&erts_bytes_in, (erts_aint_t) (hlen + size)); + if (esdp) + esdp->io.in += (Uint64) (hlen + size); + else + erts_atomic64_add_nob(&bytes_in, (erts_aint64_t) (hlen + size)); deliver_vec_message(prt, ERTS_PORT_GET_CONNECTED(prt), hbuf, hlen, binv, iov, n, size); return 0; diff --git a/erts/emulator/beam/sys.h b/erts/emulator/beam/sys.h index cd53069872..78eb0bfe2b 100644 --- a/erts/emulator/beam/sys.h +++ b/erts/emulator/beam/sys.h @@ -20,6 +20,22 @@ #ifndef __SYS_H__ #define __SYS_H__ +#ifdef ERTS_INLINE +# ifndef ERTS_CAN_INLINE +# define ERTS_CAN_INLINE 1 +# endif +#else +# if defined(__GNUC__) +# define ERTS_CAN_INLINE 1 +# define ERTS_INLINE __inline__ +# elif defined(__WIN32__) +# define ERTS_CAN_INLINE 1 +# define ERTS_INLINE __inline +# else +# define ERTS_CAN_INLINE 0 +# define ERTS_INLINE +# endif +#endif #if defined(DEBUG) || defined(ERTS_ENABLE_LOCK_CHECK) # undef ERTS_CAN_INLINE @@ -94,23 +110,6 @@ typedef int ErtsSysFdType; typedef ERTS_SYS_FD_TYPE ErtsSysFdType; #endif -#ifdef ERTS_INLINE -# ifndef ERTS_CAN_INLINE -# define ERTS_CAN_INLINE 1 -# endif -#else -# if defined(__GNUC__) -# define ERTS_CAN_INLINE 1 -# define ERTS_INLINE __inline__ -# elif defined(__WIN32__) -# define ERTS_CAN_INLINE 1 -# define ERTS_INLINE __inline -# else -# define ERTS_CAN_INLINE 0 -# define ERTS_INLINE -# endif -#endif - #if !defined(__GNUC__) # define ERTS_AT_LEAST_GCC_VSN__(MAJ, MIN, PL) 0 #elif !defined(__GNUC_MINOR__) diff --git a/erts/emulator/drivers/common/inet_drv.c b/erts/emulator/drivers/common/inet_drv.c index 10ef20fc82..119d6c097d 100644 --- a/erts/emulator/drivers/common/inet_drv.c +++ b/erts/emulator/drivers/common/inet_drv.c @@ -835,6 +835,10 @@ static int my_strncasecmp(const char *s1, const char *s2, size_t n) #define TCP_ADDF_PENDING_SHUT_RDWR 32 /* Call shutdown(sock, SHUT_RDWR) when queue empties */ #define TCP_ADDF_PENDING_SHUTDOWN \ (TCP_ADDF_PENDING_SHUT_WR | TCP_ADDF_PENDING_SHUT_RDWR) +#define TCP_ADDF_SHOW_ECONNRESET 64 /* Tell user about incoming RST */ +#define TCP_ADDF_DELAYED_ECONNRESET 128 /* An ECONNRESET error occured on send or shutdown */ +#define TCP_ADDF_SHUTDOWN_WR_DONE 256 /* A shutdown(sock, SHUT_WR) or SHUT_RDWR was made */ +#define TCP_ADDF_LINGER_ZERO 512 /* Discard driver queue on port close */ /* *_REQ_* replies */ #define INET_REP_ERROR 0 @@ -879,6 +883,7 @@ static int my_strncasecmp(const char *s1, const char *s2, size_t n) #define INET_LOPT_MSGQ_HIWTRMRK 36 /* set local msgq high watermark */ #define INET_LOPT_MSGQ_LOWTRMRK 37 /* set local msgq low watermark */ #define INET_LOPT_NETNS 38 /* Network namespace pathname */ +#define INET_LOPT_TCP_SHOW_ECONNRESET 39 /* tell user about incoming RST */ /* SCTP options: a separate range, from 100: */ #define SCTP_OPT_RTOINFO 100 #define SCTP_OPT_ASSOCINFO 101 @@ -6138,7 +6143,12 @@ static int inet_set_opts(inet_descriptor* desc, char* ptr, int len) desc->active_count = 0; if ((desc->stype == SOCK_STREAM) && (desc->active != INET_PASSIVE) && (desc->state == INET_STATE_CLOSED)) { - tcp_closed_message((tcp_descriptor *) desc); + tcp_descriptor *tdesc = (tcp_descriptor *) desc; + if (tdesc->tcp_add_flags & TCP_ADDF_DELAYED_ECONNRESET) { + tdesc->tcp_add_flags &= ~TCP_ADDF_DELAYED_ECONNRESET; + tcp_error_message(tdesc, ECONNRESET); + } + tcp_closed_message(tdesc); if (desc->exitf) { driver_exit(desc->port, 0); return 0; /* Give up on this socket, descriptor lost */ @@ -6255,6 +6265,16 @@ static int inet_set_opts(inet_descriptor* desc, char* ptr, int len) continue; #endif + case INET_LOPT_TCP_SHOW_ECONNRESET: + if (desc->sprotocol == IPPROTO_TCP) { + tcp_descriptor* tdesc = (tcp_descriptor*) desc; + if (ival) + tdesc->tcp_add_flags |= TCP_ADDF_SHOW_ECONNRESET; + else + tdesc->tcp_add_flags &= ~TCP_ADDF_SHOW_ECONNRESET; + } + continue; + case INET_OPT_REUSEADDR: #ifdef __WIN32__ continue; /* Bjorn says */ @@ -6299,6 +6319,13 @@ static int inet_set_opts(inet_descriptor* desc, char* ptr, int len) arg_sz = sizeof(li_val); DEBUGF(("inet_set_opts(%ld): s=%d, SO_LINGER=%d,%d", (long)desc->port, desc->s, li_val.l_onoff,li_val.l_linger)); + if (desc->sprotocol == IPPROTO_TCP) { + tcp_descriptor* tdesc = (tcp_descriptor*) desc; + if (li_val.l_onoff && li_val.l_linger == 0) + tdesc->tcp_add_flags |= TCP_ADDF_LINGER_ZERO; + else + tdesc->tcp_add_flags &= ~TCP_ADDF_LINGER_ZERO; + } break; case INET_OPT_PRIORITY: @@ -7234,6 +7261,17 @@ static ErlDrvSSizeT inet_fill_opts(inet_descriptor* desc, continue; #endif + case INET_LOPT_TCP_SHOW_ECONNRESET: + if (desc->sprotocol == IPPROTO_TCP) { + tcp_descriptor* tdesc = (tcp_descriptor*) desc; + *ptr++ = opt; + ival = !!(tdesc->tcp_add_flags & TCP_ADDF_SHOW_ECONNRESET); + put_int32(ival, ptr); + } else { + TRUNCATE_TO(0,ptr); + } + continue; + case INET_OPT_PRIORITY: #ifdef SO_PRIORITY type = SO_PRIORITY; @@ -9077,6 +9115,11 @@ static tcp_descriptor* tcp_inet_copy(tcp_descriptor* desc,SOCKET s, copy_desc->low = desc->low; copy_desc->send_timeout = desc->send_timeout; copy_desc->send_timeout_close = desc->send_timeout_close; + + if (desc->tcp_add_flags & TCP_ADDF_SHOW_ECONNRESET) + copy_desc->tcp_add_flags |= TCP_ADDF_SHOW_ECONNRESET; + else + copy_desc->tcp_add_flags &= ~TCP_ADDF_SHOW_ECONNRESET; /* The new port will be linked and connected to the original caller */ port = driver_create_port(port, owner, "tcp_inet", (ErlDrvData) copy_desc); @@ -9438,7 +9481,11 @@ static ErlDrvSSizeT tcp_inet_ctl(ErlDrvData e, unsigned int cmd, if (desc->tcp_add_flags & TCP_ADDF_DELAYED_CLOSE_RECV) { desc->tcp_add_flags &= ~(TCP_ADDF_DELAYED_CLOSE_RECV| TCP_ADDF_DELAYED_CLOSE_SEND); - return ctl_reply(INET_REP_ERROR, "closed", 6, rbuf, rsize); + if (desc->tcp_add_flags & TCP_ADDF_DELAYED_ECONNRESET) { + desc->tcp_add_flags &= ~TCP_ADDF_DELAYED_ECONNRESET; + return ctl_reply(INET_REP_ERROR, "econnreset", 10, rbuf, rsize); + } else + return ctl_reply(INET_REP_ERROR, "closed", 6, rbuf, rsize); } return ctl_error(ENOTCONN, rbuf, rsize); } @@ -9499,6 +9546,8 @@ static ErlDrvSSizeT tcp_inet_ctl(ErlDrvData e, unsigned int cmd, return ctl_reply(INET_REP_OK, NULL, 0, rbuf, rsize); } if (IS_SOCKET_ERROR(sock_shutdown(INETP(desc)->s, how))) { + if (how != TCP_SHUT_RD) + desc->tcp_add_flags |= TCP_ADDF_SHUTDOWN_WR_DONE; return ctl_error(sock_errno(), rbuf, rsize); } else { return ctl_reply(INET_REP_OK, NULL, 0, rbuf, rsize); @@ -9633,7 +9682,13 @@ static void tcp_inet_commandv(ErlDrvData e, ErlIOVec* ev) if (!IS_CONNECTED(INETP(desc))) { if (desc->tcp_add_flags & TCP_ADDF_DELAYED_CLOSE_SEND) { desc->tcp_add_flags &= ~TCP_ADDF_DELAYED_CLOSE_SEND; - inet_reply_error_am(INETP(desc), am_closed); + if (desc->tcp_add_flags & TCP_ADDF_DELAYED_ECONNRESET) { + /* Don't clear flag. Leave it enabled for the next receive + * operation. + */ + inet_reply_error(INETP(desc), ECONNRESET); + } else + inet_reply_error_am(INETP(desc), am_closed); } else inet_reply_error(INETP(desc), ENOTCONN); @@ -9652,6 +9707,8 @@ static void tcp_inet_flush(ErlDrvData e) /* Discard send queue to avoid hanging port (OTP-7615) */ tcp_clear_output(desc); } + if (desc->tcp_add_flags & TCP_ADDF_LINGER_ZERO) + tcp_clear_output(desc); } static void tcp_inet_process_exit(ErlDrvData e, ErlDrvMonitor *monitorp) @@ -10014,7 +10071,10 @@ static int tcp_recv(tcp_descriptor* desc, int request_len) int err = sock_errno(); if (err == ECONNRESET) { DEBUGF((" => detected close (connreset)\r\n")); - return tcp_recv_closed(desc); + if (desc->tcp_add_flags & TCP_ADDF_SHOW_ECONNRESET) + return tcp_recv_error(desc, err); + else + return tcp_recv_closed(desc); } if (err == ERRNO_BLOCK) { DEBUGF((" => would block\r\n")); @@ -10226,7 +10286,19 @@ static void tcp_inet_event(ErlDrvData e, ErlDrvEvent event) if (netEv.lNetworkEvents & FD_CLOSE) { /* error in err = netEv.iErrorCode[FD_CLOSE_BIT] */ DEBUGF(("Detected close in %s, line %d\r\n", __FILE__, __LINE__)); - tcp_recv_closed(desc); + if (desc->tcp_add_flags & TCP_ADDF_SHOW_ECONNRESET) { + err = netEv.iErrorCode[FD_CLOSE_BIT]; + if (err == ECONNRESET) + tcp_recv_error(desc, err); + else if (err == ECONNABORTED && IS_CONNECTED(INETP(desc))) { + /* translate this error to ECONNRESET */ + tcp_recv_error(desc, ECONNRESET); + } + else + tcp_recv_closed(desc); + } + else + tcp_recv_closed(desc); } DEBUGF(("tcp_inet_event(%ld) }\r\n", (long)desc->inet.port)); return; @@ -10535,6 +10607,9 @@ static int tcp_inet_input(tcp_descriptor* desc, HANDLE event) static int tcp_send_or_shutdown_error(tcp_descriptor* desc, int err) { + int show_econnreset = (err == ECONNRESET + && desc->tcp_add_flags & TCP_ADDF_SHOW_ECONNRESET); + /* * If the port is busy, we must do some clean-up before proceeding. */ @@ -10550,14 +10625,21 @@ static int tcp_send_or_shutdown_error(tcp_descriptor* desc, int err) /* * We used to handle "expected errors" differently from unexpected ones. - * Now we handle all errors in the same way. We just have to distinguish - * between passive and active sockets. + * Now we handle all errors in the same way (unless the show_econnreset + * socket option is enabled). We just have to distinguish between passive + * and active sockets. */ DEBUGF(("driver_failure_eof(%ld) in %s, line %d\r\n", (long)desc->inet.port, __FILE__, __LINE__)); if (desc->inet.active) { - tcp_closed_message(desc); - inet_reply_error_am(INETP(desc), am_closed); + if (show_econnreset) { + tcp_error_message(desc, err); + tcp_closed_message(desc); + inet_reply_error(INETP(desc), err); + } else { + tcp_closed_message(desc); + inet_reply_error_am(INETP(desc), am_closed); + } if (desc->inet.exitf) driver_exit(desc->inet.port, 0); else @@ -10569,7 +10651,10 @@ static int tcp_send_or_shutdown_error(tcp_descriptor* desc, int err) erl_inet_close(INETP(desc)); if (desc->inet.caller) { - inet_reply_error_am(INETP(desc), am_closed); + if (show_econnreset) + inet_reply_error(INETP(desc), err); + else + inet_reply_error_am(INETP(desc), am_closed); } else { /* No blocking send op to reply to right now. @@ -10586,12 +10671,46 @@ static int tcp_send_or_shutdown_error(tcp_descriptor* desc, int err) * in the receive operation. */ desc->tcp_add_flags |= TCP_ADDF_DELAYED_CLOSE_RECV; + + if (show_econnreset) { + /* Return {error, econnreset} instead of {error, closed} + * on send or receive operations. + */ + desc->tcp_add_flags |= TCP_ADDF_DELAYED_ECONNRESET; + } } return -1; } static int tcp_send_error(tcp_descriptor* desc, int err) { + /* EPIPE errors usually occur in one of three ways: + * 1. We write to a socket when we've already shutdown() the write side. On + * Windows the error returned for this is ESHUTDOWN rather than EPIPE. + * 2. The TCP peer sends us an RST through no fault of our own (perhaps + * by aborting the connection using SO_LINGER) and we then attempt + * to write to the socket. On Linux and Windows we would actually + * receive an ECONNRESET error for this, but on the BSDs, Darwin, + * Illumos and presumably Solaris, it's an EPIPE. + * 3. We cause the TCP peer to send us an RST by writing to a socket + * after we receive a FIN from them. Our first write will be + * successful, but if the they have closed the connection (rather + * than just shutting down the write side of it) this will cause their + * OS to send us an RST. Then, when we attempt to write to the socket + * a second time, we will get an EPIPE error. On Windows we get an + * ECONNABORTED. + * + * What we are going to do here is to treat all EPIPE messages that aren't + * of type 1 as ECONNRESET errors. This will allow users who have the + * show_econnreset socket option enabled to receive {error, econnreset} on + * both send and recv operations to indicate that an RST has been received. + */ +#ifdef __WIN_32__ + if (err == ECONNABORTED) + err = ECONNRESET; +#endif + if (err == EPIPE && !(desc->tcp_add_flags & TCP_ADDF_SHUTDOWN_WR_DONE)) + err = ECONNRESET; return tcp_send_or_shutdown_error(desc, err); } @@ -10811,6 +10930,8 @@ static void tcp_shutdown_async(tcp_descriptor* desc) TCP_SHUT_WR : TCP_SHUT_RDWR; if (IS_SOCKET_ERROR(sock_shutdown(INETP(desc)->s, how))) tcp_shutdown_error(desc, sock_errno()); + else + desc->tcp_add_flags |= TCP_ADDF_SHUTDOWN_WR_DONE; } #ifdef __OSE__ diff --git a/erts/preloaded/ebin/erts_internal.beam b/erts/preloaded/ebin/erts_internal.beam Binary files differindex 0e0811af3f..dc8c711e1a 100644 --- a/erts/preloaded/ebin/erts_internal.beam +++ b/erts/preloaded/ebin/erts_internal.beam diff --git a/erts/preloaded/ebin/prim_inet.beam b/erts/preloaded/ebin/prim_inet.beam Binary files differindex 6729f06b79..5a188be3ba 100644 --- a/erts/preloaded/ebin/prim_inet.beam +++ b/erts/preloaded/ebin/prim_inet.beam diff --git a/erts/preloaded/src/erts_internal.erl b/erts/preloaded/src/erts_internal.erl index 65a1f1ed3a..cf8edefd7d 100644 --- a/erts/preloaded/src/erts_internal.erl +++ b/erts/preloaded/src/erts_internal.erl @@ -40,7 +40,7 @@ -export([flush_monitor_messages/3]). --export([await_result/1]). +-export([await_result/1, gather_io_bytes/2]). -export([time_unit/0]). @@ -67,6 +67,23 @@ await_result(Ref) when is_reference(Ref) -> end. %% +%% statistics(io) end up in gather_io_bytes/2 +%% + +gather_io_bytes(Ref, No) when is_reference(Ref), + is_integer(No), + No > 0 -> + gather_io_bytes(Ref, No, 0, 0). + +gather_io_bytes(_Ref, 0, InAcc, OutAcc) -> + {{input, InAcc}, {output, OutAcc}}; +gather_io_bytes(Ref, No, InAcc, OutAcc) -> + receive + {Ref, _SchedId, In, Out} -> + gather_io_bytes(Ref, No-1, InAcc + In, OutAcc + Out) + end. + +%% %% Statically linked port NIFs %% diff --git a/erts/preloaded/src/prim_inet.erl b/erts/preloaded/src/prim_inet.erl index 622e1be869..5e0b38aa68 100644 --- a/erts/preloaded/src/prim_inet.erl +++ b/erts/preloaded/src/prim_inet.erl @@ -146,11 +146,16 @@ shutdown_1(S, How) -> %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% close(S) when is_port(S) -> - case subscribe(S, [subs_empty_out_q]) of - {ok, [{subs_empty_out_q,N}]} when N > 0 -> - close_pend_loop(S, N); %% wait for pending output to be sent + case getopt(S, linger) of + {ok,{true,0}} -> + close_port(S); _ -> - close_port(S) + case subscribe(S, [subs_empty_out_q]) of + {ok, [{subs_empty_out_q,N}]} when N > 0 -> + close_pend_loop(S, N); %% wait for pending output to be sent + _ -> + close_port(S) + end end. close_pend_loop(S, N) -> @@ -1140,6 +1145,7 @@ enc_opt(delay_send) -> ?INET_LOPT_TCP_DELAY_SEND; enc_opt(packet_size) -> ?INET_LOPT_PACKET_SIZE; enc_opt(read_packets) -> ?INET_LOPT_READ_PACKETS; enc_opt(netns) -> ?INET_LOPT_NETNS; +enc_opt(show_econnreset) -> ?INET_LOPT_TCP_SHOW_ECONNRESET; enc_opt(raw) -> ?INET_OPT_RAW; % Names of SCTP opts: enc_opt(sctp_rtoinfo) -> ?SCTP_OPT_RTOINFO; @@ -1197,6 +1203,7 @@ dec_opt(?INET_LOPT_TCP_DELAY_SEND) -> delay_send; dec_opt(?INET_LOPT_PACKET_SIZE) -> packet_size; dec_opt(?INET_LOPT_READ_PACKETS) -> read_packets; dec_opt(?INET_LOPT_NETNS) -> netns; +dec_opt(?INET_LOPT_TCP_SHOW_ECONNRESET) -> show_econnreset; dec_opt(?INET_OPT_RAW) -> raw; dec_opt(I) when is_integer(I) -> undefined. @@ -1296,6 +1303,7 @@ type_opt_1(delay_send) -> bool; type_opt_1(packet_size) -> uint; type_opt_1(read_packets) -> uint; type_opt_1(netns) -> binary; +type_opt_1(show_econnreset) -> bool; %% %% SCTP options (to be set). If the type is a record type, the corresponding %% record signature is returned, otherwise, an "elementary" type tag |