diff options
Diffstat (limited to 'lib/kernel/src/logger_disk_log_h.erl')
-rw-r--r-- | lib/kernel/src/logger_disk_log_h.erl | 694 |
1 files changed, 694 insertions, 0 deletions
diff --git a/lib/kernel/src/logger_disk_log_h.erl b/lib/kernel/src/logger_disk_log_h.erl new file mode 100644 index 0000000000..3b71f936d8 --- /dev/null +++ b/lib/kernel/src/logger_disk_log_h.erl @@ -0,0 +1,694 @@ +%% +%% %CopyrightBegin% +%% +%% Copyright Ericsson AB 2017. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%% +%% %CopyrightEnd% +%% +-module(logger_disk_log_h). + +-behaviour(gen_server). + +-include("logger.hrl"). +-include("logger_internal.hrl"). +-include("logger_h_common.hrl"). + +%%% API +-export([start_link/3, info/1, disk_log_sync/1, reset/1]). + +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +%% logger callbacks +-export([log/2, + adding_handler/2, removing_handler/1, + changing_config/3, swap_buffer/2]). + +%%%=================================================================== +%%% API +%%%=================================================================== + +%%%----------------------------------------------------------------- +%%% Start a disk_log handler process and link to caller. +%%% This function is called by the kernel supervisor when this +%%% handler process gets added (as a result of calling add/3). +-spec start_link(Name, Config, HandlerState) -> {ok,Pid} | {error,Reason} when + Name :: atom(), + Config :: logger:config(), + HandlerState :: map(), + Pid :: pid(), + Reason :: term(). + +start_link(Name, Config, HandlerState) -> + proc_lib:start_link(?MODULE,init,[[Name,Config,HandlerState]]). + +%%%----------------------------------------------------------------- +%%% +-spec disk_log_sync(Name) -> ok | {error,Reason} when + Name :: atom(), + Reason :: handler_busy | {badarg,term()}. + +disk_log_sync(Name) when is_atom(Name) -> + try + gen_server:call(Name, disk_log_sync, ?DEFAULT_CALL_TIMEOUT) + catch + _:{timeout,_} -> {error,handler_busy} + end; +disk_log_sync(Name) -> + {error,{badarg,{disk_log_sync,[Name]}}}. + +%%%----------------------------------------------------------------- +%%% +-spec info(Name) -> Info | {error,Reason} when + Name :: atom(), + Info :: term(), + Reason :: handler_busy | {badarg,term()}. + +info(Name) when is_atom(Name) -> + try + gen_server:call(Name, info, ?DEFAULT_CALL_TIMEOUT) + catch + _:{timeout,_} -> {error,handler_busy} + end; +info(Name) -> + {error,{badarg,{info,[Name]}}}. + +%%%----------------------------------------------------------------- +%%% +-spec reset(Name) -> ok | {error,Reason} when + Name :: atom(), + Reason :: handler_busy | {badarg,term()}. + +reset(Name) when is_atom(Name) -> + try + gen_server:call(Name, reset, ?DEFAULT_CALL_TIMEOUT) + catch + _:{timeout,_} -> {error,handler_busy} + end; +reset(Name) -> + {error,{badarg,{reset,[Name]}}}. + + +%%%=================================================================== +%%% logger callbacks +%%%=================================================================== + +%%%----------------------------------------------------------------- +%%% Handler being added +adding_handler(Name, Config) -> + case check_config(adding, Name, Config) of + {ok, Config1} -> + %% create initial handler state by merging defaults with config + HConfig = maps:get(?MODULE, Config1, #{}), + HState = maps:merge(get_init_state(), HConfig), + case logger_h_common:overload_levels_ok(HState) of + true -> + case start(Name, Config1, HState) of + ok -> + %% Make sure wait_for_buffer is not stored, so we + %% won't hang and wait for buffer on a restart + {ok, maps:remove(wait_for_buffer,Config1)}; + Error -> + Error + end; + false -> + #{toggle_sync_qlen := TSQL, + drop_new_reqs_qlen := DNRQL, + flush_reqs_qlen := FRQL} = HState, + {error,{invalid_levels,{TSQL,DNRQL,FRQL}}} + end; + Error -> + Error + end. + +%%%----------------------------------------------------------------- +%%% Updating handler config +changing_config(Name, + OldConfig=#{id:=Id, disk_log_opts:=DLOpts}, + NewConfig=#{id:=Id, disk_log_opts:=DLOpts}) -> + case check_config(changing, Name, NewConfig) of + Result = {ok,NewConfig1} -> + try gen_server:call(Name, {change_config,OldConfig,NewConfig1}, + ?DEFAULT_CALL_TIMEOUT) of + ok -> Result; + HError -> HError + catch + _:{timeout,_} -> {error,handler_busy} + end; + Error -> + Error + end; +changing_config(_Name, OldConfig, NewConfig) -> + {error,{illegal_config_change,OldConfig,NewConfig}}. + +check_config(adding, Name, Config0) -> + %% Merge in defaults on top level + Config = maps:merge(#{id => Name}, Config0), + %% Merge in defaults on handler level + LogOpts0 = maps:get(disk_log_opts, Config, #{}), + LogOpts = merge_default_logopts(Name, LogOpts0), + case check_log_opts(maps:to_list(LogOpts)) of + ok -> + MyConfig = maps:get(?MODULE, Config, #{}), + case check_my_config(maps:to_list(MyConfig)) of + ok -> + {ok,Config#{disk_log_opts=>LogOpts, + ?MODULE=>MyConfig}}; + Error -> + Error + end; + Error -> + Error + end; +check_config(changing, _Name, Config) -> + MyConfig = maps:get(?MODULE, Config, #{}), + case check_my_config(maps:to_list(MyConfig)) of + ok -> {ok,Config}; + Error -> Error + end. + +merge_default_logopts(Name, LogOpts) -> + Type = maps:get(type, LogOpts, wrap), + {DefaultNoFiles,DefaultNoBytes} = + case Type of + halt -> {undefined,infinity}; + _wrap -> {10,1048576} + end, + {ok,Dir} = file:get_cwd(), + Default = #{file => filename:join(Dir,Name), + max_no_files => DefaultNoFiles, + max_no_bytes => DefaultNoBytes, + type => Type}, + maps:merge(Default,LogOpts). + +check_log_opts([{file,File}|Opts]) when is_list(File) -> + check_log_opts(Opts); +check_log_opts([{max_no_files,undefined}|Opts]) -> + check_log_opts(Opts); +check_log_opts([{max_no_files,N}|Opts]) when is_integer(N), N>0 -> + check_log_opts(Opts); +check_log_opts([{max_no_bytes,infinity}|Opts]) -> + check_log_opts(Opts); +check_log_opts([{max_no_bytes,N}|Opts]) when is_integer(N), N>0 -> + check_log_opts(Opts); +check_log_opts([{type,Type}|Opts]) when Type==wrap; Type==halt -> + check_log_opts(Opts); +check_log_opts([Invalid|_]) -> + {error,{invalid_config,disk_log_opt,Invalid}}; +check_log_opts([]) -> + ok. + +check_my_config([Other | Config]) -> + case logger_h_common:check_common_config(Other) of + valid -> + check_my_config(Config); + invalid -> + {error,{invalid_config,?MODULE,Other}} + end; +check_my_config([]) -> + ok. + +%%%----------------------------------------------------------------- +%%% Handler being removed +removing_handler(Name) -> + stop(Name). + +%%%----------------------------------------------------------------- +%%% Get buffer when swapping from simple handler +swap_buffer(Name,Buffer) -> + case whereis(Name) of + undefined -> + ok; + _ -> + Name ! {buffer,Buffer} + end. + +%%%----------------------------------------------------------------- +%%% Log a string or report +-spec log(Log, Config) -> ok | dropped when + Log :: logger:log(), + Config :: logger:config(). + +log(Log,Config=#{id:=Name}) -> + %% if the handler has crashed, we must drop this request + %% and hope the handler restarts so we can try again + true = is_pid(whereis(Name)), + Bin = logger_h_common:log_to_binary(Log,Config), + logger_h_common:call_cast_or_drop(Name, Bin). + + +%%%=================================================================== +%%% gen_server callbacks +%%%=================================================================== + +init([Name, Config = #{disk_log_opts := LogOpts}, + State = #{dl_sync_int := DLSyncInt}]) -> + register(Name, self()), + process_flag(trap_exit, true), + process_flag(message_queue_data, off_heap), + + ?init_test_hooks(), + ?start_observation(Name), + + case open_disk_log(Name, LogOpts) of + ok -> + catch ets:new(Name, [public, named_table]), + ?set_mode(Name, async), + proc_lib:init_ack({ok,self()}), + T0 = ?timestamp(), + State1 = + ?merge_with_stats(State#{id => Name, + mode => async, + dl_sync => DLSyncInt, + log_opts => LogOpts, + last_qlen => 0, + last_log_ts => T0, + burst_win_ts => T0, + burst_msg_count => 0, + prev_log_result => ok, + prev_sync_result => ok, + prev_disk_log_info => undefined}), + gen_server:cast(self(), {repeated_disk_log_sync,T0}), + enter_loop(Config, State1); + Error -> + logger_h_common:error_notify({open_disk_log,Name,Error}), + proc_lib:init_ack(Error) + end. + +enter_loop(#{wait_for_buffer:=true}=Config,State) -> + State1 = + receive + {buffer,Buffer} -> + lists:foldl( + fun(Log,S) -> + Bin = logger_h_common:log_to_binary(Log,Config), + {_,S1} = do_log(Bin,cast,S), + S1 + end, + State, + Buffer) + end, + gen_server:enter_loop(?MODULE,[],State1); +enter_loop(_Config,State) -> + gen_server:enter_loop(?MODULE,[],State). + +%% This is the synchronous log request. +handle_call({log, Bin}, _From, State) -> + {Result,State1} = do_log(Bin, call, State), + %% Result == ok | dropped + {reply, Result, State1}; + +handle_call(disk_log_sync, _From, State = #{id := Name}) -> + State1 = #{prev_sync_result := Result} = disk_log_sync(Name, State), + {reply, Result, State1}; + +handle_call({change_config,_OldConfig,NewConfig}, _From, + State = #{filesync_repeat_interval := FSyncInt0, + last_log_ts := LastLogTS}) -> + HConfig = maps:get(?MODULE, NewConfig, #{}), + State1 = #{toggle_sync_qlen := TSQL, + drop_new_reqs_qlen := DNRQL, + flush_reqs_qlen := FRQL} = maps:merge(State, HConfig), + case logger_h_common:overload_levels_ok(State1) of + true -> + _ = + case maps:get(filesync_repeat_interval, HConfig, undefined) of + undefined -> + ok; + no_repeat -> + _ = logger_h_common:cancel_timer(maps:get(rep_sync_tref, + State, + undefined)); + FSyncInt0 -> + ok; + _FSyncInt1 -> + _ = logger_h_common:cancel_timer(maps:get(rep_sync_tref, + State, + undefined)), + _ = gen_server:cast(self(), {repeated_disk_log_sync, + LastLogTS}) + end, + {reply, ok, State1}; + false -> + {reply, {error,{invalid_levels,{TSQL,DNRQL,FRQL}}}, State} + end; + +handle_call(info, _From, State) -> + {reply, State, State}; + +handle_call(reset, _From, State) -> + State1 = ?merge_with_stats(State), + {reply, ok, State1#{last_qlen => 0, + last_log_ts => ?timestamp(), + prev_log_result => ok, + prev_sync_result => ok, + prev_disk_log_info => undefined}}; + +handle_call(stop, _From, State) -> + {stop, {shutdown,stopped}, ok, State}. + + +%% This is the asynchronous log request. +handle_cast({log, Bin}, State) -> + {_,State1} = do_log(Bin, cast, State), + {noreply, State1}; + +%% If FILESYNC_REPEAT_INTERVAL is set to a millisec value, this +%% clause gets called repeatedly by the handler. In order to +%% guarantee that a filesync *always* happens after the last log +%% request, the repeat operation must be active! +handle_cast({repeated_disk_log_sync,LastLogTS0}, + State = #{id := Name, + filesync_repeat_interval := FSyncInt, + last_log_ts := LastLogTS1}) -> + State1 = + if is_integer(FSyncInt) -> + %% only do filesync if something has been + %% written since last time we checked + NewState = if LastLogTS1 == LastLogTS0 -> + State; + true -> + disk_log_sync(Name, State) + end, + {ok,TRef} = + timer:apply_after(FSyncInt, gen_server,cast, + [self(), + {repeated_disk_log_sync,LastLogTS1}]), + NewState#{rep_sync_tref => TRef}; + true -> + State + end, + {noreply,State1}. + +%% The disk log owner must handle status messages from disk_log. +handle_info({disk_log, _Node, _Log, {wrap,_NoLostItems}}, State) -> + {noreply, State}; +handle_info({disk_log, _Node, Log, Info = {truncated,_NoLostItems}}, + State = #{id := Name, prev_disk_log_info := PrevInfo}) -> + error_notify_new(Info, PrevInfo, {disk_log,Name,Log,Info}), + {noreply, State#{prev_disk_log_info => Info}}; +handle_info({disk_log, _Node, Log, Info = {blocked_log,_Items}}, + State = #{id := Name, prev_disk_log_info := PrevInfo}) -> + error_notify_new(Info, PrevInfo, {disk_log,Name,Log,Info}), + {noreply, State#{prev_disk_log_info => Info}}; +handle_info({disk_log, _Node, Log, full}, + State = #{id := Name, prev_disk_log_info := PrevInfo}) -> + error_notify_new(full, PrevInfo, {disk_log,Name,Log,full}), + {noreply, State#{prev_disk_log_info => full}}; +handle_info({disk_log, _Node, Log, Info = {error_status,_Status}}, + State = #{id := Name, prev_disk_log_info := PrevInfo}) -> + error_notify_new(Info, PrevInfo, {disk_log,Name,Log,Info}), + {noreply, State#{prev_disk_log_info => Info}}; + +handle_info({'EXIT',_Pid,_Why}, State = #{id := _Name}) -> + {noreply, State}; + +handle_info(_, State) -> + {noreply, State}. + +terminate(Reason, State = #{id := Name}) -> + _ = logger_h_common:cancel_timer(maps:get(rep_sync_tref, State, + undefined)), + _ = close_disk_log(Name, normal), + logger_h_common:stop_or_restart(Name, Reason, State). + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%%----------------------------------------------------------------- +%%% Internal functions + +%%%----------------------------------------------------------------- +%%% +get_init_state() -> + #{toggle_sync_qlen => ?TOGGLE_SYNC_QLEN, + drop_new_reqs_qlen => ?DROP_NEW_REQS_QLEN, + flush_reqs_qlen => ?FLUSH_REQS_QLEN, + enable_burst_limit => ?ENABLE_BURST_LIMIT, + burst_limit_size => ?BURST_LIMIT_SIZE, + burst_window_time => ?BURST_WINDOW_TIME, + enable_kill_overloaded => ?ENABLE_KILL_OVERLOADED, + handler_overloaded_qlen => ?HANDLER_OVERLOADED_QLEN, + handler_overloaded_mem => ?HANDLER_OVERLOADED_MEM, + handler_restart_after => ?HANDLER_RESTART_AFTER, + dl_sync_int => ?CONTROLLER_SYNC_INTERVAL, + filesync_ok_qlen => ?FILESYNC_OK_QLEN, + filesync_repeat_interval => ?FILESYNC_REPEAT_INTERVAL}. + +%%%----------------------------------------------------------------- +%%% Add a disk_log handler to the logger. +%%% This starts a dedicated handler process which should always +%%% exist if the handler is registered with logger (and should not +%%% exist if the handler is not registered). +%%% +%%% Config is the logger:config() map containing a sub map with any of +%%% the following associations: +%%% +%%% Config = #{disk_log_opts => #{file => file:filename(), +%%% max_no_bytes => integer(), +%%% max_no_files => integer(), +%%% type => wrap | halt}}. +%%% +%%% This map will be merged with the logger configuration data for +%%% the disk_log LogName. If type == halt, then max_no_files is +%%% ignored. +%%% +%%% Handler specific config should be provided with a sub map associated +%%% with a key named the same as this module, e.g: +%%% +%%% Config = #{logger_disk_log_h => #{toggle_sync_qlen => 50} +%%% +%%% The disk_log handler process is linked to logger_sup, which is +%%% part of the kernel application's supervision tree. +start(Name, Config, HandlerState) -> + LoggerDLH = + #{id => Name, + start => {?MODULE, start_link, [Name,Config,HandlerState]}, + restart => temporary, + shutdown => 2000, + type => worker, + modules => [?MODULE]}, + case supervisor:start_child(logger_sup, LoggerDLH) of + {ok,_} -> + ok; + Error -> + Error + end. + +%%%----------------------------------------------------------------- +%%% Stop and remove the handler. +stop(Name) -> + case whereis(Name) of + undefined -> + ok; + _ -> + %% We don't want to do supervisor:terminate_child here + %% since we need to distinguish this explicit stop from a + %% system termination in order to avoid circular attempts + %% at removing the handler (implying deadlocks and + %% timeouts). + _ = gen_server:call(Name,stop), + _ = supervisor:delete_child(logger_sup, Name), + ok + end. + +%%%----------------------------------------------------------------- +%%% Logging and overload control. +-define(update_dl_sync(C, Interval), + if C == 0 -> Interval; + true -> C-1 end). + +%% check for overload between every request (and set Mode to async, +%% sync or drop accordingly), but never flush the whole mailbox +%% before LogWindowSize requests have been handled +do_log(Bin, CallOrCast, State = #{id:=Name, mode := _Mode0}) -> + T1 = ?timestamp(), + + %% check if the handler is getting overloaded, or if it's + %% recovering from overload (the check must be done for each + %% request to react quickly to large bursts of requests and + %% to ensure that the handler can never end up in drop mode + %% with an empty mailbox, which would stop operation) + {Mode1,QLen,Mem,State1} = logger_h_common:check_load(State), + + %% kill the handler if it can't keep up with the load + logger_h_common:kill_if_choked(Name, QLen, Mem, State), + + if Mode1 == flush -> + flush(Name, QLen, T1, State1); + true -> + write(Name, Mode1, T1, Bin, CallOrCast, State1) + end. + +%% this function is called by do_log/3 after an overload check +%% has been performed, where QLen > FlushQLen +flush(Name, _QLen0, T1, State=#{last_log_ts := _T0}) -> + %% flush messages in the mailbox (a limited number in + %% order to not cause long delays) + _NewFlushed = logger_h_common:flush_log_requests(?FLUSH_MAX_N), + + %% because of the receive loop when flushing messages, the + %% handler will be scheduled out often and the mailbox could + %% grow very large, so we'd better check the queue again here + {_,_QLen1} = process_info(self(), message_queue_len), + ?observe(Name,{max_qlen,_QLen1}), + + %% Add 1 for the current log request + ?observe(Name,{flushed,_NewFlushed+1}), + + State1 = ?update_max_time(?diff_time(T1,_T0),State), + {dropped,?update_other(flushed,FLUSHED,_NewFlushed, + State1#{mode => ?set_mode(Name,async), + last_qlen => 0, + last_log_ts => T1})}. + +%% this function is called to write to disk_log +write(Name, Mode, T1, Bin, _CallOrCast, + State = #{dl_sync := DLSync, + dl_sync_int := DLSyncInt, + last_qlen := LastQLen, + last_log_ts := T0}) -> + %% check if we need to limit the number of writes + %% during a burst of log requests + {DoWrite,BurstWinT,BurstMsgCount} = logger_h_common:limit_burst(State), + + %% only send a synhrounous request to the disk_log process + %% every DLSyncInt time, to give the handler time between + %% writes so it can keep up with incoming messages + {Status,LastQLen1,State1} = + if DoWrite, DLSync == 0 -> + ?observe(Name,{_CallOrCast,1}), + NewState = disk_log_write(Name, Bin, State), + {ok, element(2,process_info(self(),message_queue_len)), + NewState}; + DoWrite -> + ?observe(Name,{_CallOrCast,1}), + NewState = disk_log_write(Name, Bin, State), + {ok, LastQLen, NewState}; + not DoWrite -> + ?observe(Name,{flushed,1}), + {dropped, LastQLen, State} + end, + + %% Check if the time since the previous log request is long enough - + %% and the queue length small enough - to assume the mailbox has + %% been emptied, and if so, do filesync operation and reset mode to + %% async. Note that this is the best we can do to detect an idle + %% handler without setting a timer after each log call/cast. If the + %% time between two consecutive log requests is fast and no new + %% request comes in after the last one, idle state won't be detected! + Time = ?diff_time(T1,T0), + {Mode1,BurstMsgCount1,State2} = + if (LastQLen1 < ?FILESYNC_OK_QLEN) andalso + (Time > ?IDLE_DETECT_TIME_USEC) -> + {?change_mode(Name,Mode,async), 0, disk_log_sync(Name,State1)}; + true -> + {Mode, BurstMsgCount,State1} + end, + + State3 = + ?update_calls_or_casts(_CallOrCast,1,State2), + State4 = + ?update_max_time(Time, + State3#{mode => Mode1, + last_qlen := LastQLen1, + last_log_ts => T1, + burst_win_ts => BurstWinT, + burst_msg_count => BurstMsgCount1, + dl_sync => ?update_dl_sync(DLSync,DLSyncInt)}), + {Status,State4}. + + +open_disk_log(Name, LogOpts) -> + #{file := File, + max_no_bytes := MaxNoBytes, + max_no_files := MaxNoFiles, + type := Type} = LogOpts, + case filelib:ensure_dir(File) of + ok -> + Size = + if Type == halt -> MaxNoBytes; + Type == wrap -> {MaxNoBytes,MaxNoFiles} + end, + Opts = [{name, Name}, + {file, File}, + {size, Size}, + {type, Type}, + {linkto, self()}, + {repair, false}, + {format, external}, + {notify, true}, + {quiet, true}, + {mode, read_write}], + case disk_log:open(Opts) of + {ok,Name} -> + ok; + Error = {error,_Reason} -> + Error + end; + Error -> + Error + end. + +close_disk_log(Name, _) -> + _ = ?disk_log_sync(Name), + _ = disk_log:lclose(Name), + ok. + +disk_log_write(Name, Bin, State) -> + Result = + case ?disk_log_blog(Name, Bin) of + ok -> + ok; + LogError -> + _ = case maps:get(prev_log_result, State) of + LogError -> + %% don't report same error twice + ok; + _ -> + LogOpts = maps:get(log_opts, State), + logger_h_common:error_notify({Name,log, + LogOpts, + LogError}) + end, + LogError + end, + State#{prev_log_result => Result}. + +disk_log_sync(Name, State) -> + Result = + case ?disk_log_sync(Name) of + ok -> + ok; + SyncError -> + _ = case maps:get(prev_sync_result, State) of + SyncError -> + %% don't report same error twice + ok; + _ -> + LogOpts = maps:get(log_opts, State), + logger_h_common:error_notify({Name,sync, + LogOpts, + SyncError}) + end, + SyncError + end, + State#{prev_sync_result => Result}. + +error_notify_new(Info,Info, _Term) -> + ok; +error_notify_new(_Info0,_Info1, Term) -> + logger_h_common:error_notify(Term). |