diff options
Diffstat (limited to 'lib/kernel/src/logger_olp.erl')
-rw-r--r-- | lib/kernel/src/logger_olp.erl | 213 |
1 files changed, 145 insertions, 68 deletions
diff --git a/lib/kernel/src/logger_olp.erl b/lib/kernel/src/logger_olp.erl index 6b76c78c73..7c6a1a8547 100644 --- a/lib/kernel/src/logger_olp.erl +++ b/lib/kernel/src/logger_olp.erl @@ -25,47 +25,62 @@ %% API -export([start_link/4, load/2, info/1, reset/1, stop/1, restart/1, - set_opts/2, get_opts/1, get_default_opts/0, is_alive/1, - call/2, cast/2]). + set_opts/2, get_opts/1, get_default_opts/0, get_pid/1, + call/2, cast/2, get_ref/0, get_ref/1]). %% gen_server and proc_lib callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). -%%%----------------------------------------------------------------- -%% -define(CONFIG_KEYS,[sync_mode_qlen, -%% drop_mode_qlen, -%% flush_qlen, -%% burst_limit_enable, -%% burst_limit_max_count, -%% burst_limit_window_time, -%% overload_kill_enable, -%% overload_kill_qlen, -%% overload_kill_mem_size, -%% overload_kill_restart_after]). +-define(OPT_KEYS,[sync_mode_qlen, + drop_mode_qlen, + flush_qlen, + burst_limit_enable, + burst_limit_max_count, + burst_limit_window_time, + overload_kill_enable, + overload_kill_qlen, + overload_kill_mem_size, + overload_kill_restart_after]). + +-export_type([olp_ref/0, options/0]). + +-opaque olp_ref() :: {atom(),pid(),ets:tid()}. + +-type options() :: #{sync_mode_qlen => integer(), + drop_mode_qlen => integer(), + flush_qlen => integer(), + burst_limit_enable => boolean(), + burst_limit_max_count => integer(), + burst_limit_window_time => integer(), + overload_kill_enable => boolean(), + overload_kill_qlen => integer(), + overload_kill_mem_size => integer(), + overload_kill_restart_after => integer()}. %%%----------------------------------------------------------------- %%% API -%-spec start_link(Name,Module,Args,Options) -> {ok,Pid,Olp} | {error,Reason}. +-spec start_link(Name,Module,Args,Options) -> {ok,Pid,Olp} | {error,Reason} when + Name :: atom(), + Module :: module(), + Args :: term(), + Options :: options(), + Pid :: pid(), + Olp :: olp_ref(), + Reason :: term(). start_link(Name,Module,Args,Options0) when is_map(Options0) -> Options = maps:merge(get_default_opts(),Options0), case check_opts(Options) of ok -> - case proc_lib:start_link(?MODULE,init, - [[Name,Module,Args,Options]]) of - {ok,Pid,Olp} -> - {ok,Pid,{Olp,Options}}; - Error -> - Error - end; + proc_lib:start_link(?MODULE,init,[[Name,Module,Args,Options]]); Error -> Error end. -is_alive({_Name,Pid,_ModeRef}) -> - is_process_alive(Pid). - +-spec load(Olp, Msg) -> ok when + Olp :: olp_ref(), + Msg :: term(). load({_Name,Pid,ModeRef},Msg) -> %% If the process is getting overloaded, the message will be %% synchronous instead of asynchronous (slows down the tempo of a @@ -92,22 +107,36 @@ load({_Name,Pid,ModeRef},Msg) -> end, ok. +-spec info(Olp) -> map() | {error, busy} when + Olp :: atom() | pid() | olp_ref(). info(Olp) -> call(Olp, info). +-spec reset(Olp) -> ok | {error, busy} when + Olp :: atom() | pid() | olp_ref(). reset(Olp) -> call(Olp, reset). +-spec stop(Olp) -> ok when + Olp :: atom() | pid() | olp_ref(). stop({_Name,Pid,_ModRef}) -> + stop(Pid); +stop(Pid) -> _ = gen_server:call(Pid, stop), ok. -set_opts({_Name,Pid,_ModRef}, Opts) -> - gen_server:call(Pid, {set_opts,Opts}). +-spec set_opts(Olp, Opts) -> ok | {error,term()} | {error, busy} when + Olp :: atom() | pid() | olp_ref(), + Opts :: options(). +set_opts(Olp, Opts) -> + call(Olp, {set_opts,Opts}). -get_opts({_Name,Pid,_ModRef}) -> - gen_server:call(Pid, get_opts). +-spec get_opts(Olp) -> options() | {error, busy} when + Olp :: atom() | pid() | olp_ref(). +get_opts(Olp) -> + call(Olp, get_opts). +-spec get_default_opts() -> options(). get_default_opts() -> #{sync_mode_qlen => ?SYNC_MODE_QLEN, drop_mode_qlen => ?DROP_MODE_QLEN, @@ -120,11 +149,30 @@ get_default_opts() -> overload_kill_mem_size => ?OVERLOAD_KILL_MEM_SIZE, overload_kill_restart_after => ?OVERLOAD_KILL_RESTART_AFTER}. +-spec restart(fun(() -> any())) -> ok. restart(Fun) -> - erlang:display(restarting), - erlang:display(_ = Fun()), + Result = + try Fun() + catch C:R:S -> + {error,{restart_failed,Fun,C,R,S}} + end, + ?LOG_INTERNAL(debug,[{logger_olp,restart}, + {result,Result}]), ok. +-spec get_ref() -> olp_ref(). +get_ref() -> + get(olp_ref). + +-spec get_ref(PidOrName) -> olp_ref() | {error, busy} when + PidOrName :: pid() | atom(). +get_ref(PidOrName) -> + call(PidOrName,get_ref). + +-spec get_pid(olp_ref()) -> pid(). +get_pid({_Name,Pid,_ModeRef}) -> + Pid. + %%%=================================================================== %%% gen_server callbacks %%%=================================================================== @@ -136,13 +184,15 @@ init([Name,Module,Args,Options]) -> ?init_test_hooks(), ?start_observation(Name), - try Module:init(Args) of - {ok,CBState} -> - try ets:new(Name, [public]) of - ModeRef -> + try ets:new(Name, [public]) of + ModeRef -> + OlpRef = {Name,self(),ModeRef}, + put(olp_ref,OlpRef), + try Module:init(Args) of + {ok,CBState} -> ?set_mode(ModeRef, async), T0 = ?timestamp(), - proc_lib:init_ack({ok,self(),{Name,self(),ModeRef}}), + proc_lib:init_ack({ok,self(),OlpRef}), %% Storing options in state to avoid copying %% (sending) the option data with each message State0 = ?merge_with_stats( @@ -156,15 +206,17 @@ init([Name,Module,Args,Options]) -> burst_msg_count => 0, cb_state => CBState}), State = reset_restart_flag(State0), - gen_server:enter_loop(?MODULE, [], State) + gen_server:enter_loop(?MODULE, [], State); + Error -> + _ = ets:delete(ModeRef), + unregister(Name), + proc_lib:init_ack(Error) catch _:Error -> + _ = ets:delete(ModeRef), unregister(Name), proc_lib:init_ack(Error) - end; - Error -> - unregister(Name), - proc_lib:init_ack(Error) + end catch _:Error -> unregister(Name), @@ -177,8 +229,11 @@ handle_call({'$olp_load', Msg}, _From, State) -> %% Result == ok | dropped {reply,Result, State1}; +handle_call(get_ref,_From,#{id:=Name,mode_ref:=ModeRef}=State) -> + {reply,{Name,self(),ModeRef},State}; + handle_call({set_opts,Opts0},_From,State) -> - Opts = maps:merge(get_default_opts(),Opts0), + Opts = maps:merge(maps:with(?OPT_KEYS,State),Opts0), case check_opts(Opts) of ok -> {reply, ok, maps:merge(State,Opts)}; @@ -186,6 +241,9 @@ handle_call({set_opts,Opts0},_From,State) -> {reply, Error, State} end; +handle_call(get_opts,_From,State) -> + {reply, maps:with(?OPT_KEYS,State), State}; + handle_call(info, _From, State) -> {reply, State, State}; @@ -214,7 +272,6 @@ handle_call(Msg, From, #{module:=Module,cb_state:=CBState}=State) -> %% This is the asynchronous load event. handle_cast({'$olp_load', Msg}, State) -> {_Result,State1} = do_load(Msg, cast, State), - %% Result == ok | dropped {noreply,State1}; handle_cast(Msg, #{module:=Module, cb_state:=CBState} = State) -> @@ -230,7 +287,10 @@ handle_info(Msg, #{module := Module, cb_state := CBState} = State) -> {noreply,CBState1} -> {noreply,State#{cb_state=>CBState1}}; {noreply,CBState1,Timeout} -> - {noreply,State#{cb_state=>CBState1},Timeout} + {noreply,State#{cb_state=>CBState1},Timeout}; + {load,CBState1} -> + {_,State1} = do_load(Msg, cast, State#{cb_state=>CBState1}), + {noreply,State1} end. terminate({shutdown,{overloaded,_QLen,_Mem}}, @@ -242,12 +302,11 @@ terminate({shutdown,{overloaded,_QLen,_Mem}}, case try_callback_call(Module,terminate,[overloaded,CBState],ok) of {ok,Fun} when is_function(Fun,0), is_integer(RestartAfter) -> set_restart_flag(State), - timer:apply_after(RestartAfter,?MODULE,restart,[Fun]), + _ = timer:apply_after(RestartAfter,?MODULE,restart,[Fun]), ok; _ -> ok - end, - ok; + end; terminate(Reason, #{id:=Name, module:=Module, cb_state:=CBState}) -> _ = try_callback_call(Module,terminate,[Reason,CBState],ok), unregister(Name), @@ -259,6 +318,8 @@ code_change(_OldVsn, State, _Extra) -> %%%----------------------------------------------------------------- %%% Internal functions +-spec call(Olp, term()) -> term() | {error,busy} when + Olp :: atom() | pid() | olp_ref(). call({_Name, Pid, _ModeRef},Msg) -> call(Pid, Msg); call(Server, Msg) -> @@ -268,6 +329,7 @@ call(Server, Msg) -> _:{timeout,_} -> {error,busy} end. +-spec cast(olp_ref(),term()) -> ok. cast({_Name, Pid, _ModeRef},Msg) -> gen_server:cast(Pid, Msg). @@ -276,26 +338,27 @@ cast({_Name, Pid, _ModeRef},Msg) -> %% before LogWindowSize events have been handled do_load(Msg, CallOrCast, State) -> T1 = ?timestamp(), + State1 = ?update_time(T1,State), %% check if the process is getting overloaded, or if it's %% recovering from overload (the check must be done for each %% event to react quickly to large bursts of events and %% to ensure that the handler can never end up in drop mode %% with an empty mailbox, which would stop operation) - {Mode1,QLen,Mem,State1} = check_load(State), + {Mode1,QLen,Mem,State2} = check_load(State1), %% kill the handler if it can't keep up with the load - kill_if_choked(QLen, Mem, State1), + kill_if_choked(QLen, Mem, State2), if Mode1 == flush -> - flush(T1, State1); + flush(T1, State2); true -> - handle_load(Mode1, T1, Msg, CallOrCast, State1) + handle_load(Mode1, T1, Msg, CallOrCast, State2) end. %% this function is called by do_load/3 after an overload check %% has been performed, where QLen > FlushQLen -flush(T1, State=#{id := _Name, last_load_ts := T0, mode_ref := ModeRef}) -> +flush(T1, State=#{id := _Name, mode := Mode, last_load_ts := T0, mode_ref := ModeRef}) -> %% flush load messages in the mailbox (a limited number in order %% to not cause long delays) NewFlushed = flush_load(?FLUSH_MAX_N), @@ -306,17 +369,18 @@ flush(T1, State=#{id := _Name, last_load_ts := T0, mode_ref := ModeRef}) -> %% because of the receive loop when flushing messages, the %% handler will be scheduled out often and the mailbox could %% grow very large, so we'd better check the queue again here - {_,_QLen1} = process_info(self(), message_queue_len), - ?observe(_Name,{max_qlen,_QLen1}), + {_,QLen1} = process_info(self(), message_queue_len), + ?observe(_Name,{max_qlen,QLen1}), %% Add 1 for the current log event ?observe(_Name,{flushed,NewFlushed+1}), State2 = ?update_max_time(?diff_time(T1,T0),State1), - State3 = ?update_max_qlen(_QLen1,State2), + State3 = ?update_max_qlen(QLen1,State2), + State4 = maybe_notify_mode_change(async,QLen1,State3), {dropped,?update_other(flushed,FLUSHED,NewFlushed, - State3#{mode => ?set_mode(ModeRef,async), - last_qlen => 0, + State4#{mode => ?change_mode(ModeRef,Mode,async), + last_qlen => QLen1, last_load_ts => T1})}. %% this function is called to actually handle the message @@ -334,7 +398,7 @@ handle_load(Mode, T1, Msg, _CallOrCast, {Result,LastQLen1,CBState1} = if DoWrite -> ?observe(_Name,{_CallOrCast,1}), - {ok,CBS} = try_callback_call(Module,handle_load,[Msg,CBState]), + CBS = try_callback_call(Module,handle_load,[Msg,CBState]), {ok,element(2, process_info(self(), message_queue_len)),CBS}; true -> ?observe(_Name,{flushed,1}), @@ -353,9 +417,10 @@ handle_load(Mode, T1, Msg, _CallOrCast, State3 = if (LastQLen1 < ?FILESYNC_OK_QLEN) andalso (Time > ?IDLE_DETECT_TIME_USEC) -> - S = notify(idle,State2), - S#{mode => ?change_mode(ModeRef, Mode, async), - burst_msg_count => 0}; + S1 = notify(idle,State2), + S2 = maybe_notify_mode_change(async,LastQLen1,S1), + S2#{mode => ?change_mode(ModeRef, Mode, async), + burst_msg_count => 0}; true -> State2#{mode => Mode} end, @@ -365,7 +430,14 @@ handle_load(Mode, T1, Msg, _CallOrCast, ?update_max_time(Time, State5#{last_qlen := LastQLen1, last_load_ts => T1}), - {Result,State6}. + State7 = case Result of + ok -> + S = ?update_freq(T1,State6), + ?update_other(writes,WRITES,1,S); + _ -> + State6 + end, + {Result,State7}. %%%----------------------------------------------------------------- @@ -452,7 +524,7 @@ check_load(State = #{id:=_Name, mode_ref := ModeRef, mode := Mode, QLen >= DropModeQLen -> %% Note that drop mode will force load messages to %% be dropped on the client side (never sent to - %% the handler). + %% the olp process). IncDrops = if Mode == drop -> 0; true -> 1 end, {?change_mode(ModeRef, Mode, drop), IncDrops,0}; QLen >= SyncModeQLen -> @@ -461,10 +533,11 @@ check_load(State = #{id:=_Name, mode_ref := ModeRef, mode := Mode, {?change_mode(ModeRef, Mode, async), 0,0} end, State1 = ?update_other(drops,DROPS,_NewDrops,State), - State2 = maybe_notify_mode_change(Mode1,State1), + State2 = ?update_max_qlen(QLen,State1), + State3 = maybe_notify_mode_change(Mode1,QLen,State2), {Mode1, QLen, Mem, ?update_other(flushes,FLUSHES,_NewFlushes, - State2#{last_qlen => QLen})}. + State3#{last_qlen => QLen})}. limit_burst(#{burst_limit_enable := false}=State) -> {true,State}; @@ -517,7 +590,11 @@ flush_load(N, Limit) -> flush_load(N+1, Limit); {'$gen_call',{Pid,MRef},{'$olp_load',_}} -> Pid ! {MRef, dropped}, - flush_load(N+1, Limit) + flush_load(N+1, Limit); + {log,_,_,_,_} -> + flush_load(N+1, Limit); + {log,_,_,_} -> + flush_load(N+1, Limit) after 0 -> N end. @@ -528,13 +605,13 @@ overload_levels_ok(Options) -> FQL = maps:get(flush_qlen, Options, ?FLUSH_QLEN), (DMQL > 1) andalso (SMQL =< DMQL) andalso (DMQL =< FQL). -maybe_notify_mode_change(drop,#{mode:=Mode0}=State) +maybe_notify_mode_change(drop,_QLen,#{mode:=Mode0}=State) when Mode0=/=drop -> notify({mode_change,Mode0,drop},State); -maybe_notify_mode_change(Mode1,#{mode:=drop}=State) +maybe_notify_mode_change(Mode1,_QLen,#{mode:=drop}=State) when Mode1==async; Mode1==sync -> notify({mode_change,drop,Mode1},State); -maybe_notify_mode_change(_,State) -> +maybe_notify_mode_change(_,_,State) -> State. notify(Note,#{module:=Module,cb_state:=CBState}=State) -> |