diff options
Diffstat (limited to 'lib/kernel/src/logger_h_common.erl')
-rw-r--r-- | lib/kernel/src/logger_h_common.erl | 802 |
1 files changed, 802 insertions, 0 deletions
diff --git a/lib/kernel/src/logger_h_common.erl b/lib/kernel/src/logger_h_common.erl new file mode 100644 index 0000000000..74a2d158fc --- /dev/null +++ b/lib/kernel/src/logger_h_common.erl @@ -0,0 +1,802 @@ +%% +%% %CopyrightBegin% +%% +%% Copyright Ericsson AB 2017-2018. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%% +%% %CopyrightEnd% +%% +-module(logger_h_common). +-behaviour(gen_server). + +-include("logger_h_common.hrl"). +-include("logger_internal.hrl"). + +%% API +-export([start_link/1, info/2, filesync/2, reset/2]). + +%% gen_server and proc_lib callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +%% logger callbacks +-export([log/2, adding_handler/1, removing_handler/1, changing_config/3, + filter_config/1]). + +%% Library functions for handlers +-export([error_notify/1]). + +%%%----------------------------------------------------------------- +-define(CONFIG_KEYS,[sync_mode_qlen, + drop_mode_qlen, + flush_qlen, + burst_limit_enable, + burst_limit_max_count, + burst_limit_window_time, + overload_kill_enable, + overload_kill_qlen, + overload_kill_mem_size, + overload_kill_restart_after, + filesync_repeat_interval]). +-define(READ_ONLY_KEYS,[handler_pid,mode_tab]). + +%%%----------------------------------------------------------------- +%%% API + +%% This function is called by the logger_sup supervisor +start_link(Args) -> + proc_lib:start_link(?MODULE,init,[Args]). + +filesync(Module, Name) -> + call(Module, Name, filesync). + +info(Module, Name) -> + call(Module, Name, info). + +reset(Module, Name) -> + call(Module, Name, reset). + +%%%----------------------------------------------------------------- +%%% Handler being added +adding_handler(#{id:=Name,module:=Module}=Config) -> + HConfig0 = maps:get(config, Config, #{}), + HandlerConfig0 = maps:without(?CONFIG_KEYS,HConfig0), + case Module:check_config(Name,set,undefined,HandlerConfig0) of + {ok,HandlerConfig} -> + ModifiedCommon = maps:with(?CONFIG_KEYS,HandlerConfig), + CommonConfig0 = maps:with(?CONFIG_KEYS,HConfig0), + CommonConfig = maps:merge( + maps:merge(get_default_config(), CommonConfig0), + ModifiedCommon), + case check_config(CommonConfig) of + ok -> + HConfig = maps:merge(CommonConfig,HandlerConfig), + start(Config#{config => HConfig}); + {error,Faulty} -> + {error,{invalid_config,Module,Faulty}} + end; + Error -> + Error + end. + +%%%----------------------------------------------------------------- +%%% Handler being removed +removing_handler(#{id:=Name, module:=Module}) -> + case whereis(?name_to_reg_name(Module,Name)) of + undefined -> + ok; + Pid -> + %% We don't want to do supervisor:terminate_child here + %% since we need to distinguish this explicit stop from a + %% system termination in order to avoid circular attempts + %% at removing the handler (implying deadlocks and + %% timeouts). + %% And we don't need to do supervisor:delete_child, since + %% the restart type is temporary, which means that the + %% child specification is automatically removed from the + %% supervisor when the process dies. + _ = gen_server:call(Pid, stop), + ok + end. + +%%%----------------------------------------------------------------- +%%% Updating handler config +changing_config(SetOrUpdate, + #{id:=Name,config:=OldHConfig,module:=Module}, + NewConfig0) -> + NewHConfig0 = maps:get(config, NewConfig0, #{}), + OldHandlerConfig = maps:without(?CONFIG_KEYS++?READ_ONLY_KEYS,OldHConfig), + NewHandlerConfig0 = maps:without(?CONFIG_KEYS++?READ_ONLY_KEYS,NewHConfig0), + case Module:check_config(Name, SetOrUpdate, + OldHandlerConfig,NewHandlerConfig0) of + {ok, NewHandlerConfig} -> + ModifiedCommon = maps:with(?CONFIG_KEYS,NewHandlerConfig), + NewCommonConfig0 = maps:with(?CONFIG_KEYS,NewHConfig0), + CommonDefault = + case SetOrUpdate of + set -> + get_default_config(); + update -> + maps:with(?CONFIG_KEYS,OldHConfig) + end, + NewCommonConfig = maps:merge( + maps:merge(CommonDefault,NewCommonConfig0), + ModifiedCommon), + case check_config(NewCommonConfig) of + ok -> + ReadOnly = maps:with(?READ_ONLY_KEYS,OldHConfig), + NewHConfig = maps:merge( + maps:merge(NewCommonConfig,NewHandlerConfig), + ReadOnly), + NewConfig = NewConfig0#{config=>NewHConfig}, + HPid = maps:get(handler_pid,OldHConfig), + case call(HPid, {change_config,NewConfig}) of + ok -> {ok,NewConfig}; + Error -> Error + end; + {error,Faulty} -> + {error,{invalid_config,Module,Faulty}} + end; + Error -> + Error + end. + +%%%----------------------------------------------------------------- +%%% Log a string or report +-spec log(LogEvent, Config) -> ok when + LogEvent :: logger:log_event(), + Config :: logger:handler_config(). + +log(LogEvent, Config = #{id := Name, + config := #{handler_pid := HPid, + mode_tab := ModeTab}}) -> + %% if the handler has crashed, we must drop this event + %% and hope the handler restarts so we can try again + true = is_process_alive(HPid), + Bin = log_to_binary(LogEvent, Config), + call_cast_or_drop(Name, HPid, ModeTab, Bin). + +%%%----------------------------------------------------------------- +%%% Remove internal fields from configuration +filter_config(#{config:=HConfig}=Config) -> + Config#{config=>maps:without(?READ_ONLY_KEYS,HConfig)}. + +%%%----------------------------------------------------------------- +%%% Start the handler process +%%% +%%% The process must always exist if the handler is registered with +%%% logger (and must not exist if the handler is not registered). +%%% +%%% The handler process is linked to logger_sup, which is part of the +%%% kernel application's supervision tree. +start(#{id := Name} = Config0) -> + ChildSpec = + #{id => Name, + start => {?MODULE, start_link, [Config0]}, + restart => temporary, + shutdown => 2000, + type => worker, + modules => [?MODULE]}, + case supervisor:start_child(logger_sup, ChildSpec) of + {ok,Pid,Config} -> + ok = logger_handler_watcher:register_handler(Name,Pid), + {ok,Config}; + Error -> + Error + end. + +%%%=================================================================== +%%% gen_server callbacks +%%%=================================================================== + +init(#{id := Name, module := Module, + formatter := Formatter, config := HConfig0} = Config0) -> + RegName = ?name_to_reg_name(Module,Name), + register(RegName, self()), + process_flag(trap_exit, true), + process_flag(message_queue_data, off_heap), + + ?init_test_hooks(), + ?start_observation(Name), + + case Module:init(Name, HConfig0) of + {ok,HState} -> + try ets:new(Name, [public]) of + ModeTab -> + ?set_mode(ModeTab, async), + T0 = ?timestamp(), + HConfig = HConfig0#{handler_pid => self(), + mode_tab => ModeTab}, + Config = Config0#{config => HConfig}, + proc_lib:init_ack({ok,self(),Config}), + %% Storing common config in state to avoid copying + %% (sending) the config data for each log message + CommonConfig = maps:with(?CONFIG_KEYS,HConfig), + State = + ?merge_with_stats( + CommonConfig#{id => Name, + module => Module, + mode_tab => ModeTab, + mode => async, + ctrl_sync_count => + ?CONTROLLER_SYNC_INTERVAL, + last_qlen => 0, + last_log_ts => T0, + last_op => sync, + burst_win_ts => T0, + burst_msg_count => 0, + formatter => Formatter, + handler_state => HState}), + State1 = set_repeated_filesync(State), + unset_restart_flag(State1), + gen_server:enter_loop(?MODULE, [], State1) + catch + _:Error -> + unregister(RegName), + error_notify({init_handler,Name,Error}), + proc_lib:init_ack(Error) + end; + Error -> + unregister(RegName), + error_notify({init_handler,Name,Error}), + proc_lib:init_ack(Error) + end. + +%% This is the synchronous log event. +handle_call({log, Bin}, _From, State) -> + {Result,State1} = do_log(Bin, call, State), + %% Result == ok | dropped + {reply,Result, State1}; + +handle_call(filesync, _From, State = #{id := Name, + module := Module, + handler_state := HandlerState}) -> + {Result,HandlerState1} = Module:filesync(Name,sync,HandlerState), + {reply, Result, State#{handler_state=>HandlerState1, last_op=>sync}}; + +handle_call({change_config, #{formatter:=Formatter, config:=NewHConfig}}, _From, + State = #{filesync_repeat_interval := FSyncInt0}) -> + %% In the future, if handler_state must be updated due to config + %% change, then we need to add a callback to Module here. + CommonConfig = maps:with(?CONFIG_KEYS,NewHConfig), + State1 = maps:merge(State, CommonConfig), + State2 = + case maps:get(filesync_repeat_interval, NewHConfig) of + FSyncInt0 -> + State1; + _FSyncInt1 -> + set_repeated_filesync(cancel_repeated_filesync(State1)) + end, + {reply, ok, State2#{formatter:=Formatter}}; + +handle_call(info, _From, State) -> + {reply, State, State}; + +handle_call(reset, _From, + #{id:=Name,module:=Module,handler_state:=HandlerState}=State) -> + State1 = ?merge_with_stats(State), + {reply, ok, State1#{last_qlen => 0, + last_log_ts => ?timestamp(), + handler_state => Module:reset_state(Name,HandlerState)}}; + +handle_call(stop, _From, State) -> + {stop, {shutdown,stopped}, ok, State}. + +%% This is the asynchronous log event. +handle_cast({log, Bin}, State) -> + {_,State1} = do_log(Bin, cast, State), + {noreply, State1}; + +%% If FILESYNC_REPEAT_INTERVAL is set to a millisec value, this +%% clause gets called repeatedly by the handler. In order to +%% guarantee that a filesync *always* happens after the last log +%% event, the repeat operation must be active! +handle_cast(repeated_filesync,State = #{filesync_repeat_interval := no_repeat}) -> + %% This clause handles a race condition which may occur when + %% config changes filesync_repeat_interval from an integer value + %% to no_repeat. + {noreply,State}; +handle_cast(repeated_filesync, + State = #{id := Name, + module := Module, + handler_state := HandlerState, + last_op := LastOp}) -> + State1 = + if LastOp == sync -> + State; + true -> + {_,HS} = Module:filesync(Name, async, HandlerState), + State#{handler_state => HS, last_op => sync} + end, + {noreply,set_repeated_filesync(State1)}. + +handle_info(Info, #{id := Name, module := Module, + handler_state := HandlerState} = State) -> + {noreply,State#{handler_state => Module:handle_info(Name,Info,HandlerState)}}. + +terminate(Reason, State = #{id := Name, + module := Module, + handler_state := HandlerState}) -> + _ = cancel_repeated_filesync(State), + _ = Module:terminate(Name, Reason, HandlerState), + ok = stop_or_restart(Name, Reason, State), + unregister(?name_to_reg_name(Module, Name)), + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + + +%%%----------------------------------------------------------------- +%%% Internal functions +call(Module, Name, Op) when is_atom(Name) -> + call(?name_to_reg_name(Module,Name), Op); +call(_, Name, Op) -> + {error,{badarg,{Op,[Name]}}}. + +call(Server, Msg) -> + try + gen_server:call(Server, Msg, ?DEFAULT_CALL_TIMEOUT) + catch + _:{timeout,_} -> {error,handler_busy} + end. + +%% check for overload between every event (and set Mode to async, +%% sync or drop accordingly), but never flush the whole mailbox +%% before LogWindowSize events have been handled +do_log(Bin, CallOrCast, State = #{id:=Name, mode:=Mode0}) -> + T1 = ?timestamp(), + + %% check if the handler is getting overloaded, or if it's + %% recovering from overload (the check must be done for each + %% event to react quickly to large bursts of events and + %% to ensure that the handler can never end up in drop mode + %% with an empty mailbox, which would stop operation) + {Mode1,QLen,Mem,State1} = check_load(State), + + if (Mode1 == drop) andalso (Mode0 =/= drop) -> + log_handler_info(Name, "Handler ~p switched to drop mode", + [Name], State); + (Mode0 == drop) andalso ((Mode1 == async) orelse (Mode1 == sync)) -> + log_handler_info(Name, "Handler ~p switched to ~w mode", + [Name,Mode1], State); + true -> + ok + end, + + %% kill the handler if it can't keep up with the load + kill_if_choked(Name, QLen, Mem, State), + + if Mode1 == flush -> + flush(Name, QLen, T1, State1); + true -> + write(Name, Mode1, T1, Bin, CallOrCast, State1) + end. + +%% this clause is called by do_log/3 after an overload check +%% has been performed, where QLen > FlushQLen +flush(Name, _QLen0, T1, State=#{last_log_ts := _T0, mode_tab := ModeTab}) -> + %% flush messages in the mailbox (a limited number in + %% order to not cause long delays) + NewFlushed = flush_log_events(?FLUSH_MAX_N), + + %% write info in log about flushed messages + log_handler_info(Name, "Handler ~p flushed ~w log events", + [Name,NewFlushed], State), + + %% because of the receive loop when flushing messages, the + %% handler will be scheduled out often and the mailbox could + %% grow very large, so we'd better check the queue again here + {_,_QLen1} = process_info(self(), message_queue_len), + ?observe(Name,{max_qlen,_QLen1}), + + %% Add 1 for the current log event + ?observe(Name,{flushed,NewFlushed+1}), + + State1 = ?update_max_time(?diff_time(T1,_T0),State), + State2 = ?update_max_qlen(_QLen1,State1), + {dropped,?update_other(flushed,FLUSHED,NewFlushed, + State2#{mode => ?set_mode(ModeTab,async), + last_qlen => 0, + last_log_ts => T1})}. + +%% this clause is called to write to file +write(Name, Mode, T1, Bin, _CallOrCast, + State = #{module := Module, + handler_state := HandlerState, + mode_tab := ModeTab, + ctrl_sync_count := CtrlSync, + last_qlen := LastQLen, + last_log_ts := T0}) -> + %% check if we need to limit the number of writes + %% during a burst of log events + {DoWrite,State1} = limit_burst(State), + + %% only log synhrounously every ?CONTROLLER_SYNC_INTERVAL time, to + %% give the handler time between writes so it can keep up with + %% incoming messages + {Result,LastQLen1,HandlerState1} = + if DoWrite, CtrlSync == 0 -> + ?observe(Name,{_CallOrCast,1}), + {_,HS1} = Module:write(Name, sync, Bin, HandlerState), + {ok,element(2, process_info(self(), message_queue_len)),HS1}; + DoWrite -> + ?observe(Name,{_CallOrCast,1}), + {_,HS1} = Module:write(Name, async, Bin, HandlerState), + {ok,LastQLen,HS1}; + not DoWrite -> + ?observe(Name,{flushed,1}), + {dropped,LastQLen,HandlerState} + end, + + %% Check if the time since the previous log event is long enough - + %% and the queue length small enough - to assume the mailbox has + %% been emptied, and if so, do filesync operation and reset mode to + %% async. Note that this is the best we can do to detect an idle + %% handler without setting a timer after each log call/cast. If the + %% time between two consecutive log events is fast and no new + %% event comes in after the last one, idle state won't be detected! + Time = ?diff_time(T1,T0), + State2 = + if (LastQLen1 < ?FILESYNC_OK_QLEN) andalso + (Time > ?IDLE_DETECT_TIME_USEC) -> + {_,HS2} = Module:filesync(Name,async,HandlerState), + State1#{mode => ?change_mode(ModeTab, Mode, async), + burst_msg_count => 0, + handler_state => HS2}; + true -> + State1#{mode => Mode, handler_state => HandlerState1} + end, + State3 = ?update_calls_or_casts(_CallOrCast,1,State2), + State4 = ?update_max_qlen(LastQLen1,State3), + State5 = + ?update_max_time(Time, + State4#{last_qlen := LastQLen1, + last_log_ts => T1, + last_op => write, + ctrl_sync_count => + if CtrlSync==0 -> ?CONTROLLER_SYNC_INTERVAL; + true -> CtrlSync-1 + end}), + {Result,State5}. + +log_handler_info(Name, Format, Args, #{module:=Module, + formatter:=Formatter, + handler_state:=HandlerState}) -> + Config = #{formatter=>Formatter}, + Meta = #{time=>erlang:system_time(microsecond)}, + Bin = log_to_binary(#{level => notice, + msg => {Format,Args}, + meta => Meta}, Config), + _ = Module:write(Name, async, Bin, HandlerState), + ok. + +%%%----------------------------------------------------------------- +%%% Convert log data on any form to binary +-spec log_to_binary(LogEvent,Config) -> LogString when + LogEvent :: logger:log_event(), + Config :: logger:handler_config(), + LogString :: binary(). +log_to_binary(#{msg:={report,_},meta:=#{report_cb:=_}}=Log,Config) -> + do_log_to_binary(Log,Config); +log_to_binary(#{msg:={report,_},meta:=Meta}=Log,Config) -> + DefaultReportCb = fun logger:format_otp_report/1, + do_log_to_binary(Log#{meta=>Meta#{report_cb=>DefaultReportCb}},Config); +log_to_binary(Log,Config) -> + do_log_to_binary(Log,Config). + +do_log_to_binary(Log,Config) -> + {Formatter,FormatterConfig} = + maps:get(formatter,Config,{?DEFAULT_FORMATTER,?DEFAULT_FORMAT_CONFIG}), + String = try_format(Log,Formatter,FormatterConfig), + try string_to_binary(String) + catch C2:R2:S2 -> + ?LOG_INTERNAL(debug,[{formatter_error,Formatter}, + {config,FormatterConfig}, + {log_event,Log}, + {bad_return_value,String}, + {catched,{C2,R2,S2}}]), + <<"FORMATTER ERROR: bad return value">> + end. + +try_format(Log,Formatter,FormatterConfig) -> + try Formatter:format(Log,FormatterConfig) + catch + C:R:S -> + ?LOG_INTERNAL(debug,[{formatter_crashed,Formatter}, + {config,FormatterConfig}, + {log_event,Log}, + {reason, + {C,R,logger:filter_stacktrace(?MODULE,S)}}]), + case {?DEFAULT_FORMATTER,#{}} of + {Formatter,FormatterConfig} -> + "DEFAULT FORMATTER CRASHED"; + {DefaultFormatter,DefaultConfig} -> + try_format(Log#{msg=>{"FORMATTER CRASH: ~tp", + [maps:get(msg,Log)]}}, + DefaultFormatter,DefaultConfig) + end + end. + +string_to_binary(String) -> + case unicode:characters_to_binary(String) of + Binary when is_binary(Binary) -> + Binary; + Error -> + throw(Error) + end. + +%%%----------------------------------------------------------------- +%%% Check that the configuration term is valid +check_config(Config) when is_map(Config) -> + case check_common_config(maps:to_list(Config)) of + ok -> + case overload_levels_ok(Config) of + true -> + ok; + false -> + Faulty = maps:with([sync_mode_qlen, + drop_mode_qlen, + flush_qlen],Config), + {error,{invalid_levels,Faulty}} + end; + Error -> + Error + end. + +check_common_config([{sync_mode_qlen,N}|Config]) when is_integer(N) -> + check_common_config(Config); +check_common_config([{drop_mode_qlen,N}|Config]) when is_integer(N) -> + check_common_config(Config); +check_common_config([{flush_qlen,N}|Config]) when is_integer(N) -> + check_common_config(Config); +check_common_config([{burst_limit_enable,Bool}|Config]) when is_boolean(Bool) -> + check_common_config(Config); +check_common_config([{burst_limit_max_count,N}|Config]) when is_integer(N) -> + check_common_config(Config); +check_common_config([{burst_limit_window_time,N}|Config]) when is_integer(N) -> + check_common_config(Config); +check_common_config([{overload_kill_enable,Bool}|Config]) when is_boolean(Bool) -> + check_common_config(Config); +check_common_config([{overload_kill_qlen,N}|Config]) when is_integer(N) -> + check_common_config(Config); +check_common_config([{overload_kill_mem_size,N}|Config]) when is_integer(N) -> + check_common_config(Config); +check_common_config([{overload_kill_restart_after,NorA}|Config]) + when is_integer(NorA); NorA == infinity -> + check_common_config(Config); +check_common_config([{filesync_repeat_interval,NorA}|Config]) + when is_integer(NorA); NorA == no_repeat -> + check_common_config(Config); +check_common_config([{Key,Value}|_]) -> + {error,#{Key=>Value}}; +check_common_config([]) -> + ok. + +get_default_config() -> + #{sync_mode_qlen => ?SYNC_MODE_QLEN, + drop_mode_qlen => ?DROP_MODE_QLEN, + flush_qlen => ?FLUSH_QLEN, + burst_limit_enable => ?BURST_LIMIT_ENABLE, + burst_limit_max_count => ?BURST_LIMIT_MAX_COUNT, + burst_limit_window_time => ?BURST_LIMIT_WINDOW_TIME, + overload_kill_enable => ?OVERLOAD_KILL_ENABLE, + overload_kill_qlen => ?OVERLOAD_KILL_QLEN, + overload_kill_mem_size => ?OVERLOAD_KILL_MEM_SIZE, + overload_kill_restart_after => ?OVERLOAD_KILL_RESTART_AFTER, + filesync_repeat_interval => ?FILESYNC_REPEAT_INTERVAL}. + +%%%----------------------------------------------------------------- +%%% Overload Protection +call_cast_or_drop(_Name, HandlerPid, ModeTab, Bin) -> + %% If the handler process is getting overloaded, the log event + %% will be synchronous instead of asynchronous (slows down the + %% logging tempo of a process doing lots of logging. If the + %% handler is choked, drop mode is set and no event will be sent. + try ?get_mode(ModeTab) of + async -> + gen_server:cast(HandlerPid, {log,Bin}); + sync -> + case call(HandlerPid, {log,Bin}) of + ok -> + ok; + _Other -> + %% dropped or {error,handler_busy} + ?observe(_Name,{dropped,1}), + ok + end; + drop -> + ?observe(_Name,{dropped,1}) + catch + %% if the ETS table doesn't exist (maybe because of a + %% handler restart), we can only drop the event + _:_ -> ?observe(_Name,{dropped,1}) + end, + ok. + +set_restart_flag(#{id := Name, module := Module} = State) -> + log_handler_info(Name, "Handler ~p overloaded and stopping", [Name], State), + Flag = list_to_atom(lists:concat([Module,"_",Name,"_restarting"])), + spawn(fun() -> + register(Flag, self()), + timer:sleep(infinity) + end), + ok. + +unset_restart_flag(#{id := Name, module := Module} = State) -> + Flag = list_to_atom(lists:concat([Module,"_",Name,"_restarting"])), + case whereis(Flag) of + undefined -> + ok; + Pid -> + exit(Pid, kill), + log_handler_info(Name, "Handler ~p restarted", [Name], State) + end. + +check_load(State = #{id:=_Name, mode_tab := ModeTab, mode := Mode, + sync_mode_qlen := SyncModeQLen, + drop_mode_qlen := DropModeQLen, + flush_qlen := FlushQLen}) -> + {_,Mem} = process_info(self(), memory), + ?observe(_Name,{max_mem,Mem}), + {_,QLen} = process_info(self(), message_queue_len), + ?observe(_Name,{max_qlen,QLen}), + %% When the handler process gets scheduled in, it's impossible + %% to predict the QLen. We could jump "up" arbitrarily from say + %% async to sync, async to drop, sync to flush, etc. However, when + %% the handler process manages the log events (without flushing), + %% one after the other, we will move "down" from drop to sync and + %% from sync to async. This way we don't risk getting stuck in + %% drop or sync mode with an empty mailbox. + {Mode1,_NewDrops,_NewFlushes} = + if + QLen >= FlushQLen -> + {flush, 0,1}; + QLen >= DropModeQLen -> + %% Note that drop mode will force log events to + %% be dropped on the client side (never sent get to + %% the handler). + IncDrops = if Mode == drop -> 0; true -> 1 end, + {?change_mode(ModeTab, Mode, drop), IncDrops,0}; + QLen >= SyncModeQLen -> + {?change_mode(ModeTab, Mode, sync), 0,0}; + true -> + {?change_mode(ModeTab, Mode, async), 0,0} + end, + State1 = ?update_other(drops,DROPS,_NewDrops,State), + {Mode1, QLen, Mem, + ?update_other(flushes,FLUSHES,_NewFlushes, + State1#{last_qlen => QLen})}. + +limit_burst(#{burst_limit_enable := false}=State) -> + {true,State}; +limit_burst(#{burst_win_ts := BurstWinT0, + burst_msg_count := BurstMsgCount, + burst_limit_window_time := BurstLimitWinTime, + burst_limit_max_count := BurstLimitMaxCnt} = State) -> + if (BurstMsgCount >= BurstLimitMaxCnt) -> + %% the limit for allowed messages has been reached + BurstWinT1 = ?timestamp(), + case ?diff_time(BurstWinT1,BurstWinT0) of + BurstCheckTime when BurstCheckTime < (BurstLimitWinTime*1000) -> + %% we're still within the burst time frame + {false,?update_other(burst_drops,BURSTS,1,State)}; + _BurstCheckTime -> + %% burst time frame passed, reset counters + {true,State#{burst_win_ts => BurstWinT1, + burst_msg_count => 0}} + end; + true -> + %% the limit for allowed messages not yet reached + {true,State#{burst_win_ts => BurstWinT0, + burst_msg_count => BurstMsgCount+1}} + end. + +kill_if_choked(Name, QLen, Mem, State = #{overload_kill_enable := KillIfOL, + overload_kill_qlen := OLKillQLen, + overload_kill_mem_size := OLKillMem}) -> + if KillIfOL andalso + ((QLen > OLKillQLen) orelse (Mem > OLKillMem)) -> + set_restart_flag(State), + exit({shutdown,{overloaded,Name,QLen,Mem}}); + true -> + ok + end. + +flush_log_events(Limit) -> + process_flag(priority, high), + Flushed = flush_log_events(0, Limit), + process_flag(priority, normal), + Flushed. + +flush_log_events(Limit, Limit) -> + Limit; +flush_log_events(N, Limit) -> + %% flush log events but leave other events, such as + %% filesync, info and change_config, so that these + %% have a chance to be processed even under heavy load + receive + {'$gen_cast',{log,_}} -> + flush_log_events(N+1, Limit); + {'$gen_call',{Pid,MRef},{log,_}} -> + Pid ! {MRef, dropped}, + flush_log_events(N+1, Limit) + after + 0 -> N + end. + +set_repeated_filesync(#{filesync_repeat_interval:=FSyncInt} = State) + when is_integer(FSyncInt) -> + {ok,TRef} = timer:apply_after(FSyncInt, gen_server, cast, + [self(),repeated_filesync]), + State#{rep_sync_tref=>TRef}; +set_repeated_filesync(State) -> + State. + +cancel_repeated_filesync(State) -> + case maps:take(rep_sync_tref,State) of + {TRef,State1} -> + _ = timer:cancel(TRef), + State1; + error -> + State + end. + +stop_or_restart(Name, {shutdown,Reason={overloaded,_Name,_QLen,_Mem}}, + #{overload_kill_restart_after := RestartAfter}) -> + %% If we're terminating because of an overload situation (see + %% kill_if_choked/4), we need to remove the handler and set a + %% restart timer. A separate process must perform this in order to + %% avoid deadlock. + HandlerPid = self(), + ConfigResult = logger:get_handler_config(Name), + RemoveAndRestart = + fun() -> + MRef = erlang:monitor(process, HandlerPid), + receive + {'DOWN',MRef,_,_,_} -> + ok + after 30000 -> + error_notify(Reason), + exit(HandlerPid, kill) + end, + case ConfigResult of + {ok,#{module:=HMod}=HConfig0} when is_integer(RestartAfter) -> + _ = logger:remove_handler(Name), + HConfig = try HMod:filter_config(HConfig0) + catch _:_ -> HConfig0 + end, + _ = timer:apply_after(RestartAfter, logger, add_handler, + [Name,HMod,HConfig]); + {ok,_} -> + _ = logger:remove_handler(Name); + {error,CfgReason} when is_integer(RestartAfter) -> + error_notify({Name,restart_impossible,CfgReason}); + {error,_} -> + ok + end + end, + spawn(RemoveAndRestart), + ok; +stop_or_restart(_Name, _Reason, _State) -> + ok. + +overload_levels_ok(HandlerConfig) -> + SMQL = maps:get(sync_mode_qlen, HandlerConfig, ?SYNC_MODE_QLEN), + DMQL = maps:get(drop_mode_qlen, HandlerConfig, ?DROP_MODE_QLEN), + FQL = maps:get(flush_qlen, HandlerConfig, ?FLUSH_QLEN), + (DMQL > 1) andalso (SMQL =< DMQL) andalso (DMQL =< FQL). + +error_notify(Term) -> + ?internal_log(error, Term). |