aboutsummaryrefslogtreecommitdiffstats
path: root/erts
diff options
context:
space:
mode:
Diffstat (limited to 'erts')
-rw-r--r--erts/doc/src/erl_nif.xml2
-rw-r--r--erts/emulator/beam/atom.names1
-rw-r--r--erts/emulator/beam/dist.c5
-rw-r--r--erts/emulator/beam/erl_alloc.types1
-rw-r--r--erts/emulator/beam/erl_bif_info.c103
-rw-r--r--erts/emulator/beam/erl_lock_count.c348
-rw-r--r--erts/emulator/beam/erl_lock_count.h42
-rw-r--r--erts/emulator/beam/erl_port.h3
-rw-r--r--erts/emulator/beam/erl_process.c3
-rw-r--r--erts/emulator/beam/erl_process.h5
-rw-r--r--erts/emulator/beam/erl_process_lock.c186
-rw-r--r--erts/emulator/beam/io.c208
-rw-r--r--erts/emulator/beam/sys.h33
-rw-r--r--erts/emulator/drivers/common/inet_drv.c141
-rw-r--r--erts/preloaded/ebin/erts_internal.beambin5380 -> 5964 bytes
-rw-r--r--erts/preloaded/ebin/prim_inet.beambin73092 -> 72748 bytes
-rw-r--r--erts/preloaded/src/erts_internal.erl19
-rw-r--r--erts/preloaded/src/prim_inet.erl16
18 files changed, 658 insertions, 458 deletions
diff --git a/erts/doc/src/erl_nif.xml b/erts/doc/src/erl_nif.xml
index f64381c99d..412c0e02ac 100644
--- a/erts/doc/src/erl_nif.xml
+++ b/erts/doc/src/erl_nif.xml
@@ -1177,7 +1177,7 @@ typedef enum {
<code type="none">
ERL_NIF_TERM key, value;
ErlNifMapIterator iter;
-enif_map_iterator_create(env, my_map, ERL_NIF_MAP_ITERATOR_FIRST);
+enif_map_iterator_create(env, my_map, &amp;iter, ERL_NIF_MAP_ITERATOR_FIRST);
while (enif_map_iterator_get_pair(env, &amp;iter, &amp;key, &amp;value)) {
do_something(key,value);
diff --git a/erts/emulator/beam/atom.names b/erts/emulator/beam/atom.names
index 74b42c647e..0cf21b4635 100644
--- a/erts/emulator/beam/atom.names
+++ b/erts/emulator/beam/atom.names
@@ -265,6 +265,7 @@ atom get_seq_token
atom get_tcw
atom getenv
atom gather_gc_info_result
+atom gather_io_bytes
atom gather_sched_wall_time_result
atom getting_linked
atom getting_unlinked
diff --git a/erts/emulator/beam/dist.c b/erts/emulator/beam/dist.c
index ae46174a14..1354cee267 100644
--- a/erts/emulator/beam/dist.c
+++ b/erts/emulator/beam/dist.c
@@ -2088,6 +2088,7 @@ erts_dist_command(Port *prt, int reds_limit)
DistEntry *dep = prt->dist_entry;
Uint (*send)(Port *prt, ErtsDistOutputBuf *obuf);
erts_aint32_t sched_flags;
+ ErtsSchedulerData *esdp = erts_get_scheduler_data();
ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt));
@@ -2142,12 +2143,12 @@ erts_dist_command(Port *prt, int reds_limit)
ErtsDistOutputBuf *fob;
size = (*send)(prt, foq.first);
+ esdp->io.out += (Uint64) size;
#ifdef ERTS_RAW_DIST_MSG_DBG
erts_fprintf(stderr, ">> ");
bw(foq.first->extp, size);
#endif
reds += ERTS_PORT_REDS_DIST_CMD_DATA(size);
- erts_smp_atomic_add_nob(&erts_bytes_out, size);
fob = foq.first;
obufsize += size_obuf(fob);
foq.first = foq.first->next;
@@ -2227,12 +2228,12 @@ erts_dist_command(Port *prt, int reds_limit)
ASSERT(&oq.first->data[0] <= oq.first->extp
&& oq.first->extp < oq.first->ext_endp);
size = (*send)(prt, oq.first);
+ esdp->io.out += (Uint64) size;
#ifdef ERTS_RAW_DIST_MSG_DBG
erts_fprintf(stderr, ">> ");
bw(oq.first->extp, size);
#endif
reds += ERTS_PORT_REDS_DIST_CMD_DATA(size);
- erts_smp_atomic_add_nob(&erts_bytes_out, size);
fob = oq.first;
obufsize += size_obuf(fob);
oq.first = oq.first->next;
diff --git a/erts/emulator/beam/erl_alloc.types b/erts/emulator/beam/erl_alloc.types
index 2721e13250..92c397ffd3 100644
--- a/erts/emulator/beam/erl_alloc.types
+++ b/erts/emulator/beam/erl_alloc.types
@@ -274,6 +274,7 @@ type BUSY_CALLER SHORT_LIVED SYSTEM busy_caller
type PROC_SYS_TSK SHORT_LIVED PROCESSES proc_sys_task
type PROC_SYS_TSK_QS SHORT_LIVED PROCESSES proc_sys_task_queues
type NEW_TIME_OFFSET SHORT_LIVED SYSTEM new_time_offset
+type IOB_REQ SHORT_LIVED SYSTEM io_bytes_request
+if threads_no_smp
# Need thread safe allocs, but std_alloc and fix_alloc are not;
diff --git a/erts/emulator/beam/erl_bif_info.c b/erts/emulator/beam/erl_bif_info.c
index 397c68e199..7eb31fb80e 100644
--- a/erts/emulator/beam/erl_bif_info.c
+++ b/erts/emulator/beam/erl_bif_info.c
@@ -60,6 +60,7 @@
static Export* alloc_info_trap = NULL;
static Export* alloc_sizes_trap = NULL;
+static Export* gather_io_bytes_trap = NULL;
static Export *gather_sched_wall_time_res_trap;
static Export *gather_gc_info_res_trap;
@@ -3220,7 +3221,6 @@ BIF_RETTYPE process_display_2(BIF_ALIST_2)
BIF_RET(am_true);
}
-
/* this is a general call which return some possibly useful information */
BIF_RETTYPE statistics_1(BIF_ALIST_1)
@@ -3293,23 +3293,8 @@ BIF_RETTYPE statistics_1(BIF_ALIST_1)
res = TUPLE2(hp, b1, b2);
BIF_RET(res);
} else if (BIF_ARG_1 == am_io) {
- Eterm r1, r2;
- Eterm in, out;
- Uint hsz = 9;
- Uint bytes_in = (Uint) erts_smp_atomic_read_nob(&erts_bytes_in);
- Uint bytes_out = (Uint) erts_smp_atomic_read_nob(&erts_bytes_out);
-
- (void) erts_bld_uint(NULL, &hsz, bytes_in);
- (void) erts_bld_uint(NULL, &hsz, bytes_out);
- hp = HAlloc(BIF_P, hsz);
- in = erts_bld_uint(&hp, NULL, bytes_in);
- out = erts_bld_uint(&hp, NULL, bytes_out);
-
- r1 = TUPLE2(hp, am_input, in);
- hp += 3;
- r2 = TUPLE2(hp, am_output, out);
- hp += 3;
- BIF_RET(TUPLE2(hp, r1, r2));
+ Eterm ref = erts_request_io_bytes(BIF_P);
+ BIF_TRAP2(gather_io_bytes_trap, BIF_P, ref, make_small(erts_no_schedulers));
}
else if (ERTS_IS_ATOM_STR("run_queues", BIF_ARG_1)) {
Eterm res, *hp, **hpp;
@@ -4296,59 +4281,41 @@ BIF_RETTYPE erts_debug_lock_counters_1(BIF_ALIST_1)
BIF_RET(am_ok);
} else if (is_tuple(BIF_ARG_1)) {
- Eterm* tp = tuple_val(BIF_ARG_1);
+ Eterm* ptr = tuple_val(BIF_ARG_1);
+
+ if ((arityval(ptr[0]) == 2) && (ptr[2] == am_false || ptr[2] == am_true)) {
+ int lock_opt = 0, enable = (ptr[2] == am_true) ? 1 : 0;
+ if (ERTS_IS_ATOM_STR("copy_save", ptr[1])) {
+ lock_opt = ERTS_LCNT_OPT_COPYSAVE;
+ } else if (ERTS_IS_ATOM_STR("process_locks", ptr[1])) {
+ lock_opt = ERTS_LCNT_OPT_PROCLOCK;
+ } else if (ERTS_IS_ATOM_STR("port_locks", ptr[1])) {
+ lock_opt = ERTS_LCNT_OPT_PORTLOCK;
+ } else if (ERTS_IS_ATOM_STR("suspend", ptr[1])) {
+ lock_opt = ERTS_LCNT_OPT_SUSPEND;
+ } else if (ERTS_IS_ATOM_STR("location", ptr[1])) {
+ lock_opt = ERTS_LCNT_OPT_LOCATION;
+ } else {
+ BIF_ERROR(BIF_P, BADARG);
+ }
- switch (arityval(tp[0])) {
- case 2: {
- int opt = 0;
- int val = 0;
- if (ERTS_IS_ATOM_STR("copy_save", tp[1])) {
- opt = ERTS_LCNT_OPT_COPYSAVE;
- } else if (ERTS_IS_ATOM_STR("process_locks", tp[1])) {
- opt = ERTS_LCNT_OPT_PROCLOCK;
- } else if (ERTS_IS_ATOM_STR("port_locks", tp[1])) {
- opt = ERTS_LCNT_OPT_PORTLOCK;
- } else if (ERTS_IS_ATOM_STR("suspend", tp[1])) {
- opt = ERTS_LCNT_OPT_SUSPEND;
- } else if (ERTS_IS_ATOM_STR("location", tp[1])) {
- opt = ERTS_LCNT_OPT_LOCATION;
- } else {
- BIF_ERROR(BIF_P, BADARG);
- }
- if (tp[2] == am_true) {
- val = 1;
- } else if (tp[2] == am_false) {
- val = 0;
- } else {
- BIF_ERROR(BIF_P, BADARG);
- }
+ erts_smp_proc_unlock(BIF_P, ERTS_PROC_LOCK_MAIN);
+ erts_smp_thr_progress_block();
- erts_smp_proc_unlock(BIF_P, ERTS_PROC_LOCK_MAIN);
- erts_smp_thr_progress_block();
-
- if (val) {
- res = erts_lcnt_set_rt_opt(opt) ? am_true : am_false;
- } else {
- res = erts_lcnt_clear_rt_opt(opt) ? am_true : am_false;
- }
+ if (enable) res = erts_lcnt_set_rt_opt(lock_opt) ? am_true : am_false;
+ else res = erts_lcnt_clear_rt_opt(lock_opt) ? am_true : am_false;
+
#ifdef ERTS_SMP
- if (res != tp[2]) {
- if (opt == ERTS_LCNT_OPT_PORTLOCK) {
- erts_lcnt_enable_io_lock_count(val);
- } else if (opt == ERTS_LCNT_OPT_PROCLOCK) {
- erts_lcnt_enable_proc_lock_count(val);
- }
- }
+ if (res != ptr[2] && lock_opt == ERTS_LCNT_OPT_PORTLOCK) {
+ erts_lcnt_enable_io_lock_count(enable);
+ } else if (res != ptr[2] && lock_opt == ERTS_LCNT_OPT_PROCLOCK) {
+ erts_lcnt_enable_proc_lock_count(enable);
+ }
#endif
- erts_smp_thr_progress_unblock();
- erts_smp_proc_lock(BIF_P, ERTS_PROC_LOCK_MAIN);
- BIF_RET(res);
- break;
- }
-
- default:
- break;
- }
+ erts_smp_thr_progress_unblock();
+ erts_smp_proc_lock(BIF_P, ERTS_PROC_LOCK_MAIN);
+ BIF_RET(res);
+ }
}
#endif
@@ -4388,6 +4355,8 @@ erts_bif_info_init(void)
= erts_export_put(am_erlang, am_gather_sched_wall_time_result, 1);
gather_gc_info_res_trap
= erts_export_put(am_erlang, am_gather_gc_info_result, 1);
+ gather_io_bytes_trap
+ = erts_export_put(am_erts_internal, am_gather_io_bytes, 2);
process_info_init();
os_info_init();
}
diff --git a/erts/emulator/beam/erl_lock_count.c b/erts/emulator/beam/erl_lock_count.c
index c6d8f4df95..c4956d8569 100644
--- a/erts/emulator/beam/erl_lock_count.c
+++ b/erts/emulator/beam/erl_lock_count.c
@@ -20,7 +20,7 @@
/*
* Description: Statistics for locks.
*
- * Author: Bj�rn-Egil Dahlberg
+ * Author: Björn-Egil Dahlberg
* Date: 2008-07-03
*/
@@ -49,7 +49,7 @@ const char *str_undefined = "undefined";
static ethr_tsd_key lcnt_thr_data_key;
static int lcnt_n_thr;
-static erts_lcnt_thread_data_t *lcnt_thread_data[4096];
+static erts_lcnt_thread_data_t *lcnt_thread_data[2048];
/* local functions */
@@ -83,12 +83,12 @@ static ERTS_INLINE int lcnt_log2(Uint64 v) {
static char* lcnt_lock_type(Uint16 flag) {
switch(flag & ERTS_LCNT_LT_ALL) {
- case ERTS_LCNT_LT_SPINLOCK: return "spinlock";
- case ERTS_LCNT_LT_RWSPINLOCK: return "rw_spinlock";
- case ERTS_LCNT_LT_MUTEX: return "mutex";
- case ERTS_LCNT_LT_RWMUTEX: return "rw_mutex";
- case ERTS_LCNT_LT_PROCLOCK: return "proclock";
- default: return "";
+ case ERTS_LCNT_LT_SPINLOCK: return "spinlock";
+ case ERTS_LCNT_LT_RWSPINLOCK: return "rw_spinlock";
+ case ERTS_LCNT_LT_MUTEX: return "mutex";
+ case ERTS_LCNT_LT_RWMUTEX: return "rw_mutex";
+ case ERTS_LCNT_LT_PROCLOCK: return "proclock";
+ default: return "";
}
}
@@ -116,15 +116,15 @@ static void lcnt_time(erts_lcnt_time_t *time) {
static void lcnt_time_diff(erts_lcnt_time_t *d, erts_lcnt_time_t *t1, erts_lcnt_time_t *t0) {
long ds;
long dns;
-
+
ds = t1->s - t0->s;
dns = t1->ns - t0->ns;
-
+
/* the difference should not be able to get bigger than 1 sec in ns*/
-
+
if (dns < 0) {
- ds -= 1;
- dns += 1000000000LL;
+ ds -= 1;
+ dns += 1000000000LL;
}
ASSERT(ds >= 0);
@@ -145,7 +145,7 @@ static void lcnt_time_add(erts_lcnt_time_t *t, erts_lcnt_time_t *d) {
static erts_lcnt_thread_data_t *lcnt_thread_data_alloc(void) {
erts_lcnt_thread_data_t *eltd;
-
+
eltd = (erts_lcnt_thread_data_t*)malloc(sizeof(erts_lcnt_thread_data_t));
if (!eltd) {
ERTS_INTERNAL_ERROR("Lock counter failed to allocate memory!");
@@ -164,7 +164,6 @@ static erts_lcnt_thread_data_t *lcnt_get_thread_data(void) {
return (erts_lcnt_thread_data_t *)ethr_tsd_get(lcnt_thr_data_key);
}
-
/* debug */
#if 0
@@ -183,15 +182,15 @@ static void print_lock_x(erts_lcnt_lock_t *lock, Uint16 flag, char *action) {
type = lcnt_lock_type(lock->flag);
r_state = ethr_atomic_read(&lock->r_state);
w_state = ethr_atomic_read(&lock->w_state);
-
+
if (lock->flag & flag) {
erts_fprintf(stderr,"%10s [%24s] [r/w state %4ld/%4ld] %2s id %T\r\n",
- action,
- lock->name,
- r_state,
- w_state,
- type,
- lock->id);
+ action,
+ lock->name,
+ r_state,
+ w_state,
+ type,
+ lock->id);
}
}
#endif
@@ -201,18 +200,18 @@ static erts_lcnt_lock_stats_t *lcnt_get_lock_stats(erts_lcnt_lock_t *lock, char
erts_lcnt_lock_stats_t *stats = NULL;
if (erts_lcnt_rt_options & ERTS_LCNT_OPT_LOCATION) {
- for (i = 0; i < lock->n_stats; i++) {
- if ((lock->stats[i].file == file) && (lock->stats[i].line == line)) {
- return &(lock->stats[i]);
- }
- }
- if (lock->n_stats < ERTS_LCNT_MAX_LOCK_LOCATIONS) {
- stats = &lock->stats[lock->n_stats];
- lock->n_stats++;
- stats->file = file;
- stats->line = line;
- return stats;
- }
+ for (i = 0; i < lock->n_stats; i++) {
+ if ((lock->stats[i].file == file) && (lock->stats[i].line == line)) {
+ return &(lock->stats[i]);
+ }
+ }
+ if (lock->n_stats < ERTS_LCNT_MAX_LOCK_LOCATIONS) {
+ stats = &lock->stats[lock->n_stats];
+ lock->n_stats++;
+ stats->file = file;
+ stats->line = line;
+ return stats;
+ }
}
return &lock->stats[0];
}
@@ -222,53 +221,48 @@ static void lcnt_update_stats_hist(erts_lcnt_hist_t *hist, erts_lcnt_time_t *tim
unsigned long r;
if (time_wait->s > 0 || time_wait->ns > ERTS_LCNT_HISTOGRAM_MAX_NS) {
- idx = ERTS_LCNT_HISTOGRAM_SLOT_SIZE - 1;
+ idx = ERTS_LCNT_HISTOGRAM_SLOT_SIZE - 1;
} else {
- r = time_wait->ns >> ERTS_LCNT_HISTOGRAM_RSHIFT;
- if (r) idx = lcnt_log2(r);
- else idx = 0;
+ r = time_wait->ns >> ERTS_LCNT_HISTOGRAM_RSHIFT;
+ if (r) idx = lcnt_log2(r);
+ else idx = 0;
}
hist->ns[idx]++;
}
static void lcnt_update_stats(erts_lcnt_lock_stats_t *stats, int lock_in_conflict,
- erts_lcnt_time_t *time_wait) {
-
+ erts_lcnt_time_t *time_wait) {
+
ethr_atomic_inc(&stats->tries);
if (lock_in_conflict)
- ethr_atomic_inc(&stats->colls);
+ ethr_atomic_inc(&stats->colls);
if (time_wait) {
- lcnt_time_add(&(stats->timer), time_wait);
- stats->timer_n++;
- lcnt_update_stats_hist(&stats->hist,time_wait);
+ lcnt_time_add(&(stats->timer), time_wait);
+ stats->timer_n++;
+ lcnt_update_stats_hist(&stats->hist,time_wait);
}
}
-/*
- * interface
- */
+/* interface */
void erts_lcnt_init() {
erts_lcnt_thread_data_t *eltd = NULL;
-
+
/* init lock */
if (ethr_mutex_init(&lcnt_data_lock) != 0) abort();
/* init tsd */
lcnt_n_thr = 0;
-
ethr_tsd_key_create(&lcnt_thr_data_key,"lcnt_data");
lcnt_lock();
- erts_lcnt_rt_options = ERTS_LCNT_OPT_PROCLOCK | ERTS_LCNT_OPT_LOCATION;
-
+ erts_lcnt_rt_options = ERTS_LCNT_OPT_LOCATION | ERTS_LCNT_OPT_PROCLOCK;
eltd = lcnt_thread_data_alloc();
-
ethr_tsd_set(lcnt_thr_data_key, eltd);
-
+
/* init lcnt structure */
erts_lcnt_data = (erts_lcnt_data_t*)malloc(sizeof(erts_lcnt_data_t));
if (!erts_lcnt_data) {
@@ -293,7 +287,7 @@ void erts_lcnt_late_init() {
erts_lcnt_lock_list_t *erts_lcnt_list_init(void) {
erts_lcnt_lock_list_t *list;
-
+
list = (erts_lcnt_lock_list_t*)malloc(sizeof(erts_lcnt_lock_list_t));
if (!list) {
ERTS_INTERNAL_ERROR("Lock counter failed to allocate memory!");
@@ -307,14 +301,14 @@ erts_lcnt_lock_list_t *erts_lcnt_list_init(void) {
/* only do this on the list with the deleted locks! */
void erts_lcnt_list_clear(erts_lcnt_lock_list_t *list) {
erts_lcnt_lock_t *lock = NULL,
- *next = NULL;
+ *next = NULL;
lock = list->head;
-
+
while(lock != NULL) {
- next = lock->next;
- free(lock);
- lock = next;
+ next = lock->next;
+ free(lock);
+ lock = next;
}
list->head = NULL;
@@ -327,26 +321,25 @@ void erts_lcnt_list_insert(erts_lcnt_lock_list_t *list, erts_lcnt_lock_t *lock)
tail = list->tail;
if (tail) {
- tail->next = lock;
- lock->prev = tail;
+ tail->next = lock;
+ lock->prev = tail;
} else {
- list->head = lock;
- lock->prev = NULL;
- ASSERT(!lock->next);
+ list->head = lock;
+ lock->prev = NULL;
+ ASSERT(!lock->next);
}
lock->next = NULL;
list->tail = lock;
-
+
list->n++;
}
void erts_lcnt_list_delete(erts_lcnt_lock_list_t *list, erts_lcnt_lock_t *lock) {
-
if (lock->next) lock->next->prev = lock->prev;
if (lock->prev) lock->prev->next = lock->next;
if (list->head == lock) list->head = lock->next;
if (list->tail == lock) list->tail = lock->prev;
-
+
lock->prev = NULL;
lock->next = NULL;
list->n--;
@@ -364,12 +357,9 @@ void erts_lcnt_init_lock(erts_lcnt_lock_t *lock, char *name, Uint16 flag ) {
void erts_lcnt_init_lock_x(erts_lcnt_lock_t *lock, char *name, Uint16 flag, Eterm id) {
int i;
- if (!name) {
- lock->flag = 0;
- return;
- }
+ if (name == NULL) { ERTS_LCNT_CLEAR_FLAG(lock); return; }
lcnt_lock();
-
+
lock->next = NULL;
lock->prev = NULL;
lock->flag = flag;
@@ -378,46 +368,55 @@ void erts_lcnt_init_lock_x(erts_lcnt_lock_t *lock, char *name, Uint16 flag, Eter
ethr_atomic_init(&lock->r_state, 0);
ethr_atomic_init(&lock->w_state, 0);
-
#ifdef DEBUG
ethr_atomic_init(&lock->flowstate, 0);
#endif
-
+
lock->n_stats = 1;
for (i = 0; i < ERTS_LCNT_MAX_LOCK_LOCATIONS; i++) {
- lcnt_clear_stats(&lock->stats[i]);
+ lcnt_clear_stats(&lock->stats[i]);
}
erts_lcnt_list_insert(erts_lcnt_data->current_locks, lock);
lcnt_unlock();
}
-
+/* init empty, instead of zero struct */
+void erts_lcnt_init_lock_empty(erts_lcnt_lock_t *lock) {
+ lock->next = NULL;
+ lock->prev = NULL;
+ lock->flag = 0;
+ lock->name = NULL;
+ lock->id = NIL;
+ ethr_atomic_init(&lock->r_state, 0);
+ ethr_atomic_init(&lock->w_state, 0);
+#ifdef DEBUG
+ ethr_atomic_init(&lock->flowstate, 0);
+#endif
+ lock->n_stats = 0;
+ sys_memzero(lock->stats, sizeof(lock->stats));
+}
+/* destroy lock */
void erts_lcnt_destroy_lock(erts_lcnt_lock_t *lock) {
- erts_lcnt_lock_t *deleted_lock;
-
- if (!ERTS_LCNT_LOCK_TYPE(lock)) return;
-
+ if (ERTS_LCNT_IS_LOCK_INVALID(lock)) return;
lcnt_lock();
if (erts_lcnt_rt_options & ERTS_LCNT_OPT_COPYSAVE) {
- /* copy structure and insert the copy */
-
- deleted_lock = (erts_lcnt_lock_t*)malloc(sizeof(erts_lcnt_lock_t));
+ erts_lcnt_lock_t *deleted_lock;
+ /* copy structure and insert the copy */
+ deleted_lock = (erts_lcnt_lock_t*)malloc(sizeof(erts_lcnt_lock_t));
if (!deleted_lock) {
ERTS_INTERNAL_ERROR("Lock counter failed to allocate memory!");
}
- memcpy(deleted_lock, lock, sizeof(erts_lcnt_lock_t));
-
- deleted_lock->next = NULL;
- deleted_lock->prev = NULL;
-
- erts_lcnt_list_insert(erts_lcnt_data->deleted_locks, deleted_lock);
+ memcpy(deleted_lock, lock, sizeof(erts_lcnt_lock_t));
+ deleted_lock->next = NULL;
+ deleted_lock->prev = NULL;
+ erts_lcnt_list_insert(erts_lcnt_data->deleted_locks, deleted_lock);
}
/* delete original */
erts_lcnt_list_delete(erts_lcnt_data->current_locks, lock);
- lock->flag = 0;
-
+ ERTS_LCNT_CLEAR_FLAG(lock);
+
lcnt_unlock();
}
@@ -426,16 +425,15 @@ void erts_lcnt_destroy_lock(erts_lcnt_lock_t *lock) {
void erts_lcnt_lock_opt(erts_lcnt_lock_t *lock, Uint16 option) {
erts_aint_t r_state = 0, w_state = 0;
erts_lcnt_thread_data_t *eltd;
-
+
if (erts_lcnt_rt_options & ERTS_LCNT_OPT_SUSPEND) return;
- if (!ERTS_LCNT_LOCK_TYPE(lock)) return;
+ if (ERTS_LCNT_IS_LOCK_INVALID(lock)) return;
eltd = lcnt_get_thread_data();
-
ASSERT(eltd);
-
+
w_state = ethr_atomic_read(&lock->w_state);
-
+
if (option & ERTS_LCNT_LO_WRITE) {
r_state = ethr_atomic_read(&lock->r_state);
ethr_atomic_inc( &lock->w_state);
@@ -443,48 +441,47 @@ void erts_lcnt_lock_opt(erts_lcnt_lock_t *lock, Uint16 option) {
if (option & ERTS_LCNT_LO_READ) {
ethr_atomic_inc( &lock->r_state);
}
-
+
/* we cannot acquire w_lock if either w or r are taken */
/* we cannot acquire r_lock if w_lock is taken */
-
+
if ((w_state > 0) || (r_state > 0)) {
- eltd->lock_in_conflict = 1;
- if (eltd->timer_set == 0) {
- lcnt_time(&eltd->timer);
- }
- eltd->timer_set++;
+ eltd->lock_in_conflict = 1;
+ if (eltd->timer_set == 0) {
+ lcnt_time(&eltd->timer);
+ }
+ eltd->timer_set++;
} else {
- eltd->lock_in_conflict = 0;
+ eltd->lock_in_conflict = 0;
}
}
void erts_lcnt_lock(erts_lcnt_lock_t *lock) {
erts_aint_t w_state;
erts_lcnt_thread_data_t *eltd;
-
+
if (erts_lcnt_rt_options & ERTS_LCNT_OPT_SUSPEND) return;
- if (!ERTS_LCNT_LOCK_TYPE(lock)) return;
+ if (ERTS_LCNT_IS_LOCK_INVALID(lock)) return;
w_state = ethr_atomic_read(&lock->w_state);
ethr_atomic_inc(&lock->w_state);
-
eltd = lcnt_get_thread_data();
ASSERT(eltd);
if (w_state > 0) {
- eltd->lock_in_conflict = 1;
- /* only set the timer if nobody else has it
- * This should only happen when proc_locks aquires several locks
- * 'atomicly'. All other locks will block the thread if w_state > 0
- * i.e. locked.
- */
- if (eltd->timer_set == 0) {
- lcnt_time(&eltd->timer);
- }
- eltd->timer_set++;
+ eltd->lock_in_conflict = 1;
+ /* only set the timer if nobody else has it
+ * This should only happen when proc_locks aquires several locks
+ * 'atomicly'. All other locks will block the thread if w_state > 0
+ * i.e. locked.
+ */
+ if (eltd->timer_set == 0) {
+ lcnt_time(&eltd->timer);
+ }
+ eltd->timer_set++;
} else {
- eltd->lock_in_conflict = 0;
+ eltd->lock_in_conflict = 0;
}
}
@@ -493,16 +490,19 @@ void erts_lcnt_lock(erts_lcnt_lock_t *lock) {
void erts_lcnt_lock_unaquire(erts_lcnt_lock_t *lock) {
/* should check if this thread was "waiting" */
if (erts_lcnt_rt_options & ERTS_LCNT_OPT_SUSPEND) return;
- if (!ERTS_LCNT_LOCK_TYPE(lock)) return;
+ if (ERTS_LCNT_IS_LOCK_INVALID(lock)) return;
ethr_atomic_dec(&lock->w_state);
}
-/* erts_lcnt_lock_post
- * used when we get a lock (i.e. directly after a lock operation)
+/*
+ * erts_lcnt_lock_post
+ *
+ * Used when we get a lock (i.e. directly after a lock operation)
* if the timer was set then we had to wait for the lock
* lock_post will calculate the wait time.
*/
+
void erts_lcnt_lock_post(erts_lcnt_lock_t *lock) {
erts_lcnt_lock_post_x(lock, (char*)str_undefined, 0);
}
@@ -517,31 +517,31 @@ void erts_lcnt_lock_post_x(erts_lcnt_lock_t *lock, char *file, unsigned int line
#endif
if (erts_lcnt_rt_options & ERTS_LCNT_OPT_SUSPEND) return;
- if (!ERTS_LCNT_LOCK_TYPE(lock)) return;
-
+ if (ERTS_LCNT_IS_LOCK_INVALID(lock)) return;
+
#ifdef DEBUG
if (!(lock->flag & (ERTS_LCNT_LT_RWMUTEX | ERTS_LCNT_LT_RWSPINLOCK))) {
- flowstate = ethr_atomic_read(&lock->flowstate);
- ASSERT(flowstate == 0);
- ethr_atomic_inc(&lock->flowstate);
+ flowstate = ethr_atomic_read(&lock->flowstate);
+ ASSERT(flowstate == 0);
+ ethr_atomic_inc(&lock->flowstate);
}
#endif
-
+
eltd = lcnt_get_thread_data();
-
+
ASSERT(eltd);
/* if lock was in conflict, time it */
stats = lcnt_get_lock_stats(lock, file, line);
if (eltd->timer_set) {
- lcnt_time(&timer);
-
- lcnt_time_diff(&time_wait, &timer, &(eltd->timer));
- lcnt_update_stats(stats, eltd->lock_in_conflict, &time_wait);
- eltd->timer_set--;
- ASSERT(eltd->timer_set >= 0);
+ lcnt_time(&timer);
+
+ lcnt_time_diff(&time_wait, &timer, &(eltd->timer));
+ lcnt_update_stats(stats, eltd->lock_in_conflict, &time_wait);
+ eltd->timer_set--;
+ ASSERT(eltd->timer_set >= 0);
} else {
- lcnt_update_stats(stats, eltd->lock_in_conflict, NULL);
+ lcnt_update_stats(stats, eltd->lock_in_conflict, NULL);
}
}
@@ -550,27 +550,28 @@ void erts_lcnt_lock_post_x(erts_lcnt_lock_t *lock, char *file, unsigned int line
void erts_lcnt_unlock_opt(erts_lcnt_lock_t *lock, Uint16 option) {
if (erts_lcnt_rt_options & ERTS_LCNT_OPT_SUSPEND) return;
- if (!ERTS_LCNT_LOCK_TYPE(lock)) return;
+ if (ERTS_LCNT_IS_LOCK_INVALID(lock)) return;
if (option & ERTS_LCNT_LO_WRITE) ethr_atomic_dec(&lock->w_state);
if (option & ERTS_LCNT_LO_READ ) ethr_atomic_dec(&lock->r_state);
}
void erts_lcnt_unlock(erts_lcnt_lock_t *lock) {
-#ifdef DEBUG
- erts_aint_t w_state;
- erts_aint_t flowstate;
-#endif
if (erts_lcnt_rt_options & ERTS_LCNT_OPT_SUSPEND) return;
- if (!ERTS_LCNT_LOCK_TYPE(lock)) return;
+ if (ERTS_LCNT_IS_LOCK_INVALID(lock)) return;
#ifdef DEBUG
- /* flowstate */
- flowstate = ethr_atomic_read(&lock->flowstate);
- ASSERT(flowstate == 1);
- ethr_atomic_dec(&lock->flowstate);
-
- /* write state */
- w_state = ethr_atomic_read(&lock->w_state);
- ASSERT(w_state > 0);
+ {
+ erts_aint_t w_state;
+ erts_aint_t flowstate;
+
+ /* flowstate */
+ flowstate = ethr_atomic_read(&lock->flowstate);
+ ASSERT(flowstate == 1);
+ ethr_atomic_dec(&lock->flowstate);
+
+ /* write state */
+ w_state = ethr_atomic_read(&lock->w_state);
+ ASSERT(w_state > 0);
+ }
#endif
ethr_atomic_dec(&lock->w_state);
}
@@ -579,35 +580,34 @@ void erts_lcnt_unlock(erts_lcnt_lock_t *lock) {
void erts_lcnt_trylock_opt(erts_lcnt_lock_t *lock, int res, Uint16 option) {
if (erts_lcnt_rt_options & ERTS_LCNT_OPT_SUSPEND) return;
- if (!ERTS_LCNT_LOCK_TYPE(lock)) return;
+ if (ERTS_LCNT_IS_LOCK_INVALID(lock)) return;
/* Determine lock_state via res instead of state */
if (res != EBUSY) {
- if (option & ERTS_LCNT_LO_WRITE) ethr_atomic_inc(&lock->w_state);
- if (option & ERTS_LCNT_LO_READ ) ethr_atomic_inc(&lock->r_state);
- lcnt_update_stats(&(lock->stats[0]), 0, NULL);
+ if (option & ERTS_LCNT_LO_WRITE) ethr_atomic_inc(&lock->w_state);
+ if (option & ERTS_LCNT_LO_READ ) ethr_atomic_inc(&lock->r_state);
+ lcnt_update_stats(&(lock->stats[0]), 0, NULL);
} else {
ethr_atomic_inc(&lock->stats[0].tries);
ethr_atomic_inc(&lock->stats[0].colls);
}
}
-
+
void erts_lcnt_trylock(erts_lcnt_lock_t *lock, int res) {
/* Determine lock_state via res instead of state */
-#ifdef DEBUG
- erts_aint_t flowstate;
-#endif
if (erts_lcnt_rt_options & ERTS_LCNT_OPT_SUSPEND) return;
- if (!ERTS_LCNT_LOCK_TYPE(lock)) return;
+ if (ERTS_LCNT_IS_LOCK_INVALID(lock)) return;
if (res != EBUSY) {
-
#ifdef DEBUG
- flowstate = ethr_atomic_read(&lock->flowstate);
- ASSERT(flowstate == 0);
- ethr_atomic_inc( &lock->flowstate);
+ {
+ erts_aint_t flowstate;
+ flowstate = ethr_atomic_read(&lock->flowstate);
+ ASSERT(flowstate == 0);
+ ethr_atomic_inc( &lock->flowstate);
+ }
#endif
- ethr_atomic_inc(&lock->w_state);
- lcnt_update_stats(&(lock->stats[0]), 0, NULL);
+ ethr_atomic_inc(&lock->w_state);
+ lcnt_update_stats(&(lock->stats[0]), 0, NULL);
} else {
ethr_atomic_inc(&lock->stats[0].tries);
ethr_atomic_inc(&lock->stats[0].colls);
@@ -662,13 +662,13 @@ void erts_lcnt_clear_counters(void) {
lcnt_lock();
list = erts_lcnt_data->current_locks;
-
+
for (lock = list->head; lock != NULL; lock = lock->next) {
- for( i = 0; i < ERTS_LCNT_MAX_LOCK_LOCATIONS; i++) {
- stats = &lock->stats[i];
- lcnt_clear_stats(stats);
- }
- lock->n_stats = 1;
+ for( i = 0; i < ERTS_LCNT_MAX_LOCK_LOCATIONS; i++) {
+ stats = &lock->stats[i];
+ lcnt_clear_stats(stats);
+ }
+ lock->n_stats = 1;
}
/* empty deleted locks in lock list */
@@ -681,14 +681,14 @@ void erts_lcnt_clear_counters(void) {
erts_lcnt_data_t *erts_lcnt_get_data(void) {
erts_lcnt_time_t timer_stop;
-
+
lcnt_lock();
-
+
lcnt_time(&timer_stop);
lcnt_time_diff(&(erts_lcnt_data->duration), &timer_stop, &timer_start);
-
+
lcnt_unlock();
-
+
return erts_lcnt_data;
}
diff --git a/erts/emulator/beam/erl_lock_count.h b/erts/emulator/beam/erl_lock_count.h
index 09fadd7e9e..051f55271d 100644
--- a/erts/emulator/beam/erl_lock_count.h
+++ b/erts/emulator/beam/erl_lock_count.h
@@ -20,7 +20,7 @@
/*
* Description: Statistics for locks.
*
- * Author: Bj�rn-Egil Dahlberg
+ * Author: Björn-Egil Dahlberg
* Date: 2008-07-03
* Abstract:
* Locks statistics internal representation.
@@ -95,30 +95,35 @@
#define ERTS_LCNT_LO_WRITE (((Uint16) 1) << 7)
#define ERTS_LCNT_LO_READ_WRITE ( ERTS_LCNT_LO_READ \
- | ERTS_LCNT_LO_WRITE )
+ | ERTS_LCNT_LO_WRITE )
#define ERTS_LCNT_LT_ALL ( ERTS_LCNT_LT_SPINLOCK \
- | ERTS_LCNT_LT_RWSPINLOCK \
- | ERTS_LCNT_LT_MUTEX \
- | ERTS_LCNT_LT_RWMUTEX \
- | ERTS_LCNT_LT_PROCLOCK )
+ | ERTS_LCNT_LT_RWSPINLOCK \
+ | ERTS_LCNT_LT_MUTEX \
+ | ERTS_LCNT_LT_RWMUTEX \
+ | ERTS_LCNT_LT_PROCLOCK )
+
+#define ERTS_LCNT_LOCK_TYPE(lock) ((lock)->flag & ERTS_LCNT_LT_ALL)
+#define ERTS_LCNT_IS_LOCK_INVALID(lock) (!((lock)->flag & ERTS_LCNT_LT_ALL))
+#define ERTS_LCNT_CLEAR_FLAG(lock) ((lock)->flag = 0)
+
/* runtime options */
-#define ERTS_LCNT_OPT_SUSPEND (((Uint16) 1) << 0)
-#define ERTS_LCNT_OPT_LOCATION (((Uint16) 1) << 1)
-#define ERTS_LCNT_OPT_PROCLOCK (((Uint16) 1) << 2)
-#define ERTS_LCNT_OPT_COPYSAVE (((Uint16) 1) << 3)
-#define ERTS_LCNT_OPT_PORTLOCK (((Uint16) 1) << 4)
+#define ERTS_LCNT_OPT_SUSPEND (((Uint16) 1) << 0)
+#define ERTS_LCNT_OPT_LOCATION (((Uint16) 1) << 1)
+#define ERTS_LCNT_OPT_PROCLOCK (((Uint16) 1) << 2)
+#define ERTS_LCNT_OPT_PORTLOCK (((Uint16) 1) << 3)
+#define ERTS_LCNT_OPT_COPYSAVE (((Uint16) 1) << 4)
typedef struct {
unsigned long s;
unsigned long ns;
} erts_lcnt_time_t;
-
+
extern erts_lcnt_time_t timer_start;
typedef struct {
- Uint32 ns[ERTS_LCNT_HISTOGRAM_SLOT_SIZE]; /* log2 array of nano seconds occurences */
+ Uint32 ns[ERTS_LCNT_HISTOGRAM_SLOT_SIZE]; /* log2 array of nano seconds occurences */
} erts_lcnt_hist_t;
typedef struct erts_lcnt_lock_stats_s {
@@ -129,10 +134,10 @@ typedef struct erts_lcnt_lock_stats_s {
char *file; /* which file the lock was taken */
unsigned int line; /* line number in file */
-
+
ethr_atomic_t tries; /* n tries to get lock */
ethr_atomic_t colls; /* n collisions of tries to get lock */
-
+
unsigned long timer_n; /* #times waited for lock */
erts_lcnt_time_t timer; /* total wait time for lock */
erts_lcnt_hist_t hist;
@@ -155,7 +160,7 @@ typedef struct erts_lcnt_lock_s {
/* statistics */
unsigned int n_stats;
erts_lcnt_lock_stats_t stats[ERTS_LCNT_MAX_LOCK_LOCATIONS]; /* first entry is "undefined"*/
-
+
/* chains for list handling */
/* data is hold by lcnt_lock */
struct erts_lcnt_lock_s *prev;
@@ -167,7 +172,7 @@ typedef struct {
erts_lcnt_lock_t *tail;
unsigned long n;
} erts_lcnt_lock_list_t;
-
+
typedef struct {
erts_lcnt_time_t duration; /* time since last clear */
erts_lcnt_lock_list_t *current_locks;
@@ -205,6 +210,7 @@ void erts_lcnt_list_delete(erts_lcnt_lock_list_t *list, erts_lcnt_lock_t *lock);
/* lock operations (global) */
void erts_lcnt_init_lock(erts_lcnt_lock_t *lock, char *name, Uint16 flag);
void erts_lcnt_init_lock_x(erts_lcnt_lock_t *lock, char *name, Uint16 flag, Eterm id);
+void erts_lcnt_init_lock_empty(erts_lcnt_lock_t *lock);
void erts_lcnt_destroy_lock(erts_lcnt_lock_t *lock);
void erts_lcnt_lock(erts_lcnt_lock_t *lock);
@@ -226,7 +232,5 @@ void erts_lcnt_clear_counters(void);
char *erts_lcnt_lock_type(Uint16 type);
erts_lcnt_data_t *erts_lcnt_get_data(void);
-#define ERTS_LCNT_LOCK_TYPE(lockp) ((lockp)->flag & ERTS_LCNT_LT_ALL)
-
#endif /* ifdef ERTS_ENABLE_LOCK_COUNT */
#endif /* ifndef ERTS_LOCK_COUNT_H__ */
diff --git a/erts/emulator/beam/erl_port.h b/erts/emulator/beam/erl_port.h
index 3920fae2d9..66cbb67a0d 100644
--- a/erts/emulator/beam/erl_port.h
+++ b/erts/emulator/beam/erl_port.h
@@ -266,8 +266,7 @@ erts_prtsd_set(Port *prt, int ix, void *data)
#endif
-extern erts_smp_atomic_t erts_bytes_out; /* no bytes written out */
-extern erts_smp_atomic_t erts_bytes_in; /* no bytes sent into the system */
+Eterm erts_request_io_bytes(Process *c_p);
/* port status flags */
diff --git a/erts/emulator/beam/erl_process.c b/erts/emulator/beam/erl_process.c
index 43e84a1be1..88c1b5c121 100644
--- a/erts/emulator/beam/erl_process.c
+++ b/erts/emulator/beam/erl_process.c
@@ -5542,6 +5542,9 @@ init_scheduler_data(ErtsSchedulerData* esdp, int num,
esdp->thr_id = (Uint32) num;
erts_sched_bif_unique_init(esdp);
+ esdp->io.out = (Uint64) 0;
+ esdp->io.in = (Uint64) 0;
+
if (daww_ptr) {
init_aux_work_data(&esdp->aux_work_data, esdp, *daww_ptr);
#ifdef ERTS_SMP
diff --git a/erts/emulator/beam/erl_process.h b/erts/emulator/beam/erl_process.h
index 241a82d6c2..a6c3790991 100644
--- a/erts/emulator/beam/erl_process.h
+++ b/erts/emulator/beam/erl_process.h
@@ -659,6 +659,11 @@ struct ErtsSchedulerData_ {
ErtsSchedAllocData alloc_data;
+ struct {
+ Uint64 out;
+ Uint64 in;
+ } io;
+
Uint64 reductions;
ErtsSchedWallTime sched_wall_time;
ErtsGCInfo gc_info;
diff --git a/erts/emulator/beam/erl_process_lock.c b/erts/emulator/beam/erl_process_lock.c
index b28bc498f6..0548c6137c 100644
--- a/erts/emulator/beam/erl_process_lock.c
+++ b/erts/emulator/beam/erl_process_lock.c
@@ -110,6 +110,9 @@ static struct {
erts_pix_lock_t erts_pix_locks[ERTS_NO_OF_PIX_LOCKS];
+#ifdef ERTS_ENABLE_LOCK_COUNT
+static void lcnt_enable_proc_lock_count(Process *proc, int enable);
+#endif
void
erts_init_proc_lock(int cpus)
@@ -1083,30 +1086,32 @@ erts_proc_lock_fin(Process *p)
/* --- Process lock counting ----------------------------------------------- */
#if ERTS_PROC_LOCK_OWN_IMPL && defined(ERTS_ENABLE_LOCK_COUNT)
-void erts_lcnt_proc_lock_init(Process *p) {
- if (erts_lcnt_rt_options & ERTS_LCNT_OPT_PROCLOCK) {
- if (p->common.id != ERTS_INVALID_PID) {
- erts_lcnt_init_lock_x(&(p->lock.lcnt_main), "proc_main", ERTS_LCNT_LT_PROCLOCK, p->common.id);
- erts_lcnt_init_lock_x(&(p->lock.lcnt_msgq), "proc_msgq", ERTS_LCNT_LT_PROCLOCK, p->common.id);
- erts_lcnt_init_lock_x(&(p->lock.lcnt_btm), "proc_btm", ERTS_LCNT_LT_PROCLOCK, p->common.id);
- erts_lcnt_init_lock_x(&(p->lock.lcnt_link), "proc_link", ERTS_LCNT_LT_PROCLOCK, p->common.id);
- erts_lcnt_init_lock_x(&(p->lock.lcnt_status), "proc_status", ERTS_LCNT_LT_PROCLOCK, p->common.id);
- } else {
- erts_lcnt_init_lock(&(p->lock.lcnt_main), "proc_main", ERTS_LCNT_LT_PROCLOCK);
- erts_lcnt_init_lock(&(p->lock.lcnt_msgq), "proc_msgq", ERTS_LCNT_LT_PROCLOCK);
- erts_lcnt_init_lock(&(p->lock.lcnt_btm), "proc_btm", ERTS_LCNT_LT_PROCLOCK);
- erts_lcnt_init_lock(&(p->lock.lcnt_link), "proc_link", ERTS_LCNT_LT_PROCLOCK);
- erts_lcnt_init_lock(&(p->lock.lcnt_status), "proc_status", ERTS_LCNT_LT_PROCLOCK);
- }
- } else {
- sys_memzero(&(p->lock.lcnt_main), sizeof(p->lock.lcnt_main));
- sys_memzero(&(p->lock.lcnt_msgq), sizeof(p->lock.lcnt_msgq));
- sys_memzero(&(p->lock.lcnt_btm), sizeof(p->lock.lcnt_btm));
- sys_memzero(&(p->lock.lcnt_link), sizeof(p->lock.lcnt_link));
- sys_memzero(&(p->lock.lcnt_status), sizeof(p->lock.lcnt_status));
- }
+
+void erts_lcnt_enable_proc_lock_count(int enable) {
+ int ix, max = erts_ptab_max(&erts_proc);
+ Process *proc = NULL;
+ for (ix = 0; ix < max; ++ix) {
+ if ((proc = erts_pix2proc(ix)) != NULL)
+ lcnt_enable_proc_lock_count(proc, enable);
+ } /* for all processes */
}
-
+
+void erts_lcnt_proc_lock_init(Process *p) {
+ if (!(erts_lcnt_rt_options & ERTS_LCNT_OPT_PROCLOCK)) {
+ erts_lcnt_init_lock_empty(&(p->lock.lcnt_main));
+ erts_lcnt_init_lock_empty(&(p->lock.lcnt_msgq));
+ erts_lcnt_init_lock_empty(&(p->lock.lcnt_btm));
+ erts_lcnt_init_lock_empty(&(p->lock.lcnt_link));
+ erts_lcnt_init_lock_empty(&(p->lock.lcnt_status));
+ } else { /* now the common case */
+ Eterm pid = (p->common.id != ERTS_INVALID_PID) ? p->common.id : NIL;
+ erts_lcnt_init_lock_x(&(p->lock.lcnt_main), "proc_main", ERTS_LCNT_LT_PROCLOCK, pid);
+ erts_lcnt_init_lock_x(&(p->lock.lcnt_msgq), "proc_msgq", ERTS_LCNT_LT_PROCLOCK, pid);
+ erts_lcnt_init_lock_x(&(p->lock.lcnt_btm), "proc_btm", ERTS_LCNT_LT_PROCLOCK, pid);
+ erts_lcnt_init_lock_x(&(p->lock.lcnt_link), "proc_link", ERTS_LCNT_LT_PROCLOCK, pid);
+ erts_lcnt_init_lock_x(&(p->lock.lcnt_status),"proc_status",ERTS_LCNT_LT_PROCLOCK, pid);
+ } /* the lock names should really be aligned to four characters */
+} /* logic reversed */
void erts_lcnt_proc_lock_destroy(Process *p) {
erts_lcnt_destroy_lock(&(p->lock.lcnt_main));
@@ -1116,28 +1121,31 @@ void erts_lcnt_proc_lock_destroy(Process *p) {
erts_lcnt_destroy_lock(&(p->lock.lcnt_status));
}
-void erts_lcnt_proc_lock(erts_proc_lock_t *lock, ErtsProcLocks locks) {
- if (erts_lcnt_rt_options & ERTS_LCNT_OPT_PROCLOCK) {
- if (locks & ERTS_PROC_LOCK_MAIN) {
- erts_lcnt_lock(&(lock->lcnt_main));
- }
- if (locks & ERTS_PROC_LOCK_MSGQ) {
- erts_lcnt_lock(&(lock->lcnt_msgq));
- }
- if (locks & ERTS_PROC_LOCK_BTM) {
- erts_lcnt_lock(&(lock->lcnt_btm));
- }
- if (locks & ERTS_PROC_LOCK_LINK) {
- erts_lcnt_lock(&(lock->lcnt_link));
- }
- if (locks & ERTS_PROC_LOCK_STATUS) {
- erts_lcnt_lock(&(lock->lcnt_status));
+static void lcnt_enable_proc_lock_count(Process *proc, int enable) {
+ if (enable) {
+ if (!ERTS_LCNT_LOCK_TYPE(&(proc->lock.lcnt_main))) {
+ erts_lcnt_proc_lock_init(proc);
+ }
}
+ else {
+ if (ERTS_LCNT_LOCK_TYPE(&(proc->lock.lcnt_main))) {
+ erts_lcnt_proc_lock_destroy(proc);
+ }
}
}
-void erts_lcnt_proc_lock_post_x(erts_proc_lock_t *lock, ErtsProcLocks locks, char *file, unsigned int line) {
- if (erts_lcnt_rt_options & ERTS_LCNT_OPT_PROCLOCK) {
+void erts_lcnt_proc_lock(erts_proc_lock_t *lock, ErtsProcLocks locks) {
+ if (!(erts_lcnt_rt_options & ERTS_LCNT_OPT_PROCLOCK)) return;
+ if (locks & ERTS_PROC_LOCK_MAIN) { erts_lcnt_lock(&(lock->lcnt_main)); }
+ if (locks & ERTS_PROC_LOCK_MSGQ) { erts_lcnt_lock(&(lock->lcnt_msgq)); }
+ if (locks & ERTS_PROC_LOCK_BTM) { erts_lcnt_lock(&(lock->lcnt_btm)); }
+ if (locks & ERTS_PROC_LOCK_LINK) { erts_lcnt_lock(&(lock->lcnt_link)); }
+ if (locks & ERTS_PROC_LOCK_STATUS) { erts_lcnt_lock(&(lock->lcnt_status)); }
+}
+
+void erts_lcnt_proc_lock_post_x(erts_proc_lock_t *lock, ErtsProcLocks locks,
+ char *file, unsigned int line) {
+ if (!(erts_lcnt_rt_options & ERTS_LCNT_OPT_PROCLOCK)) return;
if (locks & ERTS_PROC_LOCK_MAIN) {
erts_lcnt_lock_post_x(&(lock->lcnt_main), file, line);
}
@@ -1153,90 +1161,34 @@ void erts_lcnt_proc_lock_post_x(erts_proc_lock_t *lock, ErtsProcLocks locks, cha
if (locks & ERTS_PROC_LOCK_STATUS) {
erts_lcnt_lock_post_x(&(lock->lcnt_status), file, line);
}
- }
}
void erts_lcnt_proc_lock_unaquire(erts_proc_lock_t *lock, ErtsProcLocks locks) {
- if (erts_lcnt_rt_options & ERTS_LCNT_OPT_PROCLOCK) {
- if (locks & ERTS_PROC_LOCK_MAIN) {
- erts_lcnt_lock_unaquire(&(lock->lcnt_main));
- }
- if (locks & ERTS_PROC_LOCK_MSGQ) {
- erts_lcnt_lock_unaquire(&(lock->lcnt_msgq));
- }
- if (locks & ERTS_PROC_LOCK_BTM) {
- erts_lcnt_lock_unaquire(&(lock->lcnt_btm));
- }
- if (locks & ERTS_PROC_LOCK_LINK) {
- erts_lcnt_lock_unaquire(&(lock->lcnt_link));
- }
- if (locks & ERTS_PROC_LOCK_STATUS) {
- erts_lcnt_lock_unaquire(&(lock->lcnt_status));
- }
- }
+ if (!(erts_lcnt_rt_options & ERTS_LCNT_OPT_PROCLOCK)) return;
+ if (locks & ERTS_PROC_LOCK_MAIN) { erts_lcnt_lock_unaquire(&(lock->lcnt_main)); }
+ if (locks & ERTS_PROC_LOCK_MSGQ) { erts_lcnt_lock_unaquire(&(lock->lcnt_msgq)); }
+ if (locks & ERTS_PROC_LOCK_BTM) { erts_lcnt_lock_unaquire(&(lock->lcnt_btm)); }
+ if (locks & ERTS_PROC_LOCK_LINK) { erts_lcnt_lock_unaquire(&(lock->lcnt_link)); }
+ if (locks & ERTS_PROC_LOCK_STATUS) { erts_lcnt_lock_unaquire(&(lock->lcnt_status)); }
}
void erts_lcnt_proc_unlock(erts_proc_lock_t *lock, ErtsProcLocks locks) {
- if (erts_lcnt_rt_options & ERTS_LCNT_OPT_PROCLOCK) {
- if (locks & ERTS_PROC_LOCK_MAIN) {
- erts_lcnt_unlock(&(lock->lcnt_main));
- }
- if (locks & ERTS_PROC_LOCK_MSGQ) {
- erts_lcnt_unlock(&(lock->lcnt_msgq));
- }
- if (locks & ERTS_PROC_LOCK_BTM) {
- erts_lcnt_unlock(&(lock->lcnt_btm));
- }
- if (locks & ERTS_PROC_LOCK_LINK) {
- erts_lcnt_unlock(&(lock->lcnt_link));
- }
- if (locks & ERTS_PROC_LOCK_STATUS) {
- erts_lcnt_unlock(&(lock->lcnt_status));
- }
- }
+ if (!(erts_lcnt_rt_options & ERTS_LCNT_OPT_PROCLOCK)) return;
+ if (locks & ERTS_PROC_LOCK_MAIN) { erts_lcnt_unlock(&(lock->lcnt_main)); }
+ if (locks & ERTS_PROC_LOCK_MSGQ) { erts_lcnt_unlock(&(lock->lcnt_msgq)); }
+ if (locks & ERTS_PROC_LOCK_BTM) { erts_lcnt_unlock(&(lock->lcnt_btm)); }
+ if (locks & ERTS_PROC_LOCK_LINK) { erts_lcnt_unlock(&(lock->lcnt_link)); }
+ if (locks & ERTS_PROC_LOCK_STATUS) { erts_lcnt_unlock(&(lock->lcnt_status)); }
}
void erts_lcnt_proc_trylock(erts_proc_lock_t *lock, ErtsProcLocks locks, int res) {
- if (erts_lcnt_rt_options & ERTS_LCNT_OPT_PROCLOCK) {
- if (locks & ERTS_PROC_LOCK_MAIN) {
- erts_lcnt_trylock(&(lock->lcnt_main), res);
- }
- if (locks & ERTS_PROC_LOCK_MSGQ) {
- erts_lcnt_trylock(&(lock->lcnt_msgq), res);
- }
- if (locks & ERTS_PROC_LOCK_BTM) {
- erts_lcnt_trylock(&(lock->lcnt_btm), res);
- }
- if (locks & ERTS_PROC_LOCK_LINK) {
- erts_lcnt_trylock(&(lock->lcnt_link), res);
- }
- if (locks & ERTS_PROC_LOCK_STATUS) {
- erts_lcnt_trylock(&(lock->lcnt_status), res);
- }
- }
-}
-
-
-void erts_lcnt_enable_proc_lock_count(int enable)
-{
- int i, max = erts_ptab_max(&erts_proc);
-
- for (i = 0; i < max; ++i) {
- Process* p = erts_pix2proc(i);
- if (p) {
- if (enable) {
- if (!ERTS_LCNT_LOCK_TYPE(&(p->lock.lcnt_main))) {
- erts_lcnt_proc_lock_init(p);
- }
- } else {
- if (ERTS_LCNT_LOCK_TYPE(&(p->lock.lcnt_main))) {
- erts_lcnt_proc_lock_destroy(p);
- }
- }
- }
- }
-}
-
-#endif /* ifdef ERTS_ENABLE_LOCK_COUNT */
+ if (!(erts_lcnt_rt_options & ERTS_LCNT_OPT_PROCLOCK)) return;
+ if (locks & ERTS_PROC_LOCK_MAIN) { erts_lcnt_trylock(&(lock->lcnt_main), res); }
+ if (locks & ERTS_PROC_LOCK_MSGQ) { erts_lcnt_trylock(&(lock->lcnt_msgq), res); }
+ if (locks & ERTS_PROC_LOCK_BTM) { erts_lcnt_trylock(&(lock->lcnt_btm), res); }
+ if (locks & ERTS_PROC_LOCK_LINK) { erts_lcnt_trylock(&(lock->lcnt_link), res); }
+ if (locks & ERTS_PROC_LOCK_STATUS) { erts_lcnt_trylock(&(lock->lcnt_status), res); }
+} /* reversed logic */
+#endif /* ERTS_ENABLE_LOCK_COUNT */
/* --- Process lock checking ----------------------------------------------- */
diff --git a/erts/emulator/beam/io.c b/erts/emulator/beam/io.c
index 23f208c2eb..75d80b1a58 100644
--- a/erts/emulator/beam/io.c
+++ b/erts/emulator/beam/io.c
@@ -65,8 +65,6 @@ static erts_smp_tsd_key_t driver_list_last_error_key; /* Save last DDLL error o
per thread basis (for BC interfaces) */
ErtsPTab erts_port erts_align_attribute(ERTS_CACHE_LINE_SIZE); /* The port table */
-erts_smp_atomic_t erts_bytes_out; /* No bytes sent out of the system */
-erts_smp_atomic_t erts_bytes_in; /* No bytes gotten into the system */
const ErlDrvTermData driver_term_nil = (ErlDrvTermData)NIL;
@@ -80,6 +78,9 @@ int erts_port_synchronous_ops = 0;
int erts_port_schedule_all_ops = 0;
int erts_port_parallelism = 0;
+static erts_atomic64_t bytes_in;
+static erts_atomic64_t bytes_out;
+
static void deliver_result(Eterm sender, Eterm pid, Eterm res);
static int init_driver(erts_driver_t *, ErlDrvEntry *, DE_Handle *);
static void terminate_port(Port *p);
@@ -1534,12 +1535,10 @@ erts_schedule_proc2port_signal(Process *c_p,
}
static ERTS_INLINE void
-send_badsig(Port *prt)
-{
+send_badsig(Port *prt) {
ErtsProcLocks rp_locks = ERTS_PROC_LOCKS_XSIG_SEND;
Process* rp;
Eterm connected = ERTS_PORT_GET_CONNECTED(prt);
-
ERTS_SMP_CHK_NO_PROC_LOCKS;
ERTS_LC_ASSERT(erts_get_scheduler_id());
@@ -1559,15 +1558,13 @@ send_badsig(Port *prt)
0);
if (rp_locks)
erts_smp_proc_unlock(rp, rp_locks);
- }
-}
+ } /* exit sent */
+} /* send_badsig */
static void
-badsig_received(int bang_op,
- Port *prt,
+badsig_received(int bang_op, Port *prt,
erts_aint32_t state,
- int bad_output_value)
-{
+ int bad_output_value) {
/*
* if (bang_op)
* we are part of a "Prt ! Something" operation
@@ -1583,12 +1580,12 @@ badsig_received(int bang_op,
}
if (bang_op)
send_badsig(prt);
- }
-}
+ } /* not invalid */
+} /* behaved accordingly */
static int
-port_badsig(Port *prt, erts_aint32_t state, int op, ErtsProc2PortSigData *sigdp)
-{
+port_badsig(Port *prt, erts_aint32_t state, int op,
+ ErtsProc2PortSigData *sigdp) {
if (op == ERTS_PROC2PORT_SIG_EXEC)
badsig_received(sigdp->flags & ERTS_P2P_SIG_DATA_FLG_BANG_OP,
prt,
@@ -1597,16 +1594,14 @@ port_badsig(Port *prt, erts_aint32_t state, int op, ErtsProc2PortSigData *sigdp)
if (sigdp->flags & ERTS_P2P_SIG_DATA_FLG_REPLY)
port_sched_op_reply(sigdp->caller, sigdp->ref, am_badarg);
return ERTS_PORT_REDS_BADSIG;
-}
-
-
-/*
- * bad_port_signal() will
+} /* port_badsig */
+/* bad_port_signal() will
* - preserve signal order of signals.
* - send a 'badsig' exit signal to connected process if 'from' is an
* internal pid and the port is alive when the bad signal reaches
* it.
*/
+
static ErtsPortOpResult
bad_port_signal(Process *c_p,
int flags,
@@ -1681,6 +1676,7 @@ call_driver_outputv(int bang_op,
if (bang_op && from != ERTS_PORT_GET_CONNECTED(prt))
send_badsig(prt);
else {
+ ErtsSchedulerData *esdp = erts_get_scheduler_data();
ErlDrvSizeT size = evp->size;
ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt)
@@ -1698,7 +1694,10 @@ call_driver_outputv(int bang_op,
prt->caller = NIL;
prt->bytes_out += size;
- erts_smp_atomic_add_nob(&erts_bytes_out, size);
+ if (esdp)
+ esdp->io.out += (Uint64) size;
+ else
+ erts_atomic64_add_nob(&bytes_out, (erts_aint64_t) size);
}
}
@@ -1778,7 +1777,7 @@ call_driver_output(int bang_op,
if (bang_op && from != ERTS_PORT_GET_CONNECTED(prt))
send_badsig(prt);
else {
-
+ ErtsSchedulerData *esdp = erts_get_scheduler_data();
ERTS_SMP_LC_ASSERT(erts_lc_is_port_locked(prt)
|| ERTS_IS_CRASH_DUMPING);
@@ -1794,7 +1793,10 @@ call_driver_output(int bang_op,
prt->caller = NIL;
prt->bytes_out += size;
- erts_smp_atomic_add_nob(&erts_bytes_out, size);
+ if (esdp)
+ esdp->io.out += (Uint64) size;
+ else
+ erts_atomic64_add_nob(&bytes_out, (erts_aint64_t) size);
}
}
@@ -2741,6 +2743,9 @@ void erts_init_io(int port_tab_size,
drv_list_rwmtx_opts.type = ERTS_SMP_RWMTX_TYPE_EXTREMELY_FREQUENT_READ;
drv_list_rwmtx_opts.lived = ERTS_SMP_RWMTX_LONG_LIVED;
+ erts_atomic64_init_nob(&bytes_in, 0);
+ erts_atomic64_init_nob(&bytes_out, 0);
+
common_element_size = ERTS_ALC_DATA_ALIGN_SIZE(sizeof(Port));
common_element_size += ERTS_ALC_DATA_ALIGN_SIZE(sizeof(ErtsPortTaskBusyPortQ));
common_element_size += 10; /* name */
@@ -2782,9 +2787,6 @@ void erts_init_io(int port_tab_size,
legacy_port_tab,
1);
- erts_smp_atomic_init_nob(&erts_bytes_out, 0);
- erts_smp_atomic_init_nob(&erts_bytes_in, 0);
-
sys_init_io();
erts_smp_tsd_set(driver_list_lock_status_key, (void *) 1);
@@ -2804,7 +2806,6 @@ void erts_init_io(int port_tab_size,
}
#if defined(ERTS_ENABLE_LOCK_COUNT) && defined(ERTS_SMP)
-
static ERTS_INLINE void lcnt_enable_drv_lock_count(erts_driver_t *dp, int enable)
{
if (dp->lock) {
@@ -2844,25 +2845,26 @@ static ERTS_INLINE void lcnt_enable_port_lock_count(Port *prt, int enable)
}
}
-void erts_lcnt_enable_io_lock_count(int enable)
-{
+void erts_lcnt_enable_io_lock_count(int enable) {
erts_driver_t *dp;
- int i, max = erts_ptab_max(&erts_port);
+ int ix, max = erts_ptab_max(&erts_port);
+ Port *prt;
- for (i = 0; i < max; i++) {
- Port *prt = erts_pix2port(i);
- if (prt)
+ for (ix = 0; ix < max; ix++) {
+ if ((prt = erts_pix2port(ix)) != NULL) {
lcnt_enable_port_lock_count(prt, enable);
- }
+ }
+ } /* for all ports */
lcnt_enable_drv_lock_count(&vanilla_driver, enable);
lcnt_enable_drv_lock_count(&spawn_driver, enable);
lcnt_enable_drv_lock_count(&fd_driver, enable);
- for (dp = driver_list; dp; dp = dp->next)
+ /* enable lock counting in all drivers */
+ for (dp = driver_list; dp; dp = dp->next) {
lcnt_enable_drv_lock_count(dp, enable);
-}
-#endif
-
+ }
+} /* enable/disable lock counting of ports */
+#endif /* defined(ERTS_ENABLE_LOCK_COUNT) && defined(ERTS_SMP) */
/*
* Buffering of data when using line oriented I/O on ports
*/
@@ -4574,6 +4576,102 @@ erts_port_info(Process* c_p,
}
typedef struct {
+ Uint sched_id;
+ Eterm pid;
+ Uint32 refn[ERTS_REF_NUMBERS];
+ erts_smp_atomic32_t refc;
+} ErtsIOBytesReq;
+
+static void
+reply_io_bytes(void *vreq)
+{
+ ErtsIOBytesReq *req = (ErtsIOBytesReq *) vreq;
+ Process *rp;
+
+ rp = erts_proc_lookup(req->pid);
+ if (rp) {
+ ErlOffHeap *ohp = NULL;
+ ErlHeapFragment *bp = NULL;
+ ErtsProcLocks rp_locks;
+ Eterm ref, msg, ein, eout, *hp;
+ Uint64 in, out;
+ Uint hsz;
+ ErtsSchedulerData *esdp = erts_get_scheduler_data();
+ Uint sched_id = esdp->no;
+ in = esdp->io.in;
+ out = esdp->io.out;
+ if (req->sched_id != sched_id)
+ rp_locks = 0;
+ else {
+ in += (Uint64) erts_atomic64_read_nob(&bytes_in);
+ out += (Uint64) erts_atomic64_read_nob(&bytes_out);
+ rp_locks = ERTS_PROC_LOCK_MAIN;
+ }
+
+ hsz = 5 /* 4-tuple */ + REF_THING_SIZE;
+
+ erts_bld_uint64(NULL, &hsz, in);
+ erts_bld_uint64(NULL, &hsz, out);
+
+ hp = erts_alloc_message_heap(hsz, &bp, &ohp, rp, &rp_locks);
+
+ ref = make_internal_ref(hp);
+ write_ref_thing(hp, req->refn[0], req->refn[1], req->refn[2]);
+ hp += REF_THING_SIZE;
+
+ ein = erts_bld_uint64(&hp, NULL, in);
+ eout = erts_bld_uint64(&hp, NULL, out);
+
+ msg = TUPLE4(hp, ref, make_small(sched_id), ein, eout);
+ erts_queue_message(rp, &rp_locks, bp, msg, NIL);
+
+ if (req->sched_id == sched_id)
+ rp_locks &= ~ERTS_PROC_LOCK_MAIN;
+ if (rp_locks)
+ erts_smp_proc_unlock(rp, rp_locks);
+ }
+
+ if (erts_smp_atomic32_dec_read_nob(&req->refc) == 0)
+ erts_free(ERTS_ALC_T_IOB_REQ, req);
+}
+
+Eterm
+erts_request_io_bytes(Process *c_p)
+{
+ Uint *hp;
+ Eterm ref;
+ Uint32 *refn;
+ ErtsSchedulerData *esdp = ERTS_PROC_GET_SCHDATA(c_p);
+ ErtsIOBytesReq *req = erts_alloc(ERTS_ALC_T_IOB_REQ,
+ sizeof(ErtsIOBytesReq));
+
+ hp = HAlloc(c_p, REF_THING_SIZE);
+ ref = erts_sched_make_ref_in_buffer(esdp, hp);
+ refn = internal_ref_numbers(ref);
+
+ req->sched_id = esdp->no;
+ req->pid = c_p->common.id;
+ req->refn[0] = refn[0];
+ req->refn[1] = refn[1];
+ req->refn[2] = refn[2];
+ erts_smp_atomic32_init_nob(&req->refc,
+ (erts_aint32_t) erts_no_schedulers);
+
+#ifdef ERTS_SMP
+ if (erts_no_schedulers > 1)
+ erts_schedule_multi_misc_aux_work(1,
+ erts_no_schedulers,
+ reply_io_bytes,
+ (void *) req);
+#endif
+
+ reply_io_bytes((void *) req);
+
+ return ref;
+}
+
+
+typedef struct {
int to;
void *arg;
} prt_one_lnk_data;
@@ -5111,6 +5209,7 @@ driver_deliver_term(Eterm to, ErlDrvTermData* data, int len)
struct b2t_states__ b2t;
int scheduler;
int is_heap_need_limited = 1;
+ ErtsSchedulerData *esdp = erts_get_scheduler_data();
ERTS_UNDEF(mess,NIL);
ERTS_UNDEF(scheduler,1);
@@ -5418,7 +5517,10 @@ driver_deliver_term(Eterm to, ErlDrvTermData* data, int len)
Uint size = ptr[1];
Uint offset = ptr[2];
- erts_smp_atomic_add_nob(&erts_bytes_in, (erts_aint_t) size);
+ if (esdp)
+ esdp->io.in += (Uint64) size;
+ else
+ erts_atomic64_add_nob(&bytes_in, (erts_aint64_t) size);
if (size <= ERL_ONHEAP_BIN_LIMIT) {
ErlHeapBin* hbp = (ErlHeapBin *) erts_produce_heap(&factory,
@@ -5452,7 +5554,10 @@ driver_deliver_term(Eterm to, ErlDrvTermData* data, int len)
byte *bufp = (byte *) ptr[0];
Uint size = (Uint) ptr[1];
- erts_smp_atomic_add_nob(&erts_bytes_in, (erts_aint_t) size);
+ if (esdp)
+ esdp->io.in += (Uint64) size;
+ else
+ erts_atomic64_add_nob(&bytes_in, (erts_aint64_t) size);
if (size <= ERL_ONHEAP_BIN_LIMIT) {
ErlHeapBin* hbp = (ErlHeapBin *) erts_produce_heap(&factory,
@@ -5489,7 +5594,10 @@ driver_deliver_term(Eterm to, ErlDrvTermData* data, int len)
}
case ERL_DRV_STRING: /* char*, length */
- erts_smp_atomic_add_nob(&erts_bytes_in, (erts_aint_t) ptr[1]);
+ if (esdp)
+ esdp->io.in += (Uint64) ptr[1];
+ else
+ erts_atomic64_add_nob(&bytes_in, (erts_aint64_t) ptr[1]);
erts_reserve_heap(&factory, 2*ptr[1]);
mess = buf_to_intlist(&factory.hp, (char*)ptr[0], ptr[1], NIL);
ptr += 2;
@@ -5765,6 +5873,7 @@ int driver_output_binary(ErlDrvPort ix, char* hbuf, ErlDrvSizeT hlen,
{
erts_aint32_t state;
Port* prt = erts_drvport2port_state(ix, &state);
+ ErtsSchedulerData *esdp = erts_get_scheduler_data();
ERTS_SMP_CHK_NO_PROC_LOCKS;
@@ -5775,7 +5884,10 @@ int driver_output_binary(ErlDrvPort ix, char* hbuf, ErlDrvSizeT hlen,
return 0;
prt->bytes_in += (hlen + len);
- erts_smp_atomic_add_nob(&erts_bytes_in, (erts_aint_t) (hlen + len));
+ if (esdp)
+ esdp->io.in += (Uint64) (hlen + len);
+ else
+ erts_atomic64_add_nob(&bytes_in, (erts_aint64_t) (hlen + len));
if (state & ERTS_PORT_SFLG_DISTRIBUTION) {
return erts_net_message(prt,
prt->dist_entry,
@@ -5800,6 +5912,7 @@ int driver_output2(ErlDrvPort ix, char* hbuf, ErlDrvSizeT hlen,
{
erts_aint32_t state;
Port* prt = erts_drvport2port_state(ix, &state);
+ ErtsSchedulerData *esdp = erts_get_scheduler_data();
ERTS_SMP_CHK_NO_PROC_LOCKS;
@@ -5811,7 +5924,10 @@ int driver_output2(ErlDrvPort ix, char* hbuf, ErlDrvSizeT hlen,
return 0;
prt->bytes_in += (hlen + len);
- erts_smp_atomic_add_nob(&erts_bytes_in, (erts_aint_t) (hlen + len));
+ if (esdp)
+ esdp->io.in += (Uint64) (hlen + len);
+ else
+ erts_atomic64_add_nob(&bytes_in, (erts_aint64_t) (hlen + len));
if (state & ERTS_PORT_SFLG_DISTRIBUTION) {
if (len == 0)
return erts_net_message(prt,
@@ -5851,6 +5967,7 @@ int driver_outputv(ErlDrvPort ix, char* hbuf, ErlDrvSizeT hlen,
ErlDrvBinary** binv;
Port* prt;
erts_aint32_t state;
+ ErtsSchedulerData *esdp = erts_get_scheduler_data();
ERTS_SMP_CHK_NO_PROC_LOCKS;
@@ -5889,7 +6006,10 @@ int driver_outputv(ErlDrvPort ix, char* hbuf, ErlDrvSizeT hlen,
/* XXX handle distribution !!! */
prt->bytes_in += (hlen + size);
- erts_smp_atomic_add_nob(&erts_bytes_in, (erts_aint_t) (hlen + size));
+ if (esdp)
+ esdp->io.in += (Uint64) (hlen + size);
+ else
+ erts_atomic64_add_nob(&bytes_in, (erts_aint64_t) (hlen + size));
deliver_vec_message(prt, ERTS_PORT_GET_CONNECTED(prt), hbuf, hlen,
binv, iov, n, size);
return 0;
diff --git a/erts/emulator/beam/sys.h b/erts/emulator/beam/sys.h
index cd53069872..78eb0bfe2b 100644
--- a/erts/emulator/beam/sys.h
+++ b/erts/emulator/beam/sys.h
@@ -20,6 +20,22 @@
#ifndef __SYS_H__
#define __SYS_H__
+#ifdef ERTS_INLINE
+# ifndef ERTS_CAN_INLINE
+# define ERTS_CAN_INLINE 1
+# endif
+#else
+# if defined(__GNUC__)
+# define ERTS_CAN_INLINE 1
+# define ERTS_INLINE __inline__
+# elif defined(__WIN32__)
+# define ERTS_CAN_INLINE 1
+# define ERTS_INLINE __inline
+# else
+# define ERTS_CAN_INLINE 0
+# define ERTS_INLINE
+# endif
+#endif
#if defined(DEBUG) || defined(ERTS_ENABLE_LOCK_CHECK)
# undef ERTS_CAN_INLINE
@@ -94,23 +110,6 @@ typedef int ErtsSysFdType;
typedef ERTS_SYS_FD_TYPE ErtsSysFdType;
#endif
-#ifdef ERTS_INLINE
-# ifndef ERTS_CAN_INLINE
-# define ERTS_CAN_INLINE 1
-# endif
-#else
-# if defined(__GNUC__)
-# define ERTS_CAN_INLINE 1
-# define ERTS_INLINE __inline__
-# elif defined(__WIN32__)
-# define ERTS_CAN_INLINE 1
-# define ERTS_INLINE __inline
-# else
-# define ERTS_CAN_INLINE 0
-# define ERTS_INLINE
-# endif
-#endif
-
#if !defined(__GNUC__)
# define ERTS_AT_LEAST_GCC_VSN__(MAJ, MIN, PL) 0
#elif !defined(__GNUC_MINOR__)
diff --git a/erts/emulator/drivers/common/inet_drv.c b/erts/emulator/drivers/common/inet_drv.c
index 10ef20fc82..119d6c097d 100644
--- a/erts/emulator/drivers/common/inet_drv.c
+++ b/erts/emulator/drivers/common/inet_drv.c
@@ -835,6 +835,10 @@ static int my_strncasecmp(const char *s1, const char *s2, size_t n)
#define TCP_ADDF_PENDING_SHUT_RDWR 32 /* Call shutdown(sock, SHUT_RDWR) when queue empties */
#define TCP_ADDF_PENDING_SHUTDOWN \
(TCP_ADDF_PENDING_SHUT_WR | TCP_ADDF_PENDING_SHUT_RDWR)
+#define TCP_ADDF_SHOW_ECONNRESET 64 /* Tell user about incoming RST */
+#define TCP_ADDF_DELAYED_ECONNRESET 128 /* An ECONNRESET error occured on send or shutdown */
+#define TCP_ADDF_SHUTDOWN_WR_DONE 256 /* A shutdown(sock, SHUT_WR) or SHUT_RDWR was made */
+#define TCP_ADDF_LINGER_ZERO 512 /* Discard driver queue on port close */
/* *_REQ_* replies */
#define INET_REP_ERROR 0
@@ -879,6 +883,7 @@ static int my_strncasecmp(const char *s1, const char *s2, size_t n)
#define INET_LOPT_MSGQ_HIWTRMRK 36 /* set local msgq high watermark */
#define INET_LOPT_MSGQ_LOWTRMRK 37 /* set local msgq low watermark */
#define INET_LOPT_NETNS 38 /* Network namespace pathname */
+#define INET_LOPT_TCP_SHOW_ECONNRESET 39 /* tell user about incoming RST */
/* SCTP options: a separate range, from 100: */
#define SCTP_OPT_RTOINFO 100
#define SCTP_OPT_ASSOCINFO 101
@@ -6138,7 +6143,12 @@ static int inet_set_opts(inet_descriptor* desc, char* ptr, int len)
desc->active_count = 0;
if ((desc->stype == SOCK_STREAM) && (desc->active != INET_PASSIVE) &&
(desc->state == INET_STATE_CLOSED)) {
- tcp_closed_message((tcp_descriptor *) desc);
+ tcp_descriptor *tdesc = (tcp_descriptor *) desc;
+ if (tdesc->tcp_add_flags & TCP_ADDF_DELAYED_ECONNRESET) {
+ tdesc->tcp_add_flags &= ~TCP_ADDF_DELAYED_ECONNRESET;
+ tcp_error_message(tdesc, ECONNRESET);
+ }
+ tcp_closed_message(tdesc);
if (desc->exitf) {
driver_exit(desc->port, 0);
return 0; /* Give up on this socket, descriptor lost */
@@ -6255,6 +6265,16 @@ static int inet_set_opts(inet_descriptor* desc, char* ptr, int len)
continue;
#endif
+ case INET_LOPT_TCP_SHOW_ECONNRESET:
+ if (desc->sprotocol == IPPROTO_TCP) {
+ tcp_descriptor* tdesc = (tcp_descriptor*) desc;
+ if (ival)
+ tdesc->tcp_add_flags |= TCP_ADDF_SHOW_ECONNRESET;
+ else
+ tdesc->tcp_add_flags &= ~TCP_ADDF_SHOW_ECONNRESET;
+ }
+ continue;
+
case INET_OPT_REUSEADDR:
#ifdef __WIN32__
continue; /* Bjorn says */
@@ -6299,6 +6319,13 @@ static int inet_set_opts(inet_descriptor* desc, char* ptr, int len)
arg_sz = sizeof(li_val);
DEBUGF(("inet_set_opts(%ld): s=%d, SO_LINGER=%d,%d",
(long)desc->port, desc->s, li_val.l_onoff,li_val.l_linger));
+ if (desc->sprotocol == IPPROTO_TCP) {
+ tcp_descriptor* tdesc = (tcp_descriptor*) desc;
+ if (li_val.l_onoff && li_val.l_linger == 0)
+ tdesc->tcp_add_flags |= TCP_ADDF_LINGER_ZERO;
+ else
+ tdesc->tcp_add_flags &= ~TCP_ADDF_LINGER_ZERO;
+ }
break;
case INET_OPT_PRIORITY:
@@ -7234,6 +7261,17 @@ static ErlDrvSSizeT inet_fill_opts(inet_descriptor* desc,
continue;
#endif
+ case INET_LOPT_TCP_SHOW_ECONNRESET:
+ if (desc->sprotocol == IPPROTO_TCP) {
+ tcp_descriptor* tdesc = (tcp_descriptor*) desc;
+ *ptr++ = opt;
+ ival = !!(tdesc->tcp_add_flags & TCP_ADDF_SHOW_ECONNRESET);
+ put_int32(ival, ptr);
+ } else {
+ TRUNCATE_TO(0,ptr);
+ }
+ continue;
+
case INET_OPT_PRIORITY:
#ifdef SO_PRIORITY
type = SO_PRIORITY;
@@ -9077,6 +9115,11 @@ static tcp_descriptor* tcp_inet_copy(tcp_descriptor* desc,SOCKET s,
copy_desc->low = desc->low;
copy_desc->send_timeout = desc->send_timeout;
copy_desc->send_timeout_close = desc->send_timeout_close;
+
+ if (desc->tcp_add_flags & TCP_ADDF_SHOW_ECONNRESET)
+ copy_desc->tcp_add_flags |= TCP_ADDF_SHOW_ECONNRESET;
+ else
+ copy_desc->tcp_add_flags &= ~TCP_ADDF_SHOW_ECONNRESET;
/* The new port will be linked and connected to the original caller */
port = driver_create_port(port, owner, "tcp_inet", (ErlDrvData) copy_desc);
@@ -9438,7 +9481,11 @@ static ErlDrvSSizeT tcp_inet_ctl(ErlDrvData e, unsigned int cmd,
if (desc->tcp_add_flags & TCP_ADDF_DELAYED_CLOSE_RECV) {
desc->tcp_add_flags &= ~(TCP_ADDF_DELAYED_CLOSE_RECV|
TCP_ADDF_DELAYED_CLOSE_SEND);
- return ctl_reply(INET_REP_ERROR, "closed", 6, rbuf, rsize);
+ if (desc->tcp_add_flags & TCP_ADDF_DELAYED_ECONNRESET) {
+ desc->tcp_add_flags &= ~TCP_ADDF_DELAYED_ECONNRESET;
+ return ctl_reply(INET_REP_ERROR, "econnreset", 10, rbuf, rsize);
+ } else
+ return ctl_reply(INET_REP_ERROR, "closed", 6, rbuf, rsize);
}
return ctl_error(ENOTCONN, rbuf, rsize);
}
@@ -9499,6 +9546,8 @@ static ErlDrvSSizeT tcp_inet_ctl(ErlDrvData e, unsigned int cmd,
return ctl_reply(INET_REP_OK, NULL, 0, rbuf, rsize);
}
if (IS_SOCKET_ERROR(sock_shutdown(INETP(desc)->s, how))) {
+ if (how != TCP_SHUT_RD)
+ desc->tcp_add_flags |= TCP_ADDF_SHUTDOWN_WR_DONE;
return ctl_error(sock_errno(), rbuf, rsize);
} else {
return ctl_reply(INET_REP_OK, NULL, 0, rbuf, rsize);
@@ -9633,7 +9682,13 @@ static void tcp_inet_commandv(ErlDrvData e, ErlIOVec* ev)
if (!IS_CONNECTED(INETP(desc))) {
if (desc->tcp_add_flags & TCP_ADDF_DELAYED_CLOSE_SEND) {
desc->tcp_add_flags &= ~TCP_ADDF_DELAYED_CLOSE_SEND;
- inet_reply_error_am(INETP(desc), am_closed);
+ if (desc->tcp_add_flags & TCP_ADDF_DELAYED_ECONNRESET) {
+ /* Don't clear flag. Leave it enabled for the next receive
+ * operation.
+ */
+ inet_reply_error(INETP(desc), ECONNRESET);
+ } else
+ inet_reply_error_am(INETP(desc), am_closed);
}
else
inet_reply_error(INETP(desc), ENOTCONN);
@@ -9652,6 +9707,8 @@ static void tcp_inet_flush(ErlDrvData e)
/* Discard send queue to avoid hanging port (OTP-7615) */
tcp_clear_output(desc);
}
+ if (desc->tcp_add_flags & TCP_ADDF_LINGER_ZERO)
+ tcp_clear_output(desc);
}
static void tcp_inet_process_exit(ErlDrvData e, ErlDrvMonitor *monitorp)
@@ -10014,7 +10071,10 @@ static int tcp_recv(tcp_descriptor* desc, int request_len)
int err = sock_errno();
if (err == ECONNRESET) {
DEBUGF((" => detected close (connreset)\r\n"));
- return tcp_recv_closed(desc);
+ if (desc->tcp_add_flags & TCP_ADDF_SHOW_ECONNRESET)
+ return tcp_recv_error(desc, err);
+ else
+ return tcp_recv_closed(desc);
}
if (err == ERRNO_BLOCK) {
DEBUGF((" => would block\r\n"));
@@ -10226,7 +10286,19 @@ static void tcp_inet_event(ErlDrvData e, ErlDrvEvent event)
if (netEv.lNetworkEvents & FD_CLOSE) {
/* error in err = netEv.iErrorCode[FD_CLOSE_BIT] */
DEBUGF(("Detected close in %s, line %d\r\n", __FILE__, __LINE__));
- tcp_recv_closed(desc);
+ if (desc->tcp_add_flags & TCP_ADDF_SHOW_ECONNRESET) {
+ err = netEv.iErrorCode[FD_CLOSE_BIT];
+ if (err == ECONNRESET)
+ tcp_recv_error(desc, err);
+ else if (err == ECONNABORTED && IS_CONNECTED(INETP(desc))) {
+ /* translate this error to ECONNRESET */
+ tcp_recv_error(desc, ECONNRESET);
+ }
+ else
+ tcp_recv_closed(desc);
+ }
+ else
+ tcp_recv_closed(desc);
}
DEBUGF(("tcp_inet_event(%ld) }\r\n", (long)desc->inet.port));
return;
@@ -10535,6 +10607,9 @@ static int tcp_inet_input(tcp_descriptor* desc, HANDLE event)
static int tcp_send_or_shutdown_error(tcp_descriptor* desc, int err)
{
+ int show_econnreset = (err == ECONNRESET
+ && desc->tcp_add_flags & TCP_ADDF_SHOW_ECONNRESET);
+
/*
* If the port is busy, we must do some clean-up before proceeding.
*/
@@ -10550,14 +10625,21 @@ static int tcp_send_or_shutdown_error(tcp_descriptor* desc, int err)
/*
* We used to handle "expected errors" differently from unexpected ones.
- * Now we handle all errors in the same way. We just have to distinguish
- * between passive and active sockets.
+ * Now we handle all errors in the same way (unless the show_econnreset
+ * socket option is enabled). We just have to distinguish between passive
+ * and active sockets.
*/
DEBUGF(("driver_failure_eof(%ld) in %s, line %d\r\n",
(long)desc->inet.port, __FILE__, __LINE__));
if (desc->inet.active) {
- tcp_closed_message(desc);
- inet_reply_error_am(INETP(desc), am_closed);
+ if (show_econnreset) {
+ tcp_error_message(desc, err);
+ tcp_closed_message(desc);
+ inet_reply_error(INETP(desc), err);
+ } else {
+ tcp_closed_message(desc);
+ inet_reply_error_am(INETP(desc), am_closed);
+ }
if (desc->inet.exitf)
driver_exit(desc->inet.port, 0);
else
@@ -10569,7 +10651,10 @@ static int tcp_send_or_shutdown_error(tcp_descriptor* desc, int err)
erl_inet_close(INETP(desc));
if (desc->inet.caller) {
- inet_reply_error_am(INETP(desc), am_closed);
+ if (show_econnreset)
+ inet_reply_error(INETP(desc), err);
+ else
+ inet_reply_error_am(INETP(desc), am_closed);
}
else {
/* No blocking send op to reply to right now.
@@ -10586,12 +10671,46 @@ static int tcp_send_or_shutdown_error(tcp_descriptor* desc, int err)
* in the receive operation.
*/
desc->tcp_add_flags |= TCP_ADDF_DELAYED_CLOSE_RECV;
+
+ if (show_econnreset) {
+ /* Return {error, econnreset} instead of {error, closed}
+ * on send or receive operations.
+ */
+ desc->tcp_add_flags |= TCP_ADDF_DELAYED_ECONNRESET;
+ }
}
return -1;
}
static int tcp_send_error(tcp_descriptor* desc, int err)
{
+ /* EPIPE errors usually occur in one of three ways:
+ * 1. We write to a socket when we've already shutdown() the write side. On
+ * Windows the error returned for this is ESHUTDOWN rather than EPIPE.
+ * 2. The TCP peer sends us an RST through no fault of our own (perhaps
+ * by aborting the connection using SO_LINGER) and we then attempt
+ * to write to the socket. On Linux and Windows we would actually
+ * receive an ECONNRESET error for this, but on the BSDs, Darwin,
+ * Illumos and presumably Solaris, it's an EPIPE.
+ * 3. We cause the TCP peer to send us an RST by writing to a socket
+ * after we receive a FIN from them. Our first write will be
+ * successful, but if the they have closed the connection (rather
+ * than just shutting down the write side of it) this will cause their
+ * OS to send us an RST. Then, when we attempt to write to the socket
+ * a second time, we will get an EPIPE error. On Windows we get an
+ * ECONNABORTED.
+ *
+ * What we are going to do here is to treat all EPIPE messages that aren't
+ * of type 1 as ECONNRESET errors. This will allow users who have the
+ * show_econnreset socket option enabled to receive {error, econnreset} on
+ * both send and recv operations to indicate that an RST has been received.
+ */
+#ifdef __WIN_32__
+ if (err == ECONNABORTED)
+ err = ECONNRESET;
+#endif
+ if (err == EPIPE && !(desc->tcp_add_flags & TCP_ADDF_SHUTDOWN_WR_DONE))
+ err = ECONNRESET;
return tcp_send_or_shutdown_error(desc, err);
}
@@ -10811,6 +10930,8 @@ static void tcp_shutdown_async(tcp_descriptor* desc)
TCP_SHUT_WR : TCP_SHUT_RDWR;
if (IS_SOCKET_ERROR(sock_shutdown(INETP(desc)->s, how)))
tcp_shutdown_error(desc, sock_errno());
+ else
+ desc->tcp_add_flags |= TCP_ADDF_SHUTDOWN_WR_DONE;
}
#ifdef __OSE__
diff --git a/erts/preloaded/ebin/erts_internal.beam b/erts/preloaded/ebin/erts_internal.beam
index 0e0811af3f..dc8c711e1a 100644
--- a/erts/preloaded/ebin/erts_internal.beam
+++ b/erts/preloaded/ebin/erts_internal.beam
Binary files differ
diff --git a/erts/preloaded/ebin/prim_inet.beam b/erts/preloaded/ebin/prim_inet.beam
index 6729f06b79..5a188be3ba 100644
--- a/erts/preloaded/ebin/prim_inet.beam
+++ b/erts/preloaded/ebin/prim_inet.beam
Binary files differ
diff --git a/erts/preloaded/src/erts_internal.erl b/erts/preloaded/src/erts_internal.erl
index 65a1f1ed3a..cf8edefd7d 100644
--- a/erts/preloaded/src/erts_internal.erl
+++ b/erts/preloaded/src/erts_internal.erl
@@ -40,7 +40,7 @@
-export([flush_monitor_messages/3]).
--export([await_result/1]).
+-export([await_result/1, gather_io_bytes/2]).
-export([time_unit/0]).
@@ -67,6 +67,23 @@ await_result(Ref) when is_reference(Ref) ->
end.
%%
+%% statistics(io) end up in gather_io_bytes/2
+%%
+
+gather_io_bytes(Ref, No) when is_reference(Ref),
+ is_integer(No),
+ No > 0 ->
+ gather_io_bytes(Ref, No, 0, 0).
+
+gather_io_bytes(_Ref, 0, InAcc, OutAcc) ->
+ {{input, InAcc}, {output, OutAcc}};
+gather_io_bytes(Ref, No, InAcc, OutAcc) ->
+ receive
+ {Ref, _SchedId, In, Out} ->
+ gather_io_bytes(Ref, No-1, InAcc + In, OutAcc + Out)
+ end.
+
+%%
%% Statically linked port NIFs
%%
diff --git a/erts/preloaded/src/prim_inet.erl b/erts/preloaded/src/prim_inet.erl
index 622e1be869..5e0b38aa68 100644
--- a/erts/preloaded/src/prim_inet.erl
+++ b/erts/preloaded/src/prim_inet.erl
@@ -146,11 +146,16 @@ shutdown_1(S, How) ->
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
close(S) when is_port(S) ->
- case subscribe(S, [subs_empty_out_q]) of
- {ok, [{subs_empty_out_q,N}]} when N > 0 ->
- close_pend_loop(S, N); %% wait for pending output to be sent
+ case getopt(S, linger) of
+ {ok,{true,0}} ->
+ close_port(S);
_ ->
- close_port(S)
+ case subscribe(S, [subs_empty_out_q]) of
+ {ok, [{subs_empty_out_q,N}]} when N > 0 ->
+ close_pend_loop(S, N); %% wait for pending output to be sent
+ _ ->
+ close_port(S)
+ end
end.
close_pend_loop(S, N) ->
@@ -1140,6 +1145,7 @@ enc_opt(delay_send) -> ?INET_LOPT_TCP_DELAY_SEND;
enc_opt(packet_size) -> ?INET_LOPT_PACKET_SIZE;
enc_opt(read_packets) -> ?INET_LOPT_READ_PACKETS;
enc_opt(netns) -> ?INET_LOPT_NETNS;
+enc_opt(show_econnreset) -> ?INET_LOPT_TCP_SHOW_ECONNRESET;
enc_opt(raw) -> ?INET_OPT_RAW;
% Names of SCTP opts:
enc_opt(sctp_rtoinfo) -> ?SCTP_OPT_RTOINFO;
@@ -1197,6 +1203,7 @@ dec_opt(?INET_LOPT_TCP_DELAY_SEND) -> delay_send;
dec_opt(?INET_LOPT_PACKET_SIZE) -> packet_size;
dec_opt(?INET_LOPT_READ_PACKETS) -> read_packets;
dec_opt(?INET_LOPT_NETNS) -> netns;
+dec_opt(?INET_LOPT_TCP_SHOW_ECONNRESET) -> show_econnreset;
dec_opt(?INET_OPT_RAW) -> raw;
dec_opt(I) when is_integer(I) -> undefined.
@@ -1296,6 +1303,7 @@ type_opt_1(delay_send) -> bool;
type_opt_1(packet_size) -> uint;
type_opt_1(read_packets) -> uint;
type_opt_1(netns) -> binary;
+type_opt_1(show_econnreset) -> bool;
%%
%% SCTP options (to be set). If the type is a record type, the corresponding
%% record signature is returned, otherwise, an "elementary" type tag