aboutsummaryrefslogtreecommitdiffstats
path: root/lib/kernel/src/logger_h_common.erl
diff options
context:
space:
mode:
Diffstat (limited to 'lib/kernel/src/logger_h_common.erl')
-rw-r--r--lib/kernel/src/logger_h_common.erl802
1 files changed, 802 insertions, 0 deletions
diff --git a/lib/kernel/src/logger_h_common.erl b/lib/kernel/src/logger_h_common.erl
new file mode 100644
index 0000000000..74a2d158fc
--- /dev/null
+++ b/lib/kernel/src/logger_h_common.erl
@@ -0,0 +1,802 @@
+%%
+%% %CopyrightBegin%
+%%
+%% Copyright Ericsson AB 2017-2018. All Rights Reserved.
+%%
+%% Licensed under the Apache License, Version 2.0 (the "License");
+%% you may not use this file except in compliance with the License.
+%% You may obtain a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing, software
+%% distributed under the License is distributed on an "AS IS" BASIS,
+%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+%% See the License for the specific language governing permissions and
+%% limitations under the License.
+%%
+%% %CopyrightEnd%
+%%
+-module(logger_h_common).
+-behaviour(gen_server).
+
+-include("logger_h_common.hrl").
+-include("logger_internal.hrl").
+
+%% API
+-export([start_link/1, info/2, filesync/2, reset/2]).
+
+%% gen_server and proc_lib callbacks
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
+
+%% logger callbacks
+-export([log/2, adding_handler/1, removing_handler/1, changing_config/3,
+ filter_config/1]).
+
+%% Library functions for handlers
+-export([error_notify/1]).
+
+%%%-----------------------------------------------------------------
+-define(CONFIG_KEYS,[sync_mode_qlen,
+ drop_mode_qlen,
+ flush_qlen,
+ burst_limit_enable,
+ burst_limit_max_count,
+ burst_limit_window_time,
+ overload_kill_enable,
+ overload_kill_qlen,
+ overload_kill_mem_size,
+ overload_kill_restart_after,
+ filesync_repeat_interval]).
+-define(READ_ONLY_KEYS,[handler_pid,mode_tab]).
+
+%%%-----------------------------------------------------------------
+%%% API
+
+%% This function is called by the logger_sup supervisor
+start_link(Args) ->
+ proc_lib:start_link(?MODULE,init,[Args]).
+
+filesync(Module, Name) ->
+ call(Module, Name, filesync).
+
+info(Module, Name) ->
+ call(Module, Name, info).
+
+reset(Module, Name) ->
+ call(Module, Name, reset).
+
+%%%-----------------------------------------------------------------
+%%% Handler being added
+adding_handler(#{id:=Name,module:=Module}=Config) ->
+ HConfig0 = maps:get(config, Config, #{}),
+ HandlerConfig0 = maps:without(?CONFIG_KEYS,HConfig0),
+ case Module:check_config(Name,set,undefined,HandlerConfig0) of
+ {ok,HandlerConfig} ->
+ ModifiedCommon = maps:with(?CONFIG_KEYS,HandlerConfig),
+ CommonConfig0 = maps:with(?CONFIG_KEYS,HConfig0),
+ CommonConfig = maps:merge(
+ maps:merge(get_default_config(), CommonConfig0),
+ ModifiedCommon),
+ case check_config(CommonConfig) of
+ ok ->
+ HConfig = maps:merge(CommonConfig,HandlerConfig),
+ start(Config#{config => HConfig});
+ {error,Faulty} ->
+ {error,{invalid_config,Module,Faulty}}
+ end;
+ Error ->
+ Error
+ end.
+
+%%%-----------------------------------------------------------------
+%%% Handler being removed
+removing_handler(#{id:=Name, module:=Module}) ->
+ case whereis(?name_to_reg_name(Module,Name)) of
+ undefined ->
+ ok;
+ Pid ->
+ %% We don't want to do supervisor:terminate_child here
+ %% since we need to distinguish this explicit stop from a
+ %% system termination in order to avoid circular attempts
+ %% at removing the handler (implying deadlocks and
+ %% timeouts).
+ %% And we don't need to do supervisor:delete_child, since
+ %% the restart type is temporary, which means that the
+ %% child specification is automatically removed from the
+ %% supervisor when the process dies.
+ _ = gen_server:call(Pid, stop),
+ ok
+ end.
+
+%%%-----------------------------------------------------------------
+%%% Updating handler config
+changing_config(SetOrUpdate,
+ #{id:=Name,config:=OldHConfig,module:=Module},
+ NewConfig0) ->
+ NewHConfig0 = maps:get(config, NewConfig0, #{}),
+ OldHandlerConfig = maps:without(?CONFIG_KEYS++?READ_ONLY_KEYS,OldHConfig),
+ NewHandlerConfig0 = maps:without(?CONFIG_KEYS++?READ_ONLY_KEYS,NewHConfig0),
+ case Module:check_config(Name, SetOrUpdate,
+ OldHandlerConfig,NewHandlerConfig0) of
+ {ok, NewHandlerConfig} ->
+ ModifiedCommon = maps:with(?CONFIG_KEYS,NewHandlerConfig),
+ NewCommonConfig0 = maps:with(?CONFIG_KEYS,NewHConfig0),
+ CommonDefault =
+ case SetOrUpdate of
+ set ->
+ get_default_config();
+ update ->
+ maps:with(?CONFIG_KEYS,OldHConfig)
+ end,
+ NewCommonConfig = maps:merge(
+ maps:merge(CommonDefault,NewCommonConfig0),
+ ModifiedCommon),
+ case check_config(NewCommonConfig) of
+ ok ->
+ ReadOnly = maps:with(?READ_ONLY_KEYS,OldHConfig),
+ NewHConfig = maps:merge(
+ maps:merge(NewCommonConfig,NewHandlerConfig),
+ ReadOnly),
+ NewConfig = NewConfig0#{config=>NewHConfig},
+ HPid = maps:get(handler_pid,OldHConfig),
+ case call(HPid, {change_config,NewConfig}) of
+ ok -> {ok,NewConfig};
+ Error -> Error
+ end;
+ {error,Faulty} ->
+ {error,{invalid_config,Module,Faulty}}
+ end;
+ Error ->
+ Error
+ end.
+
+%%%-----------------------------------------------------------------
+%%% Log a string or report
+-spec log(LogEvent, Config) -> ok when
+ LogEvent :: logger:log_event(),
+ Config :: logger:handler_config().
+
+log(LogEvent, Config = #{id := Name,
+ config := #{handler_pid := HPid,
+ mode_tab := ModeTab}}) ->
+ %% if the handler has crashed, we must drop this event
+ %% and hope the handler restarts so we can try again
+ true = is_process_alive(HPid),
+ Bin = log_to_binary(LogEvent, Config),
+ call_cast_or_drop(Name, HPid, ModeTab, Bin).
+
+%%%-----------------------------------------------------------------
+%%% Remove internal fields from configuration
+filter_config(#{config:=HConfig}=Config) ->
+ Config#{config=>maps:without(?READ_ONLY_KEYS,HConfig)}.
+
+%%%-----------------------------------------------------------------
+%%% Start the handler process
+%%%
+%%% The process must always exist if the handler is registered with
+%%% logger (and must not exist if the handler is not registered).
+%%%
+%%% The handler process is linked to logger_sup, which is part of the
+%%% kernel application's supervision tree.
+start(#{id := Name} = Config0) ->
+ ChildSpec =
+ #{id => Name,
+ start => {?MODULE, start_link, [Config0]},
+ restart => temporary,
+ shutdown => 2000,
+ type => worker,
+ modules => [?MODULE]},
+ case supervisor:start_child(logger_sup, ChildSpec) of
+ {ok,Pid,Config} ->
+ ok = logger_handler_watcher:register_handler(Name,Pid),
+ {ok,Config};
+ Error ->
+ Error
+ end.
+
+%%%===================================================================
+%%% gen_server callbacks
+%%%===================================================================
+
+init(#{id := Name, module := Module,
+ formatter := Formatter, config := HConfig0} = Config0) ->
+ RegName = ?name_to_reg_name(Module,Name),
+ register(RegName, self()),
+ process_flag(trap_exit, true),
+ process_flag(message_queue_data, off_heap),
+
+ ?init_test_hooks(),
+ ?start_observation(Name),
+
+ case Module:init(Name, HConfig0) of
+ {ok,HState} ->
+ try ets:new(Name, [public]) of
+ ModeTab ->
+ ?set_mode(ModeTab, async),
+ T0 = ?timestamp(),
+ HConfig = HConfig0#{handler_pid => self(),
+ mode_tab => ModeTab},
+ Config = Config0#{config => HConfig},
+ proc_lib:init_ack({ok,self(),Config}),
+ %% Storing common config in state to avoid copying
+ %% (sending) the config data for each log message
+ CommonConfig = maps:with(?CONFIG_KEYS,HConfig),
+ State =
+ ?merge_with_stats(
+ CommonConfig#{id => Name,
+ module => Module,
+ mode_tab => ModeTab,
+ mode => async,
+ ctrl_sync_count =>
+ ?CONTROLLER_SYNC_INTERVAL,
+ last_qlen => 0,
+ last_log_ts => T0,
+ last_op => sync,
+ burst_win_ts => T0,
+ burst_msg_count => 0,
+ formatter => Formatter,
+ handler_state => HState}),
+ State1 = set_repeated_filesync(State),
+ unset_restart_flag(State1),
+ gen_server:enter_loop(?MODULE, [], State1)
+ catch
+ _:Error ->
+ unregister(RegName),
+ error_notify({init_handler,Name,Error}),
+ proc_lib:init_ack(Error)
+ end;
+ Error ->
+ unregister(RegName),
+ error_notify({init_handler,Name,Error}),
+ proc_lib:init_ack(Error)
+ end.
+
+%% This is the synchronous log event.
+handle_call({log, Bin}, _From, State) ->
+ {Result,State1} = do_log(Bin, call, State),
+ %% Result == ok | dropped
+ {reply,Result, State1};
+
+handle_call(filesync, _From, State = #{id := Name,
+ module := Module,
+ handler_state := HandlerState}) ->
+ {Result,HandlerState1} = Module:filesync(Name,sync,HandlerState),
+ {reply, Result, State#{handler_state=>HandlerState1, last_op=>sync}};
+
+handle_call({change_config, #{formatter:=Formatter, config:=NewHConfig}}, _From,
+ State = #{filesync_repeat_interval := FSyncInt0}) ->
+ %% In the future, if handler_state must be updated due to config
+ %% change, then we need to add a callback to Module here.
+ CommonConfig = maps:with(?CONFIG_KEYS,NewHConfig),
+ State1 = maps:merge(State, CommonConfig),
+ State2 =
+ case maps:get(filesync_repeat_interval, NewHConfig) of
+ FSyncInt0 ->
+ State1;
+ _FSyncInt1 ->
+ set_repeated_filesync(cancel_repeated_filesync(State1))
+ end,
+ {reply, ok, State2#{formatter:=Formatter}};
+
+handle_call(info, _From, State) ->
+ {reply, State, State};
+
+handle_call(reset, _From,
+ #{id:=Name,module:=Module,handler_state:=HandlerState}=State) ->
+ State1 = ?merge_with_stats(State),
+ {reply, ok, State1#{last_qlen => 0,
+ last_log_ts => ?timestamp(),
+ handler_state => Module:reset_state(Name,HandlerState)}};
+
+handle_call(stop, _From, State) ->
+ {stop, {shutdown,stopped}, ok, State}.
+
+%% This is the asynchronous log event.
+handle_cast({log, Bin}, State) ->
+ {_,State1} = do_log(Bin, cast, State),
+ {noreply, State1};
+
+%% If FILESYNC_REPEAT_INTERVAL is set to a millisec value, this
+%% clause gets called repeatedly by the handler. In order to
+%% guarantee that a filesync *always* happens after the last log
+%% event, the repeat operation must be active!
+handle_cast(repeated_filesync,State = #{filesync_repeat_interval := no_repeat}) ->
+ %% This clause handles a race condition which may occur when
+ %% config changes filesync_repeat_interval from an integer value
+ %% to no_repeat.
+ {noreply,State};
+handle_cast(repeated_filesync,
+ State = #{id := Name,
+ module := Module,
+ handler_state := HandlerState,
+ last_op := LastOp}) ->
+ State1 =
+ if LastOp == sync ->
+ State;
+ true ->
+ {_,HS} = Module:filesync(Name, async, HandlerState),
+ State#{handler_state => HS, last_op => sync}
+ end,
+ {noreply,set_repeated_filesync(State1)}.
+
+handle_info(Info, #{id := Name, module := Module,
+ handler_state := HandlerState} = State) ->
+ {noreply,State#{handler_state => Module:handle_info(Name,Info,HandlerState)}}.
+
+terminate(Reason, State = #{id := Name,
+ module := Module,
+ handler_state := HandlerState}) ->
+ _ = cancel_repeated_filesync(State),
+ _ = Module:terminate(Name, Reason, HandlerState),
+ ok = stop_or_restart(Name, Reason, State),
+ unregister(?name_to_reg_name(Module, Name)),
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+
+%%%-----------------------------------------------------------------
+%%% Internal functions
+call(Module, Name, Op) when is_atom(Name) ->
+ call(?name_to_reg_name(Module,Name), Op);
+call(_, Name, Op) ->
+ {error,{badarg,{Op,[Name]}}}.
+
+call(Server, Msg) ->
+ try
+ gen_server:call(Server, Msg, ?DEFAULT_CALL_TIMEOUT)
+ catch
+ _:{timeout,_} -> {error,handler_busy}
+ end.
+
+%% check for overload between every event (and set Mode to async,
+%% sync or drop accordingly), but never flush the whole mailbox
+%% before LogWindowSize events have been handled
+do_log(Bin, CallOrCast, State = #{id:=Name, mode:=Mode0}) ->
+ T1 = ?timestamp(),
+
+ %% check if the handler is getting overloaded, or if it's
+ %% recovering from overload (the check must be done for each
+ %% event to react quickly to large bursts of events and
+ %% to ensure that the handler can never end up in drop mode
+ %% with an empty mailbox, which would stop operation)
+ {Mode1,QLen,Mem,State1} = check_load(State),
+
+ if (Mode1 == drop) andalso (Mode0 =/= drop) ->
+ log_handler_info(Name, "Handler ~p switched to drop mode",
+ [Name], State);
+ (Mode0 == drop) andalso ((Mode1 == async) orelse (Mode1 == sync)) ->
+ log_handler_info(Name, "Handler ~p switched to ~w mode",
+ [Name,Mode1], State);
+ true ->
+ ok
+ end,
+
+ %% kill the handler if it can't keep up with the load
+ kill_if_choked(Name, QLen, Mem, State),
+
+ if Mode1 == flush ->
+ flush(Name, QLen, T1, State1);
+ true ->
+ write(Name, Mode1, T1, Bin, CallOrCast, State1)
+ end.
+
+%% this clause is called by do_log/3 after an overload check
+%% has been performed, where QLen > FlushQLen
+flush(Name, _QLen0, T1, State=#{last_log_ts := _T0, mode_tab := ModeTab}) ->
+ %% flush messages in the mailbox (a limited number in
+ %% order to not cause long delays)
+ NewFlushed = flush_log_events(?FLUSH_MAX_N),
+
+ %% write info in log about flushed messages
+ log_handler_info(Name, "Handler ~p flushed ~w log events",
+ [Name,NewFlushed], State),
+
+ %% because of the receive loop when flushing messages, the
+ %% handler will be scheduled out often and the mailbox could
+ %% grow very large, so we'd better check the queue again here
+ {_,_QLen1} = process_info(self(), message_queue_len),
+ ?observe(Name,{max_qlen,_QLen1}),
+
+ %% Add 1 for the current log event
+ ?observe(Name,{flushed,NewFlushed+1}),
+
+ State1 = ?update_max_time(?diff_time(T1,_T0),State),
+ State2 = ?update_max_qlen(_QLen1,State1),
+ {dropped,?update_other(flushed,FLUSHED,NewFlushed,
+ State2#{mode => ?set_mode(ModeTab,async),
+ last_qlen => 0,
+ last_log_ts => T1})}.
+
+%% this clause is called to write to file
+write(Name, Mode, T1, Bin, _CallOrCast,
+ State = #{module := Module,
+ handler_state := HandlerState,
+ mode_tab := ModeTab,
+ ctrl_sync_count := CtrlSync,
+ last_qlen := LastQLen,
+ last_log_ts := T0}) ->
+ %% check if we need to limit the number of writes
+ %% during a burst of log events
+ {DoWrite,State1} = limit_burst(State),
+
+ %% only log synhrounously every ?CONTROLLER_SYNC_INTERVAL time, to
+ %% give the handler time between writes so it can keep up with
+ %% incoming messages
+ {Result,LastQLen1,HandlerState1} =
+ if DoWrite, CtrlSync == 0 ->
+ ?observe(Name,{_CallOrCast,1}),
+ {_,HS1} = Module:write(Name, sync, Bin, HandlerState),
+ {ok,element(2, process_info(self(), message_queue_len)),HS1};
+ DoWrite ->
+ ?observe(Name,{_CallOrCast,1}),
+ {_,HS1} = Module:write(Name, async, Bin, HandlerState),
+ {ok,LastQLen,HS1};
+ not DoWrite ->
+ ?observe(Name,{flushed,1}),
+ {dropped,LastQLen,HandlerState}
+ end,
+
+ %% Check if the time since the previous log event is long enough -
+ %% and the queue length small enough - to assume the mailbox has
+ %% been emptied, and if so, do filesync operation and reset mode to
+ %% async. Note that this is the best we can do to detect an idle
+ %% handler without setting a timer after each log call/cast. If the
+ %% time between two consecutive log events is fast and no new
+ %% event comes in after the last one, idle state won't be detected!
+ Time = ?diff_time(T1,T0),
+ State2 =
+ if (LastQLen1 < ?FILESYNC_OK_QLEN) andalso
+ (Time > ?IDLE_DETECT_TIME_USEC) ->
+ {_,HS2} = Module:filesync(Name,async,HandlerState),
+ State1#{mode => ?change_mode(ModeTab, Mode, async),
+ burst_msg_count => 0,
+ handler_state => HS2};
+ true ->
+ State1#{mode => Mode, handler_state => HandlerState1}
+ end,
+ State3 = ?update_calls_or_casts(_CallOrCast,1,State2),
+ State4 = ?update_max_qlen(LastQLen1,State3),
+ State5 =
+ ?update_max_time(Time,
+ State4#{last_qlen := LastQLen1,
+ last_log_ts => T1,
+ last_op => write,
+ ctrl_sync_count =>
+ if CtrlSync==0 -> ?CONTROLLER_SYNC_INTERVAL;
+ true -> CtrlSync-1
+ end}),
+ {Result,State5}.
+
+log_handler_info(Name, Format, Args, #{module:=Module,
+ formatter:=Formatter,
+ handler_state:=HandlerState}) ->
+ Config = #{formatter=>Formatter},
+ Meta = #{time=>erlang:system_time(microsecond)},
+ Bin = log_to_binary(#{level => notice,
+ msg => {Format,Args},
+ meta => Meta}, Config),
+ _ = Module:write(Name, async, Bin, HandlerState),
+ ok.
+
+%%%-----------------------------------------------------------------
+%%% Convert log data on any form to binary
+-spec log_to_binary(LogEvent,Config) -> LogString when
+ LogEvent :: logger:log_event(),
+ Config :: logger:handler_config(),
+ LogString :: binary().
+log_to_binary(#{msg:={report,_},meta:=#{report_cb:=_}}=Log,Config) ->
+ do_log_to_binary(Log,Config);
+log_to_binary(#{msg:={report,_},meta:=Meta}=Log,Config) ->
+ DefaultReportCb = fun logger:format_otp_report/1,
+ do_log_to_binary(Log#{meta=>Meta#{report_cb=>DefaultReportCb}},Config);
+log_to_binary(Log,Config) ->
+ do_log_to_binary(Log,Config).
+
+do_log_to_binary(Log,Config) ->
+ {Formatter,FormatterConfig} =
+ maps:get(formatter,Config,{?DEFAULT_FORMATTER,?DEFAULT_FORMAT_CONFIG}),
+ String = try_format(Log,Formatter,FormatterConfig),
+ try string_to_binary(String)
+ catch C2:R2:S2 ->
+ ?LOG_INTERNAL(debug,[{formatter_error,Formatter},
+ {config,FormatterConfig},
+ {log_event,Log},
+ {bad_return_value,String},
+ {catched,{C2,R2,S2}}]),
+ <<"FORMATTER ERROR: bad return value">>
+ end.
+
+try_format(Log,Formatter,FormatterConfig) ->
+ try Formatter:format(Log,FormatterConfig)
+ catch
+ C:R:S ->
+ ?LOG_INTERNAL(debug,[{formatter_crashed,Formatter},
+ {config,FormatterConfig},
+ {log_event,Log},
+ {reason,
+ {C,R,logger:filter_stacktrace(?MODULE,S)}}]),
+ case {?DEFAULT_FORMATTER,#{}} of
+ {Formatter,FormatterConfig} ->
+ "DEFAULT FORMATTER CRASHED";
+ {DefaultFormatter,DefaultConfig} ->
+ try_format(Log#{msg=>{"FORMATTER CRASH: ~tp",
+ [maps:get(msg,Log)]}},
+ DefaultFormatter,DefaultConfig)
+ end
+ end.
+
+string_to_binary(String) ->
+ case unicode:characters_to_binary(String) of
+ Binary when is_binary(Binary) ->
+ Binary;
+ Error ->
+ throw(Error)
+ end.
+
+%%%-----------------------------------------------------------------
+%%% Check that the configuration term is valid
+check_config(Config) when is_map(Config) ->
+ case check_common_config(maps:to_list(Config)) of
+ ok ->
+ case overload_levels_ok(Config) of
+ true ->
+ ok;
+ false ->
+ Faulty = maps:with([sync_mode_qlen,
+ drop_mode_qlen,
+ flush_qlen],Config),
+ {error,{invalid_levels,Faulty}}
+ end;
+ Error ->
+ Error
+ end.
+
+check_common_config([{sync_mode_qlen,N}|Config]) when is_integer(N) ->
+ check_common_config(Config);
+check_common_config([{drop_mode_qlen,N}|Config]) when is_integer(N) ->
+ check_common_config(Config);
+check_common_config([{flush_qlen,N}|Config]) when is_integer(N) ->
+ check_common_config(Config);
+check_common_config([{burst_limit_enable,Bool}|Config]) when is_boolean(Bool) ->
+ check_common_config(Config);
+check_common_config([{burst_limit_max_count,N}|Config]) when is_integer(N) ->
+ check_common_config(Config);
+check_common_config([{burst_limit_window_time,N}|Config]) when is_integer(N) ->
+ check_common_config(Config);
+check_common_config([{overload_kill_enable,Bool}|Config]) when is_boolean(Bool) ->
+ check_common_config(Config);
+check_common_config([{overload_kill_qlen,N}|Config]) when is_integer(N) ->
+ check_common_config(Config);
+check_common_config([{overload_kill_mem_size,N}|Config]) when is_integer(N) ->
+ check_common_config(Config);
+check_common_config([{overload_kill_restart_after,NorA}|Config])
+ when is_integer(NorA); NorA == infinity ->
+ check_common_config(Config);
+check_common_config([{filesync_repeat_interval,NorA}|Config])
+ when is_integer(NorA); NorA == no_repeat ->
+ check_common_config(Config);
+check_common_config([{Key,Value}|_]) ->
+ {error,#{Key=>Value}};
+check_common_config([]) ->
+ ok.
+
+get_default_config() ->
+ #{sync_mode_qlen => ?SYNC_MODE_QLEN,
+ drop_mode_qlen => ?DROP_MODE_QLEN,
+ flush_qlen => ?FLUSH_QLEN,
+ burst_limit_enable => ?BURST_LIMIT_ENABLE,
+ burst_limit_max_count => ?BURST_LIMIT_MAX_COUNT,
+ burst_limit_window_time => ?BURST_LIMIT_WINDOW_TIME,
+ overload_kill_enable => ?OVERLOAD_KILL_ENABLE,
+ overload_kill_qlen => ?OVERLOAD_KILL_QLEN,
+ overload_kill_mem_size => ?OVERLOAD_KILL_MEM_SIZE,
+ overload_kill_restart_after => ?OVERLOAD_KILL_RESTART_AFTER,
+ filesync_repeat_interval => ?FILESYNC_REPEAT_INTERVAL}.
+
+%%%-----------------------------------------------------------------
+%%% Overload Protection
+call_cast_or_drop(_Name, HandlerPid, ModeTab, Bin) ->
+ %% If the handler process is getting overloaded, the log event
+ %% will be synchronous instead of asynchronous (slows down the
+ %% logging tempo of a process doing lots of logging. If the
+ %% handler is choked, drop mode is set and no event will be sent.
+ try ?get_mode(ModeTab) of
+ async ->
+ gen_server:cast(HandlerPid, {log,Bin});
+ sync ->
+ case call(HandlerPid, {log,Bin}) of
+ ok ->
+ ok;
+ _Other ->
+ %% dropped or {error,handler_busy}
+ ?observe(_Name,{dropped,1}),
+ ok
+ end;
+ drop ->
+ ?observe(_Name,{dropped,1})
+ catch
+ %% if the ETS table doesn't exist (maybe because of a
+ %% handler restart), we can only drop the event
+ _:_ -> ?observe(_Name,{dropped,1})
+ end,
+ ok.
+
+set_restart_flag(#{id := Name, module := Module} = State) ->
+ log_handler_info(Name, "Handler ~p overloaded and stopping", [Name], State),
+ Flag = list_to_atom(lists:concat([Module,"_",Name,"_restarting"])),
+ spawn(fun() ->
+ register(Flag, self()),
+ timer:sleep(infinity)
+ end),
+ ok.
+
+unset_restart_flag(#{id := Name, module := Module} = State) ->
+ Flag = list_to_atom(lists:concat([Module,"_",Name,"_restarting"])),
+ case whereis(Flag) of
+ undefined ->
+ ok;
+ Pid ->
+ exit(Pid, kill),
+ log_handler_info(Name, "Handler ~p restarted", [Name], State)
+ end.
+
+check_load(State = #{id:=_Name, mode_tab := ModeTab, mode := Mode,
+ sync_mode_qlen := SyncModeQLen,
+ drop_mode_qlen := DropModeQLen,
+ flush_qlen := FlushQLen}) ->
+ {_,Mem} = process_info(self(), memory),
+ ?observe(_Name,{max_mem,Mem}),
+ {_,QLen} = process_info(self(), message_queue_len),
+ ?observe(_Name,{max_qlen,QLen}),
+ %% When the handler process gets scheduled in, it's impossible
+ %% to predict the QLen. We could jump "up" arbitrarily from say
+ %% async to sync, async to drop, sync to flush, etc. However, when
+ %% the handler process manages the log events (without flushing),
+ %% one after the other, we will move "down" from drop to sync and
+ %% from sync to async. This way we don't risk getting stuck in
+ %% drop or sync mode with an empty mailbox.
+ {Mode1,_NewDrops,_NewFlushes} =
+ if
+ QLen >= FlushQLen ->
+ {flush, 0,1};
+ QLen >= DropModeQLen ->
+ %% Note that drop mode will force log events to
+ %% be dropped on the client side (never sent get to
+ %% the handler).
+ IncDrops = if Mode == drop -> 0; true -> 1 end,
+ {?change_mode(ModeTab, Mode, drop), IncDrops,0};
+ QLen >= SyncModeQLen ->
+ {?change_mode(ModeTab, Mode, sync), 0,0};
+ true ->
+ {?change_mode(ModeTab, Mode, async), 0,0}
+ end,
+ State1 = ?update_other(drops,DROPS,_NewDrops,State),
+ {Mode1, QLen, Mem,
+ ?update_other(flushes,FLUSHES,_NewFlushes,
+ State1#{last_qlen => QLen})}.
+
+limit_burst(#{burst_limit_enable := false}=State) ->
+ {true,State};
+limit_burst(#{burst_win_ts := BurstWinT0,
+ burst_msg_count := BurstMsgCount,
+ burst_limit_window_time := BurstLimitWinTime,
+ burst_limit_max_count := BurstLimitMaxCnt} = State) ->
+ if (BurstMsgCount >= BurstLimitMaxCnt) ->
+ %% the limit for allowed messages has been reached
+ BurstWinT1 = ?timestamp(),
+ case ?diff_time(BurstWinT1,BurstWinT0) of
+ BurstCheckTime when BurstCheckTime < (BurstLimitWinTime*1000) ->
+ %% we're still within the burst time frame
+ {false,?update_other(burst_drops,BURSTS,1,State)};
+ _BurstCheckTime ->
+ %% burst time frame passed, reset counters
+ {true,State#{burst_win_ts => BurstWinT1,
+ burst_msg_count => 0}}
+ end;
+ true ->
+ %% the limit for allowed messages not yet reached
+ {true,State#{burst_win_ts => BurstWinT0,
+ burst_msg_count => BurstMsgCount+1}}
+ end.
+
+kill_if_choked(Name, QLen, Mem, State = #{overload_kill_enable := KillIfOL,
+ overload_kill_qlen := OLKillQLen,
+ overload_kill_mem_size := OLKillMem}) ->
+ if KillIfOL andalso
+ ((QLen > OLKillQLen) orelse (Mem > OLKillMem)) ->
+ set_restart_flag(State),
+ exit({shutdown,{overloaded,Name,QLen,Mem}});
+ true ->
+ ok
+ end.
+
+flush_log_events(Limit) ->
+ process_flag(priority, high),
+ Flushed = flush_log_events(0, Limit),
+ process_flag(priority, normal),
+ Flushed.
+
+flush_log_events(Limit, Limit) ->
+ Limit;
+flush_log_events(N, Limit) ->
+ %% flush log events but leave other events, such as
+ %% filesync, info and change_config, so that these
+ %% have a chance to be processed even under heavy load
+ receive
+ {'$gen_cast',{log,_}} ->
+ flush_log_events(N+1, Limit);
+ {'$gen_call',{Pid,MRef},{log,_}} ->
+ Pid ! {MRef, dropped},
+ flush_log_events(N+1, Limit)
+ after
+ 0 -> N
+ end.
+
+set_repeated_filesync(#{filesync_repeat_interval:=FSyncInt} = State)
+ when is_integer(FSyncInt) ->
+ {ok,TRef} = timer:apply_after(FSyncInt, gen_server, cast,
+ [self(),repeated_filesync]),
+ State#{rep_sync_tref=>TRef};
+set_repeated_filesync(State) ->
+ State.
+
+cancel_repeated_filesync(State) ->
+ case maps:take(rep_sync_tref,State) of
+ {TRef,State1} ->
+ _ = timer:cancel(TRef),
+ State1;
+ error ->
+ State
+ end.
+
+stop_or_restart(Name, {shutdown,Reason={overloaded,_Name,_QLen,_Mem}},
+ #{overload_kill_restart_after := RestartAfter}) ->
+ %% If we're terminating because of an overload situation (see
+ %% kill_if_choked/4), we need to remove the handler and set a
+ %% restart timer. A separate process must perform this in order to
+ %% avoid deadlock.
+ HandlerPid = self(),
+ ConfigResult = logger:get_handler_config(Name),
+ RemoveAndRestart =
+ fun() ->
+ MRef = erlang:monitor(process, HandlerPid),
+ receive
+ {'DOWN',MRef,_,_,_} ->
+ ok
+ after 30000 ->
+ error_notify(Reason),
+ exit(HandlerPid, kill)
+ end,
+ case ConfigResult of
+ {ok,#{module:=HMod}=HConfig0} when is_integer(RestartAfter) ->
+ _ = logger:remove_handler(Name),
+ HConfig = try HMod:filter_config(HConfig0)
+ catch _:_ -> HConfig0
+ end,
+ _ = timer:apply_after(RestartAfter, logger, add_handler,
+ [Name,HMod,HConfig]);
+ {ok,_} ->
+ _ = logger:remove_handler(Name);
+ {error,CfgReason} when is_integer(RestartAfter) ->
+ error_notify({Name,restart_impossible,CfgReason});
+ {error,_} ->
+ ok
+ end
+ end,
+ spawn(RemoveAndRestart),
+ ok;
+stop_or_restart(_Name, _Reason, _State) ->
+ ok.
+
+overload_levels_ok(HandlerConfig) ->
+ SMQL = maps:get(sync_mode_qlen, HandlerConfig, ?SYNC_MODE_QLEN),
+ DMQL = maps:get(drop_mode_qlen, HandlerConfig, ?DROP_MODE_QLEN),
+ FQL = maps:get(flush_qlen, HandlerConfig, ?FLUSH_QLEN),
+ (DMQL > 1) andalso (SMQL =< DMQL) andalso (DMQL =< FQL).
+
+error_notify(Term) ->
+ ?internal_log(error, Term).