aboutsummaryrefslogtreecommitdiffstats
path: root/lib/kernel/src/logger_olp.erl
diff options
context:
space:
mode:
Diffstat (limited to 'lib/kernel/src/logger_olp.erl')
-rw-r--r--lib/kernel/src/logger_olp.erl213
1 files changed, 145 insertions, 68 deletions
diff --git a/lib/kernel/src/logger_olp.erl b/lib/kernel/src/logger_olp.erl
index 6b76c78c73..7c6a1a8547 100644
--- a/lib/kernel/src/logger_olp.erl
+++ b/lib/kernel/src/logger_olp.erl
@@ -25,47 +25,62 @@
%% API
-export([start_link/4, load/2, info/1, reset/1, stop/1, restart/1,
- set_opts/2, get_opts/1, get_default_opts/0, is_alive/1,
- call/2, cast/2]).
+ set_opts/2, get_opts/1, get_default_opts/0, get_pid/1,
+ call/2, cast/2, get_ref/0, get_ref/1]).
%% gen_server and proc_lib callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
-%%%-----------------------------------------------------------------
-%% -define(CONFIG_KEYS,[sync_mode_qlen,
-%% drop_mode_qlen,
-%% flush_qlen,
-%% burst_limit_enable,
-%% burst_limit_max_count,
-%% burst_limit_window_time,
-%% overload_kill_enable,
-%% overload_kill_qlen,
-%% overload_kill_mem_size,
-%% overload_kill_restart_after]).
+-define(OPT_KEYS,[sync_mode_qlen,
+ drop_mode_qlen,
+ flush_qlen,
+ burst_limit_enable,
+ burst_limit_max_count,
+ burst_limit_window_time,
+ overload_kill_enable,
+ overload_kill_qlen,
+ overload_kill_mem_size,
+ overload_kill_restart_after]).
+
+-export_type([olp_ref/0, options/0]).
+
+-opaque olp_ref() :: {atom(),pid(),ets:tid()}.
+
+-type options() :: #{sync_mode_qlen => integer(),
+ drop_mode_qlen => integer(),
+ flush_qlen => integer(),
+ burst_limit_enable => boolean(),
+ burst_limit_max_count => integer(),
+ burst_limit_window_time => integer(),
+ overload_kill_enable => boolean(),
+ overload_kill_qlen => integer(),
+ overload_kill_mem_size => integer(),
+ overload_kill_restart_after => integer()}.
%%%-----------------------------------------------------------------
%%% API
-%-spec start_link(Name,Module,Args,Options) -> {ok,Pid,Olp} | {error,Reason}.
+-spec start_link(Name,Module,Args,Options) -> {ok,Pid,Olp} | {error,Reason} when
+ Name :: atom(),
+ Module :: module(),
+ Args :: term(),
+ Options :: options(),
+ Pid :: pid(),
+ Olp :: olp_ref(),
+ Reason :: term().
start_link(Name,Module,Args,Options0) when is_map(Options0) ->
Options = maps:merge(get_default_opts(),Options0),
case check_opts(Options) of
ok ->
- case proc_lib:start_link(?MODULE,init,
- [[Name,Module,Args,Options]]) of
- {ok,Pid,Olp} ->
- {ok,Pid,{Olp,Options}};
- Error ->
- Error
- end;
+ proc_lib:start_link(?MODULE,init,[[Name,Module,Args,Options]]);
Error ->
Error
end.
-is_alive({_Name,Pid,_ModeRef}) ->
- is_process_alive(Pid).
-
+-spec load(Olp, Msg) -> ok when
+ Olp :: olp_ref(),
+ Msg :: term().
load({_Name,Pid,ModeRef},Msg) ->
%% If the process is getting overloaded, the message will be
%% synchronous instead of asynchronous (slows down the tempo of a
@@ -92,22 +107,36 @@ load({_Name,Pid,ModeRef},Msg) ->
end,
ok.
+-spec info(Olp) -> map() | {error, busy} when
+ Olp :: atom() | pid() | olp_ref().
info(Olp) ->
call(Olp, info).
+-spec reset(Olp) -> ok | {error, busy} when
+ Olp :: atom() | pid() | olp_ref().
reset(Olp) ->
call(Olp, reset).
+-spec stop(Olp) -> ok when
+ Olp :: atom() | pid() | olp_ref().
stop({_Name,Pid,_ModRef}) ->
+ stop(Pid);
+stop(Pid) ->
_ = gen_server:call(Pid, stop),
ok.
-set_opts({_Name,Pid,_ModRef}, Opts) ->
- gen_server:call(Pid, {set_opts,Opts}).
+-spec set_opts(Olp, Opts) -> ok | {error,term()} | {error, busy} when
+ Olp :: atom() | pid() | olp_ref(),
+ Opts :: options().
+set_opts(Olp, Opts) ->
+ call(Olp, {set_opts,Opts}).
-get_opts({_Name,Pid,_ModRef}) ->
- gen_server:call(Pid, get_opts).
+-spec get_opts(Olp) -> options() | {error, busy} when
+ Olp :: atom() | pid() | olp_ref().
+get_opts(Olp) ->
+ call(Olp, get_opts).
+-spec get_default_opts() -> options().
get_default_opts() ->
#{sync_mode_qlen => ?SYNC_MODE_QLEN,
drop_mode_qlen => ?DROP_MODE_QLEN,
@@ -120,11 +149,30 @@ get_default_opts() ->
overload_kill_mem_size => ?OVERLOAD_KILL_MEM_SIZE,
overload_kill_restart_after => ?OVERLOAD_KILL_RESTART_AFTER}.
+-spec restart(fun(() -> any())) -> ok.
restart(Fun) ->
- erlang:display(restarting),
- erlang:display(_ = Fun()),
+ Result =
+ try Fun()
+ catch C:R:S ->
+ {error,{restart_failed,Fun,C,R,S}}
+ end,
+ ?LOG_INTERNAL(debug,[{logger_olp,restart},
+ {result,Result}]),
ok.
+-spec get_ref() -> olp_ref().
+get_ref() ->
+ get(olp_ref).
+
+-spec get_ref(PidOrName) -> olp_ref() | {error, busy} when
+ PidOrName :: pid() | atom().
+get_ref(PidOrName) ->
+ call(PidOrName,get_ref).
+
+-spec get_pid(olp_ref()) -> pid().
+get_pid({_Name,Pid,_ModeRef}) ->
+ Pid.
+
%%%===================================================================
%%% gen_server callbacks
%%%===================================================================
@@ -136,13 +184,15 @@ init([Name,Module,Args,Options]) ->
?init_test_hooks(),
?start_observation(Name),
- try Module:init(Args) of
- {ok,CBState} ->
- try ets:new(Name, [public]) of
- ModeRef ->
+ try ets:new(Name, [public]) of
+ ModeRef ->
+ OlpRef = {Name,self(),ModeRef},
+ put(olp_ref,OlpRef),
+ try Module:init(Args) of
+ {ok,CBState} ->
?set_mode(ModeRef, async),
T0 = ?timestamp(),
- proc_lib:init_ack({ok,self(),{Name,self(),ModeRef}}),
+ proc_lib:init_ack({ok,self(),OlpRef}),
%% Storing options in state to avoid copying
%% (sending) the option data with each message
State0 = ?merge_with_stats(
@@ -156,15 +206,17 @@ init([Name,Module,Args,Options]) ->
burst_msg_count => 0,
cb_state => CBState}),
State = reset_restart_flag(State0),
- gen_server:enter_loop(?MODULE, [], State)
+ gen_server:enter_loop(?MODULE, [], State);
+ Error ->
+ _ = ets:delete(ModeRef),
+ unregister(Name),
+ proc_lib:init_ack(Error)
catch
_:Error ->
+ _ = ets:delete(ModeRef),
unregister(Name),
proc_lib:init_ack(Error)
- end;
- Error ->
- unregister(Name),
- proc_lib:init_ack(Error)
+ end
catch
_:Error ->
unregister(Name),
@@ -177,8 +229,11 @@ handle_call({'$olp_load', Msg}, _From, State) ->
%% Result == ok | dropped
{reply,Result, State1};
+handle_call(get_ref,_From,#{id:=Name,mode_ref:=ModeRef}=State) ->
+ {reply,{Name,self(),ModeRef},State};
+
handle_call({set_opts,Opts0},_From,State) ->
- Opts = maps:merge(get_default_opts(),Opts0),
+ Opts = maps:merge(maps:with(?OPT_KEYS,State),Opts0),
case check_opts(Opts) of
ok ->
{reply, ok, maps:merge(State,Opts)};
@@ -186,6 +241,9 @@ handle_call({set_opts,Opts0},_From,State) ->
{reply, Error, State}
end;
+handle_call(get_opts,_From,State) ->
+ {reply, maps:with(?OPT_KEYS,State), State};
+
handle_call(info, _From, State) ->
{reply, State, State};
@@ -214,7 +272,6 @@ handle_call(Msg, From, #{module:=Module,cb_state:=CBState}=State) ->
%% This is the asynchronous load event.
handle_cast({'$olp_load', Msg}, State) ->
{_Result,State1} = do_load(Msg, cast, State),
- %% Result == ok | dropped
{noreply,State1};
handle_cast(Msg, #{module:=Module, cb_state:=CBState} = State) ->
@@ -230,7 +287,10 @@ handle_info(Msg, #{module := Module, cb_state := CBState} = State) ->
{noreply,CBState1} ->
{noreply,State#{cb_state=>CBState1}};
{noreply,CBState1,Timeout} ->
- {noreply,State#{cb_state=>CBState1},Timeout}
+ {noreply,State#{cb_state=>CBState1},Timeout};
+ {load,CBState1} ->
+ {_,State1} = do_load(Msg, cast, State#{cb_state=>CBState1}),
+ {noreply,State1}
end.
terminate({shutdown,{overloaded,_QLen,_Mem}},
@@ -242,12 +302,11 @@ terminate({shutdown,{overloaded,_QLen,_Mem}},
case try_callback_call(Module,terminate,[overloaded,CBState],ok) of
{ok,Fun} when is_function(Fun,0), is_integer(RestartAfter) ->
set_restart_flag(State),
- timer:apply_after(RestartAfter,?MODULE,restart,[Fun]),
+ _ = timer:apply_after(RestartAfter,?MODULE,restart,[Fun]),
ok;
_ ->
ok
- end,
- ok;
+ end;
terminate(Reason, #{id:=Name, module:=Module, cb_state:=CBState}) ->
_ = try_callback_call(Module,terminate,[Reason,CBState],ok),
unregister(Name),
@@ -259,6 +318,8 @@ code_change(_OldVsn, State, _Extra) ->
%%%-----------------------------------------------------------------
%%% Internal functions
+-spec call(Olp, term()) -> term() | {error,busy} when
+ Olp :: atom() | pid() | olp_ref().
call({_Name, Pid, _ModeRef},Msg) ->
call(Pid, Msg);
call(Server, Msg) ->
@@ -268,6 +329,7 @@ call(Server, Msg) ->
_:{timeout,_} -> {error,busy}
end.
+-spec cast(olp_ref(),term()) -> ok.
cast({_Name, Pid, _ModeRef},Msg) ->
gen_server:cast(Pid, Msg).
@@ -276,26 +338,27 @@ cast({_Name, Pid, _ModeRef},Msg) ->
%% before LogWindowSize events have been handled
do_load(Msg, CallOrCast, State) ->
T1 = ?timestamp(),
+ State1 = ?update_time(T1,State),
%% check if the process is getting overloaded, or if it's
%% recovering from overload (the check must be done for each
%% event to react quickly to large bursts of events and
%% to ensure that the handler can never end up in drop mode
%% with an empty mailbox, which would stop operation)
- {Mode1,QLen,Mem,State1} = check_load(State),
+ {Mode1,QLen,Mem,State2} = check_load(State1),
%% kill the handler if it can't keep up with the load
- kill_if_choked(QLen, Mem, State1),
+ kill_if_choked(QLen, Mem, State2),
if Mode1 == flush ->
- flush(T1, State1);
+ flush(T1, State2);
true ->
- handle_load(Mode1, T1, Msg, CallOrCast, State1)
+ handle_load(Mode1, T1, Msg, CallOrCast, State2)
end.
%% this function is called by do_load/3 after an overload check
%% has been performed, where QLen > FlushQLen
-flush(T1, State=#{id := _Name, last_load_ts := T0, mode_ref := ModeRef}) ->
+flush(T1, State=#{id := _Name, mode := Mode, last_load_ts := T0, mode_ref := ModeRef}) ->
%% flush load messages in the mailbox (a limited number in order
%% to not cause long delays)
NewFlushed = flush_load(?FLUSH_MAX_N),
@@ -306,17 +369,18 @@ flush(T1, State=#{id := _Name, last_load_ts := T0, mode_ref := ModeRef}) ->
%% because of the receive loop when flushing messages, the
%% handler will be scheduled out often and the mailbox could
%% grow very large, so we'd better check the queue again here
- {_,_QLen1} = process_info(self(), message_queue_len),
- ?observe(_Name,{max_qlen,_QLen1}),
+ {_,QLen1} = process_info(self(), message_queue_len),
+ ?observe(_Name,{max_qlen,QLen1}),
%% Add 1 for the current log event
?observe(_Name,{flushed,NewFlushed+1}),
State2 = ?update_max_time(?diff_time(T1,T0),State1),
- State3 = ?update_max_qlen(_QLen1,State2),
+ State3 = ?update_max_qlen(QLen1,State2),
+ State4 = maybe_notify_mode_change(async,QLen1,State3),
{dropped,?update_other(flushed,FLUSHED,NewFlushed,
- State3#{mode => ?set_mode(ModeRef,async),
- last_qlen => 0,
+ State4#{mode => ?change_mode(ModeRef,Mode,async),
+ last_qlen => QLen1,
last_load_ts => T1})}.
%% this function is called to actually handle the message
@@ -334,7 +398,7 @@ handle_load(Mode, T1, Msg, _CallOrCast,
{Result,LastQLen1,CBState1} =
if DoWrite ->
?observe(_Name,{_CallOrCast,1}),
- {ok,CBS} = try_callback_call(Module,handle_load,[Msg,CBState]),
+ CBS = try_callback_call(Module,handle_load,[Msg,CBState]),
{ok,element(2, process_info(self(), message_queue_len)),CBS};
true ->
?observe(_Name,{flushed,1}),
@@ -353,9 +417,10 @@ handle_load(Mode, T1, Msg, _CallOrCast,
State3 =
if (LastQLen1 < ?FILESYNC_OK_QLEN) andalso
(Time > ?IDLE_DETECT_TIME_USEC) ->
- S = notify(idle,State2),
- S#{mode => ?change_mode(ModeRef, Mode, async),
- burst_msg_count => 0};
+ S1 = notify(idle,State2),
+ S2 = maybe_notify_mode_change(async,LastQLen1,S1),
+ S2#{mode => ?change_mode(ModeRef, Mode, async),
+ burst_msg_count => 0};
true ->
State2#{mode => Mode}
end,
@@ -365,7 +430,14 @@ handle_load(Mode, T1, Msg, _CallOrCast,
?update_max_time(Time,
State5#{last_qlen := LastQLen1,
last_load_ts => T1}),
- {Result,State6}.
+ State7 = case Result of
+ ok ->
+ S = ?update_freq(T1,State6),
+ ?update_other(writes,WRITES,1,S);
+ _ ->
+ State6
+ end,
+ {Result,State7}.
%%%-----------------------------------------------------------------
@@ -452,7 +524,7 @@ check_load(State = #{id:=_Name, mode_ref := ModeRef, mode := Mode,
QLen >= DropModeQLen ->
%% Note that drop mode will force load messages to
%% be dropped on the client side (never sent to
- %% the handler).
+ %% the olp process).
IncDrops = if Mode == drop -> 0; true -> 1 end,
{?change_mode(ModeRef, Mode, drop), IncDrops,0};
QLen >= SyncModeQLen ->
@@ -461,10 +533,11 @@ check_load(State = #{id:=_Name, mode_ref := ModeRef, mode := Mode,
{?change_mode(ModeRef, Mode, async), 0,0}
end,
State1 = ?update_other(drops,DROPS,_NewDrops,State),
- State2 = maybe_notify_mode_change(Mode1,State1),
+ State2 = ?update_max_qlen(QLen,State1),
+ State3 = maybe_notify_mode_change(Mode1,QLen,State2),
{Mode1, QLen, Mem,
?update_other(flushes,FLUSHES,_NewFlushes,
- State2#{last_qlen => QLen})}.
+ State3#{last_qlen => QLen})}.
limit_burst(#{burst_limit_enable := false}=State) ->
{true,State};
@@ -517,7 +590,11 @@ flush_load(N, Limit) ->
flush_load(N+1, Limit);
{'$gen_call',{Pid,MRef},{'$olp_load',_}} ->
Pid ! {MRef, dropped},
- flush_load(N+1, Limit)
+ flush_load(N+1, Limit);
+ {log,_,_,_,_} ->
+ flush_load(N+1, Limit);
+ {log,_,_,_} ->
+ flush_load(N+1, Limit)
after
0 -> N
end.
@@ -528,13 +605,13 @@ overload_levels_ok(Options) ->
FQL = maps:get(flush_qlen, Options, ?FLUSH_QLEN),
(DMQL > 1) andalso (SMQL =< DMQL) andalso (DMQL =< FQL).
-maybe_notify_mode_change(drop,#{mode:=Mode0}=State)
+maybe_notify_mode_change(drop,_QLen,#{mode:=Mode0}=State)
when Mode0=/=drop ->
notify({mode_change,Mode0,drop},State);
-maybe_notify_mode_change(Mode1,#{mode:=drop}=State)
+maybe_notify_mode_change(Mode1,_QLen,#{mode:=drop}=State)
when Mode1==async; Mode1==sync ->
notify({mode_change,drop,Mode1},State);
-maybe_notify_mode_change(_,State) ->
+maybe_notify_mode_change(_,_,State) ->
State.
notify(Note,#{module:=Module,cb_state:=CBState}=State) ->