aboutsummaryrefslogtreecommitdiffstats
path: root/erts/emulator/beam/erl_hl_timer.c
diff options
context:
space:
mode:
authorRickard Green <[email protected]>2015-06-04 22:08:33 +0200
committerRickard Green <[email protected]>2015-06-09 19:35:48 +0200
commitea84ab6c03994f8d6d9f07d8740f0547f8a3cb51 (patch)
tree5080bdaf175f7d58669a26a5b084cc71bd5aa02a /erts/emulator/beam/erl_hl_timer.c
parent46efe0d85bc2f55dd432f403f20d4841b07023ed (diff)
downloadotp-ea84ab6c03994f8d6d9f07d8740f0547f8a3cb51.tar.gz
otp-ea84ab6c03994f8d6d9f07d8740f0547f8a3cb51.tar.bz2
otp-ea84ab6c03994f8d6d9f07d8740f0547f8a3cb51.zip
Callback timer
Diffstat (limited to 'erts/emulator/beam/erl_hl_timer.c')
-rw-r--r--erts/emulator/beam/erl_hl_timer.c372
1 files changed, 307 insertions, 65 deletions
diff --git a/erts/emulator/beam/erl_hl_timer.c b/erts/emulator/beam/erl_hl_timer.c
index 0cb75a8b5f..8eacb921fe 100644
--- a/erts/emulator/beam/erl_hl_timer.c
+++ b/erts/emulator/beam/erl_hl_timer.c
@@ -81,6 +81,13 @@ static void hdbg_chk_srv(ErtsHLTimerService *srv);
#error "ERTS_REF_NUMBERS changed. Update me..."
#endif
+typedef enum {
+ ERTS_TMR_BIF,
+ ERTS_TMR_PROC,
+ ERTS_TMR_PORT,
+ ERTS_TMR_CALLBACK
+} ErtsTmrType;
+
#define ERTS_BIF_TIMER_SHORT_TIME 5000
#ifdef ERTS_SMP
@@ -97,8 +104,9 @@ static void hdbg_chk_srv(ErtsHLTimerService *srv);
#define ERTS_TMR_ROFLG_REG_NAME (((Uint32) 1) << 13)
#define ERTS_TMR_ROFLG_PROC (((Uint32) 1) << 14)
#define ERTS_TMR_ROFLG_PORT (((Uint32) 1) << 15)
+#define ERTS_TMR_ROFLG_CALLBACK (((Uint32) 1) << 16)
#ifdef ERTS_BTM_ACCESSOR_SUPPORT
-#define ERTS_TMR_ROFLG_ABIF_TMR (((Uint32) 1) << 16)
+#define ERTS_TMR_ROFLG_ABIF_TMR (((Uint32) 1) << 17)
#endif
#define ERTS_TMR_ROFLG_SID_MASK \
@@ -143,6 +151,7 @@ typedef struct {
Uint32 roflgs;
erts_smp_atomic32_t refc;
union {
+ void *arg;
erts_atomic_t next;
} u;
} ErtsTmrHead;
@@ -158,6 +167,7 @@ struct ErtsHLTimer_ {
Process *proc;
Port *port;
Eterm name;
+ void (*callback)(void *);
} receiver;
#ifdef ERTS_HLT_HARD_DEBUG
@@ -192,7 +202,10 @@ struct ErtsHLTimer_ {
typedef struct {
ErtsTmrHead head; /* NEED to be first! */
- void *p;
+ union {
+ void *p;
+ void (*callback)(void *);
+ } u;
ErtsTWheelTimer tw_tmr;
} ErtsTWTimer;
@@ -348,8 +361,8 @@ refn_is_lt(Uint32 *x, Uint32 *y)
#define ERTS_RBT_WANT_SMALLEST
#define ERTS_RBT_WANT_LOOKUP_INSERT
#define ERTS_RBT_WANT_REPLACE
+#define ERTS_RBT_WANT_FOREACH
#ifdef ERTS_HLT_HARD_DEBUG
-# define ERTS_RBT_WANT_FOREACH
# define ERTS_RBT_WANT_LOOKUP
#endif
#define ERTS_RBT_UNDEF
@@ -460,8 +473,6 @@ same_time_list_foreach_destroy_yielding(ErtsHLTimer **root,
}
}
-#ifdef ERTS_HLT_HARD_DEBUG
-
static ERTS_INLINE void
same_time_list_foreach(ErtsHLTimer *root,
void (*op)(ErtsHLTimer *, void *),
@@ -476,6 +487,8 @@ same_time_list_foreach(ErtsHLTimer *root,
}
}
+#ifdef ERTS_HLT_HARD_DEBUG
+
static ERTS_INLINE ErtsHLTimer *
same_time_list_lookup(ErtsHLTimer *root, ErtsHLTimer *x)
{
@@ -761,9 +774,9 @@ schedule_tw_timer_destroy(ErtsTWTimer *tmr)
* dropped at once...
*/
if (tmr->head.roflgs & ERTS_TMR_ROFLG_PROC)
- erts_proc_dec_refc((Process *) tmr->p);
- else
- erts_port_dec_refc((Port *) tmr->p);
+ erts_proc_dec_refc((Process *) tmr->u.p);
+ else if (tmr->head.roflgs & ERTS_TMR_ROFLG_PORT)
+ erts_port_dec_refc((Port *) tmr->u.p);
erts_schedule_thr_prgr_later_cleanup_op(
scheduled_tw_timer_destroy,
@@ -785,7 +798,7 @@ static void
tw_proc_timeout(void *vtwtp)
{
ErtsTWTimer *twtp = (ErtsTWTimer *) vtwtp;
- Process *proc = (Process *) twtp->p;
+ Process *proc = (Process *) twtp->u.p;
if (proc_timeout_common(proc, vtwtp))
tw_timer_dec_refc(twtp);
tw_timer_dec_refc(twtp);
@@ -795,7 +808,7 @@ static void
tw_port_timeout(void *vtwtp)
{
ErtsTWTimer *twtp = (ErtsTWTimer *) vtwtp;
- Port *port = (Port *) twtp->p;
+ Port *port = (Port *) twtp->u.p;
if (port_timeout_common(port, vtwtp))
tw_timer_dec_refc(twtp);
tw_timer_dec_refc(twtp);
@@ -815,13 +828,26 @@ cancel_tw_timer(ErtsSchedulerData *esdp, ErtsTWTimer *tmr)
erts_twheel_cancel_timer(esdp->timer_wheel, &tmr->tw_tmr);
}
+static void
+tw_callback_timeout(void *vtwtp)
+{
+ ErtsTWTimer *twtp = (ErtsTWTimer *) vtwtp;
+ void (*callback)(void *) = twtp->u.callback;
+ void *arg = twtp->head.u.arg;
+ tw_timer_dec_refc(twtp);
+ (*callback)(arg);
+}
+
static ErtsTWTimer *
create_tw_timer(ErtsSchedulerData *esdp,
- void *p, int is_proc,
+ ErtsTmrType type, void *p,
+ void (*callback)(void *), void *arg,
ErtsMonotonicTime timeout_pos)
{
ErtsTWTimer *tmr;
void (*timeout_func)(void *);
+ void (*cancel_func)(void *);
+ erts_aint32_t refc;
tmr = tw_timer_alloc();
erts_twheel_init_timer(&tmr->tw_tmr);
@@ -829,24 +855,48 @@ create_tw_timer(ErtsSchedulerData *esdp,
tmr->head.roflgs = (Uint32) esdp->no;
ERTS_HLT_ASSERT((tmr->head.roflgs
& ~ERTS_TMR_ROFLG_SID_MASK) == 0);
- tmr->p = p;
- if (is_proc) {
+
+ switch (type) {
+
+ case ERTS_TMR_PROC:
+ tmr->u.p = p;
tmr->head.roflgs |= ERTS_TMR_ROFLG_PROC;
timeout_func = tw_proc_timeout;
+ cancel_func = tw_ptimer_cancel;
erts_proc_inc_refc((Process *) p);
- }
- else {
+ refc = 2;
+ break;
+
+ case ERTS_TMR_PORT:
+ tmr->u.p = p;
tmr->head.roflgs |= ERTS_TMR_ROFLG_PORT;
timeout_func = tw_port_timeout;
+ cancel_func = tw_ptimer_cancel;
erts_port_inc_refc((Port *) p);
+ refc = 2;
+ break;
+
+ case ERTS_TMR_CALLBACK:
+ tmr->head.u.arg = arg;
+ tmr->u.callback = callback;
+
+ tmr->head.roflgs |= ERTS_TMR_ROFLG_CALLBACK;
+ timeout_func = tw_callback_timeout;
+ cancel_func = NULL;
+ refc = 1;
+ break;
+
+ default:
+ ERTS_INTERNAL_ERROR("Unsupported timer type");
+ return NULL;
}
- erts_smp_atomic32_init_nob(&tmr->head.refc, 2);
+ erts_smp_atomic32_init_nob(&tmr->head.refc, refc);
erts_twheel_set_timer(esdp->timer_wheel,
&tmr->tw_tmr,
timeout_func,
- tw_ptimer_cancel,
+ cancel_func,
tmr,
timeout_pos);
@@ -994,17 +1044,15 @@ hlt_delete_abtm(ErtsHLTimer *tmr)
static ErtsHLTimer *
create_hl_timer(ErtsSchedulerData *esdp,
ErtsMonotonicTime timeout_pos,
- int short_time, int is_bif_tmr,
+ int short_time, ErtsTmrType type,
void *rcvrp, Eterm rcvr, Eterm acsr,
- Eterm msg, Uint32 *refn)
+ Eterm msg, Uint32 *refn,
+ void (*callback)(void *), void *arg)
{
ErtsHLTimerService *srv = esdp->timer_service;
ErtsHLTimer *tmr, *st_tmr;
erts_aint32_t refc;
Uint32 roflgs;
-#ifdef ERTS_BTM_ACCESSOR_SUPPORT
- int is_abif_tmr = is_bif_tmr && is_value(acsr) && acsr != rcvr;
-#endif
check_canceled_queue(esdp, srv);
@@ -1012,45 +1060,69 @@ create_hl_timer(ErtsSchedulerData *esdp,
roflgs = ((Uint32) esdp->no) | ERTS_TMR_ROFLG_HLT;
- if (!is_bif_tmr)
+ if (type != ERTS_TMR_BIF) {
+
tmr = erts_alloc(ERTS_ALC_T_HL_PTIMER,
ERTS_HL_PTIMER_SIZE);
- else if (short_time) {
- tmr = bif_timer_pre_alloc();
- if (!tmr)
- goto alloc_bif_timer;
- roflgs |= ERTS_TMR_ROFLG_PRE_ALC;
- }
- else {
- alloc_bif_timer:
-#ifdef ERTS_BTM_ACCESSOR_SUPPORT
- if (is_abif_tmr)
- tmr = erts_alloc(ERTS_ALC_T_ABIF_TIMER,
- ERTS_ABIF_TIMER_SIZE);
- else
-#endif
- tmr = erts_alloc(ERTS_ALC_T_BIF_TIMER,
- ERTS_BIF_TIMER_SIZE);
- }
+ tmr->timeout = timeout_pos;
- tmr->timeout = timeout_pos;
+ switch (type) {
+
+ case ERTS_TMR_PROC:
+ ERTS_HLT_ASSERT(is_internal_pid(rcvr));
- if (!is_bif_tmr) {
- if (is_internal_pid(rcvr)) {
erts_proc_inc_refc((Process *) rcvrp);
tmr->receiver.proc = (Process *) rcvrp;
roflgs |= ERTS_TMR_ROFLG_PROC;
- }
- else {
- erts_port_inc_refc((Port *) rcvrp);
+ refc = 2;
+ break;
+
+ case ERTS_TMR_PORT:
ERTS_HLT_ASSERT(is_internal_port(rcvr));
+ erts_port_inc_refc((Port *) rcvrp);
tmr->receiver.port = (Port *) rcvrp;
roflgs |= ERTS_TMR_ROFLG_PORT;
+ refc = 2;
+ break;
+
+ case ERTS_TMR_CALLBACK:
+ roflgs |= ERTS_TMR_ROFLG_CALLBACK;
+ tmr->receiver.callback = callback;
+ tmr->head.u.arg = arg;
+ refc = 1;
+ break;
+
+ default:
+ ERTS_INTERNAL_ERROR("Unsupported timer type");
+ return NULL;
}
- refc = 2;
+
}
- else {
+ else { /* ERTS_TMR_BIF */
Uint hsz;
+#ifdef ERTS_BTM_ACCESSOR_SUPPORT
+ int is_abif_tmr = is_value(acsr) && acsr != rcvr;
+#endif
+
+ if (short_time) {
+ tmr = bif_timer_pre_alloc();
+ if (!tmr)
+ goto alloc_bif_timer;
+ roflgs |= ERTS_TMR_ROFLG_PRE_ALC;
+ }
+ else {
+ alloc_bif_timer:
+#ifdef ERTS_BTM_ACCESSOR_SUPPORT
+ if (is_abif_tmr)
+ tmr = erts_alloc(ERTS_ALC_T_ABIF_TIMER,
+ ERTS_ABIF_TIMER_SIZE);
+ else
+#endif
+ tmr = erts_alloc(ERTS_ALC_T_BIF_TIMER,
+ ERTS_BIF_TIMER_SIZE);
+ }
+
+ tmr->timeout = timeout_pos;
roflgs |= ERTS_TMR_ROFLG_BIF_TMR;
if (is_internal_pid(rcvr)) {
@@ -1081,6 +1153,7 @@ create_hl_timer(ErtsSchedulerData *esdp,
tmr->btm.refn[2] = refn[2];
tmr->btm.proc_tree.parent = ERTS_HLT_PFIELD_NOT_IN_TABLE;
+
#ifdef ERTS_BTM_ACCESSOR_SUPPORT
if (is_abif_tmr) {
Process *aproc;
@@ -1097,6 +1170,8 @@ create_hl_timer(ErtsSchedulerData *esdp,
}
}
#endif
+
+ btm_rbt_insert(&srv->btm_tree, tmr);
}
tmr->head.roflgs = roflgs;
@@ -1124,9 +1199,6 @@ create_hl_timer(ErtsSchedulerData *esdp,
if (st_tmr)
same_time_list_insert(&st_tmr->time.tree.same_time, tmr);
- if (is_bif_tmr)
- btm_rbt_insert(&srv->btm_tree, tmr);
-
#ifdef ERTS_HLT_HARD_DEBUG
tmr->pending_timeout = 0;
#endif
@@ -1231,10 +1303,13 @@ static void hlt_timeout(ErtsHLTimer *tmr, void *vsrv)
hlt_bif_timer_timeout(tmr, roflgs);
else if (roflgs & ERTS_TMR_ROFLG_PROC)
hlt_proc_timeout(tmr);
- else {
- ERTS_HLT_ASSERT(roflgs & ERTS_TMR_ROFLG_PORT);
+ else if (roflgs & ERTS_TMR_ROFLG_PORT)
hlt_port_timeout(tmr);
+ else {
+ ERTS_HLT_ASSERT(roflgs & ERTS_TMR_ROFLG_CALLBACK);
+ (*tmr->receiver.callback)(tmr->head.u.arg);
}
+
}
tmr->time.tree.parent = ERTS_HLT_PFIELD_NOT_IN_TABLE;
@@ -1700,8 +1775,9 @@ setup_bif_timer(Process *c_p, ErtsMonotonicTime timeout_pos,
tmo_msg = wrap ? TUPLE3(tmp_hp, am_timeout, ref, msg) : msg;
- tmr = create_hl_timer(esdp, timeout_pos, short_time, 1, NULL,
- rcvr, acsr, tmo_msg, internal_ref_numbers(ref));
+ tmr = create_hl_timer(esdp, timeout_pos, short_time,
+ ERTS_TMR_BIF, NULL, rcvr, acsr, tmo_msg,
+ internal_ref_numbers(ref), NULL, NULL);
UnUseTmpHeap(4, c_p);
@@ -2538,6 +2614,80 @@ BIF_RETTYPE read_timer_2(BIF_ALIST_2)
return ret;
}
+static void
+start_callback_timer(ErtsSchedulerData *esdp,
+ int twt,
+ ErtsMonotonicTime timeout_pos,
+ void (*callback)(void *),
+ void *arg)
+
+{
+ if (twt)
+ create_tw_timer(esdp, ERTS_TMR_CALLBACK, NULL,
+ callback, arg, timeout_pos);
+ else
+ create_hl_timer(esdp, timeout_pos, 0,
+ ERTS_TMR_CALLBACK, NULL,
+ NIL, THE_NON_VALUE, NIL,
+ NULL, callback, arg);
+}
+
+typedef struct {
+ int twt;
+ ErtsMonotonicTime timeout_pos;
+ void (*callback)(void *);
+ void *arg;
+} ErtsStartCallbackTimerRequest;
+
+static void
+scheduled_start_callback_timer(void *vsctr)
+{
+ ErtsStartCallbackTimerRequest *sctr
+ = (ErtsStartCallbackTimerRequest *) vsctr;
+
+ start_callback_timer(erts_get_scheduler_data(),
+ sctr->twt,
+ sctr->timeout_pos,
+ sctr->callback,
+ sctr->arg);
+
+ erts_free(ERTS_ALC_T_TIMER_REQUEST, vsctr);
+}
+
+void
+erts_start_timer_callback(ErtsMonotonicTime tmo,
+ void (*callback)(void *),
+ void *arg)
+{
+ ErtsSchedulerData *esdp;
+ ErtsMonotonicTime timeout_pos;
+ int twt;
+
+ esdp = erts_get_scheduler_data();
+ timeout_pos = get_timeout_pos(erts_get_monotonic_time(esdp),
+ tmo);
+ twt = tmo < ERTS_TIMER_WHEEL_MSEC;
+
+ if (esdp)
+ start_callback_timer(esdp,
+ twt,
+ timeout_pos,
+ callback,
+ arg);
+ else {
+ ErtsStartCallbackTimerRequest *sctr;
+ sctr = erts_alloc(ERTS_ALC_T_TIMER_REQUEST,
+ sizeof(ErtsStartCallbackTimerRequest));
+ sctr->twt = twt;
+ sctr->timeout_pos = timeout_pos;
+ sctr->callback = callback;
+ sctr->arg = arg;
+ erts_schedule_misc_aux_work(1,
+ scheduled_start_callback_timer,
+ (void *) sctr);
+ }
+}
+
/*
* Process and Port timer functionality.
*
@@ -2561,12 +2711,13 @@ set_proc_timer_common(Process *c_p, ErtsSchedulerData *esdp, Sint64 tmo,
c_p->flags &= ~F_TIMO;
if (tmo < ERTS_TIMER_WHEEL_MSEC)
- tmr = (void *) create_tw_timer(esdp, (void *) c_p, 1, timeout_pos);
+ tmr = (void *) create_tw_timer(esdp, ERTS_TMR_PROC, (void *) c_p,
+ NULL, NULL, timeout_pos);
else
- tmr = (void *) create_hl_timer(esdp, timeout_pos,
- short_time, 0, (void *) c_p,
+ tmr = (void *) create_hl_timer(esdp, timeout_pos, short_time,
+ ERTS_TMR_PROC, (void *) c_p,
c_p->common.id, THE_NON_VALUE,
- NIL, NULL);
+ NIL, NULL, NULL, NULL);
erts_smp_atomic_set_relb(&c_p->common.timer, (erts_aint_t) tmr);
}
}
@@ -2649,13 +2800,12 @@ erts_set_port_timer(Port *c_prt, Sint64 tmo)
timeout_pos = get_timeout_pos(erts_get_monotonic_time(esdp), tmo);
if (tmo < ERTS_TIMER_WHEEL_MSEC)
- tmr = (void *) create_tw_timer(esdp, (void *) c_prt, 0,
- timeout_pos);
+ tmr = (void *) create_tw_timer(esdp, ERTS_TMR_PORT, (void *) c_prt,
+ NULL, NULL, timeout_pos);
else
- tmr = (void *) create_hl_timer(esdp, timeout_pos, 0, 0,
- (void *) c_prt,
- c_prt->common.id, THE_NON_VALUE,
- NIL, NULL);
+ tmr = (void *) create_hl_timer(esdp, timeout_pos, 0, ERTS_TMR_PORT,
+ (void *) c_prt, c_prt->common.id,
+ THE_NON_VALUE, NIL, NULL, NULL, NULL);
erts_smp_atomic_set_relb(&c_prt->common.timer, (erts_aint_t) tmr);
}
@@ -2800,6 +2950,98 @@ erts_debug_bif_timer_foreach(void (*func)(Eterm,
}
}
+typedef struct {
+ void (*tclbk)(void *);
+ void (*func)(void *,
+ ErtsMonotonicTime,
+ void *);
+ void *arg;
+} ErtsDebugForeachCallbackTimer;
+
+static void
+debug_callback_timer_foreach_list(ErtsHLTimer *tmr, void *vdfct)
+{
+ ErtsDebugForeachCallbackTimer *dfct
+ = (ErtsDebugForeachCallbackTimer *) vdfct;
+
+ if ((tmr->head.roflgs & ERTS_TMR_ROFLG_CALLBACK)
+ && (tmr->receiver.callback && dfct->tclbk))
+ (*dfct->func)(dfct->arg,
+ tmr->timeout,
+ tmr->head.u.arg);
+}
+
+static void
+debug_callback_timer_foreach(ErtsHLTimer *tmr, void *vdfct)
+{
+ ErtsDebugForeachCallbackTimer *dfct
+ = (ErtsDebugForeachCallbackTimer *) vdfct;
+
+ if (tmr->time.tree.same_time)
+ same_time_list_foreach(tmr->time.tree.same_time,
+ debug_callback_timer_foreach_list,
+ vdfct);
+
+ if ((tmr->head.roflgs & ERTS_TMR_ROFLG_CALLBACK)
+ && (tmr->receiver.callback && dfct->tclbk))
+ (*dfct->func)(dfct->arg,
+ tmr->timeout,
+ tmr->head.u.arg);
+}
+
+static void
+debug_tw_callback_timer(void *vdfct,
+ ErtsMonotonicTime timeout_pos,
+ void *vtwtp)
+{
+ ErtsTWTimer *twtp = (ErtsTWTimer *) vtwtp;
+ ErtsDebugForeachCallbackTimer *dfct
+ = (ErtsDebugForeachCallbackTimer *) vdfct;
+
+ if (twtp->u.callback == dfct->tclbk)
+ (*dfct->func)(dfct->arg,
+ timeout_pos,
+ twtp->head.u.arg);
+}
+
+void
+erts_debug_callback_timer_foreach(void (*tclbk)(void *),
+ void (*func)(void *,
+ ErtsMonotonicTime,
+ void *),
+ void *arg)
+{
+ int six;
+ ErtsDebugForeachCallbackTimer dfct;
+
+ dfct.tclbk = tclbk;
+ dfct.func = func;
+ dfct.arg = arg;
+
+ if (!erts_smp_thr_progress_is_blocking())
+ ERTS_INTERNAL_ERROR("Not blocking thread progress");
+
+ for (six = 0; six < erts_no_schedulers; six++) {
+ ErtsHLTimerService *srv =
+ erts_aligned_scheduler_data[six].esd.timer_service;
+ ErtsTimerWheel *twheel =
+ erts_aligned_scheduler_data[six].esd.timer_wheel;
+
+ erts_twheel_debug_foreach(twheel,
+ tw_callback_timeout,
+ debug_tw_callback_timer,
+ (void *) &dfct);
+
+ if (srv->yield.root)
+ debug_callback_timer_foreach(srv->yield.root,
+ (void *) &dfct);
+
+ time_rbt_foreach(srv->btm_tree,
+ debug_callback_timer_foreach,
+ (void *) &dfct);
+ }
+}
+
#ifdef ERTS_HLT_HARD_DEBUG
typedef struct {