diff options
author | Siri Hansen <[email protected]> | 2018-10-31 15:37:04 +0100 |
---|---|---|
committer | Siri Hansen <[email protected]> | 2019-01-16 09:59:24 +0100 |
commit | 2929e79806b0e8ffdd4be5c7eaed0cea04bce850 (patch) | |
tree | 05785e6d5de014d93af9a755a0e37011c18da3df /lib/kernel/src/logger_h_common.erl | |
parent | 7000e101745cc028e4656eaaae3ea5a1a4498747 (diff) | |
download | otp-2929e79806b0e8ffdd4be5c7eaed0cea04bce850.tar.gz otp-2929e79806b0e8ffdd4be5c7eaed0cea04bce850.tar.bz2 otp-2929e79806b0e8ffdd4be5c7eaed0cea04bce850.zip |
[logger] Split overload protection functionality to own module
Diffstat (limited to 'lib/kernel/src/logger_h_common.erl')
-rw-r--r-- | lib/kernel/src/logger_h_common.erl | 663 |
1 files changed, 160 insertions, 503 deletions
diff --git a/lib/kernel/src/logger_h_common.erl b/lib/kernel/src/logger_h_common.erl index 74a2d158fc..dd8ace8249 100644 --- a/lib/kernel/src/logger_h_common.erl +++ b/lib/kernel/src/logger_h_common.erl @@ -24,11 +24,11 @@ -include("logger_internal.hrl"). %% API --export([start_link/1, info/2, filesync/2, reset/2]). +-export([info/2, filesync/2, reset/2]). -%% gen_server and proc_lib callbacks --export([init/1, handle_call/3, handle_cast/2, handle_info/2, - terminate/2, code_change/3]). +%% logger_olp callbacks +-export([init/1, handle_load/2, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3, notify/2, reset_state/1]). %% logger callbacks -export([log/2, adding_handler/1, removing_handler/1, changing_config/3, @@ -37,27 +37,25 @@ %% Library functions for handlers -export([error_notify/1]). -%%%----------------------------------------------------------------- --define(CONFIG_KEYS,[sync_mode_qlen, - drop_mode_qlen, - flush_qlen, - burst_limit_enable, - burst_limit_max_count, - burst_limit_window_time, - overload_kill_enable, - overload_kill_qlen, - overload_kill_mem_size, - overload_kill_restart_after, - filesync_repeat_interval]). --define(READ_ONLY_KEYS,[handler_pid,mode_tab]). +-define(OLP_KEYS,[sync_mode_qlen, + drop_mode_qlen, + flush_qlen, + burst_limit_enable, + burst_limit_max_count, + burst_limit_window_time, + overload_kill_enable, + overload_kill_qlen, + overload_kill_mem_size, + overload_kill_restart_after]). + +-define(COMMON_KEYS,[filesync_repeat_interval]). + +-define(READ_ONLY_KEYS,[olp]). %%%----------------------------------------------------------------- %%% API %% This function is called by the logger_sup supervisor -start_link(Args) -> - proc_lib:start_link(?MODULE,init,[Args]). - filesync(Module, Name) -> call(Module, Name, filesync). @@ -71,18 +69,19 @@ reset(Module, Name) -> %%% Handler being added adding_handler(#{id:=Name,module:=Module}=Config) -> HConfig0 = maps:get(config, Config, #{}), - HandlerConfig0 = maps:without(?CONFIG_KEYS,HConfig0), + HandlerConfig0 = maps:without(?OLP_KEYS++?COMMON_KEYS,HConfig0), case Module:check_config(Name,set,undefined,HandlerConfig0) of {ok,HandlerConfig} -> - ModifiedCommon = maps:with(?CONFIG_KEYS,HandlerConfig), - CommonConfig0 = maps:with(?CONFIG_KEYS,HConfig0), + ModifiedCommon = maps:with(?COMMON_KEYS,HandlerConfig), + CommonConfig0 = maps:with(?COMMON_KEYS,HConfig0), CommonConfig = maps:merge( maps:merge(get_default_config(), CommonConfig0), ModifiedCommon), case check_config(CommonConfig) of ok -> HConfig = maps:merge(CommonConfig,HandlerConfig), - start(Config#{config => HConfig}); + OlpOpts = maps:with(?OLP_KEYS,HConfig0), + start(OlpOpts, Config#{config => HConfig}); {error,Faulty} -> {error,{invalid_config,Module,Faulty}} end; @@ -92,11 +91,11 @@ adding_handler(#{id:=Name,module:=Module}=Config) -> %%%----------------------------------------------------------------- %%% Handler being removed -removing_handler(#{id:=Name, module:=Module}) -> +removing_handler(#{id:=Name, module:=Module, config:=#{olp:=Olp}}) -> case whereis(?name_to_reg_name(Module,Name)) of undefined -> ok; - Pid -> + _Pid -> %% We don't want to do supervisor:terminate_child here %% since we need to distinguish this explicit stop from a %% system termination in order to avoid circular attempts @@ -106,7 +105,7 @@ removing_handler(#{id:=Name, module:=Module}) -> %% the restart type is temporary, which means that the %% child specification is automatically removed from the %% supervisor when the process dies. - _ = gen_server:call(Pid, stop), + _ = logger_olp:stop(Olp), ok end. @@ -116,34 +115,52 @@ changing_config(SetOrUpdate, #{id:=Name,config:=OldHConfig,module:=Module}, NewConfig0) -> NewHConfig0 = maps:get(config, NewConfig0, #{}), - OldHandlerConfig = maps:without(?CONFIG_KEYS++?READ_ONLY_KEYS,OldHConfig), - NewHandlerConfig0 = maps:without(?CONFIG_KEYS++?READ_ONLY_KEYS,NewHConfig0), + NoHandlerKeys = ?OLP_KEYS++?COMMON_KEYS++?READ_ONLY_KEYS, + OldHandlerConfig = maps:without(NoHandlerKeys,OldHConfig), + NewHandlerConfig0 = maps:without(NoHandlerKeys,NewHConfig0), case Module:check_config(Name, SetOrUpdate, OldHandlerConfig,NewHandlerConfig0) of {ok, NewHandlerConfig} -> - ModifiedCommon = maps:with(?CONFIG_KEYS,NewHandlerConfig), - NewCommonConfig0 = maps:with(?CONFIG_KEYS,NewHConfig0), + ModifiedCommon = maps:with(?COMMON_KEYS,NewHandlerConfig), + NewCommonConfig0 = maps:with(?COMMON_KEYS,NewHConfig0), + OldCommonConfig = maps:with(?COMMON_KEYS,OldHConfig), CommonDefault = case SetOrUpdate of set -> get_default_config(); update -> - maps:with(?CONFIG_KEYS,OldHConfig) + OldCommonConfig end, NewCommonConfig = maps:merge( maps:merge(CommonDefault,NewCommonConfig0), ModifiedCommon), case check_config(NewCommonConfig) of ok -> - ReadOnly = maps:with(?READ_ONLY_KEYS,OldHConfig), - NewHConfig = maps:merge( - maps:merge(NewCommonConfig,NewHandlerConfig), - ReadOnly), - NewConfig = NewConfig0#{config=>NewHConfig}, - HPid = maps:get(handler_pid,OldHConfig), - case call(HPid, {change_config,NewConfig}) of - ok -> {ok,NewConfig}; - Error -> Error + OlpDefault = + case SetOrUpdate of + set -> + logger_olp:get_default_opts(); + update -> + maps:with(?OLP_KEYS,OldHConfig) + end, + Olp = maps:get(olp,OldHConfig), + NewOlpOpts = maps:merge(OlpDefault, + maps:with(?OLP_KEYS,NewHConfig0)), + case logger_olp:set_opts(Olp,NewOlpOpts) of + ok -> + maybe_set_repeated_filesync(Olp,OldCommonConfig, + NewCommonConfig), + ReadOnly = maps:with(?READ_ONLY_KEYS,OldHConfig), + NewHConfig = + maps:merge( + maps:merge( + maps:merge(NewCommonConfig,NewHandlerConfig), + ReadOnly), + NewOlpOpts), + NewConfig = NewConfig0#{config=>NewHConfig}, + {ok,NewConfig}; + Error -> + Error end; {error,Faulty} -> {error,{invalid_config,Module,Faulty}} @@ -158,14 +175,12 @@ changing_config(SetOrUpdate, LogEvent :: logger:log_event(), Config :: logger:handler_config(). -log(LogEvent, Config = #{id := Name, - config := #{handler_pid := HPid, - mode_tab := ModeTab}}) -> +log(LogEvent, Config = #{config := #{olp:=Olp}}) -> %% if the handler has crashed, we must drop this event %% and hope the handler restarts so we can try again - true = is_process_alive(HPid), + true = logger_olp:is_alive(Olp), Bin = log_to_binary(LogEvent, Config), - call_cast_or_drop(Name, HPid, ModeTab, Bin). + logger_olp:load(Olp,Bin). %%%----------------------------------------------------------------- %%% Remove internal fields from configuration @@ -180,18 +195,22 @@ filter_config(#{config:=HConfig}=Config) -> %%% %%% The handler process is linked to logger_sup, which is part of the %%% kernel application's supervision tree. -start(#{id := Name} = Config0) -> +start(OlpOpts0, #{id := Name, module:=Module, config:=HConfig} = Config0) -> + RegName = ?name_to_reg_name(Module,Name), ChildSpec = #{id => Name, - start => {?MODULE, start_link, [Config0]}, + start => {logger_olp, start_link, [RegName,?MODULE, + Config0, OlpOpts0]}, restart => temporary, shutdown => 2000, type => worker, modules => [?MODULE]}, case supervisor:start_child(logger_sup, ChildSpec) of - {ok,Pid,Config} -> + {ok,Pid,{Olp,OlpOpts}} -> ok = logger_handler_watcher:register_handler(Name,Pid), - {ok,Config}; + {ok,Config0#{config=>(maps:merge(HConfig,OlpOpts))#{olp=>Olp}}}; + {error,{Reason,Ch}} when is_tuple(Ch), element(1,Ch)==child -> + {error,Reason}; Error -> Error end. @@ -200,103 +219,48 @@ start(#{id := Name} = Config0) -> %%% gen_server callbacks %%%=================================================================== -init(#{id := Name, module := Module, - formatter := Formatter, config := HConfig0} = Config0) -> - RegName = ?name_to_reg_name(Module,Name), - register(RegName, self()), +init(#{id := Name, module := Module, config := HConfig}) -> process_flag(trap_exit, true), - process_flag(message_queue_data, off_heap), - ?init_test_hooks(), - ?start_observation(Name), - - case Module:init(Name, HConfig0) of + case Module:init(Name, HConfig) of {ok,HState} -> - try ets:new(Name, [public]) of - ModeTab -> - ?set_mode(ModeTab, async), - T0 = ?timestamp(), - HConfig = HConfig0#{handler_pid => self(), - mode_tab => ModeTab}, - Config = Config0#{config => HConfig}, - proc_lib:init_ack({ok,self(),Config}), - %% Storing common config in state to avoid copying - %% (sending) the config data for each log message - CommonConfig = maps:with(?CONFIG_KEYS,HConfig), - State = - ?merge_with_stats( - CommonConfig#{id => Name, - module => Module, - mode_tab => ModeTab, - mode => async, - ctrl_sync_count => - ?CONTROLLER_SYNC_INTERVAL, - last_qlen => 0, - last_log_ts => T0, - last_op => sync, - burst_win_ts => T0, - burst_msg_count => 0, - formatter => Formatter, - handler_state => HState}), - State1 = set_repeated_filesync(State), - unset_restart_flag(State1), - gen_server:enter_loop(?MODULE, [], State1) - catch - _:Error -> - unregister(RegName), - error_notify({init_handler,Name,Error}), - proc_lib:init_ack(Error) - end; + %% Storing common config in state to avoid copying + %% (sending) the config data for each log message + CommonConfig = maps:with(?COMMON_KEYS,HConfig), + State = CommonConfig#{id => Name, + module => Module, + ctrl_sync_count => + ?CONTROLLER_SYNC_INTERVAL, + last_op => sync, + handler_state => HState}, + State1 = set_repeated_filesync(State), + {ok,State1}; Error -> - unregister(RegName), - error_notify({init_handler,Name,Error}), - proc_lib:init_ack(Error) + Error end. -%% This is the synchronous log event. -handle_call({log, Bin}, _From, State) -> - {Result,State1} = do_log(Bin, call, State), - %% Result == ok | dropped - {reply,Result, State1}; +%% This is the log event. +handle_load(Bin, #{id:=Name, + module:=Module, + handler_state:=HandlerState, + ctrl_sync_count := CtrlSync}=State) -> + if CtrlSync==0 -> + {_,HS1} = Module:write(Name, sync, Bin, HandlerState), + {ok,State#{handler_state => HS1, + ctrl_sync_count => ?CONTROLLER_SYNC_INTERVAL, + last_op=>write}}; + true -> + {_,HS1} = Module:write(Name, async, Bin, HandlerState), + {ok,State#{handler_state => HS1, + ctrl_sync_count => CtrlSync-1, + last_op=>write}} + end. handle_call(filesync, _From, State = #{id := Name, module := Module, handler_state := HandlerState}) -> {Result,HandlerState1} = Module:filesync(Name,sync,HandlerState), - {reply, Result, State#{handler_state=>HandlerState1, last_op=>sync}}; - -handle_call({change_config, #{formatter:=Formatter, config:=NewHConfig}}, _From, - State = #{filesync_repeat_interval := FSyncInt0}) -> - %% In the future, if handler_state must be updated due to config - %% change, then we need to add a callback to Module here. - CommonConfig = maps:with(?CONFIG_KEYS,NewHConfig), - State1 = maps:merge(State, CommonConfig), - State2 = - case maps:get(filesync_repeat_interval, NewHConfig) of - FSyncInt0 -> - State1; - _FSyncInt1 -> - set_repeated_filesync(cancel_repeated_filesync(State1)) - end, - {reply, ok, State2#{formatter:=Formatter}}; - -handle_call(info, _From, State) -> - {reply, State, State}; - -handle_call(reset, _From, - #{id:=Name,module:=Module,handler_state:=HandlerState}=State) -> - State1 = ?merge_with_stats(State), - {reply, ok, State1#{last_qlen => 0, - last_log_ts => ?timestamp(), - handler_state => Module:reset_state(Name,HandlerState)}}; - -handle_call(stop, _From, State) -> - {stop, {shutdown,stopped}, ok, State}. - -%% This is the asynchronous log event. -handle_cast({log, Bin}, State) -> - {_,State1} = do_log(Bin, cast, State), - {noreply, State1}; + {reply, Result, State#{handler_state=>HandlerState1, last_op=>sync}}. %% If FILESYNC_REPEAT_INTERVAL is set to a millisec value, this %% clause gets called repeatedly by the handler. In order to @@ -319,168 +283,83 @@ handle_cast(repeated_filesync, {_,HS} = Module:filesync(Name, async, HandlerState), State#{handler_state => HS, last_op => sync} end, - {noreply,set_repeated_filesync(State1)}. + {noreply,set_repeated_filesync(State1)}; + +handle_cast({set_repeated_filesync,FSyncInt},State) -> + State1 = State#{filesync_repeat_interval=>FSyncInt}, + State2 = set_repeated_filesync(cancel_repeated_filesync(State1)), + {noreply, State2}. handle_info(Info, #{id := Name, module := Module, handler_state := HandlerState} = State) -> {noreply,State#{handler_state => Module:handle_info(Name,Info,HandlerState)}}. -terminate(Reason, State = #{id := Name, - module := Module, - handler_state := HandlerState}) -> +terminate(overloaded=Reason, #{id:=Name}=State) -> + log_handler_info(Name, "Handler ~p overloaded and stopping", [Name], State), + do_terminate(Reason,State), + ConfigResult = logger:get_handler_config(Name), + case ConfigResult of + {ok,#{module:=Module}=HConfig0} -> + spawn(fun() -> logger:remove_handler(Name) end), + HConfig = try Module:filter_config(HConfig0) + catch _:_ -> HConfig0 + end, + {ok,fun() -> logger:add_handler(Name,Module,HConfig) end}; + Error -> + error_notify({Name,restart_impossible,Error}), + Error + end; +terminate(Reason, State) -> + do_terminate(Reason, State). + +do_terminate(Reason, State = #{id := Name, + module := Module, + handler_state := HandlerState}) -> _ = cancel_repeated_filesync(State), _ = Module:terminate(Name, Reason, HandlerState), - ok = stop_or_restart(Name, Reason, State), - unregister(?name_to_reg_name(Module, Name)), ok. code_change(_OldVsn, State, _Extra) -> {ok, State}. +reset_state(#{id:=Name, module:=Module, handler_state:=HandlerState} = State) -> + State#{handler_state=>Module:reset_state(Name, HandlerState)}. %%%----------------------------------------------------------------- %%% Internal functions call(Module, Name, Op) when is_atom(Name) -> - call(?name_to_reg_name(Module,Name), Op); + case logger_olp:call(?name_to_reg_name(Module,Name), Op) of + {error,busy} -> {error,handler_busy}; + Other -> Other + end; call(_, Name, Op) -> {error,{badarg,{Op,[Name]}}}. -call(Server, Msg) -> - try - gen_server:call(Server, Msg, ?DEFAULT_CALL_TIMEOUT) - catch - _:{timeout,_} -> {error,handler_busy} - end. - -%% check for overload between every event (and set Mode to async, -%% sync or drop accordingly), but never flush the whole mailbox -%% before LogWindowSize events have been handled -do_log(Bin, CallOrCast, State = #{id:=Name, mode:=Mode0}) -> - T1 = ?timestamp(), - - %% check if the handler is getting overloaded, or if it's - %% recovering from overload (the check must be done for each - %% event to react quickly to large bursts of events and - %% to ensure that the handler can never end up in drop mode - %% with an empty mailbox, which would stop operation) - {Mode1,QLen,Mem,State1} = check_load(State), - - if (Mode1 == drop) andalso (Mode0 =/= drop) -> - log_handler_info(Name, "Handler ~p switched to drop mode", - [Name], State); - (Mode0 == drop) andalso ((Mode1 == async) orelse (Mode1 == sync)) -> - log_handler_info(Name, "Handler ~p switched to ~w mode", - [Name,Mode1], State); - true -> - ok - end, - - %% kill the handler if it can't keep up with the load - kill_if_choked(Name, QLen, Mem, State), - - if Mode1 == flush -> - flush(Name, QLen, T1, State1); - true -> - write(Name, Mode1, T1, Bin, CallOrCast, State1) - end. - -%% this clause is called by do_log/3 after an overload check -%% has been performed, where QLen > FlushQLen -flush(Name, _QLen0, T1, State=#{last_log_ts := _T0, mode_tab := ModeTab}) -> - %% flush messages in the mailbox (a limited number in - %% order to not cause long delays) - NewFlushed = flush_log_events(?FLUSH_MAX_N), - - %% write info in log about flushed messages +notify({mode_change,Mode0,Mode1},#{id:=Name}=State) -> + log_handler_info(Name,"Handler ~p switched from ~p to ~p mode", + [Name,Mode0,Mode1], State); +notify({flushed,Flushed},#{id:=Name}=State) -> log_handler_info(Name, "Handler ~p flushed ~w log events", - [Name,NewFlushed], State), - - %% because of the receive loop when flushing messages, the - %% handler will be scheduled out often and the mailbox could - %% grow very large, so we'd better check the queue again here - {_,_QLen1} = process_info(self(), message_queue_len), - ?observe(Name,{max_qlen,_QLen1}), - - %% Add 1 for the current log event - ?observe(Name,{flushed,NewFlushed+1}), - - State1 = ?update_max_time(?diff_time(T1,_T0),State), - State2 = ?update_max_qlen(_QLen1,State1), - {dropped,?update_other(flushed,FLUSHED,NewFlushed, - State2#{mode => ?set_mode(ModeTab,async), - last_qlen => 0, - last_log_ts => T1})}. - -%% this clause is called to write to file -write(Name, Mode, T1, Bin, _CallOrCast, - State = #{module := Module, - handler_state := HandlerState, - mode_tab := ModeTab, - ctrl_sync_count := CtrlSync, - last_qlen := LastQLen, - last_log_ts := T0}) -> - %% check if we need to limit the number of writes - %% during a burst of log events - {DoWrite,State1} = limit_burst(State), - - %% only log synhrounously every ?CONTROLLER_SYNC_INTERVAL time, to - %% give the handler time between writes so it can keep up with - %% incoming messages - {Result,LastQLen1,HandlerState1} = - if DoWrite, CtrlSync == 0 -> - ?observe(Name,{_CallOrCast,1}), - {_,HS1} = Module:write(Name, sync, Bin, HandlerState), - {ok,element(2, process_info(self(), message_queue_len)),HS1}; - DoWrite -> - ?observe(Name,{_CallOrCast,1}), - {_,HS1} = Module:write(Name, async, Bin, HandlerState), - {ok,LastQLen,HS1}; - not DoWrite -> - ?observe(Name,{flushed,1}), - {dropped,LastQLen,HandlerState} - end, - - %% Check if the time since the previous log event is long enough - - %% and the queue length small enough - to assume the mailbox has - %% been emptied, and if so, do filesync operation and reset mode to - %% async. Note that this is the best we can do to detect an idle - %% handler without setting a timer after each log call/cast. If the - %% time between two consecutive log events is fast and no new - %% event comes in after the last one, idle state won't be detected! - Time = ?diff_time(T1,T0), - State2 = - if (LastQLen1 < ?FILESYNC_OK_QLEN) andalso - (Time > ?IDLE_DETECT_TIME_USEC) -> - {_,HS2} = Module:filesync(Name,async,HandlerState), - State1#{mode => ?change_mode(ModeTab, Mode, async), - burst_msg_count => 0, - handler_state => HS2}; - true -> - State1#{mode => Mode, handler_state => HandlerState1} - end, - State3 = ?update_calls_or_casts(_CallOrCast,1,State2), - State4 = ?update_max_qlen(LastQLen1,State3), - State5 = - ?update_max_time(Time, - State4#{last_qlen := LastQLen1, - last_log_ts => T1, - last_op => write, - ctrl_sync_count => - if CtrlSync==0 -> ?CONTROLLER_SYNC_INTERVAL; - true -> CtrlSync-1 - end}), - {Result,State5}. + [Name,Flushed], State); +notify(restart,#{id:=Name}=State) -> + log_handler_info(Name, "Handler ~p restarted", [Name], State); +notify(idle,#{id:=Name,module:=Module,handler_state:=HandlerState}=State) -> + {_,HS} = Module:filesync(Name,async,HandlerState), + State#{handler_state=>HS, last_op=>sync}. log_handler_info(Name, Format, Args, #{module:=Module, - formatter:=Formatter, - handler_state:=HandlerState}) -> - Config = #{formatter=>Formatter}, + handler_state:=HandlerState}=State) -> + Config = + case logger:get_handler_config(Name) of + {ok,Conf} -> Conf; + _ -> #{formatter=>{?DEFAULT_FORMATTER,?DEFAULT_FORMAT_CONFIG}} + end, Meta = #{time=>erlang:system_time(microsecond)}, Bin = log_to_binary(#{level => notice, msg => {Format,Args}, meta => Meta}, Config), - _ = Module:write(Name, async, Bin, HandlerState), - ok. + {_,HS} = Module:write(Name, async, Bin, HandlerState), + State#{handler_state=>HS, last_op=>write}. %%%----------------------------------------------------------------- %%% Convert log data on any form to binary @@ -540,42 +419,8 @@ string_to_binary(String) -> %%%----------------------------------------------------------------- %%% Check that the configuration term is valid check_config(Config) when is_map(Config) -> - case check_common_config(maps:to_list(Config)) of - ok -> - case overload_levels_ok(Config) of - true -> - ok; - false -> - Faulty = maps:with([sync_mode_qlen, - drop_mode_qlen, - flush_qlen],Config), - {error,{invalid_levels,Faulty}} - end; - Error -> - Error - end. + check_common_config(maps:to_list(Config)). -check_common_config([{sync_mode_qlen,N}|Config]) when is_integer(N) -> - check_common_config(Config); -check_common_config([{drop_mode_qlen,N}|Config]) when is_integer(N) -> - check_common_config(Config); -check_common_config([{flush_qlen,N}|Config]) when is_integer(N) -> - check_common_config(Config); -check_common_config([{burst_limit_enable,Bool}|Config]) when is_boolean(Bool) -> - check_common_config(Config); -check_common_config([{burst_limit_max_count,N}|Config]) when is_integer(N) -> - check_common_config(Config); -check_common_config([{burst_limit_window_time,N}|Config]) when is_integer(N) -> - check_common_config(Config); -check_common_config([{overload_kill_enable,Bool}|Config]) when is_boolean(Bool) -> - check_common_config(Config); -check_common_config([{overload_kill_qlen,N}|Config]) when is_integer(N) -> - check_common_config(Config); -check_common_config([{overload_kill_mem_size,N}|Config]) when is_integer(N) -> - check_common_config(Config); -check_common_config([{overload_kill_restart_after,NorA}|Config]) - when is_integer(NorA); NorA == infinity -> - check_common_config(Config); check_common_config([{filesync_repeat_interval,NorA}|Config]) when is_integer(NorA); NorA == no_repeat -> check_common_config(Config); @@ -585,156 +430,7 @@ check_common_config([]) -> ok. get_default_config() -> - #{sync_mode_qlen => ?SYNC_MODE_QLEN, - drop_mode_qlen => ?DROP_MODE_QLEN, - flush_qlen => ?FLUSH_QLEN, - burst_limit_enable => ?BURST_LIMIT_ENABLE, - burst_limit_max_count => ?BURST_LIMIT_MAX_COUNT, - burst_limit_window_time => ?BURST_LIMIT_WINDOW_TIME, - overload_kill_enable => ?OVERLOAD_KILL_ENABLE, - overload_kill_qlen => ?OVERLOAD_KILL_QLEN, - overload_kill_mem_size => ?OVERLOAD_KILL_MEM_SIZE, - overload_kill_restart_after => ?OVERLOAD_KILL_RESTART_AFTER, - filesync_repeat_interval => ?FILESYNC_REPEAT_INTERVAL}. - -%%%----------------------------------------------------------------- -%%% Overload Protection -call_cast_or_drop(_Name, HandlerPid, ModeTab, Bin) -> - %% If the handler process is getting overloaded, the log event - %% will be synchronous instead of asynchronous (slows down the - %% logging tempo of a process doing lots of logging. If the - %% handler is choked, drop mode is set and no event will be sent. - try ?get_mode(ModeTab) of - async -> - gen_server:cast(HandlerPid, {log,Bin}); - sync -> - case call(HandlerPid, {log,Bin}) of - ok -> - ok; - _Other -> - %% dropped or {error,handler_busy} - ?observe(_Name,{dropped,1}), - ok - end; - drop -> - ?observe(_Name,{dropped,1}) - catch - %% if the ETS table doesn't exist (maybe because of a - %% handler restart), we can only drop the event - _:_ -> ?observe(_Name,{dropped,1}) - end, - ok. - -set_restart_flag(#{id := Name, module := Module} = State) -> - log_handler_info(Name, "Handler ~p overloaded and stopping", [Name], State), - Flag = list_to_atom(lists:concat([Module,"_",Name,"_restarting"])), - spawn(fun() -> - register(Flag, self()), - timer:sleep(infinity) - end), - ok. - -unset_restart_flag(#{id := Name, module := Module} = State) -> - Flag = list_to_atom(lists:concat([Module,"_",Name,"_restarting"])), - case whereis(Flag) of - undefined -> - ok; - Pid -> - exit(Pid, kill), - log_handler_info(Name, "Handler ~p restarted", [Name], State) - end. - -check_load(State = #{id:=_Name, mode_tab := ModeTab, mode := Mode, - sync_mode_qlen := SyncModeQLen, - drop_mode_qlen := DropModeQLen, - flush_qlen := FlushQLen}) -> - {_,Mem} = process_info(self(), memory), - ?observe(_Name,{max_mem,Mem}), - {_,QLen} = process_info(self(), message_queue_len), - ?observe(_Name,{max_qlen,QLen}), - %% When the handler process gets scheduled in, it's impossible - %% to predict the QLen. We could jump "up" arbitrarily from say - %% async to sync, async to drop, sync to flush, etc. However, when - %% the handler process manages the log events (without flushing), - %% one after the other, we will move "down" from drop to sync and - %% from sync to async. This way we don't risk getting stuck in - %% drop or sync mode with an empty mailbox. - {Mode1,_NewDrops,_NewFlushes} = - if - QLen >= FlushQLen -> - {flush, 0,1}; - QLen >= DropModeQLen -> - %% Note that drop mode will force log events to - %% be dropped on the client side (never sent get to - %% the handler). - IncDrops = if Mode == drop -> 0; true -> 1 end, - {?change_mode(ModeTab, Mode, drop), IncDrops,0}; - QLen >= SyncModeQLen -> - {?change_mode(ModeTab, Mode, sync), 0,0}; - true -> - {?change_mode(ModeTab, Mode, async), 0,0} - end, - State1 = ?update_other(drops,DROPS,_NewDrops,State), - {Mode1, QLen, Mem, - ?update_other(flushes,FLUSHES,_NewFlushes, - State1#{last_qlen => QLen})}. - -limit_burst(#{burst_limit_enable := false}=State) -> - {true,State}; -limit_burst(#{burst_win_ts := BurstWinT0, - burst_msg_count := BurstMsgCount, - burst_limit_window_time := BurstLimitWinTime, - burst_limit_max_count := BurstLimitMaxCnt} = State) -> - if (BurstMsgCount >= BurstLimitMaxCnt) -> - %% the limit for allowed messages has been reached - BurstWinT1 = ?timestamp(), - case ?diff_time(BurstWinT1,BurstWinT0) of - BurstCheckTime when BurstCheckTime < (BurstLimitWinTime*1000) -> - %% we're still within the burst time frame - {false,?update_other(burst_drops,BURSTS,1,State)}; - _BurstCheckTime -> - %% burst time frame passed, reset counters - {true,State#{burst_win_ts => BurstWinT1, - burst_msg_count => 0}} - end; - true -> - %% the limit for allowed messages not yet reached - {true,State#{burst_win_ts => BurstWinT0, - burst_msg_count => BurstMsgCount+1}} - end. - -kill_if_choked(Name, QLen, Mem, State = #{overload_kill_enable := KillIfOL, - overload_kill_qlen := OLKillQLen, - overload_kill_mem_size := OLKillMem}) -> - if KillIfOL andalso - ((QLen > OLKillQLen) orelse (Mem > OLKillMem)) -> - set_restart_flag(State), - exit({shutdown,{overloaded,Name,QLen,Mem}}); - true -> - ok - end. - -flush_log_events(Limit) -> - process_flag(priority, high), - Flushed = flush_log_events(0, Limit), - process_flag(priority, normal), - Flushed. - -flush_log_events(Limit, Limit) -> - Limit; -flush_log_events(N, Limit) -> - %% flush log events but leave other events, such as - %% filesync, info and change_config, so that these - %% have a chance to be processed even under heavy load - receive - {'$gen_cast',{log,_}} -> - flush_log_events(N+1, Limit); - {'$gen_call',{Pid,MRef},{log,_}} -> - Pid ! {MRef, dropped}, - flush_log_events(N+1, Limit) - after - 0 -> N - end. + #{filesync_repeat_interval => ?FILESYNC_REPEAT_INTERVAL}. set_repeated_filesync(#{filesync_repeat_interval:=FSyncInt} = State) when is_integer(FSyncInt) -> @@ -752,51 +448,12 @@ cancel_repeated_filesync(State) -> error -> State end. - -stop_or_restart(Name, {shutdown,Reason={overloaded,_Name,_QLen,_Mem}}, - #{overload_kill_restart_after := RestartAfter}) -> - %% If we're terminating because of an overload situation (see - %% kill_if_choked/4), we need to remove the handler and set a - %% restart timer. A separate process must perform this in order to - %% avoid deadlock. - HandlerPid = self(), - ConfigResult = logger:get_handler_config(Name), - RemoveAndRestart = - fun() -> - MRef = erlang:monitor(process, HandlerPid), - receive - {'DOWN',MRef,_,_,_} -> - ok - after 30000 -> - error_notify(Reason), - exit(HandlerPid, kill) - end, - case ConfigResult of - {ok,#{module:=HMod}=HConfig0} when is_integer(RestartAfter) -> - _ = logger:remove_handler(Name), - HConfig = try HMod:filter_config(HConfig0) - catch _:_ -> HConfig0 - end, - _ = timer:apply_after(RestartAfter, logger, add_handler, - [Name,HMod,HConfig]); - {ok,_} -> - _ = logger:remove_handler(Name); - {error,CfgReason} when is_integer(RestartAfter) -> - error_notify({Name,restart_impossible,CfgReason}); - {error,_} -> - ok - end - end, - spawn(RemoveAndRestart), - ok; -stop_or_restart(_Name, _Reason, _State) -> - ok. - -overload_levels_ok(HandlerConfig) -> - SMQL = maps:get(sync_mode_qlen, HandlerConfig, ?SYNC_MODE_QLEN), - DMQL = maps:get(drop_mode_qlen, HandlerConfig, ?DROP_MODE_QLEN), - FQL = maps:get(flush_qlen, HandlerConfig, ?FLUSH_QLEN), - (DMQL > 1) andalso (SMQL =< DMQL) andalso (DMQL =< FQL). - error_notify(Term) -> ?internal_log(error, Term). + +maybe_set_repeated_filesync(_Olp, + #{filesync_repeat_interval:=FSyncInt}, + #{filesync_repeat_interval:=FSyncInt}) -> + ok; +maybe_set_repeated_filesync(Olp,_,#{filesync_repeat_interval:=FSyncInt}) -> + logger_olp:cast(Olp,{set_repeated_filesync,FSyncInt}). |