diff options
-rw-r--r-- | lib/kernel/src/logger_disk_log_h.erl | 102 | ||||
-rw-r--r-- | lib/kernel/src/logger_h_common.erl | 389 | ||||
-rw-r--r-- | lib/kernel/src/logger_std_h.erl | 160 | ||||
-rw-r--r-- | lib/kernel/test/logger_disk_log_h_SUITE.erl | 29 | ||||
-rw-r--r-- | lib/kernel/test/logger_std_h_SUITE.erl | 24 |
5 files changed, 306 insertions, 398 deletions
diff --git a/lib/kernel/src/logger_disk_log_h.erl b/lib/kernel/src/logger_disk_log_h.erl index 8c09dd071f..1e48e5b0a8 100644 --- a/lib/kernel/src/logger_disk_log_h.erl +++ b/lib/kernel/src/logger_disk_log_h.erl @@ -27,10 +27,8 @@ -export([info/1, filesync/1, reset/1]). %% logger_h_common callbacks --export([init/2, check_config/4, reset_state/1, - async_filesync/2, sync_filesync/2, - async_write/3, sync_write/3, - handle_info/2, terminate/3]). +-export([init/2, check_config/4, reset_state/2, + filesync/3, write/4, handle_info/3, terminate/3]). %% logger callbacks -export([log/2, adding_handler/1, removing_handler/1, changing_config/3, @@ -185,45 +183,27 @@ merge_default_logopts(Name, HConfig) -> type => Type}, maps:merge(Defaults, HConfig). -async_filesync(Name,State) -> - {_,State1} = disk_log_sync(Name,State), - State1. - -sync_filesync(Name,State) -> +filesync(Name,_Mode,State) -> disk_log_sync(Name,State). -async_write(Name, Bin, State) -> - {_,State1} = disk_log_write(Name, Bin, State), - State1. - -sync_write(Name, Bin, State) -> +write(Name, _Mode, Bin, State) -> disk_log_write(Name, Bin, State). -reset_state(State) -> +reset_state(_Name, State) -> State#{prev_log_result => ok, prev_sync_result => ok, prev_disk_log_info => undefined}. %% The disk log owner must handle status messages from disk_log. -handle_info({disk_log, _Node, _Log, {wrap,_NoLostItems}}, State) -> - State; -handle_info({disk_log, _Node, Log, Info = {truncated,_NoLostItems}}, - State = #{id := Name, prev_disk_log_info := PrevInfo}) -> - error_notify_new(Info, PrevInfo, {disk_log,Name,Log,Info}), - State#{prev_disk_log_info => Info}; -handle_info({disk_log, _Node, Log, Info = {blocked_log,_Items}}, - State = #{id := Name, prev_disk_log_info := PrevInfo}) -> - error_notify_new(Info, PrevInfo, {disk_log,Name,Log,Info}), - State#{prev_disk_log_info => Info}; -handle_info({disk_log, _Node, Log, full}, - State = #{id := Name, prev_disk_log_info := PrevInfo}) -> - error_notify_new(full, PrevInfo, {disk_log,Name,Log,full}), - State#{prev_disk_log_info => full}; -handle_info({disk_log, _Node, Log, Info = {error_status,_Status}}, - State = #{id := Name, prev_disk_log_info := PrevInfo}) -> - error_notify_new(Info, PrevInfo, {disk_log,Name,Log,Info}), - State#{prev_disk_log_info => Info}; -handle_info(_, State) -> +handle_info(Name, {disk_log, _Node, Log, Info={truncated,_NoLostItems}}, State) -> + maybe_notify_status(Name, Log, Info, prev_disk_log_info, State); +handle_info(Name, {disk_log, _Node, Log, Info = {blocked_log,_Items}}, State) -> + maybe_notify_status(Name, Log, Info, prev_disk_log_info, State); +handle_info(Name, {disk_log, _Node, Log, Info = full}, State) -> + maybe_notify_status(Name, Log, Info, prev_disk_log_info, State); +handle_info(Name, {disk_log, _Node, Log, Info = {error_status,_Status}}, State) -> + maybe_notify_status(Name, Log, Info, prev_disk_log_info, State); +handle_info(_, _, State) -> State. terminate(Name, _Reason, _State) -> @@ -265,42 +245,28 @@ close_disk_log(Name, _) -> ok. disk_log_write(Name, Bin, State) -> - case ?disk_log_blog(Name, Bin) of - ok -> - {ok,State#{prev_log_result => ok, last_op => write}}; - LogError -> - _ = case maps:get(prev_log_result, State) of - LogError -> - %% don't report same error twice - ok; - _ -> - LogOpts = maps:get(log_opts, State), - logger_h_common:error_notify({Name,log, - LogOpts, - LogError}) - end, - {LogError,State#{prev_log_result => LogError}} - end. + Result = ?disk_log_blog(Name, Bin), + maybe_notify_error(Name, log, Result, prev_log_result, State). disk_log_sync(Name, State) -> - case ?disk_log_sync(Name) of - ok -> - {ok,State#{prev_sync_result => ok, last_op => sync}}; - SyncError -> - _ = case maps:get(prev_sync_result, State) of - SyncError -> - %% don't report same error twice - ok; - _ -> - LogOpts = maps:get(log_opts, State), - logger_h_common:error_notify({Name,filesync, - LogOpts, - SyncError}) - end, - {SyncError,State#{prev_sync_result => SyncError}} - end. + Result = ?disk_log_sync(Name), + maybe_notify_error(Name, filesync, Result, prev_sync_result, State). -error_notify_new(Info,Info, _Term) -> +%%%----------------------------------------------------------------- +%%% Print error messages, but don't repeat the same message +maybe_notify_error(Name, Op, Result, Key, #{log_opts:=LogOpts}=State) -> + {Result,error_notify_new({Name, Op, LogOpts, Result}, Result, Key, State)}. + +maybe_notify_status(Name, Log, Info, Key, State) -> + error_notify_new({disk_log, Name, Log, Info}, Info, Key, State). + +error_notify_new(Term, What, Key, State) -> + error_notify_new(What, maps:get(Key,State), Term), + State#{Key => What}. + +error_notify_new(ok,_Prev,_Term) -> + ok; +error_notify_new(Same,Same,_Term) -> ok; -error_notify_new(_Info0,_Info1, Term) -> +error_notify_new(_New,_Prev,Term) -> logger_h_common:error_notify(Term). diff --git a/lib/kernel/src/logger_h_common.erl b/lib/kernel/src/logger_h_common.erl index d01851b2b1..d290f51e34 100644 --- a/lib/kernel/src/logger_h_common.erl +++ b/lib/kernel/src/logger_h_common.erl @@ -24,7 +24,7 @@ -include("logger_internal.hrl"). %% API --export([start_link/3, info/2, filesync/2, reset/2]). +-export([start_link/1, info/2, filesync/2, reset/2]). %% gen_server and proc_lib callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, @@ -52,65 +52,37 @@ -define(READ_ONLY_KEYS,[handler_pid,mode_tab]). %%%----------------------------------------------------------------- -%%% -filesync(Module, Name) when is_atom(Name) -> - try - gen_server:call(?name_to_reg_name(Module,Name), - filesync, ?DEFAULT_CALL_TIMEOUT) - catch - _:{timeout,_} -> {error,handler_busy} - end; -filesync(_, Name) -> - {error,{badarg,{filesync,[Name]}}}. +%%% API -info(Module, Name) when is_atom(Name) -> - try - gen_server:call(?name_to_reg_name(Module,Name), - info, ?DEFAULT_CALL_TIMEOUT) - catch - _:{timeout,_} -> {error,handler_busy} - end; -info(_, Name) -> - {error,{badarg,{info,[Name]}}}. +%% This function is called by the logger_sup supervisor +start_link(Args) -> + proc_lib:start_link(?MODULE,init,[Args]). -reset(Module, Name) when is_atom(Name) -> - try - gen_server:call(?name_to_reg_name(Module,Name), - reset, ?DEFAULT_CALL_TIMEOUT) - catch - _:{timeout,_} -> {error,handler_busy} - end; -reset(_, Name) -> - {error,{badarg,{reset,[Name]}}}. +filesync(Module, Name) -> + call(Module, Name, filesync). +info(Module, Name) -> + call(Module, Name, info). +reset(Module, Name) -> + call(Module, Name, reset). %%%----------------------------------------------------------------- %%% Handler being added adding_handler(#{id:=Name,module:=Module}=Config) -> - HConfig = maps:get(config, Config, #{}), - HandlerConfig0 = maps:without(?CONFIG_KEYS,HConfig), + HConfig0 = maps:get(config, Config, #{}), + HandlerConfig0 = maps:without(?CONFIG_KEYS,HConfig0), case Module:check_config(Name,set,undefined,HandlerConfig0) of {ok,HandlerConfig} -> ModifiedCommon = maps:with(?CONFIG_KEYS,HandlerConfig), - CommonConfig0 = maps:with(?CONFIG_KEYS,HConfig), + CommonConfig0 = maps:with(?CONFIG_KEYS,HConfig0), CommonConfig = maps:merge( maps:merge(get_default_config(), CommonConfig0), ModifiedCommon), case check_config(CommonConfig) of ok -> - State = maps:merge(get_init_state(), CommonConfig), - HConfig1 = maps:merge(CommonConfig,HandlerConfig), - Config1 = Config#{config=>HConfig1}, - case overload_levels_ok(State) of - true -> - start(Name, Config1, State); - false -> - #{sync_mode_qlen := SMQL, - drop_mode_qlen := DMQL, - flush_qlen := FQL} = State, - {error,{invalid_levels,{SMQL,DMQL,FQL}}} - end; + HConfig = maps:merge(CommonConfig,HandlerConfig), + start(Config#{config => HConfig}); {error,Faulty} -> {error,{invalid_config,Module,Faulty}} end; @@ -141,7 +113,7 @@ removing_handler(#{id:=Name, module:=Module}) -> %%%----------------------------------------------------------------- %%% Updating handler config changing_config(SetOrUpdate, - OldConfig=#{id:=Name,config:=OldHConfig,module:=Module}, + #{id:=Name,config:=OldHConfig,module:=Module}, NewConfig0) -> NewHConfig0 = maps:get(config, NewConfig0, #{}), OldHandlerConfig = maps:without(?CONFIG_KEYS++?READ_ONLY_KEYS,OldHConfig), @@ -169,12 +141,9 @@ changing_config(SetOrUpdate, ReadOnly), NewConfig = NewConfig0#{config=>NewHConfig}, HPid = maps:get(handler_pid,OldHConfig), - try gen_server:call(HPid, {change_config,OldConfig,NewConfig}, - ?DEFAULT_CALL_TIMEOUT) of + case call(HPid, {change_config,NewConfig}) of ok -> {ok,NewConfig}; Error -> Error - catch - _:{timeout,_} -> {error,handler_busy} end; {error,Faulty} -> {error,{invalid_config,Module,Faulty}} @@ -204,61 +173,35 @@ filter_config(#{config:=HConfig}=Config) -> Config#{config=>maps:without(?READ_ONLY_KEYS,HConfig)}. %%%----------------------------------------------------------------- -%%% Add a standard handler to the logger. -%%% This starts a dedicated handler process which should always -%%% exist if the handler is registered with logger (and should not -%%% exist if the handler is not registered). -%%% -%%% Handler specific config should be provided with a sub map associated -%%% with a key named 'config', e.g: +%%% Start the handler process %%% -%%% Config = #{config => #{sync_mode_qlen => 50} +%%% The process must always exist if the handler is registered with +%%% logger (and must not exist if the handler is not registered). %%% -%%% The standard handler process is linked to logger_sup, which is -%%% part of the kernel application's supervision tree. -start(Name, Config, HandlerState) -> +%%% The handler process is linked to logger_sup, which is part of the +%%% kernel application's supervision tree. +start(#{id := Name} = Config0) -> ChildSpec = #{id => Name, - start => {?MODULE, start_link, [Name,Config,HandlerState]}, + start => {?MODULE, start_link, [Config0]}, restart => temporary, shutdown => 2000, type => worker, modules => [?MODULE]}, case supervisor:start_child(logger_sup, ChildSpec) of - {ok,Pid,Config1} -> + {ok,Pid,Config} -> ok = logger_handler_watcher:register_handler(Name,Pid), - {ok,Config1}; + {ok,Config}; Error -> Error end. -%%%----------------------------------------------------------------- -%%% Start a standard handler process and link to caller. -%%% This function is called by the kernel supervisor when this -%%% handler process gets added --spec start_link(Name, Config, HandlerState) -> {ok,Pid} | {error,Reason} when - Name :: atom(), - Config :: logger:handler_config(), - HandlerState :: map(), - Pid :: pid(), - Reason :: term(). - -start_link(Name, Config, HandlerState) -> - proc_lib:start_link(?MODULE,init,[[Name,Config,HandlerState]]). - -%%%----------------------------------------------------------------- -%%% -get_init_state() -> - #{ctrl_sync_int => ?CONTROLLER_SYNC_INTERVAL, - filesync_ok_qlen => ?FILESYNC_OK_QLEN}. - %%%=================================================================== %%% gen_server callbacks %%%=================================================================== -init([Name, Config = #{config := HConfig, module := Module}, - State = #{filesync_repeat_interval := FSyncInt, - ctrl_sync_int := CtrlSyncInt}]) -> +init(#{id := Name, module := Module, + formatter := Formatter, config := HConfig0} = Config0) -> RegName = ?name_to_reg_name(Module,Name), register(RegName, self()), process_flag(trap_exit, true), @@ -267,43 +210,36 @@ init([Name, Config = #{config := HConfig, module := Module}, ?init_test_hooks(), ?start_observation(Name), - case Module:init(Name, HConfig) of + case Module:init(Name, HConfig0) of {ok,HState} -> try ets:new(Name, [public]) of ModeTab -> ?set_mode(ModeTab, async), T0 = ?timestamp(), - State1 = - ?merge_with_stats(State#{id => Name, - module => Module, - mode_tab => ModeTab, - mode => async, - ctrl_sync_count => CtrlSyncInt, - last_qlen => 0, - last_log_ts => T0, - last_op => sync, - burst_win_ts => T0, - burst_msg_count => 0, - handler_state => HState}), - Config1 = - Config#{config => HConfig#{handler_pid => self(), - mode_tab => ModeTab}}, - proc_lib:init_ack({ok,self(),Config1}), - if is_integer(FSyncInt) -> - gen_server:cast(self(), repeated_filesync); - true -> - ok - end, - case unset_restart_flag(Name, Module) of - true -> - %% inform about restart - gen_server:cast(self(), {log_handler_info, - "Handler ~p restarted", - [Name]}); - false -> - %% initial start - ok - end, + HConfig = HConfig0#{handler_pid => self(), + mode_tab => ModeTab}, + Config = Config0#{config => HConfig}, + proc_lib:init_ack({ok,self(),Config}), + %% Storing common config in state to avoid copying + %% (sending) the config data for each log message + CommonConfig = maps:with(?CONFIG_KEYS,HConfig), + State = + ?merge_with_stats( + CommonConfig#{id => Name, + module => Module, + mode_tab => ModeTab, + mode => async, + ctrl_sync_count => + ?CONTROLLER_SYNC_INTERVAL, + last_qlen => 0, + last_log_ts => T0, + last_op => sync, + burst_win_ts => T0, + burst_msg_count => 0, + formatter => Formatter, + handler_state => HState}), + State1 = set_repeated_filesync(State), + unset_restart_flag(State1), gen_server:enter_loop(?MODULE, [], State1) catch _:Error -> @@ -326,47 +262,33 @@ handle_call({log, Bin}, _From, State) -> handle_call(filesync, _From, State = #{id := Name, module := Module, handler_state := HandlerState}) -> - {Result,HandlerState1} = Module:sync_filesync(Name,HandlerState), + {Result,HandlerState1} = Module:filesync(Name,sync,HandlerState), {reply, Result, State#{handler_state=>HandlerState1, last_op=>sync}}; -handle_call({change_config,_OldConfig,NewConfig}, _From, +handle_call({change_config, #{formatter:=Formatter, config:=NewHConfig}}, _From, State = #{filesync_repeat_interval := FSyncInt0}) -> - HConfig = maps:get(config, NewConfig, #{}), - State1 = maps:merge(State, HConfig), - case overload_levels_ok(State1) of - true -> - _ = - case maps:get(filesync_repeat_interval, HConfig, undefined) of - undefined -> - ok; - no_repeat -> - _ = logger_h_common:cancel_timer(maps:get(rep_sync_tref, - State, - undefined)); - FSyncInt0 -> - ok; - _FSyncInt1 -> - _ = logger_h_common:cancel_timer(maps:get(rep_sync_tref, - State, - undefined)), - gen_server:cast(self(), repeated_filesync) - end, - {reply, ok, State1}; - false -> - #{sync_mode_qlen := SMQL, - drop_mode_qlen := DMQL, - flush_qlen := FQL} = State1, - {reply, {error,{invalid_levels,{SMQL,DMQL,FQL}}}, State} - end; + %% In the future, if handler_state must be updated due to config + %% change, then we need to add a callback to Module here. + CommonConfig = maps:with(?CONFIG_KEYS,NewHConfig), + State1 = maps:merge(State, CommonConfig), + State2 = + case maps:get(filesync_repeat_interval, NewHConfig) of + FSyncInt0 -> + State1; + _FSyncInt1 -> + set_repeated_filesync(cancel_repeated_filesync(State1)) + end, + {reply, ok, State2#{formatter:=Formatter}}; handle_call(info, _From, State) -> {reply, State, State}; -handle_call(reset, _From, #{module:=Module,handler_state:=HandlerState}=State) -> +handle_call(reset, _From, + #{id:=Name,module:=Module,handler_state:=HandlerState}=State) -> State1 = ?merge_with_stats(State), {reply, ok, State1#{last_qlen => 0, last_log_ts => ?timestamp(), - handler_state => Module:reset_state(HandlerState)}}; + handler_state => Module:reset_state(Name,HandlerState)}}; handle_call(stop, _From, State) -> {stop, {shutdown,stopped}, ok, State}. @@ -376,41 +298,37 @@ handle_cast({log, Bin}, State) -> {_,State1} = do_log(Bin, cast, State), {noreply, State1}; -handle_cast({log_handler_info, Format, Args}, State = #{id:=Name}) -> - log_handler_info(Name, Format, Args, State), - {noreply, State}; - %% If FILESYNC_REPEAT_INTERVAL is set to a millisec value, this %% clause gets called repeatedly by the handler. In order to %% guarantee that a filesync *always* happens after the last log %% event, the repeat operation must be active! handle_cast(repeated_filesync,State = #{filesync_repeat_interval := no_repeat}) -> + %% This clause handles a race condition which may occur when + %% config changes filesync_repeat_interval from an integer value + %% to no_repeat. {noreply,State}; handle_cast(repeated_filesync, State = #{id := Name, module := Module, handler_state := HandlerState, - filesync_repeat_interval := FSyncInt, last_op := LastOp}) -> - HandlerState1 = + State1 = if LastOp == sync -> - HandlerState; + State; true -> - Module:async_filesync(Name,HandlerState) + {_,HS} = Module:filesync(Name, async, HandlerState), + State#{handler_state => HS, last_op => sync} end, - {ok,TRef} = timer:apply_after(FSyncInt, gen_server,cast, - [self(),repeated_filesync]), - {noreply,State#{handler_state=>HandlerState1, - rep_sync_tref => TRef, - last_op => sync}}. + {noreply,set_repeated_filesync(State1)}. -handle_info(Info, #{module := Module, handler_state := HandlerState} = State) -> - {noreply,State#{handler_state => Module:handle_info(Info,HandlerState)}}. +handle_info(Info, #{id := Name, module := Module, + handler_state := HandlerState} = State) -> + {noreply,State#{handler_state => Module:handle_info(Name,Info,HandlerState)}}. terminate(Reason, State = #{id := Name, module := Module, handler_state := HandlerState}) -> - _ = cancel_timer(maps:get(rep_sync_tref, State, undefined)), + _ = cancel_repeated_filesync(State), _ = Module:terminate(Name, Reason, HandlerState), ok = stop_or_restart(Name, Reason, State), unregister(?name_to_reg_name(Module, Name)), @@ -420,10 +338,24 @@ code_change(_OldVsn, State, _Extra) -> {ok, State}. +%%%----------------------------------------------------------------- +%%% Internal functions +call(Module, Name, Op) when is_atom(Name) -> + call(?name_to_reg_name(Module,Name), Op); +call(_, Name, Op) -> + {error,{badarg,{Op,[Name]}}}. + +call(Server, Msg) -> + try + gen_server:call(Server, Msg, ?DEFAULT_CALL_TIMEOUT) + catch + _:{timeout,_} -> {error,handler_busy} + end. + %% check for overload between every event (and set Mode to async, %% sync or drop accordingly), but never flush the whole mailbox %% before LogWindowSize events have been handled -do_log(Bin, CallOrCast, State = #{id:=Name, module:=Module, mode:=Mode0}) -> +do_log(Bin, CallOrCast, State = #{id:=Name, mode:=Mode0}) -> T1 = ?timestamp(), %% check if the handler is getting overloaded, or if it's @@ -444,7 +376,7 @@ do_log(Bin, CallOrCast, State = #{id:=Name, module:=Module, mode:=Mode0}) -> end, %% kill the handler if it can't keep up with the load - kill_if_choked(Name, Module, QLen, Mem, State), + kill_if_choked(Name, QLen, Mem, State), if Mode1 == flush -> flush(Name, QLen, T1, State1); @@ -473,8 +405,9 @@ flush(Name, _QLen0, T1, State=#{last_log_ts := _T0, mode_tab := ModeTab}) -> ?observe(Name,{flushed,NewFlushed+1}), State1 = ?update_max_time(?diff_time(T1,_T0),State), + State2 = ?update_max_qlen(_QLen1,State1), {dropped,?update_other(flushed,FLUSHED,NewFlushed, - State1#{mode => ?set_mode(ModeTab,async), + State2#{mode => ?set_mode(ModeTab,async), last_qlen => 0, last_log_ts => T1})}. @@ -484,24 +417,23 @@ write(Name, Mode, T1, Bin, _CallOrCast, handler_state := HandlerState, mode_tab := ModeTab, ctrl_sync_count := CtrlSync, - ctrl_sync_int := CtrlSyncInt, last_qlen := LastQLen, last_log_ts := T0}) -> %% check if we need to limit the number of writes %% during a burst of log events {DoWrite,BurstWinT,BurstMsgCount} = limit_burst(State), - %% only log synhrounously every CtrlSyncInt time, to give the - %% handler time between writes so it can keep up with incoming - %% messages + %% only log synhrounously every ?CONTROLLER_SYNC_INTERVAL time, to + %% give the handler time between writes so it can keep up with + %% incoming messages {Result,LastQLen1,HandlerState1} = if DoWrite, CtrlSync == 0 -> ?observe(Name,{_CallOrCast,1}), - {_,HS1} = Module:sync_write(Name, Bin, HandlerState), + {_,HS1} = Module:write(Name, sync, Bin, HandlerState), {ok,element(2, process_info(self(), message_queue_len)),HS1}; DoWrite -> ?observe(Name,{_CallOrCast,1}), - HS1 = Module:async_write(Name, Bin, HandlerState), + {_,HS1} = Module:write(Name, async, Bin, HandlerState), {ok,LastQLen,HS1}; not DoWrite -> ?observe(Name,{flushed,1}), @@ -519,39 +451,37 @@ write(Name, Mode, T1, Bin, _CallOrCast, {Mode1,BurstMsgCount1,HandlerState2} = if (LastQLen1 < ?FILESYNC_OK_QLEN) andalso (Time > ?IDLE_DETECT_TIME_USEC) -> - HS2 = Module:async_filesync(Name,HandlerState), + {_,HS2} = Module:filesync(Name,async,HandlerState), {?change_mode(ModeTab, Mode, async),0,HS2}; true -> {Mode,BurstMsgCount,HandlerState1} end, - State1 = - ?update_calls_or_casts(_CallOrCast,1,State), - State2 = + State1 = ?update_calls_or_casts(_CallOrCast,1,State), + State2 = ?update_max_qlen(LastQLen1,State1), + State3 = ?update_max_time(Time, - State1#{mode => Mode1, + State2#{mode => Mode1, last_qlen := LastQLen1, last_log_ts => T1, last_op => write, burst_win_ts => BurstWinT, burst_msg_count => BurstMsgCount1, - ctrl_sync_count => if CtrlSync==0 -> CtrlSyncInt; - true -> CtrlSync-1 - end, + ctrl_sync_count => + if CtrlSync==0 -> ?CONTROLLER_SYNC_INTERVAL; + true -> CtrlSync-1 + end, handler_state => HandlerState2}), - {Result,State2}. + {Result,State3}. log_handler_info(Name, Format, Args, #{module:=Module, + formatter:=Formatter, handler_state:=HandlerState}) -> - Config = - case logger:get_handler_config(Name) of - {ok,Conf} -> Conf; - _ -> #{formatter=>{?DEFAULT_FORMATTER,?DEFAULT_FORMAT_CONFIG}} - end, + Config = #{formatter=>Formatter}, Meta = #{time=>erlang:system_time(microsecond)}, Bin = log_to_binary(#{level => notice, msg => {Format,Args}, meta => Meta}, Config), - _ = Module:async_write(Name, Bin, HandlerState), + _ = Module:write(Name, async, Bin, HandlerState), ok. %%%----------------------------------------------------------------- @@ -612,7 +542,20 @@ string_to_binary(String) -> %%%----------------------------------------------------------------- %%% Check that the configuration term is valid check_config(Config) when is_map(Config) -> - check_common_config(maps:to_list(Config)). + case check_common_config(maps:to_list(Config)) of + ok -> + case overload_levels_ok(Config) of + true -> + ok; + false -> + Faulty = maps:with([sync_mode_qlen, + drop_mode_qlen, + flush_qlen],Config), + {error,{invalid_levels,Faulty}} + end; + Error -> + Error + end. check_common_config([{sync_mode_qlen,N}|Config]) when is_integer(N) -> check_common_config(Config); @@ -667,15 +610,13 @@ call_cast_or_drop(_Name, HandlerPid, ModeTab, Bin) -> async -> gen_server:cast(HandlerPid, {log,Bin}); sync -> - try gen_server:call(HandlerPid, {log,Bin}, ?DEFAULT_CALL_TIMEOUT) of - %% if return value from call == dropped, the - %% message has been flushed by handler and should - %% therefore not be counted as dropped in stats - ok -> ok; - dropped -> ok - catch - _:{timeout,_} -> - ?observe(_Name,{dropped,1}) + case call(HandlerPid, {log,Bin}) of + ok -> + ok; + _Other -> + %% dropped or {error,handler_busy} + ?observe(_Name,{dropped,1}), + ok end; drop -> ?observe(_Name,{dropped,1}) @@ -686,10 +627,8 @@ call_cast_or_drop(_Name, HandlerPid, ModeTab, Bin) -> end, ok. -handler_exit(_Name, Reason) -> - exit(Reason). - -set_restart_flag(Name, Module) -> +set_restart_flag(#{id := Name, module := Module} = State) -> + log_handler_info(Name, "Handler ~p overloaded and stopping", [Name], State), Flag = list_to_atom(lists:concat([Module,"_",Name,"_restarting"])), spawn(fun() -> register(Flag, self()), @@ -697,14 +636,14 @@ set_restart_flag(Name, Module) -> end), ok. -unset_restart_flag(Name, Module) -> +unset_restart_flag(#{id := Name, module := Module} = State) -> Flag = list_to_atom(lists:concat([Module,"_",Name,"_restarting"])), case whereis(Flag) of undefined -> - false; + ok; Pid -> exit(Pid, kill), - true + log_handler_info(Name, "Handler ~p restarted", [Name], State) end. check_load(State = #{id:=_Name, mode_tab := ModeTab, mode := Mode, @@ -764,24 +703,17 @@ limit_burst(#{burst_win_ts := BurstWinT0, {true,BurstWinT0,BurstMsgCount+1} end. -kill_if_choked(Name, Module, QLen, Mem, - State = #{overload_kill_enable := KillIfOL, - overload_kill_qlen := OLKillQLen, - overload_kill_mem_size := OLKillMem}) -> +kill_if_choked(Name, QLen, Mem, State = #{overload_kill_enable := KillIfOL, + overload_kill_qlen := OLKillQLen, + overload_kill_mem_size := OLKillMem}) -> if KillIfOL andalso ((QLen > OLKillQLen) orelse (Mem > OLKillMem)) -> - log_handler_info(Name, - "Handler ~p overloaded and stopping", - [Name], State), - set_restart_flag(Name, Module), - handler_exit(Name, {shutdown,{overloaded,Name,QLen,Mem}}); + set_restart_flag(State), + exit({shutdown,{overloaded,Name,QLen,Mem}}); true -> ok end. -flush_log_events() -> - flush_log_events(-1). - flush_log_events(Limit) -> process_flag(priority, high), Flushed = flush_log_events(0, Limit), @@ -804,14 +736,27 @@ flush_log_events(N, Limit) -> 0 -> N end. -cancel_timer(TRef) when is_atom(TRef) -> ok; -cancel_timer(TRef) -> timer:cancel(TRef). - +set_repeated_filesync(#{filesync_repeat_interval:=FSyncInt} = State) + when is_integer(FSyncInt) -> + {ok,TRef} = timer:apply_after(FSyncInt, gen_server, cast, + [self(),repeated_filesync]), + State#{rep_sync_tref=>TRef}; +set_repeated_filesync(State) -> + State. + +cancel_repeated_filesync(State) -> + case maps:take(rep_sync_tref,State) of + {TRef,State1} -> + _ = timer:cancel(TRef), + State1; + error -> + State + end. stop_or_restart(Name, {shutdown,Reason={overloaded,_Name,_QLen,_Mem}}, #{overload_kill_restart_after := RestartAfter}) -> %% If we're terminating because of an overload situation (see - %% kill_if_choked/5), we need to remove the handler and set a + %% kill_if_choked/4), we need to remove the handler and set a %% restart timer. A separate process must perform this in order to %% avoid deadlock. HandlerPid = self(), diff --git a/lib/kernel/src/logger_std_h.erl b/lib/kernel/src/logger_std_h.erl index 95e7d17c0d..ebe741e331 100644 --- a/lib/kernel/src/logger_std_h.erl +++ b/lib/kernel/src/logger_std_h.erl @@ -29,10 +29,8 @@ -export([info/1, filesync/1, reset/1]). %% logger_h_common callbacks --export([init/2, check_config/4, reset_state/1, - async_filesync/2, sync_filesync/2, - async_write/3, sync_write/3, - handle_info/2, terminate/3]). +-export([init/2, check_config/4, reset_state/2, + filesync/3, write/4, handle_info/3, terminate/3]). %% logger callbacks -export([log/2, adding_handler/1, removing_handler/1, changing_config/3, @@ -71,21 +69,36 @@ reset(Name) -> logger_h_common:reset(?MODULE,Name). %%%=================================================================== -%%% logger callbacks +%%% logger callbacks - just forward to logger_h_common %%%=================================================================== %%%----------------------------------------------------------------- %%% Handler being added +-spec adding_handler(Config) -> {ok,Config} | {error,Reason} when + Config :: logger:handler_config(), + Reason :: term(). + adding_handler(Config) -> logger_h_common:adding_handler(Config). %%%----------------------------------------------------------------- %%% Updating handler config +-spec changing_config(SetOrUpdate, OldConfig, NewConfig) -> + {ok,Config} | {error,Reason} when + SetOrUpdate :: set | update, + OldConfig :: logger:handler_config(), + NewConfig :: logger:handler_config(), + Config :: logger:handler_config(), + Reason :: term(). + changing_config(SetOrUpdate, OldConfig, NewConfig) -> logger_h_common:changing_config(SetOrUpdate, OldConfig, NewConfig). %%%----------------------------------------------------------------- %%% Handler being removed +-spec removing_handler(Config) -> ok when + Config :: logger:handler_config(). + removing_handler(Config) -> logger_h_common:removing_handler(Config). @@ -100,6 +113,9 @@ log(LogEvent, Config) -> %%%----------------------------------------------------------------- %%% Remove internal fields from configuration +-spec filter_config(Config) -> Config when + Config :: logger:handler_config(). + filter_config(Config) -> logger_h_common:filter_config(Config). @@ -163,33 +179,29 @@ check_h_config([]) -> get_default_config() -> #{type => standard_io}. -async_filesync(_Name,#{type := Type}=State) when is_atom(Type) -> - State; -async_filesync(_Name,#{file_ctrl_pid := FileCtrlPid}=State) -> +filesync(_Name, _Mode, #{type := Type}=State) when is_atom(Type) -> + {ok,State}; +filesync(_Name, async, #{file_ctrl_pid := FileCtrlPid} = State) -> ok = file_ctrl_filesync_async(FileCtrlPid), - State. - -sync_filesync(_Name,#{type := Type}=State) when is_atom(Type) -> {ok,State}; -sync_filesync(_Name,#{file_ctrl_pid := FileCtrlPid}=State) -> +filesync(_Name, sync, #{file_ctrl_pid := FileCtrlPid} = State) -> Result = file_ctrl_filesync_sync(FileCtrlPid), {Result,State}. -async_write(_Name, Bin, #{file_ctrl_pid:=FileCtrlPid} = State) -> +write(_Name, async, Bin, #{file_ctrl_pid:=FileCtrlPid} = State) -> ok = file_write_async(FileCtrlPid, Bin), - State. - -sync_write(_Name, Bin, #{file_ctrl_pid:=FileCtrlPid} = State) -> + {ok,State}; +write(_Name, sync, Bin, #{file_ctrl_pid:=FileCtrlPid} = State) -> Result = file_write_sync(FileCtrlPid, Bin), {Result,State}. -reset_state(State) -> +reset_state(_Name, State) -> State. -handle_info({'EXIT',Pid,Why}, #{type := FileInfo, file_ctrl_pid := Pid}) -> +handle_info(_Name, {'EXIT',Pid,Why}, #{type := FileInfo, file_ctrl_pid := Pid}) -> %% file_ctrl_pid died, file error, terminate handler exit({error,{write_failed,FileInfo,Why}}); -handle_info(_, State) -> +handle_info(_, _, State) -> State. terminate(_Name, _Reason, #{file_ctrl_pid:=FWPid}) -> @@ -203,7 +215,8 @@ terminate(_Name, _Reason, #{file_ctrl_pid:=FWPid}) -> ok after ?DEFAULT_CALL_TIMEOUT -> - exit(FWPid, kill) + exit(FWPid, kill), + ok end; false -> ok @@ -273,18 +286,18 @@ file_write_async(Pid, Bin) -> ok. file_write_sync(Pid, Bin) -> - file_ctrl_call(Pid, {log,self(),Bin}). + file_ctrl_call(Pid, {log,Bin}). file_ctrl_filesync_async(Pid) -> Pid ! filesync, ok. file_ctrl_filesync_sync(Pid) -> - file_ctrl_call(Pid, {filesync,self()}). + file_ctrl_call(Pid, filesync). file_ctrl_call(Pid, Msg) -> MRef = monitor(process, Pid), - Pid ! {Msg,MRef}, + Pid ! {Msg,{self(),MRef}}, receive {MRef,Result} -> demonitor(MRef, [flush]), @@ -302,63 +315,40 @@ file_ctrl_init(HandlerName, FileInfo, Starter) when is_tuple(FileInfo) -> case do_open_log_file(FileInfo) of {ok,Fd} -> Starter ! {self(),ok}, - file_ctrl_loop(Fd, file, FileName, false, ok, ok, HandlerName); + file_ctrl_loop(Fd, FileName, false, ok, ok, HandlerName); {error,Reason} -> Starter ! {self(),{error,{open_failed,FileName,Reason}}} end; file_ctrl_init(HandlerName, StdDev, Starter) -> Starter ! {self(),ok}, - file_ctrl_loop(StdDev, standard_io, StdDev, false, ok, ok, HandlerName). + file_ctrl_loop(StdDev, StdDev, false, ok, ok, HandlerName). -file_ctrl_loop(Fd, Type, DevName, Synced, +file_ctrl_loop(Fd, DevName, Synced, PrevWriteResult, PrevSyncResult, HandlerName) -> receive %% asynchronous event {log,Bin} -> - Result = if Type == file -> - write_to_dev(Fd, Bin, DevName, - PrevWriteResult, HandlerName); - true -> - io:put_chars(Fd, Bin) - end, - file_ctrl_loop(Fd, Type, DevName, false, + Result = write_to_dev(Fd, Bin, DevName, PrevWriteResult, HandlerName), + file_ctrl_loop(Fd, DevName, false, Result, PrevSyncResult, HandlerName); %% synchronous event - {{log,From,Bin},MRef} -> - WResult = - if Type == file -> - %% check that file hasn't been deleted - CheckFile = - fun() -> {ok,_} = file:read_file_info(DevName) end, - spawn_link(CheckFile), - write_to_dev(Fd, Bin, DevName, - PrevWriteResult, HandlerName); - true -> - _ = io:put_chars(Fd, Bin), - ok - end, + {{log,Bin},{From,MRef}} -> + check_exist(Fd, DevName), + Result = write_to_dev(Fd, Bin, DevName, PrevWriteResult, HandlerName), From ! {MRef,ok}, - file_ctrl_loop(Fd, Type, DevName, false, - WResult, PrevSyncResult, HandlerName); + file_ctrl_loop(Fd, DevName, false, + Result, PrevSyncResult, HandlerName); - filesync when not Synced -> - Result = sync_dev(Fd, DevName, PrevSyncResult, HandlerName), - file_ctrl_loop(Fd, Type, DevName, true, + filesync -> + Result = sync_dev(Fd, DevName, Synced, PrevSyncResult, HandlerName), + file_ctrl_loop(Fd, DevName, true, PrevWriteResult, Result, HandlerName); - filesync -> - file_ctrl_loop(Fd, Type, DevName, true, - PrevWriteResult, PrevSyncResult, HandlerName); - - {{filesync,From},MRef} -> - Result = if not Synced -> - sync_dev(Fd, DevName, PrevSyncResult, HandlerName); - true -> - ok - end, + {filesync,{From,MRef}} -> + Result = sync_dev(Fd, DevName, Synced, PrevSyncResult, HandlerName), From ! {MRef,ok}, - file_ctrl_loop(Fd, Type, DevName, true, + file_ctrl_loop(Fd, DevName, true, PrevWriteResult, Result, HandlerName); stop -> @@ -366,26 +356,30 @@ file_ctrl_loop(Fd, Type, DevName, Synced, stopped end. +check_exist(DevName, DevName) when is_atom(DevName) -> + ok; +check_exist(_Fd, FileName) -> + _ = spawn_link(fun() -> {ok,_} = file:read_file_info(FileName) end), + ok. + +write_to_dev(DevName, Bin, _DevName, _PrevWriteResult, _HandlerName) + when is_atom(DevName) -> + io:put_chars(DevName, Bin); write_to_dev(Fd, Bin, FileName, PrevWriteResult, HandlerName) -> - case ?file_write(Fd, Bin) of - ok -> - ok; - PrevWriteResult -> - %% don't report same error twice - PrevWriteResult; - Error -> - logger_h_common:error_notify({HandlerName,write,FileName,Error}), - Error - end. + Result = ?file_write(Fd, Bin), + maybe_notify_error(write,Result,PrevWriteResult,FileName,HandlerName). -sync_dev(Fd, DevName, PrevSyncResult, HandlerName) -> - case ?file_datasync(Fd) of - ok -> - ok; - PrevSyncResult -> - %% don't report same error twice - PrevSyncResult; - Error -> - logger_h_common:error_notify({HandlerName,filesync,DevName,Error}), - Error - end. +sync_dev(_Fd, _FileName, true, PrevSyncResult, _HandlerName) -> + PrevSyncResult; +sync_dev(Fd, FileName, false, PrevSyncResult, HandlerName) -> + Result = ?file_datasync(Fd), + maybe_notify_error(filesync,Result,PrevSyncResult,FileName,HandlerName). + +maybe_notify_error(_Op, ok, _PrevResult, _FileName, _HandlerName) -> + ok; +maybe_notify_error(_Op, PrevResult, PrevResult, _FileName, _HandlerName) -> + %% don't report same error twice + PrevResult; +maybe_notify_error(Op, Error, _PrevResult, FileName, HandlerName) -> + logger_h_common:error_notify({HandlerName,Op,FileName,Error}), + Error. diff --git a/lib/kernel/test/logger_disk_log_h_SUITE.erl b/lib/kernel/test/logger_disk_log_h_SUITE.erl index 7bac4c2cb2..2bd49432b2 100644 --- a/lib/kernel/test/logger_disk_log_h_SUITE.erl +++ b/lib/kernel/test/logger_disk_log_h_SUITE.erl @@ -419,14 +419,19 @@ config_fail(_Config) -> filter_default=>log, formatter=>{?MODULE,self()}}), - {error,{handler_not_added,{invalid_levels,{_,1,_}}}} = + {error,{handler_not_added,{invalid_config,logger_disk_log_h, + {invalid_levels,#{drop_mode_qlen:=1}}}}} = logger:add_handler(?MODULE,logger_disk_log_h, #{config => #{drop_mode_qlen=>1}}), - {error,{handler_not_added,{invalid_levels,{43,42,_}}}} = + {error,{handler_not_added,{invalid_config,logger_disk_log_h, + {invalid_levels,#{sync_mode_qlen:=43, + drop_mode_qlen:=42}}}}} = logger:add_handler(?MODULE,logger_disk_log_h, #{config => #{sync_mode_qlen=>43, drop_mode_qlen=>42}}), - {error,{handler_not_added,{invalid_levels,{_,43,42}}}} = + {error,{handler_not_added,{invalid_config,logger_disk_log_h, + {invalid_levels,#{drop_mode_qlen:=43, + flush_qlen:=42}}}}} = logger:add_handler(?MODULE,logger_disk_log_h, #{config => #{drop_mode_qlen=>43, flush_qlen=>42}}), @@ -440,7 +445,7 @@ config_fail(_Config) -> #{max_no_files=>2}), %% incorrect values of OP params {ok,#{config := HConfig}} = logger:get_handler_config(?MODULE), - {error,{invalid_levels,_}} = + {error,{invalid_config,logger_disk_log_h,{invalid_levels,_}}} = logger:update_handler_config(?MODULE,config, HConfig#{sync_mode_qlen=>100, flush_qlen=>99}), @@ -672,9 +677,9 @@ sync(Config) -> SyncInt = 1000, WaitT = 4500, OneSync = {logger_h_common,handle_cast,repeated_filesync}, - %% receive 1 initial repeated_filesync, then 1 per sec + %% receive 1 repeated_filesync per sec start_tracer([{logger_h_common,handle_cast,2}], - [OneSync || _ <- lists:seq(1, 1 + trunc(WaitT/SyncInt))]), + [OneSync || _ <- lists:seq(1, trunc(WaitT/SyncInt))]), HConfig2 = HConfig#{filesync_repeat_interval => SyncInt}, ok = logger:update_handler_config(?MODULE, config, HConfig2), @@ -717,7 +722,7 @@ disk_log_wrap(Config) -> end, {ok,_} = dbg:tracer(process, {TraceFun, Tester}), {ok,_} = dbg:p(whereis(h_proc_name()), [c]), - {ok,_} = dbg:tp(logger_disk_log_h, handle_info, 2, []), + {ok,_} = dbg:tp(logger_disk_log_h, handle_info, 3, []), Text = [34 + rand:uniform(126-34) || _ <- lists:seq(1,MaxBytes)], ct:pal("String = ~p (~w)", [Text, erts_debug:size(Text)]), @@ -735,7 +740,7 @@ disk_log_wrap(Config) -> timer:sleep(1000), dbg:stop_clear(), Received = lists:flatmap(fun({trace,_M,handle_info, - [{disk_log,_Node,_Name,What},_]}) -> + [_,{disk_log,_Node,_Name,What},_]}) -> [{trace,What}]; ({log,_}) -> [] @@ -771,7 +776,7 @@ disk_log_full(Config) -> end, {ok,_} = dbg:tracer(process, {TraceFun, Tester}), {ok,_} = dbg:p(whereis(h_proc_name()), [c]), - {ok,_} = dbg:tp(logger_disk_log_h, handle_info, 2, []), + {ok,_} = dbg:tp(logger_disk_log_h, handle_info, 3, []), NoOfChars = 5, Text = [34 + rand:uniform(126-34) || _ <- lists:seq(1,NoOfChars)], @@ -781,7 +786,7 @@ disk_log_full(Config) -> timer:sleep(2000), dbg:stop_clear(), Received = lists:flatmap(fun({trace,_M,handle_info, - [{disk_log,_Node,_Name,What},_]}) -> + [_,{disk_log,_Node,_Name,What},_]}) -> [{trace,What}]; ({log,_}) -> [] @@ -820,14 +825,14 @@ disk_log_events(Config) -> end, {ok,_} = dbg:tracer(process, {TraceFun, Tester}), {ok,_} = dbg:p(whereis(h_proc_name()), [c]), - {ok,_} = dbg:tp(logger_disk_log_h, handle_info, 2, []), + {ok,_} = dbg:tp(logger_disk_log_h, handle_info, 3, []), [whereis(h_proc_name()) ! E || E <- Events], %% wait for trace messages timer:sleep(2000), dbg:stop_clear(), Received = lists:map(fun({trace,_M,handle_info, - [Got,_]}) -> Got + [_,Got,_]}) -> Got end, test_server:messages_get()), ct:pal("Trace =~n~p", [Received]), NoOfEvents = length(Events), diff --git a/lib/kernel/test/logger_std_h_SUITE.erl b/lib/kernel/test/logger_std_h_SUITE.erl index affa04d410..a1159f280c 100644 --- a/lib/kernel/test/logger_std_h_SUITE.erl +++ b/lib/kernel/test/logger_std_h_SUITE.erl @@ -319,14 +319,19 @@ config_fail(_Config) -> #{config => #{restart_type => bad}, filter_default=>log, formatter=>{?MODULE,self()}}), - {error,{handler_not_added,{invalid_levels,{_,1,_}}}} = + {error,{handler_not_added,{invalid_config,logger_std_h, + {invalid_levels,#{drop_mode_qlen:=1}}}}} = logger:add_handler(?MODULE,logger_std_h, #{config => #{drop_mode_qlen=>1}}), - {error,{handler_not_added,{invalid_levels,{43,42,_}}}} = + {error,{handler_not_added,{invalid_config,logger_std_h, + {invalid_levels,#{sync_mode_qlen:=43, + drop_mode_qlen:=42}}}}} = logger:add_handler(?MODULE,logger_std_h, #{config => #{sync_mode_qlen=>43, drop_mode_qlen=>42}}), - {error,{handler_not_added,{invalid_levels,{_,43,42}}}} = + {error,{handler_not_added,{invalid_config,logger_std_h, + {invalid_levels,#{drop_mode_qlen:=43, + flush_qlen:=42}}}}} = logger:add_handler(?MODULE,logger_std_h, #{config => #{drop_mode_qlen=>43, flush_qlen=>42}}), @@ -338,7 +343,7 @@ config_fail(_Config) -> logger:set_handler_config(?MODULE,config, #{type=>{file,"file"}}), - {error,{invalid_levels,_}} = + {error,{invalid_config,logger_std_h,{invalid_levels,_}}} = logger:set_handler_config(?MODULE,config, #{sync_mode_qlen=>100, flush_qlen=>99}), @@ -643,10 +648,8 @@ sync(Config) -> %% check repeated filesync happens start_tracer([{logger_std_h, write_to_dev, 5}, - {logger_std_h, sync_dev, 4}, {file, datasync, 1}], [{logger_std_h, write_to_dev, <<"first\n">>}, - {logger_std_h, sync_dev}, {file,datasync}]), logger:notice("first", ?domain), @@ -655,10 +658,8 @@ sync(Config) -> %% check that explicit filesync is only done once start_tracer([{logger_std_h, write_to_dev, 5}, - {logger_std_h, sync_dev, 4}, {file, datasync, 1}], [{logger_std_h, write_to_dev, <<"second\n">>}, - {logger_std_h, sync_dev}, {file,datasync}, {no_more,500} ]), @@ -679,13 +680,10 @@ sync(Config) -> %% triggered by the idle timeout between "thrid" and "fourth". timer:sleep(?IDLE_DETECT_TIME_MSEC*2), start_tracer([{logger_std_h, write_to_dev, 5}, - {logger_std_h, sync_dev, 4}, {file, datasync, 1}], [{logger_std_h, write_to_dev, <<"third\n">>}, - {logger_std_h, sync_dev}, {file,datasync}, {logger_std_h, write_to_dev, <<"fourth\n">>}, - {logger_std_h, sync_dev}, {file,datasync}]), logger:notice("third", ?domain), %% wait for automatic filesync @@ -698,9 +696,9 @@ sync(Config) -> SyncInt = 1000, WaitT = 4500, OneSync = {logger_h_common,handle_cast,repeated_filesync}, - %% receive 1 initial repeated_filesync, then 1 per sec + %% receive 1 repeated_filesync per sec start_tracer([{logger_h_common,handle_cast,2}], - [OneSync || _ <- lists:seq(1, 1 + trunc(WaitT/SyncInt))]), + [OneSync || _ <- lists:seq(1, trunc(WaitT/SyncInt))]), ok = logger:update_handler_config(?MODULE, config, #{filesync_repeat_interval => SyncInt}), |