diff options
Diffstat (limited to 'lib/kernel/src/logger_h_common.erl')
-rw-r--r-- | lib/kernel/src/logger_h_common.erl | 389 |
1 files changed, 167 insertions, 222 deletions
diff --git a/lib/kernel/src/logger_h_common.erl b/lib/kernel/src/logger_h_common.erl index d01851b2b1..d290f51e34 100644 --- a/lib/kernel/src/logger_h_common.erl +++ b/lib/kernel/src/logger_h_common.erl @@ -24,7 +24,7 @@ -include("logger_internal.hrl"). %% API --export([start_link/3, info/2, filesync/2, reset/2]). +-export([start_link/1, info/2, filesync/2, reset/2]). %% gen_server and proc_lib callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, @@ -52,65 +52,37 @@ -define(READ_ONLY_KEYS,[handler_pid,mode_tab]). %%%----------------------------------------------------------------- -%%% -filesync(Module, Name) when is_atom(Name) -> - try - gen_server:call(?name_to_reg_name(Module,Name), - filesync, ?DEFAULT_CALL_TIMEOUT) - catch - _:{timeout,_} -> {error,handler_busy} - end; -filesync(_, Name) -> - {error,{badarg,{filesync,[Name]}}}. +%%% API -info(Module, Name) when is_atom(Name) -> - try - gen_server:call(?name_to_reg_name(Module,Name), - info, ?DEFAULT_CALL_TIMEOUT) - catch - _:{timeout,_} -> {error,handler_busy} - end; -info(_, Name) -> - {error,{badarg,{info,[Name]}}}. +%% This function is called by the logger_sup supervisor +start_link(Args) -> + proc_lib:start_link(?MODULE,init,[Args]). -reset(Module, Name) when is_atom(Name) -> - try - gen_server:call(?name_to_reg_name(Module,Name), - reset, ?DEFAULT_CALL_TIMEOUT) - catch - _:{timeout,_} -> {error,handler_busy} - end; -reset(_, Name) -> - {error,{badarg,{reset,[Name]}}}. +filesync(Module, Name) -> + call(Module, Name, filesync). +info(Module, Name) -> + call(Module, Name, info). +reset(Module, Name) -> + call(Module, Name, reset). %%%----------------------------------------------------------------- %%% Handler being added adding_handler(#{id:=Name,module:=Module}=Config) -> - HConfig = maps:get(config, Config, #{}), - HandlerConfig0 = maps:without(?CONFIG_KEYS,HConfig), + HConfig0 = maps:get(config, Config, #{}), + HandlerConfig0 = maps:without(?CONFIG_KEYS,HConfig0), case Module:check_config(Name,set,undefined,HandlerConfig0) of {ok,HandlerConfig} -> ModifiedCommon = maps:with(?CONFIG_KEYS,HandlerConfig), - CommonConfig0 = maps:with(?CONFIG_KEYS,HConfig), + CommonConfig0 = maps:with(?CONFIG_KEYS,HConfig0), CommonConfig = maps:merge( maps:merge(get_default_config(), CommonConfig0), ModifiedCommon), case check_config(CommonConfig) of ok -> - State = maps:merge(get_init_state(), CommonConfig), - HConfig1 = maps:merge(CommonConfig,HandlerConfig), - Config1 = Config#{config=>HConfig1}, - case overload_levels_ok(State) of - true -> - start(Name, Config1, State); - false -> - #{sync_mode_qlen := SMQL, - drop_mode_qlen := DMQL, - flush_qlen := FQL} = State, - {error,{invalid_levels,{SMQL,DMQL,FQL}}} - end; + HConfig = maps:merge(CommonConfig,HandlerConfig), + start(Config#{config => HConfig}); {error,Faulty} -> {error,{invalid_config,Module,Faulty}} end; @@ -141,7 +113,7 @@ removing_handler(#{id:=Name, module:=Module}) -> %%%----------------------------------------------------------------- %%% Updating handler config changing_config(SetOrUpdate, - OldConfig=#{id:=Name,config:=OldHConfig,module:=Module}, + #{id:=Name,config:=OldHConfig,module:=Module}, NewConfig0) -> NewHConfig0 = maps:get(config, NewConfig0, #{}), OldHandlerConfig = maps:without(?CONFIG_KEYS++?READ_ONLY_KEYS,OldHConfig), @@ -169,12 +141,9 @@ changing_config(SetOrUpdate, ReadOnly), NewConfig = NewConfig0#{config=>NewHConfig}, HPid = maps:get(handler_pid,OldHConfig), - try gen_server:call(HPid, {change_config,OldConfig,NewConfig}, - ?DEFAULT_CALL_TIMEOUT) of + case call(HPid, {change_config,NewConfig}) of ok -> {ok,NewConfig}; Error -> Error - catch - _:{timeout,_} -> {error,handler_busy} end; {error,Faulty} -> {error,{invalid_config,Module,Faulty}} @@ -204,61 +173,35 @@ filter_config(#{config:=HConfig}=Config) -> Config#{config=>maps:without(?READ_ONLY_KEYS,HConfig)}. %%%----------------------------------------------------------------- -%%% Add a standard handler to the logger. -%%% This starts a dedicated handler process which should always -%%% exist if the handler is registered with logger (and should not -%%% exist if the handler is not registered). -%%% -%%% Handler specific config should be provided with a sub map associated -%%% with a key named 'config', e.g: +%%% Start the handler process %%% -%%% Config = #{config => #{sync_mode_qlen => 50} +%%% The process must always exist if the handler is registered with +%%% logger (and must not exist if the handler is not registered). %%% -%%% The standard handler process is linked to logger_sup, which is -%%% part of the kernel application's supervision tree. -start(Name, Config, HandlerState) -> +%%% The handler process is linked to logger_sup, which is part of the +%%% kernel application's supervision tree. +start(#{id := Name} = Config0) -> ChildSpec = #{id => Name, - start => {?MODULE, start_link, [Name,Config,HandlerState]}, + start => {?MODULE, start_link, [Config0]}, restart => temporary, shutdown => 2000, type => worker, modules => [?MODULE]}, case supervisor:start_child(logger_sup, ChildSpec) of - {ok,Pid,Config1} -> + {ok,Pid,Config} -> ok = logger_handler_watcher:register_handler(Name,Pid), - {ok,Config1}; + {ok,Config}; Error -> Error end. -%%%----------------------------------------------------------------- -%%% Start a standard handler process and link to caller. -%%% This function is called by the kernel supervisor when this -%%% handler process gets added --spec start_link(Name, Config, HandlerState) -> {ok,Pid} | {error,Reason} when - Name :: atom(), - Config :: logger:handler_config(), - HandlerState :: map(), - Pid :: pid(), - Reason :: term(). - -start_link(Name, Config, HandlerState) -> - proc_lib:start_link(?MODULE,init,[[Name,Config,HandlerState]]). - -%%%----------------------------------------------------------------- -%%% -get_init_state() -> - #{ctrl_sync_int => ?CONTROLLER_SYNC_INTERVAL, - filesync_ok_qlen => ?FILESYNC_OK_QLEN}. - %%%=================================================================== %%% gen_server callbacks %%%=================================================================== -init([Name, Config = #{config := HConfig, module := Module}, - State = #{filesync_repeat_interval := FSyncInt, - ctrl_sync_int := CtrlSyncInt}]) -> +init(#{id := Name, module := Module, + formatter := Formatter, config := HConfig0} = Config0) -> RegName = ?name_to_reg_name(Module,Name), register(RegName, self()), process_flag(trap_exit, true), @@ -267,43 +210,36 @@ init([Name, Config = #{config := HConfig, module := Module}, ?init_test_hooks(), ?start_observation(Name), - case Module:init(Name, HConfig) of + case Module:init(Name, HConfig0) of {ok,HState} -> try ets:new(Name, [public]) of ModeTab -> ?set_mode(ModeTab, async), T0 = ?timestamp(), - State1 = - ?merge_with_stats(State#{id => Name, - module => Module, - mode_tab => ModeTab, - mode => async, - ctrl_sync_count => CtrlSyncInt, - last_qlen => 0, - last_log_ts => T0, - last_op => sync, - burst_win_ts => T0, - burst_msg_count => 0, - handler_state => HState}), - Config1 = - Config#{config => HConfig#{handler_pid => self(), - mode_tab => ModeTab}}, - proc_lib:init_ack({ok,self(),Config1}), - if is_integer(FSyncInt) -> - gen_server:cast(self(), repeated_filesync); - true -> - ok - end, - case unset_restart_flag(Name, Module) of - true -> - %% inform about restart - gen_server:cast(self(), {log_handler_info, - "Handler ~p restarted", - [Name]}); - false -> - %% initial start - ok - end, + HConfig = HConfig0#{handler_pid => self(), + mode_tab => ModeTab}, + Config = Config0#{config => HConfig}, + proc_lib:init_ack({ok,self(),Config}), + %% Storing common config in state to avoid copying + %% (sending) the config data for each log message + CommonConfig = maps:with(?CONFIG_KEYS,HConfig), + State = + ?merge_with_stats( + CommonConfig#{id => Name, + module => Module, + mode_tab => ModeTab, + mode => async, + ctrl_sync_count => + ?CONTROLLER_SYNC_INTERVAL, + last_qlen => 0, + last_log_ts => T0, + last_op => sync, + burst_win_ts => T0, + burst_msg_count => 0, + formatter => Formatter, + handler_state => HState}), + State1 = set_repeated_filesync(State), + unset_restart_flag(State1), gen_server:enter_loop(?MODULE, [], State1) catch _:Error -> @@ -326,47 +262,33 @@ handle_call({log, Bin}, _From, State) -> handle_call(filesync, _From, State = #{id := Name, module := Module, handler_state := HandlerState}) -> - {Result,HandlerState1} = Module:sync_filesync(Name,HandlerState), + {Result,HandlerState1} = Module:filesync(Name,sync,HandlerState), {reply, Result, State#{handler_state=>HandlerState1, last_op=>sync}}; -handle_call({change_config,_OldConfig,NewConfig}, _From, +handle_call({change_config, #{formatter:=Formatter, config:=NewHConfig}}, _From, State = #{filesync_repeat_interval := FSyncInt0}) -> - HConfig = maps:get(config, NewConfig, #{}), - State1 = maps:merge(State, HConfig), - case overload_levels_ok(State1) of - true -> - _ = - case maps:get(filesync_repeat_interval, HConfig, undefined) of - undefined -> - ok; - no_repeat -> - _ = logger_h_common:cancel_timer(maps:get(rep_sync_tref, - State, - undefined)); - FSyncInt0 -> - ok; - _FSyncInt1 -> - _ = logger_h_common:cancel_timer(maps:get(rep_sync_tref, - State, - undefined)), - gen_server:cast(self(), repeated_filesync) - end, - {reply, ok, State1}; - false -> - #{sync_mode_qlen := SMQL, - drop_mode_qlen := DMQL, - flush_qlen := FQL} = State1, - {reply, {error,{invalid_levels,{SMQL,DMQL,FQL}}}, State} - end; + %% In the future, if handler_state must be updated due to config + %% change, then we need to add a callback to Module here. + CommonConfig = maps:with(?CONFIG_KEYS,NewHConfig), + State1 = maps:merge(State, CommonConfig), + State2 = + case maps:get(filesync_repeat_interval, NewHConfig) of + FSyncInt0 -> + State1; + _FSyncInt1 -> + set_repeated_filesync(cancel_repeated_filesync(State1)) + end, + {reply, ok, State2#{formatter:=Formatter}}; handle_call(info, _From, State) -> {reply, State, State}; -handle_call(reset, _From, #{module:=Module,handler_state:=HandlerState}=State) -> +handle_call(reset, _From, + #{id:=Name,module:=Module,handler_state:=HandlerState}=State) -> State1 = ?merge_with_stats(State), {reply, ok, State1#{last_qlen => 0, last_log_ts => ?timestamp(), - handler_state => Module:reset_state(HandlerState)}}; + handler_state => Module:reset_state(Name,HandlerState)}}; handle_call(stop, _From, State) -> {stop, {shutdown,stopped}, ok, State}. @@ -376,41 +298,37 @@ handle_cast({log, Bin}, State) -> {_,State1} = do_log(Bin, cast, State), {noreply, State1}; -handle_cast({log_handler_info, Format, Args}, State = #{id:=Name}) -> - log_handler_info(Name, Format, Args, State), - {noreply, State}; - %% If FILESYNC_REPEAT_INTERVAL is set to a millisec value, this %% clause gets called repeatedly by the handler. In order to %% guarantee that a filesync *always* happens after the last log %% event, the repeat operation must be active! handle_cast(repeated_filesync,State = #{filesync_repeat_interval := no_repeat}) -> + %% This clause handles a race condition which may occur when + %% config changes filesync_repeat_interval from an integer value + %% to no_repeat. {noreply,State}; handle_cast(repeated_filesync, State = #{id := Name, module := Module, handler_state := HandlerState, - filesync_repeat_interval := FSyncInt, last_op := LastOp}) -> - HandlerState1 = + State1 = if LastOp == sync -> - HandlerState; + State; true -> - Module:async_filesync(Name,HandlerState) + {_,HS} = Module:filesync(Name, async, HandlerState), + State#{handler_state => HS, last_op => sync} end, - {ok,TRef} = timer:apply_after(FSyncInt, gen_server,cast, - [self(),repeated_filesync]), - {noreply,State#{handler_state=>HandlerState1, - rep_sync_tref => TRef, - last_op => sync}}. + {noreply,set_repeated_filesync(State1)}. -handle_info(Info, #{module := Module, handler_state := HandlerState} = State) -> - {noreply,State#{handler_state => Module:handle_info(Info,HandlerState)}}. +handle_info(Info, #{id := Name, module := Module, + handler_state := HandlerState} = State) -> + {noreply,State#{handler_state => Module:handle_info(Name,Info,HandlerState)}}. terminate(Reason, State = #{id := Name, module := Module, handler_state := HandlerState}) -> - _ = cancel_timer(maps:get(rep_sync_tref, State, undefined)), + _ = cancel_repeated_filesync(State), _ = Module:terminate(Name, Reason, HandlerState), ok = stop_or_restart(Name, Reason, State), unregister(?name_to_reg_name(Module, Name)), @@ -420,10 +338,24 @@ code_change(_OldVsn, State, _Extra) -> {ok, State}. +%%%----------------------------------------------------------------- +%%% Internal functions +call(Module, Name, Op) when is_atom(Name) -> + call(?name_to_reg_name(Module,Name), Op); +call(_, Name, Op) -> + {error,{badarg,{Op,[Name]}}}. + +call(Server, Msg) -> + try + gen_server:call(Server, Msg, ?DEFAULT_CALL_TIMEOUT) + catch + _:{timeout,_} -> {error,handler_busy} + end. + %% check for overload between every event (and set Mode to async, %% sync or drop accordingly), but never flush the whole mailbox %% before LogWindowSize events have been handled -do_log(Bin, CallOrCast, State = #{id:=Name, module:=Module, mode:=Mode0}) -> +do_log(Bin, CallOrCast, State = #{id:=Name, mode:=Mode0}) -> T1 = ?timestamp(), %% check if the handler is getting overloaded, or if it's @@ -444,7 +376,7 @@ do_log(Bin, CallOrCast, State = #{id:=Name, module:=Module, mode:=Mode0}) -> end, %% kill the handler if it can't keep up with the load - kill_if_choked(Name, Module, QLen, Mem, State), + kill_if_choked(Name, QLen, Mem, State), if Mode1 == flush -> flush(Name, QLen, T1, State1); @@ -473,8 +405,9 @@ flush(Name, _QLen0, T1, State=#{last_log_ts := _T0, mode_tab := ModeTab}) -> ?observe(Name,{flushed,NewFlushed+1}), State1 = ?update_max_time(?diff_time(T1,_T0),State), + State2 = ?update_max_qlen(_QLen1,State1), {dropped,?update_other(flushed,FLUSHED,NewFlushed, - State1#{mode => ?set_mode(ModeTab,async), + State2#{mode => ?set_mode(ModeTab,async), last_qlen => 0, last_log_ts => T1})}. @@ -484,24 +417,23 @@ write(Name, Mode, T1, Bin, _CallOrCast, handler_state := HandlerState, mode_tab := ModeTab, ctrl_sync_count := CtrlSync, - ctrl_sync_int := CtrlSyncInt, last_qlen := LastQLen, last_log_ts := T0}) -> %% check if we need to limit the number of writes %% during a burst of log events {DoWrite,BurstWinT,BurstMsgCount} = limit_burst(State), - %% only log synhrounously every CtrlSyncInt time, to give the - %% handler time between writes so it can keep up with incoming - %% messages + %% only log synhrounously every ?CONTROLLER_SYNC_INTERVAL time, to + %% give the handler time between writes so it can keep up with + %% incoming messages {Result,LastQLen1,HandlerState1} = if DoWrite, CtrlSync == 0 -> ?observe(Name,{_CallOrCast,1}), - {_,HS1} = Module:sync_write(Name, Bin, HandlerState), + {_,HS1} = Module:write(Name, sync, Bin, HandlerState), {ok,element(2, process_info(self(), message_queue_len)),HS1}; DoWrite -> ?observe(Name,{_CallOrCast,1}), - HS1 = Module:async_write(Name, Bin, HandlerState), + {_,HS1} = Module:write(Name, async, Bin, HandlerState), {ok,LastQLen,HS1}; not DoWrite -> ?observe(Name,{flushed,1}), @@ -519,39 +451,37 @@ write(Name, Mode, T1, Bin, _CallOrCast, {Mode1,BurstMsgCount1,HandlerState2} = if (LastQLen1 < ?FILESYNC_OK_QLEN) andalso (Time > ?IDLE_DETECT_TIME_USEC) -> - HS2 = Module:async_filesync(Name,HandlerState), + {_,HS2} = Module:filesync(Name,async,HandlerState), {?change_mode(ModeTab, Mode, async),0,HS2}; true -> {Mode,BurstMsgCount,HandlerState1} end, - State1 = - ?update_calls_or_casts(_CallOrCast,1,State), - State2 = + State1 = ?update_calls_or_casts(_CallOrCast,1,State), + State2 = ?update_max_qlen(LastQLen1,State1), + State3 = ?update_max_time(Time, - State1#{mode => Mode1, + State2#{mode => Mode1, last_qlen := LastQLen1, last_log_ts => T1, last_op => write, burst_win_ts => BurstWinT, burst_msg_count => BurstMsgCount1, - ctrl_sync_count => if CtrlSync==0 -> CtrlSyncInt; - true -> CtrlSync-1 - end, + ctrl_sync_count => + if CtrlSync==0 -> ?CONTROLLER_SYNC_INTERVAL; + true -> CtrlSync-1 + end, handler_state => HandlerState2}), - {Result,State2}. + {Result,State3}. log_handler_info(Name, Format, Args, #{module:=Module, + formatter:=Formatter, handler_state:=HandlerState}) -> - Config = - case logger:get_handler_config(Name) of - {ok,Conf} -> Conf; - _ -> #{formatter=>{?DEFAULT_FORMATTER,?DEFAULT_FORMAT_CONFIG}} - end, + Config = #{formatter=>Formatter}, Meta = #{time=>erlang:system_time(microsecond)}, Bin = log_to_binary(#{level => notice, msg => {Format,Args}, meta => Meta}, Config), - _ = Module:async_write(Name, Bin, HandlerState), + _ = Module:write(Name, async, Bin, HandlerState), ok. %%%----------------------------------------------------------------- @@ -612,7 +542,20 @@ string_to_binary(String) -> %%%----------------------------------------------------------------- %%% Check that the configuration term is valid check_config(Config) when is_map(Config) -> - check_common_config(maps:to_list(Config)). + case check_common_config(maps:to_list(Config)) of + ok -> + case overload_levels_ok(Config) of + true -> + ok; + false -> + Faulty = maps:with([sync_mode_qlen, + drop_mode_qlen, + flush_qlen],Config), + {error,{invalid_levels,Faulty}} + end; + Error -> + Error + end. check_common_config([{sync_mode_qlen,N}|Config]) when is_integer(N) -> check_common_config(Config); @@ -667,15 +610,13 @@ call_cast_or_drop(_Name, HandlerPid, ModeTab, Bin) -> async -> gen_server:cast(HandlerPid, {log,Bin}); sync -> - try gen_server:call(HandlerPid, {log,Bin}, ?DEFAULT_CALL_TIMEOUT) of - %% if return value from call == dropped, the - %% message has been flushed by handler and should - %% therefore not be counted as dropped in stats - ok -> ok; - dropped -> ok - catch - _:{timeout,_} -> - ?observe(_Name,{dropped,1}) + case call(HandlerPid, {log,Bin}) of + ok -> + ok; + _Other -> + %% dropped or {error,handler_busy} + ?observe(_Name,{dropped,1}), + ok end; drop -> ?observe(_Name,{dropped,1}) @@ -686,10 +627,8 @@ call_cast_or_drop(_Name, HandlerPid, ModeTab, Bin) -> end, ok. -handler_exit(_Name, Reason) -> - exit(Reason). - -set_restart_flag(Name, Module) -> +set_restart_flag(#{id := Name, module := Module} = State) -> + log_handler_info(Name, "Handler ~p overloaded and stopping", [Name], State), Flag = list_to_atom(lists:concat([Module,"_",Name,"_restarting"])), spawn(fun() -> register(Flag, self()), @@ -697,14 +636,14 @@ set_restart_flag(Name, Module) -> end), ok. -unset_restart_flag(Name, Module) -> +unset_restart_flag(#{id := Name, module := Module} = State) -> Flag = list_to_atom(lists:concat([Module,"_",Name,"_restarting"])), case whereis(Flag) of undefined -> - false; + ok; Pid -> exit(Pid, kill), - true + log_handler_info(Name, "Handler ~p restarted", [Name], State) end. check_load(State = #{id:=_Name, mode_tab := ModeTab, mode := Mode, @@ -764,24 +703,17 @@ limit_burst(#{burst_win_ts := BurstWinT0, {true,BurstWinT0,BurstMsgCount+1} end. -kill_if_choked(Name, Module, QLen, Mem, - State = #{overload_kill_enable := KillIfOL, - overload_kill_qlen := OLKillQLen, - overload_kill_mem_size := OLKillMem}) -> +kill_if_choked(Name, QLen, Mem, State = #{overload_kill_enable := KillIfOL, + overload_kill_qlen := OLKillQLen, + overload_kill_mem_size := OLKillMem}) -> if KillIfOL andalso ((QLen > OLKillQLen) orelse (Mem > OLKillMem)) -> - log_handler_info(Name, - "Handler ~p overloaded and stopping", - [Name], State), - set_restart_flag(Name, Module), - handler_exit(Name, {shutdown,{overloaded,Name,QLen,Mem}}); + set_restart_flag(State), + exit({shutdown,{overloaded,Name,QLen,Mem}}); true -> ok end. -flush_log_events() -> - flush_log_events(-1). - flush_log_events(Limit) -> process_flag(priority, high), Flushed = flush_log_events(0, Limit), @@ -804,14 +736,27 @@ flush_log_events(N, Limit) -> 0 -> N end. -cancel_timer(TRef) when is_atom(TRef) -> ok; -cancel_timer(TRef) -> timer:cancel(TRef). - +set_repeated_filesync(#{filesync_repeat_interval:=FSyncInt} = State) + when is_integer(FSyncInt) -> + {ok,TRef} = timer:apply_after(FSyncInt, gen_server, cast, + [self(),repeated_filesync]), + State#{rep_sync_tref=>TRef}; +set_repeated_filesync(State) -> + State. + +cancel_repeated_filesync(State) -> + case maps:take(rep_sync_tref,State) of + {TRef,State1} -> + _ = timer:cancel(TRef), + State1; + error -> + State + end. stop_or_restart(Name, {shutdown,Reason={overloaded,_Name,_QLen,_Mem}}, #{overload_kill_restart_after := RestartAfter}) -> %% If we're terminating because of an overload situation (see - %% kill_if_choked/5), we need to remove the handler and set a + %% kill_if_choked/4), we need to remove the handler and set a %% restart timer. A separate process must perform this in order to %% avoid deadlock. HandlerPid = self(), |