%%
%% %CopyrightBegin%
%%
%% Copyright Ericsson AB 2017-2018. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%
%% %CopyrightEnd%
%%
-module(logger_h_common).
-behaviour(gen_server).
-include("logger_h_common.hrl").
-include("logger_internal.hrl").
%% API
-export([start_link/1, info/2, filesync/2, reset/2]).
%% gen_server and proc_lib callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
%% logger callbacks
-export([log/2, adding_handler/1, removing_handler/1, changing_config/3,
filter_config/1]).
%% Library functions for handlers
-export([error_notify/1]).
%%%-----------------------------------------------------------------
-define(CONFIG_KEYS,[sync_mode_qlen,
drop_mode_qlen,
flush_qlen,
burst_limit_enable,
burst_limit_max_count,
burst_limit_window_time,
overload_kill_enable,
overload_kill_qlen,
overload_kill_mem_size,
overload_kill_restart_after,
filesync_repeat_interval]).
-define(READ_ONLY_KEYS,[handler_pid,mode_tab]).
%%%-----------------------------------------------------------------
%%% API
%% This function is called by the logger_sup supervisor
start_link(Args) ->
proc_lib:start_link(?MODULE,init,[Args]).
filesync(Module, Name) ->
call(Module, Name, filesync).
info(Module, Name) ->
call(Module, Name, info).
reset(Module, Name) ->
call(Module, Name, reset).
%%%-----------------------------------------------------------------
%%% Handler being added
adding_handler(#{id:=Name,module:=Module}=Config) ->
HConfig0 = maps:get(config, Config, #{}),
HandlerConfig0 = maps:without(?CONFIG_KEYS,HConfig0),
case Module:check_config(Name,set,undefined,HandlerConfig0) of
{ok,HandlerConfig} ->
ModifiedCommon = maps:with(?CONFIG_KEYS,HandlerConfig),
CommonConfig0 = maps:with(?CONFIG_KEYS,HConfig0),
CommonConfig = maps:merge(
maps:merge(get_default_config(), CommonConfig0),
ModifiedCommon),
case check_config(CommonConfig) of
ok ->
HConfig = maps:merge(CommonConfig,HandlerConfig),
start(Config#{config => HConfig});
{error,Faulty} ->
{error,{invalid_config,Module,Faulty}}
end;
Error ->
Error
end.
%%%-----------------------------------------------------------------
%%% Handler being removed
removing_handler(#{id:=Name, module:=Module}) ->
case whereis(?name_to_reg_name(Module,Name)) of
undefined ->
ok;
Pid ->
%% We don't want to do supervisor:terminate_child here
%% since we need to distinguish this explicit stop from a
%% system termination in order to avoid circular attempts
%% at removing the handler (implying deadlocks and
%% timeouts).
%% And we don't need to do supervisor:delete_child, since
%% the restart type is temporary, which means that the
%% child specification is automatically removed from the
%% supervisor when the process dies.
_ = gen_server:call(Pid, stop),
ok
end.
%%%-----------------------------------------------------------------
%%% Updating handler config
changing_config(SetOrUpdate,
#{id:=Name,config:=OldHConfig,module:=Module},
NewConfig0) ->
NewHConfig0 = maps:get(config, NewConfig0, #{}),
OldHandlerConfig = maps:without(?CONFIG_KEYS++?READ_ONLY_KEYS,OldHConfig),
NewHandlerConfig0 = maps:without(?CONFIG_KEYS++?READ_ONLY_KEYS,NewHConfig0),
case Module:check_config(Name, SetOrUpdate,
OldHandlerConfig,NewHandlerConfig0) of
{ok, NewHandlerConfig} ->
ModifiedCommon = maps:with(?CONFIG_KEYS,NewHandlerConfig),
NewCommonConfig0 = maps:with(?CONFIG_KEYS,NewHConfig0),
CommonDefault =
case SetOrUpdate of
set ->
get_default_config();
update ->
maps:with(?CONFIG_KEYS,OldHConfig)
end,
NewCommonConfig = maps:merge(
maps:merge(CommonDefault,NewCommonConfig0),
ModifiedCommon),
case check_config(NewCommonConfig) of
ok ->
ReadOnly = maps:with(?READ_ONLY_KEYS,OldHConfig),
NewHConfig = maps:merge(
maps:merge(NewCommonConfig,NewHandlerConfig),
ReadOnly),
NewConfig = NewConfig0#{config=>NewHConfig},
HPid = maps:get(handler_pid,OldHConfig),
case call(HPid, {change_config,NewConfig}) of
ok -> {ok,NewConfig};
Error -> Error
end;
{error,Faulty} ->
{error,{invalid_config,Module,Faulty}}
end;
Error ->
Error
end.
%%%-----------------------------------------------------------------
%%% Log a string or report
-spec log(LogEvent, Config) -> ok when
LogEvent :: logger:log_event(),
Config :: logger:handler_config().
log(LogEvent, Config = #{id := Name,
config := #{handler_pid := HPid,
mode_tab := ModeTab}}) ->
%% if the handler has crashed, we must drop this event
%% and hope the handler restarts so we can try again
true = is_process_alive(HPid),
Bin = log_to_binary(LogEvent, Config),
call_cast_or_drop(Name, HPid, ModeTab, Bin).
%%%-----------------------------------------------------------------
%%% Remove internal fields from configuration
filter_config(#{config:=HConfig}=Config) ->
Config#{config=>maps:without(?READ_ONLY_KEYS,HConfig)}.
%%%-----------------------------------------------------------------
%%% Start the handler process
%%%
%%% The process must always exist if the handler is registered with
%%% logger (and must not exist if the handler is not registered).
%%%
%%% The handler process is linked to logger_sup, which is part of the
%%% kernel application's supervision tree.
start(#{id := Name} = Config0) ->
ChildSpec =
#{id => Name,
start => {?MODULE, start_link, [Config0]},
restart => temporary,
shutdown => 2000,
type => worker,
modules => [?MODULE]},
case supervisor:start_child(logger_sup, ChildSpec) of
{ok,Pid,Config} ->
ok = logger_handler_watcher:register_handler(Name,Pid),
{ok,Config};
Error ->
Error
end.
%%%===================================================================
%%% gen_server callbacks
%%%===================================================================
init(#{id := Name, module := Module,
formatter := Formatter, config := HConfig0} = Config0) ->
RegName = ?name_to_reg_name(Module,Name),
register(RegName, self()),
process_flag(trap_exit, true),
process_flag(message_queue_data, off_heap),
?init_test_hooks(),
?start_observation(Name),
case Module:init(Name, HConfig0) of
{ok,HState} ->
try ets:new(Name, [public]) of
ModeTab ->
?set_mode(ModeTab, async),
T0 = ?timestamp(),
HConfig = HConfig0#{handler_pid => self(),
mode_tab => ModeTab},
Config = Config0#{config => HConfig},
proc_lib:init_ack({ok,self(),Config}),
%% Storing common config in state to avoid copying
%% (sending) the config data for each log message
CommonConfig = maps:with(?CONFIG_KEYS,HConfig),
State =
?merge_with_stats(
CommonConfig#{id => Name,
module => Module,
mode_tab => ModeTab,
mode => async,
ctrl_sync_count =>
?CONTROLLER_SYNC_INTERVAL,
last_qlen => 0,
last_log_ts => T0,
last_op => sync,
burst_win_ts => T0,
burst_msg_count => 0,
formatter => Formatter,
handler_state => HState}),
State1 = set_repeated_filesync(State),
unset_restart_flag(State1),
gen_server:enter_loop(?MODULE, [], State1)
catch
_:Error ->
unregister(RegName),
error_notify({init_handler,Name,Error}),
proc_lib:init_ack(Error)
end;
Error ->
unregister(RegName),
error_notify({init_handler,Name,Error}),
proc_lib:init_ack(Error)
end.
%% This is the synchronous log event.
handle_call({log, Bin}, _From, State) ->
{Result,State1} = do_log(Bin, call, State),
%% Result == ok | dropped
{reply,Result, State1};
handle_call(filesync, _From, State = #{id := Name,
module := Module,
handler_state := HandlerState}) ->
{Result,HandlerState1} = Module:filesync(Name,sync,HandlerState),
{reply, Result, State#{handler_state=>HandlerState1, last_op=>sync}};
handle_call({change_config, #{formatter:=Formatter, config:=NewHConfig}}, _From,
State = #{filesync_repeat_interval := FSyncInt0}) ->
%% In the future, if handler_state must be updated due to config
%% change, then we need to add a callback to Module here.
CommonConfig = maps:with(?CONFIG_KEYS,NewHConfig),
State1 = maps:merge(State, CommonConfig),
State2 =
case maps:get(filesync_repeat_interval, NewHConfig) of
FSyncInt0 ->
State1;
_FSyncInt1 ->
set_repeated_filesync(cancel_repeated_filesync(State1))
end,
{reply, ok, State2#{formatter:=Formatter}};
handle_call(info, _From, State) ->
{reply, State, State};
handle_call(reset, _From,
#{id:=Name,module:=Module,handler_state:=HandlerState}=State) ->
State1 = ?merge_with_stats(State),
{reply, ok, State1#{last_qlen => 0,
last_log_ts => ?timestamp(),
handler_state => Module:reset_state(Name,HandlerState)}};
handle_call(stop, _From, State) ->
{stop, {shutdown,stopped}, ok, State}.
%% This is the asynchronous log event.
handle_cast({log, Bin}, State) ->
{_,State1} = do_log(Bin, cast, State),
{noreply, State1};
%% If FILESYNC_REPEAT_INTERVAL is set to a millisec value, this
%% clause gets called repeatedly by the handler. In order to
%% guarantee that a filesync *always* happens after the last log
%% event, the repeat operation must be active!
handle_cast(repeated_filesync,State = #{filesync_repeat_interval := no_repeat}) ->
%% This clause handles a race condition which may occur when
%% config changes filesync_repeat_interval from an integer value
%% to no_repeat.
{noreply,State};
handle_cast(repeated_filesync,
State = #{id := Name,
module := Module,
handler_state := HandlerState,
last_op := LastOp}) ->
State1 =
if LastOp == sync ->
State;
true ->
{_,HS} = Module:filesync(Name, async, HandlerState),
State#{handler_state => HS, last_op => sync}
end,
{noreply,set_repeated_filesync(State1)}.
handle_info(Info, #{id := Name, module := Module,
handler_state := HandlerState} = State) ->
{noreply,State#{handler_state => Module:handle_info(Name,Info,HandlerState)}}.
terminate(Reason, State = #{id := Name,
module := Module,
handler_state := HandlerState}) ->
_ = cancel_repeated_filesync(State),
_ = Module:terminate(Name, Reason, HandlerState),
ok = stop_or_restart(Name, Reason, State),
unregister(?name_to_reg_name(Module, Name)),
ok.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%%-----------------------------------------------------------------
%%% Internal functions
call(Module, Name, Op) when is_atom(Name) ->
call(?name_to_reg_name(Module,Name), Op);
call(_, Name, Op) ->
{error,{badarg,{Op,[Name]}}}.
call(Server, Msg) ->
try
gen_server:call(Server, Msg, ?DEFAULT_CALL_TIMEOUT)
catch
_:{timeout,_} -> {error,handler_busy}
end.
%% check for overload between every event (and set Mode to async,
%% sync or drop accordingly), but never flush the whole mailbox
%% before LogWindowSize events have been handled
do_log(Bin, CallOrCast, State = #{id:=Name, mode:=Mode0}) ->
T1 = ?timestamp(),
%% check if the handler is getting overloaded, or if it's
%% recovering from overload (the check must be done for each
%% event to react quickly to large bursts of events and
%% to ensure that the handler can never end up in drop mode
%% with an empty mailbox, which would stop operation)
{Mode1,QLen,Mem,State1} = check_load(State),
if (Mode1 == drop) andalso (Mode0 =/= drop) ->
log_handler_info(Name, "Handler ~p switched to drop mode",
[Name], State);
(Mode0 == drop) andalso ((Mode1 == async) orelse (Mode1 == sync)) ->
log_handler_info(Name, "Handler ~p switched to ~w mode",
[Name,Mode1], State);
true ->
ok
end,
%% kill the handler if it can't keep up with the load
kill_if_choked(Name, QLen, Mem, State),
if Mode1 == flush ->
flush(Name, QLen, T1, State1);
true ->
write(Name, Mode1, T1, Bin, CallOrCast, State1)
end.
%% this clause is called by do_log/3 after an overload check
%% has been performed, where QLen > FlushQLen
flush(Name, _QLen0, T1, State=#{last_log_ts := _T0, mode_tab := ModeTab}) ->
%% flush messages in the mailbox (a limited number in
%% order to not cause long delays)
NewFlushed = flush_log_events(?FLUSH_MAX_N),
%% write info in log about flushed messages
log_handler_info(Name, "Handler ~p flushed ~w log events",
[Name,NewFlushed], State),
%% because of the receive loop when flushing messages, the
%% handler will be scheduled out often and the mailbox could
%% grow very large, so we'd better check the queue again here
{_,_QLen1} = process_info(self(), message_queue_len),
?observe(Name,{max_qlen,_QLen1}),
%% Add 1 for the current log event
?observe(Name,{flushed,NewFlushed+1}),
State1 = ?update_max_time(?diff_time(T1,_T0),State),
State2 = ?update_max_qlen(_QLen1,State1),
{dropped,?update_other(flushed,FLUSHED,NewFlushed,
State2#{mode => ?set_mode(ModeTab,async),
last_qlen => 0,
last_log_ts => T1})}.
%% this clause is called to write to file
write(Name, Mode, T1, Bin, _CallOrCast,
State = #{module := Module,
handler_state := HandlerState,
mode_tab := ModeTab,
ctrl_sync_count := CtrlSync,
last_qlen := LastQLen,
last_log_ts := T0}) ->
%% check if we need to limit the number of writes
%% during a burst of log events
{DoWrite,State1} = limit_burst(State),
%% only log synhrounously every ?CONTROLLER_SYNC_INTERVAL time, to
%% give the handler time between writes so it can keep up with
%% incoming messages
{Result,LastQLen1,HandlerState1} =
if DoWrite, CtrlSync == 0 ->
?observe(Name,{_CallOrCast,1}),
{_,HS1} = Module:write(Name, sync, Bin, HandlerState),
{ok,element(2, process_info(self(), message_queue_len)),HS1};
DoWrite ->
?observe(Name,{_CallOrCast,1}),
{_,HS1} = Module:write(Name, async, Bin, HandlerState),
{ok,LastQLen,HS1};
not DoWrite ->
?observe(Name,{flushed,1}),
{dropped,LastQLen,HandlerState}
end,
%% Check if the time since the previous log event is long enough -
%% and the queue length small enough - to assume the mailbox has
%% been emptied, and if so, do filesync operation and reset mode to
%% async. Note that this is the best we can do to detect an idle
%% handler without setting a timer after each log call/cast. If the
%% time between two consecutive log events is fast and no new
%% event comes in after the last one, idle state won't be detected!
Time = ?diff_time(T1,T0),
State2 =
if (LastQLen1 < ?FILESYNC_OK_QLEN) andalso
(Time > ?IDLE_DETECT_TIME_USEC) ->
{_,HS2} = Module:filesync(Name,async,HandlerState),
State1#{mode => ?change_mode(ModeTab, Mode, async),
burst_msg_count => 0,
handler_state => HS2};
true ->
State1#{mode => Mode, handler_state => HandlerState1}
end,
State3 = ?update_calls_or_casts(_CallOrCast,1,State2),
State4 = ?update_max_qlen(LastQLen1,State3),
State5 =
?update_max_time(Time,
State4#{last_qlen := LastQLen1,
last_log_ts => T1,
last_op => write,
ctrl_sync_count =>
if CtrlSync==0 -> ?CONTROLLER_SYNC_INTERVAL;
true -> CtrlSync-1
end}),
{Result,State5}.
log_handler_info(Name, Format, Args, #{module:=Module,
formatter:=Formatter,
handler_state:=HandlerState}) ->
Config = #{formatter=>Formatter},
Meta = #{time=>erlang:system_time(microsecond)},
Bin = log_to_binary(#{level => notice,
msg => {Format,Args},
meta => Meta}, Config),
_ = Module:write(Name, async, Bin, HandlerState),
ok.
%%%-----------------------------------------------------------------
%%% Convert log data on any form to binary
-spec log_to_binary(LogEvent,Config) -> LogString when
LogEvent :: logger:log_event(),
Config :: logger:handler_config(),
LogString :: binary().
log_to_binary(#{msg:={report,_},meta:=#{report_cb:=_}}=Log,Config) ->
do_log_to_binary(Log,Config);
log_to_binary(#{msg:={report,_},meta:=Meta}=Log,Config) ->
DefaultReportCb = fun logger:format_otp_report/1,
do_log_to_binary(Log#{meta=>Meta#{report_cb=>DefaultReportCb}},Config);
log_to_binary(Log,Config) ->
do_log_to_binary(Log,Config).
do_log_to_binary(Log,Config) ->
{Formatter,FormatterConfig} =
maps:get(formatter,Config,{?DEFAULT_FORMATTER,?DEFAULT_FORMAT_CONFIG}),
String = try_format(Log,Formatter,FormatterConfig),
try string_to_binary(String)
catch C2:R2:S2 ->
?LOG_INTERNAL(debug,[{formatter_error,Formatter},
{config,FormatterConfig},
{log_event,Log},
{bad_return_value,String},
{catched,{C2,R2,S2}}]),
<<"FORMATTER ERROR: bad return value">>
end.
try_format(Log,Formatter,FormatterConfig) ->
try Formatter:format(Log,FormatterConfig)
catch
C:R:S ->
?LOG_INTERNAL(debug,[{formatter_crashed,Formatter},
{config,FormatterConfig},
{log_event,Log},
{reason,
{C,R,logger:filter_stacktrace(?MODULE,S)}}]),
case {?DEFAULT_FORMATTER,#{}} of
{Formatter,FormatterConfig} ->
"DEFAULT FORMATTER CRASHED";
{DefaultFormatter,DefaultConfig} ->
try_format(Log#{msg=>{"FORMATTER CRASH: ~tp",
[maps:get(msg,Log)]}},
DefaultFormatter,DefaultConfig)
end
end.
string_to_binary(String) ->
case unicode:characters_to_binary(String) of
Binary when is_binary(Binary) ->
Binary;
Error ->
throw(Error)
end.
%%%-----------------------------------------------------------------
%%% Check that the configuration term is valid
check_config(Config) when is_map(Config) ->
case check_common_config(maps:to_list(Config)) of
ok ->
case overload_levels_ok(Config) of
true ->
ok;
false ->
Faulty = maps:with([sync_mode_qlen,
drop_mode_qlen,
flush_qlen],Config),
{error,{invalid_levels,Faulty}}
end;
Error ->
Error
end.
check_common_config([{sync_mode_qlen,N}|Config]) when is_integer(N) ->
check_common_config(Config);
check_common_config([{drop_mode_qlen,N}|Config]) when is_integer(N) ->
check_common_config(Config);
check_common_config([{flush_qlen,N}|Config]) when is_integer(N) ->
check_common_config(Config);
check_common_config([{burst_limit_enable,Bool}|Config]) when is_boolean(Bool) ->
check_common_config(Config);
check_common_config([{burst_limit_max_count,N}|Config]) when is_integer(N) ->
check_common_config(Config);
check_common_config([{burst_limit_window_time,N}|Config]) when is_integer(N) ->
check_common_config(Config);
check_common_config([{overload_kill_enable,Bool}|Config]) when is_boolean(Bool) ->
check_common_config(Config);
check_common_config([{overload_kill_qlen,N}|Config]) when is_integer(N) ->
check_common_config(Config);
check_common_config([{overload_kill_mem_size,N}|Config]) when is_integer(N) ->
check_common_config(Config);
check_common_config([{overload_kill_restart_after,NorA}|Config])
when is_integer(NorA); NorA == infinity ->
check_common_config(Config);
check_common_config([{filesync_repeat_interval,NorA}|Config])
when is_integer(NorA); NorA == no_repeat ->
check_common_config(Config);
check_common_config([{Key,Value}|_]) ->
{error,#{Key=>Value}};
check_common_config([]) ->
ok.
get_default_config() ->
#{sync_mode_qlen => ?SYNC_MODE_QLEN,
drop_mode_qlen => ?DROP_MODE_QLEN,
flush_qlen => ?FLUSH_QLEN,
burst_limit_enable => ?BURST_LIMIT_ENABLE,
burst_limit_max_count => ?BURST_LIMIT_MAX_COUNT,
burst_limit_window_time => ?BURST_LIMIT_WINDOW_TIME,
overload_kill_enable => ?OVERLOAD_KILL_ENABLE,
overload_kill_qlen => ?OVERLOAD_KILL_QLEN,
overload_kill_mem_size => ?OVERLOAD_KILL_MEM_SIZE,
overload_kill_restart_after => ?OVERLOAD_KILL_RESTART_AFTER,
filesync_repeat_interval => ?FILESYNC_REPEAT_INTERVAL}.
%%%-----------------------------------------------------------------
%%% Overload Protection
call_cast_or_drop(_Name, HandlerPid, ModeTab, Bin) ->
%% If the handler process is getting overloaded, the log event
%% will be synchronous instead of asynchronous (slows down the
%% logging tempo of a process doing lots of logging. If the
%% handler is choked, drop mode is set and no event will be sent.
try ?get_mode(ModeTab) of
async ->
gen_server:cast(HandlerPid, {log,Bin});
sync ->
case call(HandlerPid, {log,Bin}) of
ok ->
ok;
_Other ->
%% dropped or {error,handler_busy}
?observe(_Name,{dropped,1}),
ok
end;
drop ->
?observe(_Name,{dropped,1})
catch
%% if the ETS table doesn't exist (maybe because of a
%% handler restart), we can only drop the event
_:_ -> ?observe(_Name,{dropped,1})
end,
ok.
set_restart_flag(#{id := Name, module := Module} = State) ->
log_handler_info(Name, "Handler ~p overloaded and stopping", [Name], State),
Flag = list_to_atom(lists:concat([Module,"_",Name,"_restarting"])),
spawn(fun() ->
register(Flag, self()),
timer:sleep(infinity)
end),
ok.
unset_restart_flag(#{id := Name, module := Module} = State) ->
Flag = list_to_atom(lists:concat([Module,"_",Name,"_restarting"])),
case whereis(Flag) of
undefined ->
ok;
Pid ->
exit(Pid, kill),
log_handler_info(Name, "Handler ~p restarted", [Name], State)
end.
check_load(State = #{id:=_Name, mode_tab := ModeTab, mode := Mode,
sync_mode_qlen := SyncModeQLen,
drop_mode_qlen := DropModeQLen,
flush_qlen := FlushQLen}) ->
{_,Mem} = process_info(self(), memory),
?observe(_Name,{max_mem,Mem}),
{_,QLen} = process_info(self(), message_queue_len),
?observe(_Name,{max_qlen,QLen}),
%% When the handler process gets scheduled in, it's impossible
%% to predict the QLen. We could jump "up" arbitrarily from say
%% async to sync, async to drop, sync to flush, etc. However, when
%% the handler process manages the log events (without flushing),
%% one after the other, we will move "down" from drop to sync and
%% from sync to async. This way we don't risk getting stuck in
%% drop or sync mode with an empty mailbox.
{Mode1,_NewDrops,_NewFlushes} =
if
QLen >= FlushQLen ->
{flush, 0,1};
QLen >= DropModeQLen ->
%% Note that drop mode will force log events to
%% be dropped on the client side (never sent get to
%% the handler).
IncDrops = if Mode == drop -> 0; true -> 1 end,
{?change_mode(ModeTab, Mode, drop), IncDrops,0};
QLen >= SyncModeQLen ->
{?change_mode(ModeTab, Mode, sync), 0,0};
true ->
{?change_mode(ModeTab, Mode, async), 0,0}
end,
State1 = ?update_other(drops,DROPS,_NewDrops,State),
{Mode1, QLen, Mem,
?update_other(flushes,FLUSHES,_NewFlushes,
State1#{last_qlen => QLen})}.
limit_burst(#{burst_limit_enable := false}=State) ->
{true,State};
limit_burst(#{burst_win_ts := BurstWinT0,
burst_msg_count := BurstMsgCount,
burst_limit_window_time := BurstLimitWinTime,
burst_limit_max_count := BurstLimitMaxCnt} = State) ->
if (BurstMsgCount >= BurstLimitMaxCnt) ->
%% the limit for allowed messages has been reached
BurstWinT1 = ?timestamp(),
case ?diff_time(BurstWinT1,BurstWinT0) of
BurstCheckTime when BurstCheckTime < (BurstLimitWinTime*1000) ->
%% we're still within the burst time frame
{false,?update_other(burst_drops,BURSTS,1,State)};
_BurstCheckTime ->
%% burst time frame passed, reset counters
{true,State#{burst_win_ts => BurstWinT1,
burst_msg_count => 0}}
end;
true ->
%% the limit for allowed messages not yet reached
{true,State#{burst_win_ts => BurstWinT0,
burst_msg_count => BurstMsgCount+1}}
end.
kill_if_choked(Name, QLen, Mem, State = #{overload_kill_enable := KillIfOL,
overload_kill_qlen := OLKillQLen,
overload_kill_mem_size := OLKillMem}) ->
if KillIfOL andalso
((QLen > OLKillQLen) orelse (Mem > OLKillMem)) ->
set_restart_flag(State),
exit({shutdown,{overloaded,Name,QLen,Mem}});
true ->
ok
end.
flush_log_events(Limit) ->
process_flag(priority, high),
Flushed = flush_log_events(0, Limit),
process_flag(priority, normal),
Flushed.
flush_log_events(Limit, Limit) ->
Limit;
flush_log_events(N, Limit) ->
%% flush log events but leave other events, such as
%% filesync, info and change_config, so that these
%% have a chance to be processed even under heavy load
receive
{'$gen_cast',{log,_}} ->
flush_log_events(N+1, Limit);
{'$gen_call',{Pid,MRef},{log,_}} ->
Pid ! {MRef, dropped},
flush_log_events(N+1, Limit)
after
0 -> N
end.
set_repeated_filesync(#{filesync_repeat_interval:=FSyncInt} = State)
when is_integer(FSyncInt) ->
{ok,TRef} = timer:apply_after(FSyncInt, gen_server, cast,
[self(),repeated_filesync]),
State#{rep_sync_tref=>TRef};
set_repeated_filesync(State) ->
State.
cancel_repeated_filesync(State) ->
case maps:take(rep_sync_tref,State) of
{TRef,State1} ->
_ = timer:cancel(TRef),
State1;
error ->
State
end.
stop_or_restart(Name, {shutdown,Reason={overloaded,_Name,_QLen,_Mem}},
#{overload_kill_restart_after := RestartAfter}) ->
%% If we're terminating because of an overload situation (see
%% kill_if_choked/4), we need to remove the handler and set a
%% restart timer. A separate process must perform this in order to
%% avoid deadlock.
HandlerPid = self(),
ConfigResult = logger:get_handler_config(Name),
RemoveAndRestart =
fun() ->
MRef = erlang:monitor(process, HandlerPid),
receive
{'DOWN',MRef,_,_,_} ->
ok
after 30000 ->
error_notify(Reason),
exit(HandlerPid, kill)
end,
case ConfigResult of
{ok,#{module:=HMod}=HConfig0} when is_integer(RestartAfter) ->
_ = logger:remove_handler(Name),
HConfig = try HMod:filter_config(HConfig0)
catch _:_ -> HConfig0
end,
_ = timer:apply_after(RestartAfter, logger, add_handler,
[Name,HMod,HConfig]);
{ok,_} ->
_ = logger:remove_handler(Name);
{error,CfgReason} when is_integer(RestartAfter) ->
error_notify({Name,restart_impossible,CfgReason});
{error,_} ->
ok
end
end,
spawn(RemoveAndRestart),
ok;
stop_or_restart(_Name, _Reason, _State) ->
ok.
overload_levels_ok(HandlerConfig) ->
SMQL = maps:get(sync_mode_qlen, HandlerConfig, ?SYNC_MODE_QLEN),
DMQL = maps:get(drop_mode_qlen, HandlerConfig, ?DROP_MODE_QLEN),
FQL = maps:get(flush_qlen, HandlerConfig, ?FLUSH_QLEN),
(DMQL > 1) andalso (SMQL =< DMQL) andalso (DMQL =< FQL).
error_notify(Term) ->
?internal_log(error, Term).