From 2929e79806b0e8ffdd4be5c7eaed0cea04bce850 Mon Sep 17 00:00:00 2001 From: Siri Hansen Date: Wed, 31 Oct 2018 15:37:04 +0100 Subject: [logger] Split overload protection functionality to own module --- lib/kernel/src/logger_olp.erl | 558 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 558 insertions(+) create mode 100644 lib/kernel/src/logger_olp.erl (limited to 'lib/kernel/src/logger_olp.erl') diff --git a/lib/kernel/src/logger_olp.erl b/lib/kernel/src/logger_olp.erl new file mode 100644 index 0000000000..6b76c78c73 --- /dev/null +++ b/lib/kernel/src/logger_olp.erl @@ -0,0 +1,558 @@ +%% +%% %CopyrightBegin% +%% +%% Copyright Ericsson AB 2017-2018. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%% +%% %CopyrightEnd% +%% +-module(logger_olp). +-behaviour(gen_server). + +-include("logger_h_common.hrl"). +-include("logger_internal.hrl"). + +%% API +-export([start_link/4, load/2, info/1, reset/1, stop/1, restart/1, + set_opts/2, get_opts/1, get_default_opts/0, is_alive/1, + call/2, cast/2]). + +%% gen_server and proc_lib callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3]). + +%%%----------------------------------------------------------------- +%% -define(CONFIG_KEYS,[sync_mode_qlen, +%% drop_mode_qlen, +%% flush_qlen, +%% burst_limit_enable, +%% burst_limit_max_count, +%% burst_limit_window_time, +%% overload_kill_enable, +%% overload_kill_qlen, +%% overload_kill_mem_size, +%% overload_kill_restart_after]). + +%%%----------------------------------------------------------------- +%%% API + +%-spec start_link(Name,Module,Args,Options) -> {ok,Pid,Olp} | {error,Reason}. +start_link(Name,Module,Args,Options0) when is_map(Options0) -> + Options = maps:merge(get_default_opts(),Options0), + case check_opts(Options) of + ok -> + case proc_lib:start_link(?MODULE,init, + [[Name,Module,Args,Options]]) of + {ok,Pid,Olp} -> + {ok,Pid,{Olp,Options}}; + Error -> + Error + end; + Error -> + Error + end. + +is_alive({_Name,Pid,_ModeRef}) -> + is_process_alive(Pid). + +load({_Name,Pid,ModeRef},Msg) -> + %% If the process is getting overloaded, the message will be + %% synchronous instead of asynchronous (slows down the tempo of a + %% process causing much load). If the process is choked, drop mode + %% is set and no message is sent. + try ?get_mode(ModeRef) of + async -> + gen_server:cast(Pid, {'$olp_load',Msg}); + sync -> + case call(Pid, {'$olp_load',Msg}) of + ok -> + ok; + _Other -> + %% dropped or {error,busy} + ?observe(_Name,{dropped,1}), + ok + end; + drop -> + ?observe(_Name,{dropped,1}) + catch + %% if the ETS table doesn't exist (maybe because of a + %% process restart), we can only drop the event + _:_ -> ?observe(_Name,{dropped,1}) + end, + ok. + +info(Olp) -> + call(Olp, info). + +reset(Olp) -> + call(Olp, reset). + +stop({_Name,Pid,_ModRef}) -> + _ = gen_server:call(Pid, stop), + ok. + +set_opts({_Name,Pid,_ModRef}, Opts) -> + gen_server:call(Pid, {set_opts,Opts}). + +get_opts({_Name,Pid,_ModRef}) -> + gen_server:call(Pid, get_opts). + +get_default_opts() -> + #{sync_mode_qlen => ?SYNC_MODE_QLEN, + drop_mode_qlen => ?DROP_MODE_QLEN, + flush_qlen => ?FLUSH_QLEN, + burst_limit_enable => ?BURST_LIMIT_ENABLE, + burst_limit_max_count => ?BURST_LIMIT_MAX_COUNT, + burst_limit_window_time => ?BURST_LIMIT_WINDOW_TIME, + overload_kill_enable => ?OVERLOAD_KILL_ENABLE, + overload_kill_qlen => ?OVERLOAD_KILL_QLEN, + overload_kill_mem_size => ?OVERLOAD_KILL_MEM_SIZE, + overload_kill_restart_after => ?OVERLOAD_KILL_RESTART_AFTER}. + +restart(Fun) -> + erlang:display(restarting), + erlang:display(_ = Fun()), + ok. + +%%%=================================================================== +%%% gen_server callbacks +%%%=================================================================== + +init([Name,Module,Args,Options]) -> + register(Name, self()), + process_flag(message_queue_data, off_heap), + + ?init_test_hooks(), + ?start_observation(Name), + + try Module:init(Args) of + {ok,CBState} -> + try ets:new(Name, [public]) of + ModeRef -> + ?set_mode(ModeRef, async), + T0 = ?timestamp(), + proc_lib:init_ack({ok,self(),{Name,self(),ModeRef}}), + %% Storing options in state to avoid copying + %% (sending) the option data with each message + State0 = ?merge_with_stats( + Options#{id => Name, + module => Module, + mode_ref => ModeRef, + mode => async, + last_qlen => 0, + last_load_ts => T0, + burst_win_ts => T0, + burst_msg_count => 0, + cb_state => CBState}), + State = reset_restart_flag(State0), + gen_server:enter_loop(?MODULE, [], State) + catch + _:Error -> + unregister(Name), + proc_lib:init_ack(Error) + end; + Error -> + unregister(Name), + proc_lib:init_ack(Error) + catch + _:Error -> + unregister(Name), + proc_lib:init_ack(Error) + end. + +%% This is the synchronous load event. +handle_call({'$olp_load', Msg}, _From, State) -> + {Result,State1} = do_load(Msg, call, State), + %% Result == ok | dropped + {reply,Result, State1}; + +handle_call({set_opts,Opts0},_From,State) -> + Opts = maps:merge(get_default_opts(),Opts0), + case check_opts(Opts) of + ok -> + {reply, ok, maps:merge(State,Opts)}; + Error -> + {reply, Error, State} + end; + +handle_call(info, _From, State) -> + {reply, State, State}; + +handle_call(reset, _From, #{module:=Module,cb_state:=CBState}=State) -> + State1 = ?merge_with_stats(State), + CBState1 = try_callback_call(Module,reset_state,[CBState],CBState), + {reply, ok, State1#{last_qlen => 0, + last_load_ts => ?timestamp(), + cb_state => CBState1}}; + +handle_call(stop, _From, State) -> + {stop, {shutdown,stopped}, ok, State}; + +handle_call(Msg, From, #{module:=Module,cb_state:=CBState}=State) -> + case try_callback_call(Module,handle_call,[Msg, From, CBState]) of + {reply,Reply,CBState1} -> + {reply,Reply,State#{cb_state=>CBState1}}; + {reply,Reply,CBState1,Timeout}-> + {reply,Reply,State#{cb_state=>CBState1},Timeout}; + {noreply,CBState1} -> + {noreply,State#{cb_state=>CBState1}}; + {noreply,CBState1,Timeout} -> + {noreply,State#{cb_state=>CBState1},Timeout} + end. + +%% This is the asynchronous load event. +handle_cast({'$olp_load', Msg}, State) -> + {_Result,State1} = do_load(Msg, cast, State), + %% Result == ok | dropped + {noreply,State1}; + +handle_cast(Msg, #{module:=Module, cb_state:=CBState} = State) -> + case try_callback_call(Module,handle_cast,[Msg, CBState]) of + {noreply,CBState1} -> + {noreply,State#{cb_state=>CBState1}}; + {noreply,CBState1,Timeout} -> + {noreply,State#{cb_state=>CBState1},Timeout} + end. + +handle_info(Msg, #{module := Module, cb_state := CBState} = State) -> + case try_callback_call(Module,handle_info,[Msg, CBState]) of + {noreply,CBState1} -> + {noreply,State#{cb_state=>CBState1}}; + {noreply,CBState1,Timeout} -> + {noreply,State#{cb_state=>CBState1},Timeout} + end. + +terminate({shutdown,{overloaded,_QLen,_Mem}}, + #{id:=Name, module := Module, cb_state := CBState, + overload_kill_restart_after := RestartAfter} = State) -> + %% We're terminating because of an overload situation (see + %% kill_if_choked/3). + unregister(Name), %%!!!! to avoid error printout of callback crashed on stop + case try_callback_call(Module,terminate,[overloaded,CBState],ok) of + {ok,Fun} when is_function(Fun,0), is_integer(RestartAfter) -> + set_restart_flag(State), + timer:apply_after(RestartAfter,?MODULE,restart,[Fun]), + ok; + _ -> + ok + end, + ok; +terminate(Reason, #{id:=Name, module:=Module, cb_state:=CBState}) -> + _ = try_callback_call(Module,terminate,[Reason,CBState],ok), + unregister(Name), + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + + +%%%----------------------------------------------------------------- +%%% Internal functions +call({_Name, Pid, _ModeRef},Msg) -> + call(Pid, Msg); +call(Server, Msg) -> + try + gen_server:call(Server, Msg, ?DEFAULT_CALL_TIMEOUT) + catch + _:{timeout,_} -> {error,busy} + end. + +cast({_Name, Pid, _ModeRef},Msg) -> + gen_server:cast(Pid, Msg). + +%% check for overload between every event (and set Mode to async, +%% sync or drop accordingly), but never flush the whole mailbox +%% before LogWindowSize events have been handled +do_load(Msg, CallOrCast, State) -> + T1 = ?timestamp(), + + %% check if the process is getting overloaded, or if it's + %% recovering from overload (the check must be done for each + %% event to react quickly to large bursts of events and + %% to ensure that the handler can never end up in drop mode + %% with an empty mailbox, which would stop operation) + {Mode1,QLen,Mem,State1} = check_load(State), + + %% kill the handler if it can't keep up with the load + kill_if_choked(QLen, Mem, State1), + + if Mode1 == flush -> + flush(T1, State1); + true -> + handle_load(Mode1, T1, Msg, CallOrCast, State1) + end. + +%% this function is called by do_load/3 after an overload check +%% has been performed, where QLen > FlushQLen +flush(T1, State=#{id := _Name, last_load_ts := T0, mode_ref := ModeRef}) -> + %% flush load messages in the mailbox (a limited number in order + %% to not cause long delays) + NewFlushed = flush_load(?FLUSH_MAX_N), + + %% write info in log about flushed messages + State1=notify({flushed,NewFlushed},State), + + %% because of the receive loop when flushing messages, the + %% handler will be scheduled out often and the mailbox could + %% grow very large, so we'd better check the queue again here + {_,_QLen1} = process_info(self(), message_queue_len), + ?observe(_Name,{max_qlen,_QLen1}), + + %% Add 1 for the current log event + ?observe(_Name,{flushed,NewFlushed+1}), + + State2 = ?update_max_time(?diff_time(T1,T0),State1), + State3 = ?update_max_qlen(_QLen1,State2), + {dropped,?update_other(flushed,FLUSHED,NewFlushed, + State3#{mode => ?set_mode(ModeRef,async), + last_qlen => 0, + last_load_ts => T1})}. + +%% this function is called to actually handle the message +handle_load(Mode, T1, Msg, _CallOrCast, + State = #{id := _Name, + module := Module, + cb_state := CBState, + mode_ref := ModeRef, + last_qlen := LastQLen, + last_load_ts := T0}) -> + %% check if we need to limit the number of writes + %% during a burst of log events + {DoWrite,State1} = limit_burst(State), + + {Result,LastQLen1,CBState1} = + if DoWrite -> + ?observe(_Name,{_CallOrCast,1}), + {ok,CBS} = try_callback_call(Module,handle_load,[Msg,CBState]), + {ok,element(2, process_info(self(), message_queue_len)),CBS}; + true -> + ?observe(_Name,{flushed,1}), + {dropped,LastQLen,CBState} + end, + State2 = State1#{cb_state=>CBState1}, + + %% Check if the time since the previous load message is long + %% enough - and the queue length small enough - to assume the + %% mailbox has been emptied, and if so, reset mode to async. Note + %% that this is the best we can do to detect an idle handler + %% without setting a timer after each log call/cast. If the time + %% between two consecutive log events is fast and no new event + %% comes in after the last one, idle state won't be detected! + Time = ?diff_time(T1,T0), + State3 = + if (LastQLen1 < ?FILESYNC_OK_QLEN) andalso + (Time > ?IDLE_DETECT_TIME_USEC) -> + S = notify(idle,State2), + S#{mode => ?change_mode(ModeRef, Mode, async), + burst_msg_count => 0}; + true -> + State2#{mode => Mode} + end, + State4 = ?update_calls_or_casts(_CallOrCast,1,State3), + State5 = ?update_max_qlen(LastQLen1,State4), + State6 = + ?update_max_time(Time, + State5#{last_qlen := LastQLen1, + last_load_ts => T1}), + {Result,State6}. + + +%%%----------------------------------------------------------------- +%%% Check that the options are valid +check_opts(Options) when is_map(Options) -> + case do_check_opts(maps:to_list(Options)) of + ok -> + case overload_levels_ok(Options) of + true -> + ok; + false -> + Faulty = maps:with([sync_mode_qlen, + drop_mode_qlen, + flush_qlen],Options), + {error,{invalid_olp_levels,Faulty}} + end; + {error,Key,Value} -> + {error,{invalid_olp_config,#{Key=>Value}}} + end. + +do_check_opts([{sync_mode_qlen,N}|Options]) when is_integer(N) -> + do_check_opts(Options); +do_check_opts([{drop_mode_qlen,N}|Options]) when is_integer(N) -> + do_check_opts(Options); +do_check_opts([{flush_qlen,N}|Options]) when is_integer(N) -> + do_check_opts(Options); +do_check_opts([{burst_limit_enable,Bool}|Options]) when is_boolean(Bool) -> + do_check_opts(Options); +do_check_opts([{burst_limit_max_count,N}|Options]) when is_integer(N) -> + do_check_opts(Options); +do_check_opts([{burst_limit_window_time,N}|Options]) when is_integer(N) -> + do_check_opts(Options); +do_check_opts([{overload_kill_enable,Bool}|Options]) when is_boolean(Bool) -> + do_check_opts(Options); +do_check_opts([{overload_kill_qlen,N}|Options]) when is_integer(N) -> + do_check_opts(Options); +do_check_opts([{overload_kill_mem_size,N}|Options]) when is_integer(N) -> + do_check_opts(Options); +do_check_opts([{overload_kill_restart_after,NorA}|Options]) + when is_integer(NorA); NorA == infinity -> + do_check_opts(Options); +do_check_opts([{Key,Value}|_]) -> + {error,Key,Value}; +do_check_opts([]) -> + ok. + +set_restart_flag(#{id := Name, module := Module}) -> + Flag = list_to_atom(lists:concat([Module,"_",Name,"_restarting"])), + spawn(fun() -> + register(Flag, self()), + timer:sleep(infinity) + end), + ok. + +reset_restart_flag(#{id := Name, module := Module} = State) -> + Flag = list_to_atom(lists:concat([Module,"_",Name,"_restarting"])), + case whereis(Flag) of + undefined -> + State; + Pid -> + exit(Pid, kill), + notify(restart,State) + end. + +check_load(State = #{id:=_Name, mode_ref := ModeRef, mode := Mode, + sync_mode_qlen := SyncModeQLen, + drop_mode_qlen := DropModeQLen, + flush_qlen := FlushQLen}) -> + {_,Mem} = process_info(self(), memory), + ?observe(_Name,{max_mem,Mem}), + {_,QLen} = process_info(self(), message_queue_len), + ?observe(_Name,{max_qlen,QLen}), + %% When the handler process gets scheduled in, it's impossible + %% to predict the QLen. We could jump "up" arbitrarily from say + %% async to sync, async to drop, sync to flush, etc. However, when + %% the handler process manages the log events (without flushing), + %% one after the other, we will move "down" from drop to sync and + %% from sync to async. This way we don't risk getting stuck in + %% drop or sync mode with an empty mailbox. + {Mode1,_NewDrops,_NewFlushes} = + if + QLen >= FlushQLen -> + {flush, 0,1}; + QLen >= DropModeQLen -> + %% Note that drop mode will force load messages to + %% be dropped on the client side (never sent to + %% the handler). + IncDrops = if Mode == drop -> 0; true -> 1 end, + {?change_mode(ModeRef, Mode, drop), IncDrops,0}; + QLen >= SyncModeQLen -> + {?change_mode(ModeRef, Mode, sync), 0,0}; + true -> + {?change_mode(ModeRef, Mode, async), 0,0} + end, + State1 = ?update_other(drops,DROPS,_NewDrops,State), + State2 = maybe_notify_mode_change(Mode1,State1), + {Mode1, QLen, Mem, + ?update_other(flushes,FLUSHES,_NewFlushes, + State2#{last_qlen => QLen})}. + +limit_burst(#{burst_limit_enable := false}=State) -> + {true,State}; +limit_burst(#{burst_win_ts := BurstWinT0, + burst_msg_count := BurstMsgCount, + burst_limit_window_time := BurstLimitWinTime, + burst_limit_max_count := BurstLimitMaxCnt} = State) -> + if (BurstMsgCount >= BurstLimitMaxCnt) -> + %% the limit for allowed messages has been reached + BurstWinT1 = ?timestamp(), + case ?diff_time(BurstWinT1,BurstWinT0) of + BurstCheckTime when BurstCheckTime < (BurstLimitWinTime*1000) -> + %% we're still within the burst time frame + {false,?update_other(burst_drops,BURSTS,1,State)}; + _BurstCheckTime -> + %% burst time frame passed, reset counters + {true,State#{burst_win_ts => BurstWinT1, + burst_msg_count => 0}} + end; + true -> + %% the limit for allowed messages not yet reached + {true,State#{burst_win_ts => BurstWinT0, + burst_msg_count => BurstMsgCount+1}} + end. + +kill_if_choked(QLen, Mem, #{overload_kill_enable := KillIfOL, + overload_kill_qlen := OLKillQLen, + overload_kill_mem_size := OLKillMem}) -> + if KillIfOL andalso + ((QLen > OLKillQLen) orelse (Mem > OLKillMem)) -> + exit({shutdown,{overloaded,QLen,Mem}}); + true -> + ok + end. + +flush_load(Limit) -> + process_flag(priority, high), + Flushed = flush_load(0, Limit), + process_flag(priority, normal), + Flushed. + +flush_load(Limit, Limit) -> + Limit; +flush_load(N, Limit) -> + %% flush log events but leave other events, such as info, reset + %% and stop, so that these have a chance to be processed even + %% under heavy load + receive + {'$gen_cast',{'$olp_load',_}} -> + flush_load(N+1, Limit); + {'$gen_call',{Pid,MRef},{'$olp_load',_}} -> + Pid ! {MRef, dropped}, + flush_load(N+1, Limit) + after + 0 -> N + end. + +overload_levels_ok(Options) -> + SMQL = maps:get(sync_mode_qlen, Options, ?SYNC_MODE_QLEN), + DMQL = maps:get(drop_mode_qlen, Options, ?DROP_MODE_QLEN), + FQL = maps:get(flush_qlen, Options, ?FLUSH_QLEN), + (DMQL > 1) andalso (SMQL =< DMQL) andalso (DMQL =< FQL). + +maybe_notify_mode_change(drop,#{mode:=Mode0}=State) + when Mode0=/=drop -> + notify({mode_change,Mode0,drop},State); +maybe_notify_mode_change(Mode1,#{mode:=drop}=State) + when Mode1==async; Mode1==sync -> + notify({mode_change,drop,Mode1},State); +maybe_notify_mode_change(_,State) -> + State. + +notify(Note,#{module:=Module,cb_state:=CBState}=State) -> + CBState1 = try_callback_call(Module,notify,[Note,CBState],CBState), + State#{cb_state=>CBState1}. + +try_callback_call(Module, Function, Args) -> + try_callback_call(Module, Function, Args, '$no_default_return'). + +try_callback_call(Module, Function, Args, DefRet) -> + try apply(Module, Function, Args) + catch + throw:R -> R; + error:undef:S when DefRet=/='$no_default_return' -> + case S of + [{Module,Function,Args,_}|_] -> + DefRet; + _ -> + erlang:raise(error,undef,S) + end + end. -- cgit v1.2.3 From 2e4dbedd90b61d72dc841c5bee99564d0ad2f531 Mon Sep 17 00:00:00 2001 From: Siri Hansen Date: Fri, 2 Nov 2018 16:39:13 +0100 Subject: [logger] Overload protect logging from erts and remote nodes --- lib/kernel/src/logger_olp.erl | 213 ++++++++++++++++++++++++++++-------------- 1 file changed, 145 insertions(+), 68 deletions(-) (limited to 'lib/kernel/src/logger_olp.erl') diff --git a/lib/kernel/src/logger_olp.erl b/lib/kernel/src/logger_olp.erl index 6b76c78c73..7c6a1a8547 100644 --- a/lib/kernel/src/logger_olp.erl +++ b/lib/kernel/src/logger_olp.erl @@ -25,47 +25,62 @@ %% API -export([start_link/4, load/2, info/1, reset/1, stop/1, restart/1, - set_opts/2, get_opts/1, get_default_opts/0, is_alive/1, - call/2, cast/2]). + set_opts/2, get_opts/1, get_default_opts/0, get_pid/1, + call/2, cast/2, get_ref/0, get_ref/1]). %% gen_server and proc_lib callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). -%%%----------------------------------------------------------------- -%% -define(CONFIG_KEYS,[sync_mode_qlen, -%% drop_mode_qlen, -%% flush_qlen, -%% burst_limit_enable, -%% burst_limit_max_count, -%% burst_limit_window_time, -%% overload_kill_enable, -%% overload_kill_qlen, -%% overload_kill_mem_size, -%% overload_kill_restart_after]). +-define(OPT_KEYS,[sync_mode_qlen, + drop_mode_qlen, + flush_qlen, + burst_limit_enable, + burst_limit_max_count, + burst_limit_window_time, + overload_kill_enable, + overload_kill_qlen, + overload_kill_mem_size, + overload_kill_restart_after]). + +-export_type([olp_ref/0, options/0]). + +-opaque olp_ref() :: {atom(),pid(),ets:tid()}. + +-type options() :: #{sync_mode_qlen => integer(), + drop_mode_qlen => integer(), + flush_qlen => integer(), + burst_limit_enable => boolean(), + burst_limit_max_count => integer(), + burst_limit_window_time => integer(), + overload_kill_enable => boolean(), + overload_kill_qlen => integer(), + overload_kill_mem_size => integer(), + overload_kill_restart_after => integer()}. %%%----------------------------------------------------------------- %%% API -%-spec start_link(Name,Module,Args,Options) -> {ok,Pid,Olp} | {error,Reason}. +-spec start_link(Name,Module,Args,Options) -> {ok,Pid,Olp} | {error,Reason} when + Name :: atom(), + Module :: module(), + Args :: term(), + Options :: options(), + Pid :: pid(), + Olp :: olp_ref(), + Reason :: term(). start_link(Name,Module,Args,Options0) when is_map(Options0) -> Options = maps:merge(get_default_opts(),Options0), case check_opts(Options) of ok -> - case proc_lib:start_link(?MODULE,init, - [[Name,Module,Args,Options]]) of - {ok,Pid,Olp} -> - {ok,Pid,{Olp,Options}}; - Error -> - Error - end; + proc_lib:start_link(?MODULE,init,[[Name,Module,Args,Options]]); Error -> Error end. -is_alive({_Name,Pid,_ModeRef}) -> - is_process_alive(Pid). - +-spec load(Olp, Msg) -> ok when + Olp :: olp_ref(), + Msg :: term(). load({_Name,Pid,ModeRef},Msg) -> %% If the process is getting overloaded, the message will be %% synchronous instead of asynchronous (slows down the tempo of a @@ -92,22 +107,36 @@ load({_Name,Pid,ModeRef},Msg) -> end, ok. +-spec info(Olp) -> map() | {error, busy} when + Olp :: atom() | pid() | olp_ref(). info(Olp) -> call(Olp, info). +-spec reset(Olp) -> ok | {error, busy} when + Olp :: atom() | pid() | olp_ref(). reset(Olp) -> call(Olp, reset). +-spec stop(Olp) -> ok when + Olp :: atom() | pid() | olp_ref(). stop({_Name,Pid,_ModRef}) -> + stop(Pid); +stop(Pid) -> _ = gen_server:call(Pid, stop), ok. -set_opts({_Name,Pid,_ModRef}, Opts) -> - gen_server:call(Pid, {set_opts,Opts}). +-spec set_opts(Olp, Opts) -> ok | {error,term()} | {error, busy} when + Olp :: atom() | pid() | olp_ref(), + Opts :: options(). +set_opts(Olp, Opts) -> + call(Olp, {set_opts,Opts}). -get_opts({_Name,Pid,_ModRef}) -> - gen_server:call(Pid, get_opts). +-spec get_opts(Olp) -> options() | {error, busy} when + Olp :: atom() | pid() | olp_ref(). +get_opts(Olp) -> + call(Olp, get_opts). +-spec get_default_opts() -> options(). get_default_opts() -> #{sync_mode_qlen => ?SYNC_MODE_QLEN, drop_mode_qlen => ?DROP_MODE_QLEN, @@ -120,11 +149,30 @@ get_default_opts() -> overload_kill_mem_size => ?OVERLOAD_KILL_MEM_SIZE, overload_kill_restart_after => ?OVERLOAD_KILL_RESTART_AFTER}. +-spec restart(fun(() -> any())) -> ok. restart(Fun) -> - erlang:display(restarting), - erlang:display(_ = Fun()), + Result = + try Fun() + catch C:R:S -> + {error,{restart_failed,Fun,C,R,S}} + end, + ?LOG_INTERNAL(debug,[{logger_olp,restart}, + {result,Result}]), ok. +-spec get_ref() -> olp_ref(). +get_ref() -> + get(olp_ref). + +-spec get_ref(PidOrName) -> olp_ref() | {error, busy} when + PidOrName :: pid() | atom(). +get_ref(PidOrName) -> + call(PidOrName,get_ref). + +-spec get_pid(olp_ref()) -> pid(). +get_pid({_Name,Pid,_ModeRef}) -> + Pid. + %%%=================================================================== %%% gen_server callbacks %%%=================================================================== @@ -136,13 +184,15 @@ init([Name,Module,Args,Options]) -> ?init_test_hooks(), ?start_observation(Name), - try Module:init(Args) of - {ok,CBState} -> - try ets:new(Name, [public]) of - ModeRef -> + try ets:new(Name, [public]) of + ModeRef -> + OlpRef = {Name,self(),ModeRef}, + put(olp_ref,OlpRef), + try Module:init(Args) of + {ok,CBState} -> ?set_mode(ModeRef, async), T0 = ?timestamp(), - proc_lib:init_ack({ok,self(),{Name,self(),ModeRef}}), + proc_lib:init_ack({ok,self(),OlpRef}), %% Storing options in state to avoid copying %% (sending) the option data with each message State0 = ?merge_with_stats( @@ -156,15 +206,17 @@ init([Name,Module,Args,Options]) -> burst_msg_count => 0, cb_state => CBState}), State = reset_restart_flag(State0), - gen_server:enter_loop(?MODULE, [], State) + gen_server:enter_loop(?MODULE, [], State); + Error -> + _ = ets:delete(ModeRef), + unregister(Name), + proc_lib:init_ack(Error) catch _:Error -> + _ = ets:delete(ModeRef), unregister(Name), proc_lib:init_ack(Error) - end; - Error -> - unregister(Name), - proc_lib:init_ack(Error) + end catch _:Error -> unregister(Name), @@ -177,8 +229,11 @@ handle_call({'$olp_load', Msg}, _From, State) -> %% Result == ok | dropped {reply,Result, State1}; +handle_call(get_ref,_From,#{id:=Name,mode_ref:=ModeRef}=State) -> + {reply,{Name,self(),ModeRef},State}; + handle_call({set_opts,Opts0},_From,State) -> - Opts = maps:merge(get_default_opts(),Opts0), + Opts = maps:merge(maps:with(?OPT_KEYS,State),Opts0), case check_opts(Opts) of ok -> {reply, ok, maps:merge(State,Opts)}; @@ -186,6 +241,9 @@ handle_call({set_opts,Opts0},_From,State) -> {reply, Error, State} end; +handle_call(get_opts,_From,State) -> + {reply, maps:with(?OPT_KEYS,State), State}; + handle_call(info, _From, State) -> {reply, State, State}; @@ -214,7 +272,6 @@ handle_call(Msg, From, #{module:=Module,cb_state:=CBState}=State) -> %% This is the asynchronous load event. handle_cast({'$olp_load', Msg}, State) -> {_Result,State1} = do_load(Msg, cast, State), - %% Result == ok | dropped {noreply,State1}; handle_cast(Msg, #{module:=Module, cb_state:=CBState} = State) -> @@ -230,7 +287,10 @@ handle_info(Msg, #{module := Module, cb_state := CBState} = State) -> {noreply,CBState1} -> {noreply,State#{cb_state=>CBState1}}; {noreply,CBState1,Timeout} -> - {noreply,State#{cb_state=>CBState1},Timeout} + {noreply,State#{cb_state=>CBState1},Timeout}; + {load,CBState1} -> + {_,State1} = do_load(Msg, cast, State#{cb_state=>CBState1}), + {noreply,State1} end. terminate({shutdown,{overloaded,_QLen,_Mem}}, @@ -242,12 +302,11 @@ terminate({shutdown,{overloaded,_QLen,_Mem}}, case try_callback_call(Module,terminate,[overloaded,CBState],ok) of {ok,Fun} when is_function(Fun,0), is_integer(RestartAfter) -> set_restart_flag(State), - timer:apply_after(RestartAfter,?MODULE,restart,[Fun]), + _ = timer:apply_after(RestartAfter,?MODULE,restart,[Fun]), ok; _ -> ok - end, - ok; + end; terminate(Reason, #{id:=Name, module:=Module, cb_state:=CBState}) -> _ = try_callback_call(Module,terminate,[Reason,CBState],ok), unregister(Name), @@ -259,6 +318,8 @@ code_change(_OldVsn, State, _Extra) -> %%%----------------------------------------------------------------- %%% Internal functions +-spec call(Olp, term()) -> term() | {error,busy} when + Olp :: atom() | pid() | olp_ref(). call({_Name, Pid, _ModeRef},Msg) -> call(Pid, Msg); call(Server, Msg) -> @@ -268,6 +329,7 @@ call(Server, Msg) -> _:{timeout,_} -> {error,busy} end. +-spec cast(olp_ref(),term()) -> ok. cast({_Name, Pid, _ModeRef},Msg) -> gen_server:cast(Pid, Msg). @@ -276,26 +338,27 @@ cast({_Name, Pid, _ModeRef},Msg) -> %% before LogWindowSize events have been handled do_load(Msg, CallOrCast, State) -> T1 = ?timestamp(), + State1 = ?update_time(T1,State), %% check if the process is getting overloaded, or if it's %% recovering from overload (the check must be done for each %% event to react quickly to large bursts of events and %% to ensure that the handler can never end up in drop mode %% with an empty mailbox, which would stop operation) - {Mode1,QLen,Mem,State1} = check_load(State), + {Mode1,QLen,Mem,State2} = check_load(State1), %% kill the handler if it can't keep up with the load - kill_if_choked(QLen, Mem, State1), + kill_if_choked(QLen, Mem, State2), if Mode1 == flush -> - flush(T1, State1); + flush(T1, State2); true -> - handle_load(Mode1, T1, Msg, CallOrCast, State1) + handle_load(Mode1, T1, Msg, CallOrCast, State2) end. %% this function is called by do_load/3 after an overload check %% has been performed, where QLen > FlushQLen -flush(T1, State=#{id := _Name, last_load_ts := T0, mode_ref := ModeRef}) -> +flush(T1, State=#{id := _Name, mode := Mode, last_load_ts := T0, mode_ref := ModeRef}) -> %% flush load messages in the mailbox (a limited number in order %% to not cause long delays) NewFlushed = flush_load(?FLUSH_MAX_N), @@ -306,17 +369,18 @@ flush(T1, State=#{id := _Name, last_load_ts := T0, mode_ref := ModeRef}) -> %% because of the receive loop when flushing messages, the %% handler will be scheduled out often and the mailbox could %% grow very large, so we'd better check the queue again here - {_,_QLen1} = process_info(self(), message_queue_len), - ?observe(_Name,{max_qlen,_QLen1}), + {_,QLen1} = process_info(self(), message_queue_len), + ?observe(_Name,{max_qlen,QLen1}), %% Add 1 for the current log event ?observe(_Name,{flushed,NewFlushed+1}), State2 = ?update_max_time(?diff_time(T1,T0),State1), - State3 = ?update_max_qlen(_QLen1,State2), + State3 = ?update_max_qlen(QLen1,State2), + State4 = maybe_notify_mode_change(async,QLen1,State3), {dropped,?update_other(flushed,FLUSHED,NewFlushed, - State3#{mode => ?set_mode(ModeRef,async), - last_qlen => 0, + State4#{mode => ?change_mode(ModeRef,Mode,async), + last_qlen => QLen1, last_load_ts => T1})}. %% this function is called to actually handle the message @@ -334,7 +398,7 @@ handle_load(Mode, T1, Msg, _CallOrCast, {Result,LastQLen1,CBState1} = if DoWrite -> ?observe(_Name,{_CallOrCast,1}), - {ok,CBS} = try_callback_call(Module,handle_load,[Msg,CBState]), + CBS = try_callback_call(Module,handle_load,[Msg,CBState]), {ok,element(2, process_info(self(), message_queue_len)),CBS}; true -> ?observe(_Name,{flushed,1}), @@ -353,9 +417,10 @@ handle_load(Mode, T1, Msg, _CallOrCast, State3 = if (LastQLen1 < ?FILESYNC_OK_QLEN) andalso (Time > ?IDLE_DETECT_TIME_USEC) -> - S = notify(idle,State2), - S#{mode => ?change_mode(ModeRef, Mode, async), - burst_msg_count => 0}; + S1 = notify(idle,State2), + S2 = maybe_notify_mode_change(async,LastQLen1,S1), + S2#{mode => ?change_mode(ModeRef, Mode, async), + burst_msg_count => 0}; true -> State2#{mode => Mode} end, @@ -365,7 +430,14 @@ handle_load(Mode, T1, Msg, _CallOrCast, ?update_max_time(Time, State5#{last_qlen := LastQLen1, last_load_ts => T1}), - {Result,State6}. + State7 = case Result of + ok -> + S = ?update_freq(T1,State6), + ?update_other(writes,WRITES,1,S); + _ -> + State6 + end, + {Result,State7}. %%%----------------------------------------------------------------- @@ -452,7 +524,7 @@ check_load(State = #{id:=_Name, mode_ref := ModeRef, mode := Mode, QLen >= DropModeQLen -> %% Note that drop mode will force load messages to %% be dropped on the client side (never sent to - %% the handler). + %% the olp process). IncDrops = if Mode == drop -> 0; true -> 1 end, {?change_mode(ModeRef, Mode, drop), IncDrops,0}; QLen >= SyncModeQLen -> @@ -461,10 +533,11 @@ check_load(State = #{id:=_Name, mode_ref := ModeRef, mode := Mode, {?change_mode(ModeRef, Mode, async), 0,0} end, State1 = ?update_other(drops,DROPS,_NewDrops,State), - State2 = maybe_notify_mode_change(Mode1,State1), + State2 = ?update_max_qlen(QLen,State1), + State3 = maybe_notify_mode_change(Mode1,QLen,State2), {Mode1, QLen, Mem, ?update_other(flushes,FLUSHES,_NewFlushes, - State2#{last_qlen => QLen})}. + State3#{last_qlen => QLen})}. limit_burst(#{burst_limit_enable := false}=State) -> {true,State}; @@ -517,7 +590,11 @@ flush_load(N, Limit) -> flush_load(N+1, Limit); {'$gen_call',{Pid,MRef},{'$olp_load',_}} -> Pid ! {MRef, dropped}, - flush_load(N+1, Limit) + flush_load(N+1, Limit); + {log,_,_,_,_} -> + flush_load(N+1, Limit); + {log,_,_,_} -> + flush_load(N+1, Limit) after 0 -> N end. @@ -528,13 +605,13 @@ overload_levels_ok(Options) -> FQL = maps:get(flush_qlen, Options, ?FLUSH_QLEN), (DMQL > 1) andalso (SMQL =< DMQL) andalso (DMQL =< FQL). -maybe_notify_mode_change(drop,#{mode:=Mode0}=State) +maybe_notify_mode_change(drop,_QLen,#{mode:=Mode0}=State) when Mode0=/=drop -> notify({mode_change,Mode0,drop},State); -maybe_notify_mode_change(Mode1,#{mode:=drop}=State) +maybe_notify_mode_change(Mode1,_QLen,#{mode:=drop}=State) when Mode1==async; Mode1==sync -> notify({mode_change,drop,Mode1},State); -maybe_notify_mode_change(_,State) -> +maybe_notify_mode_change(_,_,State) -> State. notify(Note,#{module:=Module,cb_state:=CBState}=State) -> -- cgit v1.2.3 From 2c63eda781ccf12c2f35a94bc07b1b1013232483 Mon Sep 17 00:00:00 2001 From: Siri Hansen Date: Tue, 11 Dec 2018 12:03:28 +0100 Subject: [logger] Add logger_stress_SUITE --- lib/kernel/src/logger_olp.erl | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'lib/kernel/src/logger_olp.erl') diff --git a/lib/kernel/src/logger_olp.erl b/lib/kernel/src/logger_olp.erl index 7c6a1a8547..013cc6ce37 100644 --- a/lib/kernel/src/logger_olp.erl +++ b/lib/kernel/src/logger_olp.erl @@ -358,7 +358,7 @@ do_load(Msg, CallOrCast, State) -> %% this function is called by do_load/3 after an overload check %% has been performed, where QLen > FlushQLen -flush(T1, State=#{id := _Name, mode := Mode, last_load_ts := T0, mode_ref := ModeRef}) -> +flush(T1, State=#{id := _Name, mode := Mode, last_load_ts := _T0, mode_ref := ModeRef}) -> %% flush load messages in the mailbox (a limited number in order %% to not cause long delays) NewFlushed = flush_load(?FLUSH_MAX_N), @@ -375,7 +375,7 @@ flush(T1, State=#{id := _Name, mode := Mode, last_load_ts := T0, mode_ref := Mod %% Add 1 for the current log event ?observe(_Name,{flushed,NewFlushed+1}), - State2 = ?update_max_time(?diff_time(T1,T0),State1), + State2 = ?update_max_time(?diff_time(T1,_T0),State1), State3 = ?update_max_qlen(QLen1,State2), State4 = maybe_notify_mode_change(async,QLen1,State3), {dropped,?update_other(flushed,FLUSHED,NewFlushed, -- cgit v1.2.3 From 6211cead7be6f0871cfcccccdfc00dbeb466bcf2 Mon Sep 17 00:00:00 2001 From: Siri Hansen Date: Fri, 14 Dec 2018 13:05:32 +0100 Subject: [logger] Add API function for configuring logger proxy --- lib/kernel/src/logger_olp.erl | 11 +---------- 1 file changed, 1 insertion(+), 10 deletions(-) (limited to 'lib/kernel/src/logger_olp.erl') diff --git a/lib/kernel/src/logger_olp.erl b/lib/kernel/src/logger_olp.erl index 013cc6ce37..fbc2e81882 100644 --- a/lib/kernel/src/logger_olp.erl +++ b/lib/kernel/src/logger_olp.erl @@ -47,16 +47,7 @@ -opaque olp_ref() :: {atom(),pid(),ets:tid()}. --type options() :: #{sync_mode_qlen => integer(), - drop_mode_qlen => integer(), - flush_qlen => integer(), - burst_limit_enable => boolean(), - burst_limit_max_count => integer(), - burst_limit_window_time => integer(), - overload_kill_enable => boolean(), - overload_kill_qlen => integer(), - overload_kill_mem_size => integer(), - overload_kill_restart_after => integer()}. +-type options() :: logger:olp_config(). %%%----------------------------------------------------------------- %%% API -- cgit v1.2.3 From f68e91a318912a34f38ea437365c8843af54f66f Mon Sep 17 00:00:00 2001 From: Siri Hansen Date: Thu, 20 Dec 2018 16:49:45 +0100 Subject: [logger] Add idle timer in logger_olp --- lib/kernel/src/logger_olp.erl | 97 ++++++++++++++++++++----------------------- 1 file changed, 45 insertions(+), 52 deletions(-) (limited to 'lib/kernel/src/logger_olp.erl') diff --git a/lib/kernel/src/logger_olp.erl b/lib/kernel/src/logger_olp.erl index fbc2e81882..f35577d43a 100644 --- a/lib/kernel/src/logger_olp.erl +++ b/lib/kernel/src/logger_olp.erl @@ -188,6 +188,7 @@ init([Name,Module,Args,Options]) -> %% (sending) the option data with each message State0 = ?merge_with_stats( Options#{id => Name, + idle=> true, module => Module, mode_ref => ModeRef, mode => async, @@ -216,34 +217,35 @@ init([Name,Module,Args,Options]) -> %% This is the synchronous load event. handle_call({'$olp_load', Msg}, _From, State) -> - {Result,State1} = do_load(Msg, call, State), + {Result,State1} = do_load(Msg, call, State#{idle=>false}), %% Result == ok | dropped - {reply,Result, State1}; + reply_return(Result,State1); handle_call(get_ref,_From,#{id:=Name,mode_ref:=ModeRef}=State) -> - {reply,{Name,self(),ModeRef},State}; + reply_return({Name,self(),ModeRef},State); handle_call({set_opts,Opts0},_From,State) -> Opts = maps:merge(maps:with(?OPT_KEYS,State),Opts0), case check_opts(Opts) of ok -> - {reply, ok, maps:merge(State,Opts)}; + reply_return(ok, maps:merge(State,Opts)); Error -> - {reply, Error, State} + reply_return(Error, State) end; handle_call(get_opts,_From,State) -> - {reply, maps:with(?OPT_KEYS,State), State}; + reply_return(maps:with(?OPT_KEYS,State), State); handle_call(info, _From, State) -> - {reply, State, State}; + reply_return(State, State); handle_call(reset, _From, #{module:=Module,cb_state:=CBState}=State) -> State1 = ?merge_with_stats(State), CBState1 = try_callback_call(Module,reset_state,[CBState],CBState), - {reply, ok, State1#{last_qlen => 0, - last_load_ts => ?timestamp(), - cb_state => CBState1}}; + reply_return(ok, State1#{idle => true, + last_qlen => 0, + last_load_ts => ?timestamp(), + cb_state => CBState1}); handle_call(stop, _From, State) -> {stop, {shutdown,stopped}, ok, State}; @@ -251,37 +253,36 @@ handle_call(stop, _From, State) -> handle_call(Msg, From, #{module:=Module,cb_state:=CBState}=State) -> case try_callback_call(Module,handle_call,[Msg, From, CBState]) of {reply,Reply,CBState1} -> - {reply,Reply,State#{cb_state=>CBState1}}; - {reply,Reply,CBState1,Timeout}-> - {reply,Reply,State#{cb_state=>CBState1},Timeout}; + reply_return(Reply,State#{cb_state=>CBState1}); {noreply,CBState1} -> - {noreply,State#{cb_state=>CBState1}}; - {noreply,CBState1,Timeout} -> - {noreply,State#{cb_state=>CBState1},Timeout} + noreply_return(State#{cb_state=>CBState1}) end. %% This is the asynchronous load event. handle_cast({'$olp_load', Msg}, State) -> - {_Result,State1} = do_load(Msg, cast, State), - {noreply,State1}; + {_Result,State1} = do_load(Msg, cast, State#{idle=>false}), + noreply_return(State1); handle_cast(Msg, #{module:=Module, cb_state:=CBState} = State) -> case try_callback_call(Module,handle_cast,[Msg, CBState]) of {noreply,CBState1} -> - {noreply,State#{cb_state=>CBState1}}; - {noreply,CBState1,Timeout} -> - {noreply,State#{cb_state=>CBState1},Timeout} + noreply_return(State#{cb_state=>CBState1}) end. +handle_info(timeout, #{mode_ref:=_ModeRef, mode:=Mode} = State) -> + State1 = notify(idle,State), + State2 = maybe_notify_mode_change(async,State1), + {noreply, State2#{idle => true, + mode => ?change_mode(_ModeRef, Mode, async), + burst_msg_count => 0}}; handle_info(Msg, #{module := Module, cb_state := CBState} = State) -> case try_callback_call(Module,handle_info,[Msg, CBState]) of {noreply,CBState1} -> - {noreply,State#{cb_state=>CBState1}}; - {noreply,CBState1,Timeout} -> - {noreply,State#{cb_state=>CBState1},Timeout}; + noreply_return(State#{cb_state=>CBState1}); {load,CBState1} -> - {_,State1} = do_load(Msg, cast, State#{cb_state=>CBState1}), - {noreply,State1} + {_,State1} = do_load(Msg, cast, State#{idle=>false, + cb_state=>CBState1}), + noreply_return(State1) end. terminate({shutdown,{overloaded,_QLen,_Mem}}, @@ -368,7 +369,7 @@ flush(T1, State=#{id := _Name, mode := Mode, last_load_ts := _T0, mode_ref := Mo State2 = ?update_max_time(?diff_time(T1,_T0),State1), State3 = ?update_max_qlen(QLen1,State2), - State4 = maybe_notify_mode_change(async,QLen1,State3), + State4 = maybe_notify_mode_change(async,State3), {dropped,?update_other(flushed,FLUSHED,NewFlushed, State4#{mode => ?change_mode(ModeRef,Mode,async), last_qlen => QLen1, @@ -379,9 +380,8 @@ handle_load(Mode, T1, Msg, _CallOrCast, State = #{id := _Name, module := Module, cb_state := CBState, - mode_ref := ModeRef, last_qlen := LastQLen, - last_load_ts := T0}) -> + last_load_ts := _T0}) -> %% check if we need to limit the number of writes %% during a burst of log events {DoWrite,State1} = limit_burst(State), @@ -397,28 +397,11 @@ handle_load(Mode, T1, Msg, _CallOrCast, end, State2 = State1#{cb_state=>CBState1}, - %% Check if the time since the previous load message is long - %% enough - and the queue length small enough - to assume the - %% mailbox has been emptied, and if so, reset mode to async. Note - %% that this is the best we can do to detect an idle handler - %% without setting a timer after each log call/cast. If the time - %% between two consecutive log events is fast and no new event - %% comes in after the last one, idle state won't be detected! - Time = ?diff_time(T1,T0), - State3 = - if (LastQLen1 < ?FILESYNC_OK_QLEN) andalso - (Time > ?IDLE_DETECT_TIME_USEC) -> - S1 = notify(idle,State2), - S2 = maybe_notify_mode_change(async,LastQLen1,S1), - S2#{mode => ?change_mode(ModeRef, Mode, async), - burst_msg_count => 0}; - true -> - State2#{mode => Mode} - end, + State3 = State2#{mode => Mode}, State4 = ?update_calls_or_casts(_CallOrCast,1,State3), State5 = ?update_max_qlen(LastQLen1,State4), State6 = - ?update_max_time(Time, + ?update_max_time(?diff_time(T1,_T0), State5#{last_qlen := LastQLen1, last_load_ts => T1}), State7 = case Result of @@ -525,7 +508,7 @@ check_load(State = #{id:=_Name, mode_ref := ModeRef, mode := Mode, end, State1 = ?update_other(drops,DROPS,_NewDrops,State), State2 = ?update_max_qlen(QLen,State1), - State3 = maybe_notify_mode_change(Mode1,QLen,State2), + State3 = maybe_notify_mode_change(Mode1,State2), {Mode1, QLen, Mem, ?update_other(flushes,FLUSHES,_NewFlushes, State3#{last_qlen => QLen})}. @@ -596,13 +579,13 @@ overload_levels_ok(Options) -> FQL = maps:get(flush_qlen, Options, ?FLUSH_QLEN), (DMQL > 1) andalso (SMQL =< DMQL) andalso (DMQL =< FQL). -maybe_notify_mode_change(drop,_QLen,#{mode:=Mode0}=State) +maybe_notify_mode_change(drop,#{mode:=Mode0}=State) when Mode0=/=drop -> notify({mode_change,Mode0,drop},State); -maybe_notify_mode_change(Mode1,_QLen,#{mode:=drop}=State) +maybe_notify_mode_change(Mode1,#{mode:=drop}=State) when Mode1==async; Mode1==sync -> notify({mode_change,drop,Mode1},State); -maybe_notify_mode_change(_,_,State) -> +maybe_notify_mode_change(_,State) -> State. notify(Note,#{module:=Module,cb_state:=CBState}=State) -> @@ -624,3 +607,13 @@ try_callback_call(Module, Function, Args, DefRet) -> erlang:raise(error,undef,S) end end. + +noreply_return(#{idle:=true}=State) -> + {noreply,State}; +noreply_return(#{idle:=false}=State) -> + {noreply,State,?IDLE_DETECT_TIME}. + +reply_return(Reply,#{idle:=true}=State) -> + {reply,Reply,State}; +reply_return(Reply,#{idle:=false}=State) -> + {reply,Reply,State,?IDLE_DETECT_TIME}. -- cgit v1.2.3 From 4ec6d337da40599b5153c7878d329ca1045b6a35 Mon Sep 17 00:00:00 2001 From: Siri Hansen Date: Fri, 21 Dec 2018 12:35:07 +0100 Subject: [logger] Allow logger_olp callbacks to return {stop,...} --- lib/kernel/src/logger_olp.erl | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) (limited to 'lib/kernel/src/logger_olp.erl') diff --git a/lib/kernel/src/logger_olp.erl b/lib/kernel/src/logger_olp.erl index f35577d43a..0f9314b4a3 100644 --- a/lib/kernel/src/logger_olp.erl +++ b/lib/kernel/src/logger_olp.erl @@ -255,7 +255,11 @@ handle_call(Msg, From, #{module:=Module,cb_state:=CBState}=State) -> {reply,Reply,CBState1} -> reply_return(Reply,State#{cb_state=>CBState1}); {noreply,CBState1} -> - noreply_return(State#{cb_state=>CBState1}) + noreply_return(State#{cb_state=>CBState1}); + {stop, Reason, Reply, CBState1} -> + {stop, Reason, Reply, State#{cb_state=>CBState1}}; + {stop, Reason, CBState1} -> + {stop, Reason, State#{cb_state=>CBState1}} end. %% This is the asynchronous load event. @@ -266,7 +270,9 @@ handle_cast({'$olp_load', Msg}, State) -> handle_cast(Msg, #{module:=Module, cb_state:=CBState} = State) -> case try_callback_call(Module,handle_cast,[Msg, CBState]) of {noreply,CBState1} -> - noreply_return(State#{cb_state=>CBState1}) + noreply_return(State#{cb_state=>CBState1}); + {stop, Reason, CBState1} -> + {stop, Reason, State#{cb_state=>CBState1}} end. handle_info(timeout, #{mode_ref:=_ModeRef, mode:=Mode} = State) -> @@ -279,6 +285,8 @@ handle_info(Msg, #{module := Module, cb_state := CBState} = State) -> case try_callback_call(Module,handle_info,[Msg, CBState]) of {noreply,CBState1} -> noreply_return(State#{cb_state=>CBState1}); + {stop, Reason, CBState1} -> + {stop, Reason, State#{cb_state=>CBState1}}; {load,CBState1} -> {_,State1} = do_load(Msg, cast, State#{idle=>false, cb_state=>CBState1}), -- cgit v1.2.3 From 06b9756aceb200c359015a088f86e8afc097dc97 Mon Sep 17 00:00:00 2001 From: Siri Hansen Date: Fri, 21 Dec 2018 12:35:39 +0100 Subject: [logger] Move out overload protection macros from logger_h_common.hrl The new file logger_olp.hrl is added. --- lib/kernel/src/logger_olp.erl | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) (limited to 'lib/kernel/src/logger_olp.erl') diff --git a/lib/kernel/src/logger_olp.erl b/lib/kernel/src/logger_olp.erl index 0f9314b4a3..009280a9c9 100644 --- a/lib/kernel/src/logger_olp.erl +++ b/lib/kernel/src/logger_olp.erl @@ -20,7 +20,7 @@ -module(logger_olp). -behaviour(gen_server). --include("logger_h_common.hrl"). +-include("logger_olp.hrl"). -include("logger_internal.hrl"). %% API @@ -172,7 +172,6 @@ init([Name,Module,Args,Options]) -> register(Name, self()), process_flag(message_queue_data, off_heap), - ?init_test_hooks(), ?start_observation(Name), try ets:new(Name, [public]) of @@ -324,7 +323,7 @@ call({_Name, Pid, _ModeRef},Msg) -> call(Pid, Msg); call(Server, Msg) -> try - gen_server:call(Server, Msg, ?DEFAULT_CALL_TIMEOUT) + gen_server:call(Server, Msg) catch _:{timeout,_} -> {error,busy} end. -- cgit v1.2.3