%%
%% %CopyrightBegin%
%%
%% Copyright Ericsson AB 2017. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%
%% %CopyrightEnd%
%%
-module(logger_disk_log_h).
-behaviour(gen_server).
-include("logger.hrl").
-include("logger_internal.hrl").
-include("logger_h_common.hrl").
%%% API
-export([start_link/3, info/1, disk_log_sync/1, reset/1]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
%% logger callbacks
-export([log/2,
adding_handler/2, removing_handler/1,
changing_config/3, swap_buffer/2]).
%%%===================================================================
%%% API
%%%===================================================================
%%%-----------------------------------------------------------------
%%% Start a disk_log handler process and link to caller.
%%% This function is called by the kernel supervisor when this
%%% handler process gets added (as a result of calling add/3).
-spec start_link(Name, Config, HandlerState) -> {ok,Pid} | {error,Reason} when
Name :: atom(),
Config :: logger:config(),
HandlerState :: map(),
Pid :: pid(),
Reason :: term().
start_link(Name, Config, HandlerState) ->
proc_lib:start_link(?MODULE,init,[[Name,Config,HandlerState]]).
%%%-----------------------------------------------------------------
%%%
-spec disk_log_sync(Name) -> ok | {error,Reason} when
Name :: atom(),
Reason :: handler_busy | {badarg,term()}.
disk_log_sync(Name) when is_atom(Name) ->
try
gen_server:call(Name, disk_log_sync, ?DEFAULT_CALL_TIMEOUT)
catch
_:{timeout,_} -> {error,handler_busy}
end;
disk_log_sync(Name) ->
{error,{badarg,{disk_log_sync,[Name]}}}.
%%%-----------------------------------------------------------------
%%%
-spec info(Name) -> Info | {error,Reason} when
Name :: atom(),
Info :: term(),
Reason :: handler_busy | {badarg,term()}.
info(Name) when is_atom(Name) ->
try
gen_server:call(Name, info, ?DEFAULT_CALL_TIMEOUT)
catch
_:{timeout,_} -> {error,handler_busy}
end;
info(Name) ->
{error,{badarg,{info,[Name]}}}.
%%%-----------------------------------------------------------------
%%%
-spec reset(Name) -> ok | {error,Reason} when
Name :: atom(),
Reason :: handler_busy | {badarg,term()}.
reset(Name) when is_atom(Name) ->
try
gen_server:call(Name, reset, ?DEFAULT_CALL_TIMEOUT)
catch
_:{timeout,_} -> {error,handler_busy}
end;
reset(Name) ->
{error,{badarg,{reset,[Name]}}}.
%%%===================================================================
%%% logger callbacks
%%%===================================================================
%%%-----------------------------------------------------------------
%%% Handler being added
adding_handler(Name, Config) ->
case check_config(adding, Name, Config) of
{ok, Config1} ->
%% create initial handler state by merging defaults with config
HConfig = maps:get(?MODULE, Config1, #{}),
HState = maps:merge(get_init_state(), HConfig),
case logger_h_common:overload_levels_ok(HState) of
true ->
case start(Name, Config1, HState) of
ok ->
%% Make sure wait_for_buffer is not stored, so we
%% won't hang and wait for buffer on a restart
{ok, maps:remove(wait_for_buffer,Config1)};
Error ->
Error
end;
false ->
#{toggle_sync_qlen := TSQL,
drop_new_reqs_qlen := DNRQL,
flush_reqs_qlen := FRQL} = HState,
{error,{invalid_levels,{TSQL,DNRQL,FRQL}}}
end;
Error ->
Error
end.
%%%-----------------------------------------------------------------
%%% Updating handler config
changing_config(Name,
OldConfig=#{id:=Id, disk_log_opts:=DLOpts},
NewConfig=#{id:=Id, disk_log_opts:=DLOpts}) ->
case check_config(changing, Name, NewConfig) of
Result = {ok,NewConfig1} ->
try gen_server:call(Name, {change_config,OldConfig,NewConfig1},
?DEFAULT_CALL_TIMEOUT) of
ok -> Result;
HError -> HError
catch
_:{timeout,_} -> {error,handler_busy}
end;
Error ->
Error
end;
changing_config(_Name, OldConfig, NewConfig) ->
{error,{illegal_config_change,OldConfig,NewConfig}}.
check_config(adding, Name, Config0) ->
%% Merge in defaults on top level
Config = maps:merge(#{id => Name}, Config0),
%% Merge in defaults on handler level
LogOpts0 = maps:get(disk_log_opts, Config, #{}),
LogOpts = merge_default_logopts(Name, LogOpts0),
case check_log_opts(maps:to_list(LogOpts)) of
ok ->
MyConfig = maps:get(?MODULE, Config, #{}),
case check_my_config(maps:to_list(MyConfig)) of
ok ->
{ok,Config#{disk_log_opts=>LogOpts,
?MODULE=>MyConfig}};
Error ->
Error
end;
Error ->
Error
end;
check_config(changing, _Name, Config) ->
MyConfig = maps:get(?MODULE, Config, #{}),
case check_my_config(maps:to_list(MyConfig)) of
ok -> {ok,Config};
Error -> Error
end.
merge_default_logopts(Name, LogOpts) ->
Type = maps:get(type, LogOpts, wrap),
{DefaultNoFiles,DefaultNoBytes} =
case Type of
halt -> {undefined,infinity};
_wrap -> {10,1048576}
end,
{ok,Dir} = file:get_cwd(),
Default = #{file => filename:join(Dir,Name),
max_no_files => DefaultNoFiles,
max_no_bytes => DefaultNoBytes,
type => Type},
maps:merge(Default,LogOpts).
check_log_opts([{file,File}|Opts]) when is_list(File) ->
check_log_opts(Opts);
check_log_opts([{max_no_files,undefined}|Opts]) ->
check_log_opts(Opts);
check_log_opts([{max_no_files,N}|Opts]) when is_integer(N), N>0 ->
check_log_opts(Opts);
check_log_opts([{max_no_bytes,infinity}|Opts]) ->
check_log_opts(Opts);
check_log_opts([{max_no_bytes,N}|Opts]) when is_integer(N), N>0 ->
check_log_opts(Opts);
check_log_opts([{type,Type}|Opts]) when Type==wrap; Type==halt ->
check_log_opts(Opts);
check_log_opts([Invalid|_]) ->
{error,{invalid_config,disk_log_opt,Invalid}};
check_log_opts([]) ->
ok.
check_my_config([Other | Config]) ->
case logger_h_common:check_common_config(Other) of
valid ->
check_my_config(Config);
invalid ->
{error,{invalid_config,?MODULE,Other}}
end;
check_my_config([]) ->
ok.
%%%-----------------------------------------------------------------
%%% Handler being removed
removing_handler(Name) ->
stop(Name).
%%%-----------------------------------------------------------------
%%% Get buffer when swapping from simple handler
swap_buffer(Name,Buffer) ->
case whereis(Name) of
undefined ->
ok;
_ ->
Name ! {buffer,Buffer}
end.
%%%-----------------------------------------------------------------
%%% Log a string or report
-spec log(Log, Config) -> ok | dropped when
Log :: logger:log(),
Config :: logger:config().
log(Log,Config=#{id:=Name}) ->
%% if the handler has crashed, we must drop this request
%% and hope the handler restarts so we can try again
true = is_pid(whereis(Name)),
Bin = logger_h_common:log_to_binary(Log,Config),
logger_h_common:call_cast_or_drop(Name, Bin).
%%%===================================================================
%%% gen_server callbacks
%%%===================================================================
init([Name, Config = #{disk_log_opts := LogOpts},
State = #{dl_sync_int := DLSyncInt}]) ->
register(Name, self()),
process_flag(trap_exit, true),
process_flag(message_queue_data, off_heap),
?init_test_hooks(),
?start_observation(Name),
case open_disk_log(Name, LogOpts) of
ok ->
catch ets:new(Name, [public, named_table]),
?set_mode(Name, async),
proc_lib:init_ack({ok,self()}),
T0 = ?timestamp(),
State1 =
?merge_with_stats(State#{id => Name,
mode => async,
dl_sync => DLSyncInt,
log_opts => LogOpts,
last_qlen => 0,
last_log_ts => T0,
burst_win_ts => T0,
burst_msg_count => 0,
prev_log_result => ok,
prev_sync_result => ok,
prev_disk_log_info => undefined}),
gen_server:cast(self(), {repeated_disk_log_sync,T0}),
enter_loop(Config, State1);
Error ->
logger_h_common:error_notify({open_disk_log,Name,Error}),
proc_lib:init_ack(Error)
end.
enter_loop(#{wait_for_buffer:=true}=Config,State) ->
State1 =
receive
{buffer,Buffer} ->
lists:foldl(
fun(Log,S) ->
Bin = logger_h_common:log_to_binary(Log,Config),
{_,S1} = do_log(Bin,cast,S),
S1
end,
State,
Buffer)
end,
gen_server:enter_loop(?MODULE,[],State1);
enter_loop(_Config,State) ->
gen_server:enter_loop(?MODULE,[],State).
%% This is the synchronous log request.
handle_call({log, Bin}, _From, State) ->
{Result,State1} = do_log(Bin, call, State),
%% Result == ok | dropped
{reply, Result, State1};
handle_call(disk_log_sync, _From, State = #{id := Name}) ->
State1 = #{prev_sync_result := Result} = disk_log_sync(Name, State),
{reply, Result, State1};
handle_call({change_config,_OldConfig,NewConfig}, _From,
State = #{filesync_repeat_interval := FSyncInt0,
last_log_ts := LastLogTS}) ->
HConfig = maps:get(?MODULE, NewConfig, #{}),
State1 = #{toggle_sync_qlen := TSQL,
drop_new_reqs_qlen := DNRQL,
flush_reqs_qlen := FRQL} = maps:merge(State, HConfig),
case logger_h_common:overload_levels_ok(State1) of
true ->
_ =
case maps:get(filesync_repeat_interval, HConfig, undefined) of
undefined ->
ok;
no_repeat ->
_ = logger_h_common:cancel_timer(maps:get(rep_sync_tref,
State,
undefined));
FSyncInt0 ->
ok;
_FSyncInt1 ->
_ = logger_h_common:cancel_timer(maps:get(rep_sync_tref,
State,
undefined)),
_ = gen_server:cast(self(), {repeated_disk_log_sync,
LastLogTS})
end,
{reply, ok, State1};
false ->
{reply, {error,{invalid_levels,{TSQL,DNRQL,FRQL}}}, State}
end;
handle_call(info, _From, State) ->
{reply, State, State};
handle_call(reset, _From, State) ->
State1 = ?merge_with_stats(State),
{reply, ok, State1#{last_qlen => 0,
last_log_ts => ?timestamp(),
prev_log_result => ok,
prev_sync_result => ok,
prev_disk_log_info => undefined}};
handle_call(stop, _From, State) ->
{stop, {shutdown,stopped}, ok, State}.
%% This is the asynchronous log request.
handle_cast({log, Bin}, State) ->
{_,State1} = do_log(Bin, cast, State),
{noreply, State1};
%% If FILESYNC_REPEAT_INTERVAL is set to a millisec value, this
%% clause gets called repeatedly by the handler. In order to
%% guarantee that a filesync *always* happens after the last log
%% request, the repeat operation must be active!
handle_cast({repeated_disk_log_sync,LastLogTS0},
State = #{id := Name,
filesync_repeat_interval := FSyncInt,
last_log_ts := LastLogTS1}) ->
State1 =
if is_integer(FSyncInt) ->
%% only do filesync if something has been
%% written since last time we checked
NewState = if LastLogTS1 == LastLogTS0 ->
State;
true ->
disk_log_sync(Name, State)
end,
{ok,TRef} =
timer:apply_after(FSyncInt, gen_server,cast,
[self(),
{repeated_disk_log_sync,LastLogTS1}]),
NewState#{rep_sync_tref => TRef};
true ->
State
end,
{noreply,State1}.
%% The disk log owner must handle status messages from disk_log.
handle_info({disk_log, _Node, _Log, {wrap,_NoLostItems}}, State) ->
{noreply, State};
handle_info({disk_log, _Node, Log, Info = {truncated,_NoLostItems}},
State = #{id := Name, prev_disk_log_info := PrevInfo}) ->
error_notify_new(Info, PrevInfo, {disk_log,Name,Log,Info}),
{noreply, State#{prev_disk_log_info => Info}};
handle_info({disk_log, _Node, Log, Info = {blocked_log,_Items}},
State = #{id := Name, prev_disk_log_info := PrevInfo}) ->
error_notify_new(Info, PrevInfo, {disk_log,Name,Log,Info}),
{noreply, State#{prev_disk_log_info => Info}};
handle_info({disk_log, _Node, Log, full},
State = #{id := Name, prev_disk_log_info := PrevInfo}) ->
error_notify_new(full, PrevInfo, {disk_log,Name,Log,full}),
{noreply, State#{prev_disk_log_info => full}};
handle_info({disk_log, _Node, Log, Info = {error_status,_Status}},
State = #{id := Name, prev_disk_log_info := PrevInfo}) ->
error_notify_new(Info, PrevInfo, {disk_log,Name,Log,Info}),
{noreply, State#{prev_disk_log_info => Info}};
handle_info({'EXIT',_Pid,_Why}, State = #{id := _Name}) ->
{noreply, State};
handle_info(_, State) ->
{noreply, State}.
terminate(Reason, State = #{id := Name}) ->
_ = logger_h_common:cancel_timer(maps:get(rep_sync_tref, State,
undefined)),
_ = close_disk_log(Name, normal),
logger_h_common:stop_or_restart(Name, Reason, State).
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%%-----------------------------------------------------------------
%%% Internal functions
%%%-----------------------------------------------------------------
%%%
get_init_state() ->
#{toggle_sync_qlen => ?TOGGLE_SYNC_QLEN,
drop_new_reqs_qlen => ?DROP_NEW_REQS_QLEN,
flush_reqs_qlen => ?FLUSH_REQS_QLEN,
enable_burst_limit => ?ENABLE_BURST_LIMIT,
burst_limit_size => ?BURST_LIMIT_SIZE,
burst_window_time => ?BURST_WINDOW_TIME,
enable_kill_overloaded => ?ENABLE_KILL_OVERLOADED,
handler_overloaded_qlen => ?HANDLER_OVERLOADED_QLEN,
handler_overloaded_mem => ?HANDLER_OVERLOADED_MEM,
handler_restart_after => ?HANDLER_RESTART_AFTER,
dl_sync_int => ?CONTROLLER_SYNC_INTERVAL,
filesync_ok_qlen => ?FILESYNC_OK_QLEN,
filesync_repeat_interval => ?FILESYNC_REPEAT_INTERVAL}.
%%%-----------------------------------------------------------------
%%% Add a disk_log handler to the logger.
%%% This starts a dedicated handler process which should always
%%% exist if the handler is registered with logger (and should not
%%% exist if the handler is not registered).
%%%
%%% Config is the logger:config() map containing a sub map with any of
%%% the following associations:
%%%
%%% Config = #{disk_log_opts => #{file => file:filename(),
%%% max_no_bytes => integer(),
%%% max_no_files => integer(),
%%% type => wrap | halt}}.
%%%
%%% This map will be merged with the logger configuration data for
%%% the disk_log LogName. If type == halt, then max_no_files is
%%% ignored.
%%%
%%% Handler specific config should be provided with a sub map associated
%%% with a key named the same as this module, e.g:
%%%
%%% Config = #{logger_disk_log_h => #{toggle_sync_qlen => 50}
%%%
%%% The disk_log handler process is linked to logger_sup, which is
%%% part of the kernel application's supervision tree.
start(Name, Config, HandlerState) ->
LoggerDLH =
#{id => Name,
start => {?MODULE, start_link, [Name,Config,HandlerState]},
restart => temporary,
shutdown => 2000,
type => worker,
modules => [?MODULE]},
case supervisor:start_child(logger_sup, LoggerDLH) of
{ok,_} ->
ok;
Error ->
Error
end.
%%%-----------------------------------------------------------------
%%% Stop and remove the handler.
stop(Name) ->
case whereis(Name) of
undefined ->
ok;
_ ->
%% We don't want to do supervisor:terminate_child here
%% since we need to distinguish this explicit stop from a
%% system termination in order to avoid circular attempts
%% at removing the handler (implying deadlocks and
%% timeouts).
_ = gen_server:call(Name,stop),
_ = supervisor:delete_child(logger_sup, Name),
ok
end.
%%%-----------------------------------------------------------------
%%% Logging and overload control.
-define(update_dl_sync(C, Interval),
if C == 0 -> Interval;
true -> C-1 end).
%% check for overload between every request (and set Mode to async,
%% sync or drop accordingly), but never flush the whole mailbox
%% before LogWindowSize requests have been handled
do_log(Bin, CallOrCast, State = #{id:=Name, mode := _Mode0}) ->
T1 = ?timestamp(),
%% check if the handler is getting overloaded, or if it's
%% recovering from overload (the check must be done for each
%% request to react quickly to large bursts of requests and
%% to ensure that the handler can never end up in drop mode
%% with an empty mailbox, which would stop operation)
{Mode1,QLen,Mem,State1} = logger_h_common:check_load(State),
%% kill the handler if it can't keep up with the load
logger_h_common:kill_if_choked(Name, QLen, Mem, State),
if Mode1 == flush ->
flush(Name, QLen, T1, State1);
true ->
write(Name, Mode1, T1, Bin, CallOrCast, State1)
end.
%% this function is called by do_log/3 after an overload check
%% has been performed, where QLen > FlushQLen
flush(Name, _QLen0, T1, State=#{last_log_ts := _T0}) ->
%% flush messages in the mailbox (a limited number in
%% order to not cause long delays)
_NewFlushed = logger_h_common:flush_log_requests(?FLUSH_MAX_N),
%% because of the receive loop when flushing messages, the
%% handler will be scheduled out often and the mailbox could
%% grow very large, so we'd better check the queue again here
{_,_QLen1} = process_info(self(), message_queue_len),
?observe(Name,{max_qlen,_QLen1}),
%% Add 1 for the current log request
?observe(Name,{flushed,_NewFlushed+1}),
State1 = ?update_max_time(?diff_time(T1,_T0),State),
{dropped,?update_other(flushed,FLUSHED,_NewFlushed,
State1#{mode => ?set_mode(Name,async),
last_qlen => 0,
last_log_ts => T1})}.
%% this function is called to write to disk_log
write(Name, Mode, T1, Bin, _CallOrCast,
State = #{dl_sync := DLSync,
dl_sync_int := DLSyncInt,
last_qlen := LastQLen,
last_log_ts := T0}) ->
%% check if we need to limit the number of writes
%% during a burst of log requests
{DoWrite,BurstWinT,BurstMsgCount} = logger_h_common:limit_burst(State),
%% only send a synhrounous request to the disk_log process
%% every DLSyncInt time, to give the handler time between
%% writes so it can keep up with incoming messages
{Status,LastQLen1,State1} =
if DoWrite, DLSync == 0 ->
?observe(Name,{_CallOrCast,1}),
NewState = disk_log_write(Name, Bin, State),
{ok, element(2,process_info(self(),message_queue_len)),
NewState};
DoWrite ->
?observe(Name,{_CallOrCast,1}),
NewState = disk_log_write(Name, Bin, State),
{ok, LastQLen, NewState};
not DoWrite ->
?observe(Name,{flushed,1}),
{dropped, LastQLen, State}
end,
%% Check if the time since the previous log request is long enough -
%% and the queue length small enough - to assume the mailbox has
%% been emptied, and if so, do filesync operation and reset mode to
%% async. Note that this is the best we can do to detect an idle
%% handler without setting a timer after each log call/cast. If the
%% time between two consecutive log requests is fast and no new
%% request comes in after the last one, idle state won't be detected!
Time = ?diff_time(T1,T0),
{Mode1,BurstMsgCount1,State2} =
if (LastQLen1 < ?FILESYNC_OK_QLEN) andalso
(Time > ?IDLE_DETECT_TIME_USEC) ->
{?change_mode(Name,Mode,async), 0, disk_log_sync(Name,State1)};
true ->
{Mode, BurstMsgCount,State1}
end,
State3 =
?update_calls_or_casts(_CallOrCast,1,State2),
State4 =
?update_max_time(Time,
State3#{mode => Mode1,
last_qlen := LastQLen1,
last_log_ts => T1,
burst_win_ts => BurstWinT,
burst_msg_count => BurstMsgCount1,
dl_sync => ?update_dl_sync(DLSync,DLSyncInt)}),
{Status,State4}.
open_disk_log(Name, LogOpts) ->
#{file := File,
max_no_bytes := MaxNoBytes,
max_no_files := MaxNoFiles,
type := Type} = LogOpts,
case filelib:ensure_dir(File) of
ok ->
Size =
if Type == halt -> MaxNoBytes;
Type == wrap -> {MaxNoBytes,MaxNoFiles}
end,
Opts = [{name, Name},
{file, File},
{size, Size},
{type, Type},
{linkto, self()},
{repair, false},
{format, external},
{notify, true},
{quiet, true},
{mode, read_write}],
case disk_log:open(Opts) of
{ok,Name} ->
ok;
Error = {error,_Reason} ->
Error
end;
Error ->
Error
end.
close_disk_log(Name, _) ->
_ = ?disk_log_sync(Name),
_ = disk_log:lclose(Name),
ok.
disk_log_write(Name, Bin, State) ->
Result =
case ?disk_log_blog(Name, Bin) of
ok ->
ok;
LogError ->
_ = case maps:get(prev_log_result, State) of
LogError ->
%% don't report same error twice
ok;
_ ->
LogOpts = maps:get(log_opts, State),
logger_h_common:error_notify({Name,log,
LogOpts,
LogError})
end,
LogError
end,
State#{prev_log_result => Result}.
disk_log_sync(Name, State) ->
Result =
case ?disk_log_sync(Name) of
ok ->
ok;
SyncError ->
_ = case maps:get(prev_sync_result, State) of
SyncError ->
%% don't report same error twice
ok;
_ ->
LogOpts = maps:get(log_opts, State),
logger_h_common:error_notify({Name,sync,
LogOpts,
SyncError})
end,
SyncError
end,
State#{prev_sync_result => Result}.
error_notify_new(Info,Info, _Term) ->
ok;
error_notify_new(_Info0,_Info1, Term) ->
logger_h_common:error_notify(Term).