From 6b1921d767de5cd1a980234f83b36dbfa13d9fc7 Mon Sep 17 00:00:00 2001 From: Rickard Green Date: Fri, 13 Feb 2015 00:25:57 +0100 Subject: Erlang based BIF timer implementation for scalability --- erts/emulator/beam/atom.names | 1 + erts/emulator/beam/bif.c | 35 ++++- erts/emulator/beam/bif.tab | 5 - erts/emulator/beam/erl_alloc.types | 2 + erts/emulator/beam/erl_bif_info.c | 21 ++- erts/emulator/beam/erl_bif_timer.c | 157 ++++++++++++++++++++- erts/emulator/beam/erl_bif_timer.h | 1 + erts/emulator/beam/erl_db_hash.c | 51 ++++++- erts/emulator/beam/erl_gc.c | 71 ++++++---- erts/emulator/beam/erl_init.c | 14 +- erts/emulator/beam/erl_message.c | 2 +- erts/emulator/beam/erl_message.h | 14 +- erts/emulator/beam/erl_process.c | 36 ++++- erts/emulator/beam/erl_process.h | 22 ++- erts/emulator/beam/erl_time_sup.c | 1 + erts/emulator/beam/global.h | 8 +- erts/emulator/test/timer_bif_SUITE.erl | 18 ++- erts/preloaded/ebin/erlang.beam | Bin 100456 -> 105832 bytes erts/preloaded/ebin/erts_internal.beam | Bin 4688 -> 12368 bytes erts/preloaded/ebin/init.beam | Bin 48800 -> 49744 bytes erts/preloaded/src/erlang.erl | 191 +++++++++++++++++++++++-- erts/preloaded/src/erts_internal.erl | 251 +++++++++++++++++++++++++++++++++ erts/preloaded/src/init.erl | 32 ++++- 23 files changed, 848 insertions(+), 85 deletions(-) diff --git a/erts/emulator/beam/atom.names b/erts/emulator/beam/atom.names index ced35be265..e2760c18b4 100644 --- a/erts/emulator/beam/atom.names +++ b/erts/emulator/beam/atom.names @@ -109,6 +109,7 @@ atom bag atom band atom big atom bif_return_trap +atom bif_timer_server atom binary atom binary_bin_to_list_trap atom binary_copy_trap diff --git a/erts/emulator/beam/bif.c b/erts/emulator/beam/bif.c index ec5122292e..a48f65d3a2 100644 --- a/erts/emulator/beam/bif.c +++ b/erts/emulator/beam/bif.c @@ -615,14 +615,12 @@ erts_queue_monitor_message(Process *p, } static BIF_RETTYPE -local_pid_monitor(Process *p, Eterm target) +local_pid_monitor(Process *p, Eterm target, Eterm mon_ref, int bool) { BIF_RETTYPE ret; - Eterm mon_ref; Process *rp; ErtsProcLocks p_locks = ERTS_PROC_LOCK_MAIN|ERTS_PROC_LOCK_LINK; - mon_ref = erts_make_ref(p); ERTS_BIF_PREP_RET(ret, mon_ref); if (target == p->common.id) { return ret; @@ -635,12 +633,18 @@ local_pid_monitor(Process *p, Eterm target) if (!rp) { erts_smp_proc_unlock(p, ERTS_PROC_LOCK_LINK); p_locks &= ~ERTS_PROC_LOCK_LINK; - erts_queue_monitor_message(p, &p_locks, - mon_ref, am_process, target, am_noproc); + if (bool) + ret = am_false; + else + erts_queue_monitor_message(p, &p_locks, + mon_ref, am_process, target, am_noproc); } else { ASSERT(rp != p); + if (bool) + ret = am_true; + erts_add_monitor(&ERTS_P_MONITORS(p), MON_ORIGIN, mon_ref, target, NIL); erts_add_monitor(&ERTS_P_MONITORS(rp), MON_TARGET, mon_ref, p->common.id, NIL); @@ -785,7 +789,7 @@ BIF_RETTYPE monitor_2(BIF_ALIST_2) if (is_internal_pid(target)) { local_pid: - ret = local_pid_monitor(BIF_P, target); + ret = local_pid_monitor(BIF_P, target, erts_make_ref(BIF_P), 0); } else if (is_external_pid(target)) { dep = external_pid_dist_entry(target); if (dep == erts_this_dist_entry) @@ -828,6 +832,25 @@ BIF_RETTYPE monitor_2(BIF_ALIST_2) return ret; } +BIF_RETTYPE erts_internal_monitor_process_2(BIF_ALIST_2) +{ + if (is_not_internal_pid(BIF_ARG_1)) { + if (is_external_pid(BIF_ARG_1) + && (external_pid_dist_entry(BIF_ARG_1) + == erts_this_dist_entry)) { + BIF_RET(am_false); + } + goto badarg; + } + + if (is_not_internal_ref(BIF_ARG_2)) + goto badarg; + + BIF_RET(local_pid_monitor(BIF_P, BIF_ARG_1, BIF_ARG_2, 1)); + +badarg: + BIF_ERROR(BIF_P, BADARG); +} /**********************************************************************/ /* this is a combination of the spawn and link BIFs */ diff --git a/erts/emulator/beam/bif.tab b/erts/emulator/beam/bif.tab index db8feb681b..8ac1fba733 100644 --- a/erts/emulator/beam/bif.tab +++ b/erts/emulator/beam/bif.tab @@ -217,11 +217,6 @@ bif math:sqrt/1 bif math:atan2/2 bif math:pow/2 -bif erlang:start_timer/3 -bif erlang:send_after/3 -bif erlang:cancel_timer/1 -bif erlang:read_timer/1 - bif erlang:make_tuple/2 bif erlang:append_element/2 bif erlang:make_tuple/3 diff --git a/erts/emulator/beam/erl_alloc.types b/erts/emulator/beam/erl_alloc.types index 4cd4ad100c..43279715b8 100644 --- a/erts/emulator/beam/erl_alloc.types +++ b/erts/emulator/beam/erl_alloc.types @@ -365,6 +365,7 @@ type AINFO_REQ STANDARD_LOW SYSTEM alloc_info_request type SCHED_WTIME_REQ STANDARD_LOW SYSTEM sched_wall_time_request type GC_INFO_REQ STANDARD_LOW SYSTEM gc_info_request type PORT_DATA_HEAP STANDARD_LOW SYSTEM port_data_heap +type BIF_TIMER_DATA LONG_LIVED_LOW SYSTEM bif_timer_data +else # "fullword" @@ -385,6 +386,7 @@ type AINFO_REQ SHORT_LIVED SYSTEM alloc_info_request type SCHED_WTIME_REQ SHORT_LIVED SYSTEM sched_wall_time_request type GC_INFO_REQ SHORT_LIVED SYSTEM gc_info_request type PORT_DATA_HEAP STANDARD SYSTEM port_data_heap +type BIF_TIMER_DATA LONG_LIVED SYSTEM bif_timer_data +endif diff --git a/erts/emulator/beam/erl_bif_info.c b/erts/emulator/beam/erl_bif_info.c index 80d49c7ce2..70062b2305 100644 --- a/erts/emulator/beam/erl_bif_info.c +++ b/erts/emulator/beam/erl_bif_info.c @@ -538,6 +538,7 @@ pi_locks(Eterm info) switch (info) { case am_status: case am_priority: + case am_trap_exit: return ERTS_PROC_LOCK_STATUS; case am_links: case am_monitors: @@ -590,7 +591,7 @@ static Eterm pi_args[] = { am_min_bin_vheap_size, am_current_location, am_current_stacktrace, -}; +}; #define ERTS_PI_ARGS ((int) (sizeof(pi_args)/sizeof(Eterm))) @@ -3701,6 +3702,24 @@ BIF_RETTYPE erts_debug_get_internal_state_1(BIF_ALIST_1) BIF_ERROR(BIF_P, BADARG); } +BIF_RETTYPE erts_internal_is_system_process_1(BIF_ALIST_1) +{ + if (is_internal_pid(BIF_ARG_1)) { + Process *rp = erts_proc_lookup(BIF_ARG_1); + if (rp && (rp->static_flags & ERTS_STC_FLG_SYSTEM_PROC)) + BIF_RET(am_true); + BIF_RET(am_false); + } + + if (is_external_pid(BIF_ARG_1) + && external_pid_dist_entry(BIF_ARG_1) == erts_this_dist_entry) { + BIF_RET(am_false); + } + + BIF_ERROR(BIF_P, BADARG); +} + + static erts_smp_atomic_t hipe_test_reschedule_flag; diff --git a/erts/emulator/beam/erl_bif_timer.c b/erts/emulator/beam/erl_bif_timer.c index c9b02b48f5..8b444f2b01 100644 --- a/erts/emulator/beam/erl_bif_timer.c +++ b/erts/emulator/beam/erl_bif_timer.c @@ -490,8 +490,9 @@ setup_bif_timer(Uint32 xflags, return ref; } +BIF_RETTYPE old_send_after_3(BIF_ALIST_3); /* send_after(Time, Pid, Message) -> Ref */ -BIF_RETTYPE send_after_3(BIF_ALIST_3) +BIF_RETTYPE old_send_after_3(BIF_ALIST_3) { Eterm res; @@ -511,8 +512,9 @@ BIF_RETTYPE send_after_3(BIF_ALIST_3) } } +BIF_RETTYPE old_start_timer_3(BIF_ALIST_3); /* start_timer(Time, Pid, Message) -> Ref */ -BIF_RETTYPE start_timer_3(BIF_ALIST_3) +BIF_RETTYPE old_start_timer_3(BIF_ALIST_3) { Eterm res; @@ -532,8 +534,9 @@ BIF_RETTYPE start_timer_3(BIF_ALIST_3) } } +BIF_RETTYPE old_cancel_timer_1(BIF_ALIST_1); /* cancel_timer(Ref) -> false | RemainingTime */ -BIF_RETTYPE cancel_timer_1(BIF_ALIST_1) +BIF_RETTYPE old_cancel_timer_1(BIF_ALIST_1) { Eterm res; ErtsBifTimer *btm; @@ -570,8 +573,9 @@ BIF_RETTYPE cancel_timer_1(BIF_ALIST_1) BIF_RET(res); } +BIF_RETTYPE old_read_timer_1(BIF_ALIST_1); /* read_timer(Ref) -> false | RemainingTime */ -BIF_RETTYPE read_timer_1(BIF_ALIST_1) +BIF_RETTYPE old_read_timer_1(BIF_ALIST_1) { Eterm res; ErtsBifTimer *btm; @@ -653,7 +657,7 @@ erts_cancel_bif_timers(Process *p, ErtsProcLocks plocks) erts_smp_btm_rwunlock(); } -void erts_bif_timer_init(void) +static void erts_old_bif_timer_init(void) { int i; no_bif_timers = 0; @@ -704,3 +708,146 @@ erts_bif_timer_foreach(void (*func)(Eterm, Eterm, ErlHeapFragment *, void *), } } } + +typedef struct { + Uint ref_heap[REF_THING_SIZE]; + Eterm pid[1]; +} ErtsBifTimerServers; + +static ErtsBifTimerServers *bif_timer_servers; + +void erts_bif_timer_init(void) +{ + erts_old_bif_timer_init(); +} + +void +erts_bif_timer_start_servers(Eterm parent) +{ + Process *parent_proc; + Eterm *hp, btr_ref, arg_list_end; + ErlSpawnOpts so; + int i; + + bif_timer_servers = erts_alloc(ERTS_ALC_T_BIF_TIMER_DATA, + (sizeof(ErtsBifTimerServers) + + (sizeof(Eterm)*(erts_no_schedulers-1)))); + + so.flags = SPO_USE_ARGS|SPO_SYSTEM_PROC|SPO_PREFER_SCHED|SPO_OFF_HEAP_MSGS; + so.min_heap_size = H_MIN_SIZE; + so.min_vheap_size = BIN_VH_MIN_SIZE; + so.priority = PRIORITY_MAX; + so.max_gen_gcs = (Uint16) erts_smp_atomic32_read_nob(&erts_max_gen_gcs); + + /* + * Parent is "init" and schedulers have not yet been started, so it + * *should* be alive and well... + */ + ASSERT(is_internal_pid(parent)); + parent_proc = (Process *) erts_ptab_pix2intptr_ddrb(&erts_proc, + internal_pid_index(parent)); + ASSERT(parent_proc); + ASSERT(parent_proc->common.id == parent); + ASSERT(!ERTS_PROC_IS_EXITING(parent_proc)); + + erts_smp_proc_lock(parent_proc, ERTS_PROC_LOCK_MAIN); + + hp = HAlloc(parent_proc, 2*erts_no_schedulers + 2 + REF_THING_SIZE); + + btr_ref = erts_make_ref_in_buffer(hp); + hp += REF_THING_SIZE; + + arg_list_end = CONS(hp, btr_ref, NIL); + hp += 2; + + for (i = 0; i < erts_no_schedulers; i++) { + int sched = i+1; + Eterm arg_list = CONS(hp, make_small(i+1), arg_list_end); + hp += 2; + + so.scheduler = sched; /* Preferred scheduler */ + + bif_timer_servers->pid[i] = erl_create_process(parent_proc, + am_erts_internal, + am_bif_timer_server, + arg_list, + &so); + } + + erts_smp_proc_unlock(parent_proc, ERTS_PROC_LOCK_MAIN); + + hp = internal_ref_val(btr_ref); + for (i = 0; i < REF_THING_SIZE; i++) + bif_timer_servers->ref_heap[i] = hp[i]; +} + +BIF_RETTYPE +erts_internal_get_bif_timer_servers_0(BIF_ALIST_0) +{ + int i; + Eterm *hp, res = NIL; + + hp = HAlloc(BIF_P, erts_no_schedulers*2); + for (i = erts_no_schedulers-1; i >= 0; i--) { + res = CONS(hp, bif_timer_servers->pid[i], res); + hp += 2; + } + BIF_RET(res); +} + +BIF_RETTYPE +erts_internal_access_bif_timer_1(BIF_ALIST_1) +{ + int ix; + Uint32 *rdp; + Eterm ref, pid, *hp, res; + + if (is_not_internal_ref(BIF_ARG_1)) { + if (is_not_ref(BIF_ARG_1)) + BIF_ERROR(BIF_P, BADARG); + BIF_RET(am_undefined); + } + + rdp = internal_ref_numbers(BIF_ARG_1); + ix = (int) erts_get_ref_numbers_thr_id(rdp); + if (ix < 1 || erts_no_schedulers < ix) + BIF_RET(am_undefined); + + pid = bif_timer_servers->pid[ix-1]; + ASSERT(is_internal_pid(pid)); + + hp = HAlloc(BIF_P, 3 /* 2-tuple */ + REF_THING_SIZE); + for (ix = 0; ix < REF_THING_SIZE; ix++) + hp[ix] = bif_timer_servers->ref_heap[ix]; + ref = make_internal_ref(&hp[0]); + hp += REF_THING_SIZE; + + res = TUPLE2(hp, ref, pid); + BIF_RET(res); +} + +BIF_RETTYPE +erts_internal_create_bif_timer_0(BIF_ALIST_0) +{ + ErtsSchedulerData *esdp = ERTS_PROC_GET_SCHDATA(BIF_P); + Eterm *hp, btr_ref, t_ref, pid, res; + int ix; + + hp = HAlloc(BIF_P, 4 /* 3-tuple */ + 2*REF_THING_SIZE); + for (ix = 0; ix < REF_THING_SIZE; ix++) + hp[ix] = bif_timer_servers->ref_heap[ix]; + btr_ref = make_internal_ref(&hp[0]); + hp += REF_THING_SIZE; + + t_ref = erts_sched_make_ref_in_buffer(esdp, hp); + hp += REF_THING_SIZE; + + ASSERT(erts_get_ref_numbers_thr_id(internal_ref_numbers(t_ref)) + == (Uint32) esdp->no); + + pid = bif_timer_servers->pid[((int) esdp->no) - 1]; + + res = TUPLE3(hp, btr_ref, pid, t_ref); + + BIF_RET(res); +} diff --git a/erts/emulator/beam/erl_bif_timer.h b/erts/emulator/beam/erl_bif_timer.h index 1197c176f5..c2f5dfd3c3 100644 --- a/erts/emulator/beam/erl_bif_timer.h +++ b/erts/emulator/beam/erl_bif_timer.h @@ -33,4 +33,5 @@ void erts_cancel_bif_timers(Process *p, ErtsProcLocks plocks); void erts_bif_timer_init(void); void erts_bif_timer_foreach(void (*func)(Eterm,Eterm,ErlHeapFragment *,void *), void *arg); +void erts_bif_timer_start_servers(Eterm); #endif diff --git a/erts/emulator/beam/erl_db_hash.c b/erts/emulator/beam/erl_db_hash.c index 06dac8f161..063808eb79 100644 --- a/erts/emulator/beam/erl_db_hash.c +++ b/erts/emulator/beam/erl_db_hash.c @@ -171,10 +171,53 @@ static ERTS_INLINE void add_fixed_deletion(DbTableHash* tb, int ix) #define MAX_HASH 0xEFFFFFFFUL #define INVALID_HASH 0xFFFFFFFFUL -/* optimised version of make_hash (normal case? atomic key) */ -#define MAKE_HASH(term) \ - ((is_atom(term) ? (atom_tab(atom_val(term))->slot.bucket.hvalue) : \ - make_hash2(term)) % MAX_HASH) +static ERTS_INLINE HashValue +MAKE_HASH(Eterm term) +{ + if (is_atom(term)) { + /* + * optimised version of make_hash, although poor hashvalue + * (normal case? atomic key) + */ + return atom_tab(atom_val(term))->slot.bucket.hvalue; + } + if (is_ref(term)) { + /* + * make_hash2() produce poor hash values + * for refs. + */ + int no; + Uint32 *ref; + HashValue hval; + if (is_internal_ref(term)) { + no = (int) internal_ref_no_of_numbers(term); + ref = internal_ref_numbers(term); + } + else { + no = (int) external_ref_no_of_numbers(term); + ref = external_ref_numbers(term); + } + switch (no) { + case 3: + ref_limit: + if (!ref[2]) + no = 2; + case 2: + if (!ref[1]) + no = 1; + case 1: + break; + default: + no = 3; + goto ref_limit; + } + hval = (HashValue) block_hash((byte *) ref, + no * sizeof(Uint32), + 0x08d12e65); + return hval % MAX_HASH; + } + return make_hash2(term) % MAX_HASH; +} #ifdef ERTS_SMP # define DB_HASH_LOCK_MASK (DB_HASH_LOCK_CNT-1) diff --git a/erts/emulator/beam/erl_gc.c b/erts/emulator/beam/erl_gc.c index fea9b16e90..9a05e5b23a 100644 --- a/erts/emulator/beam/erl_gc.c +++ b/erts/emulator/beam/erl_gc.c @@ -95,10 +95,10 @@ typedef struct { static Uint setup_rootset(Process*, Eterm*, int, Rootset*); static void cleanup_rootset(Rootset *rootset); -static Uint combined_message_size(Process* p); +static Uint combined_message_size(Process* p, int off_heap_msgs); static void remove_message_buffers(Process* p); -static int major_collection(Process* p, int need, Eterm* objv, int nobj, Uint *recl); -static int minor_collection(Process* p, int need, Eterm* objv, int nobj, Uint *recl); +static int major_collection(Process* p, int need, Eterm* objv, int nobj, Uint *recl, int off_heap_msgs); +static int minor_collection(Process* p, int need, Eterm* objv, int nobj, Uint *recl, int off_heap_msgs); static void do_minor(Process *p, Uint new_sz, Eterm* objv, int nobj); static Eterm* sweep_rootset(Rootset *rootset, Eterm* htop, char* src, Uint src_size); static Eterm* sweep_one_area(Eterm* n_hp, Eterm* n_htop, char* src, Uint src_size); @@ -401,7 +401,9 @@ erts_garbage_collect(Process* p, int need, Eterm* objv, int nobj) { Uint reclaimed_now = 0; int done = 0; + int off_heap_msgs; Uint ms1, s1, us1; + erts_aint32_t state; ErtsSchedulerData *esdp; #ifdef USE_VM_PROBES DTRACE_CHARBUF(pidbuf, DTRACE_TERM_BUF_SIZE); @@ -418,7 +420,8 @@ erts_garbage_collect(Process* p, int need, Eterm* objv, int nobj) trace_gc(p, am_gc_start); } - erts_smp_atomic32_read_bor_nob(&p->state, ERTS_PSFLG_GC); + state = erts_smp_atomic32_read_bor_nob(&p->state, ERTS_PSFLG_GC); + off_heap_msgs = state & ERTS_PSFLG_OFF_HEAP_MSGS; if (erts_system_monitor_long_gc != 0) { get_now(&ms1, &s1, &us1); } @@ -444,11 +447,11 @@ erts_garbage_collect(Process* p, int need, Eterm* objv, int nobj) while (!done) { if ((FLAGS(p) & F_NEED_FULLSWEEP) != 0) { DTRACE2(gc_major_start, pidbuf, need); - done = major_collection(p, need, objv, nobj, &reclaimed_now); + done = major_collection(p, need, objv, nobj, &reclaimed_now, off_heap_msgs); DTRACE2(gc_major_end, pidbuf, reclaimed_now); } else { DTRACE2(gc_minor_start, pidbuf, need); - done = minor_collection(p, need, objv, nobj, &reclaimed_now); + done = minor_collection(p, need, objv, nobj, &reclaimed_now, off_heap_msgs); DTRACE2(gc_minor_end, pidbuf, reclaimed_now); } } @@ -831,7 +834,7 @@ erts_garbage_collect_literals(Process* p, Eterm* literals, } static int -minor_collection(Process* p, int need, Eterm* objv, int nobj, Uint *recl) +minor_collection(Process* p, int need, Eterm* objv, int nobj, Uint *recl, int off_heap_msgs) { Uint mature = HIGH_WATER(p) - HEAP_START(p); @@ -870,20 +873,22 @@ minor_collection(Process* p, int need, Eterm* objv, int nobj, Uint *recl) Uint size_after; Uint need_after; Uint stack_size = STACK_SZ_ON_HEAP(p); - Uint fragments = MBUF_SIZE(p) + combined_message_size(p); + Uint fragments = MBUF_SIZE(p) + combined_message_size(p, off_heap_msgs); Uint size_before = fragments + (HEAP_TOP(p) - HEAP_START(p)); Uint new_sz = next_heap_size(p, HEAP_SIZE(p) + fragments, 0); do_minor(p, new_sz, objv, nobj); - /* - * Copy newly received message onto the end of the new heap. - */ - ErtsGcQuickSanityCheck(p); - for (msgp = p->msg.first; msgp; msgp = msgp->next) { - if (msgp->data.attached) { - erts_move_msg_attached_data_to_heap(&p->htop, &p->off_heap, msgp); - ErtsGcQuickSanityCheck(p); + if (!off_heap_msgs) { + /* + * Copy newly received message onto the end of the new heap. + */ + ErtsGcQuickSanityCheck(p); + for (msgp = p->msg.first; msgp; msgp = msgp->next) { + if (msgp->data.attached) { + erts_move_msg_attached_data_to_heap(&p->htop, &p->off_heap, msgp); + ErtsGcQuickSanityCheck(p); + } } } ErtsGcQuickSanityCheck(p); @@ -1209,7 +1214,7 @@ do_minor(Process *p, Uint new_sz, Eterm* objv, int nobj) */ static int -major_collection(Process* p, int need, Eterm* objv, int nobj, Uint *recl) +major_collection(Process* p, int need, Eterm* objv, int nobj, Uint *recl, int off_heap_msgs) { Rootset rootset; Roots* roots; @@ -1222,8 +1227,7 @@ major_collection(Process* p, int need, Eterm* objv, int nobj, Uint *recl) Uint oh_size = (char *) OLD_HTOP(p) - oh; Uint n; Uint new_sz; - Uint fragments = MBUF_SIZE(p) + combined_message_size(p); - ErlMessage *msgp; + Uint fragments = MBUF_SIZE(p) + combined_message_size(p, off_heap_msgs); size_before = fragments + (HEAP_TOP(p) - HEAP_START(p)); @@ -1433,13 +1437,16 @@ major_collection(Process* p, int need, Eterm* objv, int nobj, Uint *recl) ErtsGcQuickSanityCheck(p); - /* - * Copy newly received message onto the end of the new heap. - */ - for (msgp = p->msg.first; msgp; msgp = msgp->next) { - if (msgp->data.attached) { - erts_move_msg_attached_data_to_heap(&p->htop, &p->off_heap, msgp); - ErtsGcQuickSanityCheck(p); + if (!off_heap_msgs) { + ErlMessage *msgp; + /* + * Copy newly received message onto the end of the new heap. + */ + for (msgp = p->msg.first; msgp; msgp = msgp->next) { + if (msgp->data.attached) { + erts_move_msg_attached_data_to_heap(&p->htop, &p->off_heap, msgp); + ErtsGcQuickSanityCheck(p); + } } } @@ -1500,15 +1507,17 @@ adjust_after_fullsweep(Process *p, Uint size_before, int need, Eterm *objv, int * mbuf list. */ static Uint -combined_message_size(Process* p) +combined_message_size(Process* p, int off_heap_msgs) { - Uint sz = 0; + Uint sz; ErlMessage *msgp; - for (msgp = p->msg.first; msgp; msgp = msgp->next) { - if (msgp->data.attached) { + if (off_heap_msgs) + return 0; + + for (sz = 0, msgp = p->msg.first; msgp; msgp = msgp->next) { + if (msgp->data.attached) sz += erts_msg_attached_data_size(msgp); - } } return sz; } diff --git a/erts/emulator/beam/erl_init.c b/erts/emulator/beam/erl_init.c index 0e128c9b99..be2c5ced9e 100644 --- a/erts/emulator/beam/erl_init.c +++ b/erts/emulator/beam/erl_init.c @@ -389,12 +389,13 @@ erl_init(int ncpu, erl_nif_init(); } -static void +static Eterm erl_first_process_otp(char* modname, void* code, unsigned size, int argc, char** argv) { int i; Eterm start_mod; Eterm args; + Eterm res; Eterm* hp; Process parent; ErlSpawnOpts so; @@ -424,10 +425,11 @@ erl_first_process_otp(char* modname, void* code, unsigned size, int argc, char** hp += 2; args = CONS(hp, env, args); - so.flags = 0; - (void) erl_create_process(&parent, start_mod, am_start, args, &so); + so.flags = SPO_SYSTEM_PROC; + res = erl_create_process(&parent, start_mod, am_start, args, &so); erts_smp_proc_unlock(&parent, ERTS_PROC_LOCK_MAIN); erts_cleanup_empty_process(&parent); + return res; } Eterm @@ -2086,7 +2088,11 @@ erl_start(int argc, char **argv) erts_initialized = 1; - erl_first_process_otp("otp_ring0", NULL, 0, boot_argc, boot_argv); + { + Eterm init = erl_first_process_otp("otp_ring0", NULL, 0, + boot_argc, boot_argv); + erts_bif_timer_start_servers(init); + } #ifdef ERTS_SMP erts_start_schedulers(); diff --git a/erts/emulator/beam/erl_message.c b/erts/emulator/beam/erl_message.c index 8870fac7d9..ce91acb6a4 100644 --- a/erts/emulator/beam/erl_message.c +++ b/erts/emulator/beam/erl_message.c @@ -994,7 +994,7 @@ erts_send_message(Process* sender, #endif ); BM_SWAP_TIMER(send,system); - } else if (sender == receiver) { + } else if (sender == receiver && !(sender->flags & F_OFF_HEAP_MSGS)) { /* Drop message if receiver has a pending exit ... */ #ifdef ERTS_SMP ErtsProcLocks need_locks = (~(*receiver_locks) diff --git a/erts/emulator/beam/erl_message.h b/erts/emulator/beam/erl_message.h index 0f3bb8d281..a2a7193ea9 100644 --- a/erts/emulator/beam/erl_message.h +++ b/erts/emulator/beam/erl_message.h @@ -198,15 +198,25 @@ do { \ if ((M)->data.attached) { \ Uint need__ = erts_msg_attached_data_size((M)); \ if ((ST) - (HT) >= need__) { \ - Uint *htop__ = (HT); \ + Uint *htop__; \ + move__attached__msg__data____: \ + htop__ = (HT); \ erts_move_msg_attached_data_to_heap(&htop__, &MSO((P)), (M));\ ASSERT(htop__ - (HT) <= need__); \ (HT) = htop__; \ } \ else { \ + int off_heap_msgs__ = (int) (P)->flags & F_OFF_HEAP_MSGS; \ + if (!off_heap_msgs__) \ + need__ = 0; \ { SWPO ; } \ - (FC) -= erts_garbage_collect((P), 0, NULL, 0); \ + (FC) -= erts_garbage_collect((P), need__, NULL, 0); \ { SWPI ; } \ + if (off_heap_msgs__) { \ + ASSERT((M)->data.attached); \ + ASSERT((ST) - (HT) >= need__); \ + goto move__attached__msg__data____; \ + } \ } \ ASSERT(!(M)->data.attached); \ } \ diff --git a/erts/emulator/beam/erl_process.c b/erts/emulator/beam/erl_process.c index 9dcaf2fdca..6e562e16c8 100644 --- a/erts/emulator/beam/erl_process.c +++ b/erts/emulator/beam/erl_process.c @@ -5844,6 +5844,13 @@ schedule_out_process(ErtsRunQueue *c_rq, erts_aint32_t state, Process *p, Proces int check_emigration_need; #endif +#ifdef ERTS_SMP + if ((p->static_flags & ERTS_STC_FLG_PREFER_SCHED) + && p->preferred_run_queue != RUNQ_READ_RQ(&p->run_queue)) { + RUNQ_SET_RQ(&p->run_queue, p->preferred_run_queue); + } +#endif + a = state; while (1) { @@ -5882,6 +5889,7 @@ schedule_out_process(ErtsRunQueue *c_rq, erts_aint32_t state, Process *p, Proces free_proxy_proc(proxy); erts_smp_runq_lock(c_rq); + return 0; #ifdef ERTS_DIRTY_SCHEDULERS @@ -10511,7 +10519,10 @@ erl_create_process(Process* parent, /* Parent of process (default group leader). int ix = so->scheduler-1; ASSERT(0 <= ix && ix < erts_no_run_queues); rq = ERTS_RUNQ_IX(ix); - state |= ERTS_PSFLG_BOUND; + if (!(so->flags & SPO_PREFER_SCHED)) { + /* Unsupported feature... */ + state |= ERTS_PSFLG_BOUND; + } } prio = (erts_aint32_t) so->priority; } @@ -10519,6 +10530,9 @@ erl_create_process(Process* parent, /* Parent of process (default group leader). state |= (((prio & ERTS_PSFLGS_PRIO_MASK) << ERTS_PSFLGS_ACT_PRIO_OFFSET) | ((prio & ERTS_PSFLGS_PRIO_MASK) << ERTS_PSFLGS_USR_PRIO_OFFSET)); + if (so->flags & SPO_OFF_HEAP_MSGS) + state |= ERTS_PSFLG_OFF_HEAP_MSGS; + if (!rq) rq = erts_get_runq_proc(parent); @@ -10542,11 +10556,25 @@ erl_create_process(Process* parent, /* Parent of process (default group leader). heap_need = arg_size; p->flags = erts_default_process_flags; + if (so->flags & SPO_OFF_HEAP_MSGS) + p->flags |= F_OFF_HEAP_MSGS; +#ifdef ERTS_SMP + p->preferred_run_queue = NULL; +#endif + p->static_flags = 0; + if (so->flags & SPO_SYSTEM_PROC) + p->static_flags |= ERTS_STC_FLG_SYSTEM_PROC; if (so->flags & SPO_USE_ARGS) { p->min_heap_size = so->min_heap_size; p->min_vheap_size = so->min_vheap_size; p->max_gen_gcs = so->max_gen_gcs; + if (so->flags & SPO_PREFER_SCHED) { +#ifdef ERTS_SMP + p->preferred_run_queue = rq; +#endif + p->static_flags |= ERTS_STC_FLG_PREFER_SCHED; + } } else { p->min_heap_size = H_MIN_SIZE; p->min_vheap_size = BIN_VH_MIN_SIZE; @@ -10867,6 +10895,8 @@ void erts_init_empty_process(Process *p) p->parent = NIL; p->approx_started = 0; + p->static_flags = 0; + p->common.u.alive.started_interval = 0; #ifdef HIPE @@ -10892,6 +10922,7 @@ void erts_init_empty_process(Process *p) p->pending_suspenders = NULL; p->pending_exit.reason = THE_NON_VALUE; p->pending_exit.bp = NULL; + p->preferred_run_queue = NULL; erts_proc_lock_init(p); erts_smp_proc_unlock(p, ERTS_PROC_LOCKS_ALL); RUNQ_SET_RQ(&p->run_queue, ERTS_RUNQ_IX(0)); @@ -11805,6 +11836,9 @@ erts_do_exit_process(Process* p, Eterm reason) } #endif + if (p->static_flags & ERTS_STC_FLG_SYSTEM_PROC) + erl_exit(1, "System process %T terminated: %T\n", p->common.id, reason); + #ifdef ERTS_SMP ERTS_SMP_CHK_HAVE_ONLY_MAIN_PROC_LOCK(p); /* By locking all locks (main lock is already locked) when going diff --git a/erts/emulator/beam/erl_process.h b/erts/emulator/beam/erl_process.h index 6ef56b1974..53a992e115 100644 --- a/erts/emulator/beam/erl_process.h +++ b/erts/emulator/beam/erl_process.h @@ -938,6 +938,8 @@ struct process { Eterm parent; /* Pid of process that created this process. */ erts_approx_time_t approx_started; /* Time when started. */ + Uint32 static_flags; /* Flags that do *not* change */ + /* This is the place, where all fields that differs between memory * architectures, have gone to. */ @@ -969,6 +971,7 @@ struct process { ErtsSchedulerData *scheduler_data; Eterm suspendee; ErtsPendingSuspend *pending_suspenders; + ErtsRunQueue *preferred_run_queue; erts_smp_atomic_t run_queue; #ifdef HIPE struct hipe_process_state_smp hipe_smp; @@ -1078,11 +1081,12 @@ void erts_check_for_holes(Process* p); #define ERTS_PSFLG_RUNNING_SYS ERTS_PSFLG_BIT(15) #define ERTS_PSFLG_PROXY ERTS_PSFLG_BIT(16) #define ERTS_PSFLG_DELAYED_SYS ERTS_PSFLG_BIT(17) +#define ERTS_PSFLG_OFF_HEAP_MSGS ERTS_PSFLG_BIT(18) #ifdef ERTS_DIRTY_SCHEDULERS -#define ERTS_PSFLG_DIRTY_CPU_PROC ERTS_PSFLG_BIT(18) -#define ERTS_PSFLG_DIRTY_IO_PROC ERTS_PSFLG_BIT(19) -#define ERTS_PSFLG_DIRTY_CPU_PROC_IN_Q ERTS_PSFLG_BIT(20) -#define ERTS_PSFLG_DIRTY_IO_PROC_IN_Q ERTS_PSFLG_BIT(21) +#define ERTS_PSFLG_DIRTY_CPU_PROC ERTS_PSFLG_BIT(19) +#define ERTS_PSFLG_DIRTY_IO_PROC ERTS_PSFLG_BIT(20) +#define ERTS_PSFLG_DIRTY_CPU_PROC_IN_Q ERTS_PSFLG_BIT(21) +#define ERTS_PSFLG_DIRTY_IO_PROC_IN_Q ERTS_PSFLG_BIT(22) #endif #define ERTS_PSFLGS_IN_PRQ_MASK (ERTS_PSFLG_IN_PRQ_MAX \ @@ -1097,6 +1101,12 @@ void erts_check_for_holes(Process* p); #define ERTS_PSFLGS_GET_PRQ_PRIO(PSFLGS) \ (((PSFLGS) >> ERTS_PSFLGS_USR_PRIO_OFFSET) & ERTS_PSFLGS_PRIO_MASK) +/* + * Static flags that do not change after process creation. + */ +#define ERTS_STC_FLG_SYSTEM_PROC (((Uint32) 1) << 0) +#define ERTS_STC_FLG_PREFER_SCHED (((Uint32) 1) << 1) + /* The sequential tracing token is a tuple of size 5: * * {Flags, Label, Serial, Sender} @@ -1124,6 +1134,9 @@ void erts_check_for_holes(Process* p); #define SPO_LINK 1 #define SPO_USE_ARGS 2 #define SPO_MONITOR 4 +#define SPO_OFF_HEAP_MSGS 8 +#define SPO_SYSTEM_PROC 16 +#define SPO_PREFER_SCHED 32 /* * The following struct contains options for a process to be spawned. @@ -1211,6 +1224,7 @@ extern struct erts_system_profile_flags_t erts_system_profile_flags; #define F_P2PNR_RESCHED (1 << 9) /* Process has been rescheduled via erts_pid2proc_not_running() */ #define F_FORCE_GC (1 << 10) /* Force gc at process in-scheduling */ #define F_DISABLE_GC (1 << 11) /* Disable GC */ +#define F_OFF_HEAP_MSGS (1 << 12) /* process trace_flags */ #define F_SENSITIVE (1 << 0) diff --git a/erts/emulator/beam/erl_time_sup.c b/erts/emulator/beam/erl_time_sup.c index 1534fb8058..7dfa7d8743 100644 --- a/erts/emulator/beam/erl_time_sup.c +++ b/erts/emulator/beam/erl_time_sup.c @@ -1984,3 +1984,4 @@ BIF_RETTYPE os_system_time_1(BIF_ALIST_0) stime += ERTS_USEC_TO_MONOTONIC(tod.tv_usec); BIF_RET(time_unit_conversion(BIF_P, BIF_ARG_1, stime, 0)); } + diff --git a/erts/emulator/beam/global.h b/erts/emulator/beam/global.h index e24aef3e3c..96c21d5320 100644 --- a/erts/emulator/beam/global.h +++ b/erts/emulator/beam/global.h @@ -1154,7 +1154,9 @@ erts_alloc_message_heap_state(Uint size, state = erts_smp_atomic32_read_acqb(&receiver->state); if (statep) *statep = state; - if (state & (ERTS_PSFLG_EXITING|ERTS_PSFLG_PENDING_EXIT)) + if (state & (ERTS_PSFLG_OFF_HEAP_MSGS + | ERTS_PSFLG_EXITING + | ERTS_PSFLG_PENDING_EXIT)) goto allocate_in_mbuf; #endif @@ -1174,7 +1176,9 @@ erts_alloc_message_heap_state(Uint size, state = erts_smp_atomic32_read_nob(&receiver->state); if (statep) *statep = state; - if ((state & (ERTS_PSFLG_EXITING|ERTS_PSFLG_PENDING_EXIT)) + if ((state & (ERTS_PSFLG_OFF_HEAP_MSGS + | ERTS_PSFLG_EXITING + | ERTS_PSFLG_PENDING_EXIT)) || (receiver->flags & F_DISABLE_GC) || HEAP_LIMIT(receiver) - HEAP_TOP(receiver) <= size) { /* diff --git a/erts/emulator/test/timer_bif_SUITE.erl b/erts/emulator/test/timer_bif_SUITE.erl index c28224729d..da19be3424 100644 --- a/erts/emulator/test/timer_bif_SUITE.erl +++ b/erts/emulator/test/timer_bif_SUITE.erl @@ -238,6 +238,7 @@ cleanup(Config) when is_list(Config) -> ?line wait_until(fun () -> process_is_cleaned_up(P1) end), ?line T1 = erlang:start_timer(10000, P1, "hej"), ?line T2 = erlang:send_after(10000, P1, "hej"), + receive after 1000 -> ok end, ?line Mem = mem(), ?line false = erlang:read_timer(T1), ?line false = erlang:read_timer(T2), @@ -250,6 +251,7 @@ cleanup(Config) when is_list(Config) -> ?line true = is_integer(erlang:read_timer(T3)), ?line true = is_integer(erlang:read_timer(T4)), ?line wait_until(fun () -> process_is_cleaned_up(P2) end), + receive after 1000 -> ok end, ?line false = erlang:read_timer(T3), ?line false = erlang:read_timer(T4), ?line Mem = mem(), @@ -455,10 +457,18 @@ registered_process(Config) when is_list(Config) -> ?line ok. mem() -> - AA = erlang:system_info(allocated_areas), - {value,{bif_timer,Mem}} = lists:keysearch(bif_timer, 1, AA), - Mem. - + TSrvs = erts_internal:get_bif_timer_servers(), + lists:foldl(fun (Tab, Sz) -> + case lists:member(ets:info(Tab, owner), TSrvs) of + true -> + ets:info(Tab, memory) + Sz; + false -> + Sz + end + end, + 0, + ets:all())*erlang:system_info({wordsize,external}). + process_is_cleaned_up(P) when is_pid(P) -> undefined == erts_debug:get_internal_state({process_status, P}). diff --git a/erts/preloaded/ebin/erlang.beam b/erts/preloaded/ebin/erlang.beam index 4ec388a7b9..303f3f47b6 100644 Binary files a/erts/preloaded/ebin/erlang.beam and b/erts/preloaded/ebin/erlang.beam differ diff --git a/erts/preloaded/ebin/erts_internal.beam b/erts/preloaded/ebin/erts_internal.beam index 705bb61247..68fb357eb4 100644 Binary files a/erts/preloaded/ebin/erts_internal.beam and b/erts/preloaded/ebin/erts_internal.beam differ diff --git a/erts/preloaded/ebin/init.beam b/erts/preloaded/ebin/init.beam index aadc9797cb..7f79493b4d 100644 Binary files a/erts/preloaded/ebin/init.beam and b/erts/preloaded/ebin/init.beam differ diff --git a/erts/preloaded/src/erlang.erl b/erts/preloaded/src/erlang.erl index 79eb60f362..83010b17d2 100644 --- a/erts/preloaded/src/erlang.erl +++ b/erts/preloaded/src/erlang.erl @@ -89,7 +89,7 @@ -export([binary_to_list/3, binary_to_term/1, binary_to_term/2]). -export([bit_size/1, bitsize/1, bitstring_to_list/1]). -export([bump_reductions/1, byte_size/1, call_on_load_function/1]). --export([cancel_timer/1, check_old_code/1, check_process_code/2, +-export([cancel_timer/1, cancel_timer/2, check_old_code/1, check_process_code/2, check_process_code/3, crc32/1]). -export([crc32/2, crc32_combine/3, date/0, decode_packet/3]). -export([delete_element/2]). @@ -128,7 +128,7 @@ -export([time_offset/0, time_offset/1, timestamp/0]). -export([process_display/2]). -export([process_flag/3, process_info/1, processes/0, purge_module/1]). --export([put/2, raise/3, read_timer/1, ref_to_list/1, register/2]). +-export([put/2, raise/3, read_timer/1, read_timer/2, ref_to_list/1, register/2]). -export([registered/0, resume_process/1, round/1, self/0, send_after/3]). -export([seq_trace/2, seq_trace_print/1, seq_trace_print/2, setnode/2]). -export([setnode/3, size/1, spawn/3, spawn_link/3, split_binary/2]). @@ -427,8 +427,77 @@ call_on_load_function(_P1) -> -spec erlang:cancel_timer(TimerRef) -> Time | false when TimerRef :: reference(), Time :: non_neg_integer(). -cancel_timer(_TimerRef) -> - erlang:nif_error(undefined). +cancel_timer(TimerRef) -> + try + case erts_internal:access_bif_timer(TimerRef) of + undefined -> + false; + {BTR, TSrv} -> + Req = erlang:make_ref(), + TSrv ! {cancel_timeout, BTR, erlang:self(), + true, Req, TimerRef}, + receive + {cancel_timer, Req, Result} -> + Result + end + end + catch + _:_ -> erlang:error(badarg, [TimerRef]) + end. + +%% cancel_timer/2 +-spec erlang:cancel_timer(TimerRef, Options) -> Time | false | ok when + TimerRef :: reference(), + Option :: {async, boolean()} | {info, boolean()}, + Options :: [Option], + Time :: non_neg_integer(). +cancel_timer(TimerRef, Options) -> + try + {Async, Info} = get_cancel_timer_options(Options, false, true), + case erts_internal:access_bif_timer(TimerRef) of + undefined -> + case {Async, Info} of + {true, true} -> + erlang:self() ! {cancel_timer, TimerRef, false}, ok; + {false, true} -> + false; + _ -> + ok + end; + {BTR, TSrv} -> + case Async of + true -> + TSrv ! {cancel_timeout, BTR, erlang:self(), + Info, TimerRef, TimerRef}, + ok; + false -> + Req = erlang:make_ref(), + TSrv ! {cancel_timeout, BTR, erlang:self(), + true, Req, TimerRef}, + receive + {cancel_timer, Req, Result} -> + case Info of + true -> Result; + false -> ok + end + end + end + end + catch + _:_ -> erlang:error(badarg, [TimerRef, Options]) + end. + +get_cancel_timer_options([], Async, Info) -> + {Async, Info}; +get_cancel_timer_options([{async, Bool} | Opts], + _Async, Info) when Bool == true; + Bool == false -> + get_cancel_timer_options(Opts, Bool, Info); +get_cancel_timer_options([{info, Bool} | Opts], + Async, _Info) when Bool == true; + Bool == false -> + get_cancel_timer_options(Opts, Async, Bool). + %% check_old_code/1 -spec check_old_code(Module) -> boolean() when @@ -1462,8 +1531,53 @@ raise(_Class, _Reason, _Stacktrace) -> %% read_timer/1 -spec erlang:read_timer(TimerRef) -> non_neg_integer() | false when TimerRef :: reference(). -read_timer(_TimerRef) -> - erlang:nif_error(undefined). + +read_timer(TimerRef) -> + read_timer(TimerRef, []). + +%% read_timer/2 +-spec erlang:read_timer(TimerRef, Options) -> non_neg_integer() | false | ok when + TimerRef :: reference(), + Option :: {async, boolean()}, + Options :: [Option]. + +read_timer(TimerRef, Options) -> + try + Async = get_read_timer_options(Options, false), + case erts_internal:access_bif_timer(TimerRef) of + undefined -> + case Async of + true -> + erlang:self() ! {read_timer, TimerRef, false}, + ok; + false -> + false + end; + {BTR, TSrv} -> + case Async of + true -> + TSrv ! {read_timeout, BTR, erlang:self(), + TimerRef, TimerRef}, + ok; + false -> + Req = erlang:make_ref(), + TSrv ! {read_timeout, BTR, erlang:self(), + Req, TimerRef}, + receive + {read_timer, Req, Result} -> + Result + end + end + end + catch + _:_ -> erlang:error(badarg, [TimerRef]) + end. + +get_read_timer_options([], Async) -> + Async; +get_read_timer_options([{async, Bool} | Opts], + _Async) when Bool == true; Bool == false -> + get_read_timer_options(Opts, Bool). %% ref_to_list/1 -spec erlang:ref_to_list(Ref) -> string() when @@ -1509,8 +1623,36 @@ self() -> Dest :: pid() | atom(), Msg :: term(), TimerRef :: reference(). -send_after(_Time, _Dest, _Msg) -> - erlang:nif_error(undefined). + +send_after(0, Dest, Msg) -> + try + true = ((erlang:is_pid(Dest) + andalso erlang:node(Dest) == erlang:node()) + orelse (erlang:is_atom(Dest) + andalso Dest /= undefined)), + try Dest ! Msg catch _:_ -> ok end, + erlang:make_ref() + catch + _:_ -> + erlang:error(badarg, [0, Dest, Msg]) + end; +send_after(Time, Dest, Msg) -> + Now = erlang:monotonic_time(), + try + true = ((erlang:is_pid(Dest) + andalso erlang:node(Dest) == erlang:node()) + orelse (erlang:is_atom(Dest) + andalso Dest /= undefined)), + true = Time > 0, + true = Time < (1 bsl 32), % Maybe lift this restriction... + TO = Now + (erts_internal:time_unit()*Time) div 1000, + {BTR, TSrv, TRef} = erts_internal:create_bif_timer(), + TSrv ! {set_timeout, BTR, Dest, TO, TRef, Msg}, + TRef + catch + _:_ -> + erlang:error(badarg, [Time, Dest, Msg]) + end. %% seq_trace/2 -spec erlang:seq_trace(P1, P2) -> seq_trace_info_returns() | {term(), term(), term(), term(), term()} when @@ -1583,8 +1725,37 @@ split_binary(_Bin, _Pos) -> Dest :: pid() | atom(), Msg :: term(), TimerRef :: reference(). -start_timer(_Time, _Dest, _Msg) -> - erlang:nif_error(undefined). +start_timer(0, Dest, Msg) -> + try + true = ((erlang:is_pid(Dest) + andalso erlang:node(Dest) == erlang:node()) + orelse (erlang:is_atom(Dest) + andalso Dest /= undefined)), + TimerRef = erlang:make_ref(), + try Dest ! {timeout, TimerRef, Msg} catch _:_ -> ok end, + TimerRef + catch + _:_ -> + erlang:error(badarg, [0, Dest, Msg]) + end; +start_timer(Time, Dest, Msg) -> + Now = erlang:monotonic_time(), + try + true = ((erlang:is_pid(Dest) + andalso erlang:node(Dest) == erlang:node()) + orelse (erlang:is_atom(Dest) + andalso Dest /= undefined)), + true = Time > 0, + true = Time < (1 bsl 32), % Maybe lift this restriction... + TO = Now + (erts_internal:time_unit()*Time) div 1000, + {BTR, TSrv, TimerRef} = erts_internal:create_bif_timer(), + TSrv ! {set_timeout, BTR, Dest, TO, TimerRef, + {timeout, TimerRef, Msg}}, + TimerRef + catch + _:_ -> + erlang:error(badarg, [Time, Dest, Msg]) + end. %% suspend_process/2 -spec erlang:suspend_process(Suspendee, OptList) -> boolean() when diff --git a/erts/preloaded/src/erts_internal.erl b/erts/preloaded/src/erts_internal.erl index 30df75b406..2c701d75e4 100644 --- a/erts/preloaded/src/erts_internal.erl +++ b/erts/preloaded/src/erts_internal.erl @@ -42,6 +42,14 @@ -export([time_unit/0]). +-export([bif_timer_server/2]). + +-export([get_bif_timer_servers/0, create_bif_timer/0, access_bif_timer/1]). + +-export([monitor_process/2]). + +-export([is_system_process/1]). + %% %% Await result of send to port %% @@ -208,3 +216,246 @@ flush_monitor_messages(Ref, Multi, Res) when is_reference(Ref) -> time_unit() -> erlang:nif_error(undefined). + +-spec erts_internal:get_bif_timer_servers() -> Pids when + Pid :: pid(), + Pids :: [Pid]. + +get_bif_timer_servers() -> + erlang:nif_error(undefined). + +-spec erts_internal:create_bif_timer() -> Res when + Res :: {reference(), pid(), reference()}. + +create_bif_timer() -> + erlang:nif_error(undefined). + +-spec erts_internal:access_bif_timer(Ref) -> Res when + Ref :: reference(), + Res :: {reference(), pid(), reference()}. + +access_bif_timer(_Ref) -> + erlang:nif_error(undefined). + +-spec erts_internal:monitor_process(Pid, Ref) -> boolean() when + Pid :: pid(), + Ref :: reference(). + +monitor_process(_Pid, _Ref) -> + erlang:nif_error(undefined). + +-spec erts_internal:is_system_process(Pid) -> boolean() when + Pid :: pid(). + +is_system_process(_Pid) -> + erlang:nif_error(undefined). + +%% +%% BIF timer servers +%% + +-record(tsrv_state, {rtab, + ttab, + btr, + unit, + next}). + +bif_timer_server(N, BTR) -> + try + tsrv_loop(tsrv_init_static_state(N, BTR), infinity) + catch + Type:Reason -> + erlang:display({'BIF_timer_server', + {Type, Reason}, + erlang:get_stacktrace()}), + exit(Reason) + end. + +tsrv_init_static_state(N, BTR) -> + process_flag(trap_exit, true), + NList = integer_to_list(N), + RTabName = list_to_atom("BIF_timer_reference_table_" ++ NList), + TTabName = list_to_atom("BIF_timer_time_table_" ++ NList), + #tsrv_state{rtab = ets:new(RTabName, + [set, private, {keypos, 2}]), + ttab = ets:new(TTabName, + [ordered_set, private, {keypos, 1}]), + btr = BTR, + unit = erts_internal:time_unit(), + next = infinity}. + + +tsrv_loop(#tsrv_state{unit = Unit} = StaticState, Nxt) -> + CallTime = erlang:monotonic_time(), + %% 'infinity' is greater than all integers... + NewNxt = case CallTime >= Nxt of + true -> + tsrv_handle_timeout(CallTime, StaticState); + false -> + TMO = try + (1000*(Nxt - CallTime - 1)) div Unit + 1 + catch + error:badarith when Nxt == infinity -> infinity + end, + receive + Msg -> + tsrv_handle_msg(Msg, StaticState, Nxt) + after TMO -> + Nxt + end + end, + tsrv_loop(StaticState, NewNxt). + +tsrv_handle_msg({set_timeout, BTR, Proc, Time, TRef, Msg}, + #tsrv_state{rtab = RTab, + ttab = TTab, + btr = BTR}, + Nxt) when erlang:is_integer(Time) -> + RcvTime = erlang:monotonic_time(), + case Time =< RcvTime of + true -> + try Proc ! Msg catch _:_ -> ok end, + Nxt; + false -> + Ins = case erlang:is_atom(Proc) of + true -> + true; + false -> + try + erts_internal:monitor_process(Proc, TRef) + catch + _:_ -> false + end + end, + case Ins of + false -> + Nxt; + true -> + TKey = {Time, TRef}, + true = ets:insert(RTab, TKey), + true = ets:insert(TTab, {TKey, Proc, Msg}), + case Time < Nxt of + true -> Time; + false -> Nxt + end + end + end; +tsrv_handle_msg({cancel_timeout, BTR, From, Reply, Req, TRef}, + #tsrv_state{rtab = RTab, + ttab = TTab, + unit = Unit, + btr = BTR}, + Nxt) -> + case ets:lookup(RTab, TRef) of + [] -> + case Reply of + false -> + ok; + _ -> + try From ! {cancel_timer, Req, false} catch _:_ -> ok end + end, + Nxt; + [{Time, TRef} = TKey] -> + ets:delete(RTab, TRef), + ets:delete(TTab, TKey), + erlang:demonitor(TRef), + case Reply of + false -> + ok; + _ -> + RcvTime = erlang:monotonic_time(), + RT = case Time =< RcvTime of + true -> + 0; + false -> + ((1000*(Time - RcvTime)) div Unit) + end, + try From ! {cancel_timer, Req, RT} catch _:_ -> ok end + end, + case Time =:= Nxt of + false -> + Nxt; + true -> + case ets:first(TTab) of + '$end_of_table' -> infinity; + {NextTime, _TRef} -> NextTime + end + end + end; +tsrv_handle_msg({read_timeout, BTR, From, Req, TRef}, + #tsrv_state{rtab = RTab, + unit = Unit, + btr = BTR}, + Nxt) -> + case ets:lookup(RTab, TRef) of + [] -> + try From ! {read_timer, Req, false} catch _:_ -> ok end; + [{Time, TRef}] -> + RcvTime = erlang:monotonic_time(), + RT = case Time =< RcvTime of + true -> 0; + false -> (1000*(Time - RcvTime)) div Unit + end, + try From ! {read_timer, Req, RT} catch _:_ -> ok end + end, + Nxt; +tsrv_handle_msg({'DOWN', TRef, process, _, _}, + #tsrv_state{rtab = RTab, + ttab = TTab}, + Nxt) -> + case ets:lookup(RTab, TRef) of + [] -> + Nxt; + [{Time, TRef} = TKey] -> + ets:delete(RTab, TRef), + ets:delete(TTab, TKey), + case Time =:= Nxt of + false -> + Nxt; + true -> + case ets:first(TTab) of + '$end_of_table' -> infinity; + {NextTime, _} -> NextTime + end + end + end; +tsrv_handle_msg({cancel_all_timeouts, BTR, From, Ref}, + #tsrv_state{rtab = RTab, + ttab = TTab, + btr = BTR}, + _Nxt) -> + tsrv_delete_monitor_objects(RTab), + ets:delete_all_objects(TTab), + try From ! {canceled_all_timeouts, Ref} catch _:_ -> ok end, + infinity; +tsrv_handle_msg(_GarbageMsg, _StaticState, Nxt) -> + Nxt. + +tsrv_delete_monitor_objects(RTab) -> + case ets:first(RTab) of + '$end_of_table' -> + ok; + TRef -> + erlang:demonitor(TRef), + ets:delete(RTab, TRef), + tsrv_delete_monitor_objects(RTab) + end. + +tsrv_handle_timeout(CallTime, #tsrv_state{rtab = RTab, + ttab = TTab} = S) -> + case ets:first(TTab) of + '$end_of_table' -> + infinity; + {Time, _TRef} when Time > CallTime -> + Time; + {_Time, TRef} = TKey -> + [{TKey, Proc, Msg}] = ets:lookup(TTab, TKey), + case erlang:is_pid(Proc) of + false -> ok; + _ -> erlang:demonitor(TRef) + end, + ets:delete(TTab, TKey), + ets:delete(RTab, TRef), + try Proc ! Msg catch _:_ -> ok end, + tsrv_handle_timeout(CallTime, S) + end. diff --git a/erts/preloaded/src/init.erl b/erts/preloaded/src/init.erl index e95e11b3e6..48c5c37717 100644 --- a/erts/preloaded/src/init.erl +++ b/erts/preloaded/src/init.erl @@ -522,6 +522,7 @@ shutdown_pids(Heart,BootPid,State) -> Timer = shutdown_timer(State#state.flags), catch shutdown(State#state.kernel,BootPid,Timer,State), kill_all_pids(Heart), % Even the shutdown timer. + cancel_all_bif_timeouts(), kill_all_ports(Heart), flush_timout(Timer). @@ -580,6 +581,30 @@ resend([ExitMsg|Exits]) -> resend(_) -> ok. + +cancel_all_bif_timeouts() -> + TSrvs = erts_internal:get_bif_timer_servers(), + Ref = make_ref(), + {BTR, _TSrv} = erts_internal:access_bif_timer(Ref), %% Cheat... + request_cancel_all_bif_timeouts(Ref, BTR, TSrvs), + wait_response_cancel_all_bif_timeouts(Ref, BTR, TSrvs), + ok. + +request_cancel_all_bif_timeouts(_Ref, _BTR, []) -> + ok; +request_cancel_all_bif_timeouts(Ref, BTR, [TSrv|TSrvs]) -> + TSrv ! {cancel_all_timeouts, BTR, self(), {Ref, TSrv}}, + request_cancel_all_bif_timeouts(Ref, BTR, TSrvs). + +wait_response_cancel_all_bif_timeouts(_Ref, _BTR, []) -> + ok; +wait_response_cancel_all_bif_timeouts(Ref, BTR, [TSrv|TSrvs]) -> + receive + {canceled_all_timeouts, {Ref, TSrv}} -> + wait_response_cancel_all_bif_timeouts(Ref, BTR, TSrvs) + end. + + %% %% Kill all existing pids in the system (except init and heart). kill_all_pids(Heart) -> @@ -591,12 +616,9 @@ kill_all_pids(Heart) -> kill_all_pids(Heart) % Continue until all are really killed. end. -%% All except zombies. -alive_processes() -> - [P || P <- processes(), erlang:is_process_alive(P)]. - +%% All except system processes. get_pids(Heart) -> - Pids = alive_processes(), + Pids = [P || P <- processes(), not erts_internal:is_system_process(P)], delete(Heart,self(),Pids). delete(Heart,Init,[Heart|Pids]) -> delete(Heart,Init,Pids); -- cgit v1.2.3