aboutsummaryrefslogtreecommitdiffstats
path: root/erts/preloaded/src/erts_internal.erl
diff options
context:
space:
mode:
authorRickard Green <[email protected]>2015-02-13 00:25:57 +0100
committerRickard Green <[email protected]>2015-03-20 15:28:52 +0100
commit6b1921d767de5cd1a980234f83b36dbfa13d9fc7 (patch)
tree0b7a016c2b41f30f1adf81559e5fc6a4567099ea /erts/preloaded/src/erts_internal.erl
parentfa7b2c00cbf9212c4a3551980939b92fc6606510 (diff)
downloadotp-6b1921d767de5cd1a980234f83b36dbfa13d9fc7.tar.gz
otp-6b1921d767de5cd1a980234f83b36dbfa13d9fc7.tar.bz2
otp-6b1921d767de5cd1a980234f83b36dbfa13d9fc7.zip
Erlang based BIF timer implementation for scalability
Diffstat (limited to 'erts/preloaded/src/erts_internal.erl')
-rw-r--r--erts/preloaded/src/erts_internal.erl251
1 files changed, 251 insertions, 0 deletions
diff --git a/erts/preloaded/src/erts_internal.erl b/erts/preloaded/src/erts_internal.erl
index 30df75b406..2c701d75e4 100644
--- a/erts/preloaded/src/erts_internal.erl
+++ b/erts/preloaded/src/erts_internal.erl
@@ -42,6 +42,14 @@
-export([time_unit/0]).
+-export([bif_timer_server/2]).
+
+-export([get_bif_timer_servers/0, create_bif_timer/0, access_bif_timer/1]).
+
+-export([monitor_process/2]).
+
+-export([is_system_process/1]).
+
%%
%% Await result of send to port
%%
@@ -208,3 +216,246 @@ flush_monitor_messages(Ref, Multi, Res) when is_reference(Ref) ->
time_unit() ->
erlang:nif_error(undefined).
+
+-spec erts_internal:get_bif_timer_servers() -> Pids when
+ Pid :: pid(),
+ Pids :: [Pid].
+
+get_bif_timer_servers() ->
+ erlang:nif_error(undefined).
+
+-spec erts_internal:create_bif_timer() -> Res when
+ Res :: {reference(), pid(), reference()}.
+
+create_bif_timer() ->
+ erlang:nif_error(undefined).
+
+-spec erts_internal:access_bif_timer(Ref) -> Res when
+ Ref :: reference(),
+ Res :: {reference(), pid(), reference()}.
+
+access_bif_timer(_Ref) ->
+ erlang:nif_error(undefined).
+
+-spec erts_internal:monitor_process(Pid, Ref) -> boolean() when
+ Pid :: pid(),
+ Ref :: reference().
+
+monitor_process(_Pid, _Ref) ->
+ erlang:nif_error(undefined).
+
+-spec erts_internal:is_system_process(Pid) -> boolean() when
+ Pid :: pid().
+
+is_system_process(_Pid) ->
+ erlang:nif_error(undefined).
+
+%%
+%% BIF timer servers
+%%
+
+-record(tsrv_state, {rtab,
+ ttab,
+ btr,
+ unit,
+ next}).
+
+bif_timer_server(N, BTR) ->
+ try
+ tsrv_loop(tsrv_init_static_state(N, BTR), infinity)
+ catch
+ Type:Reason ->
+ erlang:display({'BIF_timer_server',
+ {Type, Reason},
+ erlang:get_stacktrace()}),
+ exit(Reason)
+ end.
+
+tsrv_init_static_state(N, BTR) ->
+ process_flag(trap_exit, true),
+ NList = integer_to_list(N),
+ RTabName = list_to_atom("BIF_timer_reference_table_" ++ NList),
+ TTabName = list_to_atom("BIF_timer_time_table_" ++ NList),
+ #tsrv_state{rtab = ets:new(RTabName,
+ [set, private, {keypos, 2}]),
+ ttab = ets:new(TTabName,
+ [ordered_set, private, {keypos, 1}]),
+ btr = BTR,
+ unit = erts_internal:time_unit(),
+ next = infinity}.
+
+
+tsrv_loop(#tsrv_state{unit = Unit} = StaticState, Nxt) ->
+ CallTime = erlang:monotonic_time(),
+ %% 'infinity' is greater than all integers...
+ NewNxt = case CallTime >= Nxt of
+ true ->
+ tsrv_handle_timeout(CallTime, StaticState);
+ false ->
+ TMO = try
+ (1000*(Nxt - CallTime - 1)) div Unit + 1
+ catch
+ error:badarith when Nxt == infinity -> infinity
+ end,
+ receive
+ Msg ->
+ tsrv_handle_msg(Msg, StaticState, Nxt)
+ after TMO ->
+ Nxt
+ end
+ end,
+ tsrv_loop(StaticState, NewNxt).
+
+tsrv_handle_msg({set_timeout, BTR, Proc, Time, TRef, Msg},
+ #tsrv_state{rtab = RTab,
+ ttab = TTab,
+ btr = BTR},
+ Nxt) when erlang:is_integer(Time) ->
+ RcvTime = erlang:monotonic_time(),
+ case Time =< RcvTime of
+ true ->
+ try Proc ! Msg catch _:_ -> ok end,
+ Nxt;
+ false ->
+ Ins = case erlang:is_atom(Proc) of
+ true ->
+ true;
+ false ->
+ try
+ erts_internal:monitor_process(Proc, TRef)
+ catch
+ _:_ -> false
+ end
+ end,
+ case Ins of
+ false ->
+ Nxt;
+ true ->
+ TKey = {Time, TRef},
+ true = ets:insert(RTab, TKey),
+ true = ets:insert(TTab, {TKey, Proc, Msg}),
+ case Time < Nxt of
+ true -> Time;
+ false -> Nxt
+ end
+ end
+ end;
+tsrv_handle_msg({cancel_timeout, BTR, From, Reply, Req, TRef},
+ #tsrv_state{rtab = RTab,
+ ttab = TTab,
+ unit = Unit,
+ btr = BTR},
+ Nxt) ->
+ case ets:lookup(RTab, TRef) of
+ [] ->
+ case Reply of
+ false ->
+ ok;
+ _ ->
+ try From ! {cancel_timer, Req, false} catch _:_ -> ok end
+ end,
+ Nxt;
+ [{Time, TRef} = TKey] ->
+ ets:delete(RTab, TRef),
+ ets:delete(TTab, TKey),
+ erlang:demonitor(TRef),
+ case Reply of
+ false ->
+ ok;
+ _ ->
+ RcvTime = erlang:monotonic_time(),
+ RT = case Time =< RcvTime of
+ true ->
+ 0;
+ false ->
+ ((1000*(Time - RcvTime)) div Unit)
+ end,
+ try From ! {cancel_timer, Req, RT} catch _:_ -> ok end
+ end,
+ case Time =:= Nxt of
+ false ->
+ Nxt;
+ true ->
+ case ets:first(TTab) of
+ '$end_of_table' -> infinity;
+ {NextTime, _TRef} -> NextTime
+ end
+ end
+ end;
+tsrv_handle_msg({read_timeout, BTR, From, Req, TRef},
+ #tsrv_state{rtab = RTab,
+ unit = Unit,
+ btr = BTR},
+ Nxt) ->
+ case ets:lookup(RTab, TRef) of
+ [] ->
+ try From ! {read_timer, Req, false} catch _:_ -> ok end;
+ [{Time, TRef}] ->
+ RcvTime = erlang:monotonic_time(),
+ RT = case Time =< RcvTime of
+ true -> 0;
+ false -> (1000*(Time - RcvTime)) div Unit
+ end,
+ try From ! {read_timer, Req, RT} catch _:_ -> ok end
+ end,
+ Nxt;
+tsrv_handle_msg({'DOWN', TRef, process, _, _},
+ #tsrv_state{rtab = RTab,
+ ttab = TTab},
+ Nxt) ->
+ case ets:lookup(RTab, TRef) of
+ [] ->
+ Nxt;
+ [{Time, TRef} = TKey] ->
+ ets:delete(RTab, TRef),
+ ets:delete(TTab, TKey),
+ case Time =:= Nxt of
+ false ->
+ Nxt;
+ true ->
+ case ets:first(TTab) of
+ '$end_of_table' -> infinity;
+ {NextTime, _} -> NextTime
+ end
+ end
+ end;
+tsrv_handle_msg({cancel_all_timeouts, BTR, From, Ref},
+ #tsrv_state{rtab = RTab,
+ ttab = TTab,
+ btr = BTR},
+ _Nxt) ->
+ tsrv_delete_monitor_objects(RTab),
+ ets:delete_all_objects(TTab),
+ try From ! {canceled_all_timeouts, Ref} catch _:_ -> ok end,
+ infinity;
+tsrv_handle_msg(_GarbageMsg, _StaticState, Nxt) ->
+ Nxt.
+
+tsrv_delete_monitor_objects(RTab) ->
+ case ets:first(RTab) of
+ '$end_of_table' ->
+ ok;
+ TRef ->
+ erlang:demonitor(TRef),
+ ets:delete(RTab, TRef),
+ tsrv_delete_monitor_objects(RTab)
+ end.
+
+tsrv_handle_timeout(CallTime, #tsrv_state{rtab = RTab,
+ ttab = TTab} = S) ->
+ case ets:first(TTab) of
+ '$end_of_table' ->
+ infinity;
+ {Time, _TRef} when Time > CallTime ->
+ Time;
+ {_Time, TRef} = TKey ->
+ [{TKey, Proc, Msg}] = ets:lookup(TTab, TKey),
+ case erlang:is_pid(Proc) of
+ false -> ok;
+ _ -> erlang:demonitor(TRef)
+ end,
+ ets:delete(TTab, TKey),
+ ets:delete(RTab, TRef),
+ try Proc ! Msg catch _:_ -> ok end,
+ tsrv_handle_timeout(CallTime, S)
+ end.