From 2e4dbedd90b61d72dc841c5bee99564d0ad2f531 Mon Sep 17 00:00:00 2001 From: Siri Hansen Date: Fri, 2 Nov 2018 16:39:13 +0100 Subject: [logger] Overload protect logging from erts and remote nodes --- lib/kernel/src/Makefile | 3 + lib/kernel/src/kernel.app.src | 2 + lib/kernel/src/logger.erl | 51 ++++++--- lib/kernel/src/logger_h_common.erl | 19 ++-- lib/kernel/src/logger_h_common.hrl | 29 ++++- lib/kernel/src/logger_olp.erl | 213 +++++++++++++++++++++++++------------ lib/kernel/src/logger_proxy.erl | 152 ++++++++++++++++++++++++++ lib/kernel/src/logger_server.erl | 20 +++- lib/kernel/src/logger_sup.erl | 4 +- 9 files changed, 391 insertions(+), 102 deletions(-) create mode 100644 lib/kernel/src/logger_proxy.erl (limited to 'lib/kernel/src') diff --git a/lib/kernel/src/Makefile b/lib/kernel/src/Makefile index c076726bf4..c3fee02334 100644 --- a/lib/kernel/src/Makefile +++ b/lib/kernel/src/Makefile @@ -119,6 +119,7 @@ MODULES = \ logger_filters \ logger_formatter \ logger_olp \ + logger_proxy \ logger_server \ logger_simple_h \ logger_sup \ @@ -280,6 +281,8 @@ $(EBIN)/logger_config.beam: logger_internal.hrl ../include/logger.hrl $(EBIN)/logger_disk_log_h.beam: logger_h_common.hrl logger_internal.hrl ../include/logger.hrl ../include/file.hrl $(EBIN)/logger_filters.beam: logger_internal.hrl ../include/logger.hrl $(EBIN)/logger_formatter.beam: logger_internal.hrl ../include/logger.hrl +$(EBIN)/logger_olp.beam: logger_h_common.hrl logger_internal.hrl +$(EBIN)/logger_proxy.beam: logger_internal.hrl $(EBIN)/logger_server.beam: logger_internal.hrl ../include/logger.hrl $(EBIN)/logger_simple_h.beam: logger_internal.hrl ../include/logger.hrl $(EBIN)/logger_std_h.beam: logger_h_common.hrl logger_internal.hrl ../include/logger.hrl ../include/file.hrl diff --git a/lib/kernel/src/kernel.app.src b/lib/kernel/src/kernel.app.src index fe073621c8..a1d9e8e215 100644 --- a/lib/kernel/src/kernel.app.src +++ b/lib/kernel/src/kernel.app.src @@ -68,6 +68,8 @@ logger_formatter, logger_h_common, logger_handler_watcher, + logger_olp, + logger_proxy, logger_server, logger_simple_h, logger_std_h, diff --git a/lib/kernel/src/logger.erl b/lib/kernel/src/logger.erl index 6762998d4f..1611d489e6 100644 --- a/lib/kernel/src/logger.erl +++ b/lib/kernel/src/logger.erl @@ -672,6 +672,17 @@ init_kernel_handlers(Env) -> %% This function is responsible for resolving the handler config %% and then starting the correct handlers. This is done after the %% kernel supervisor tree has been started as it needs the logger_sup. +add_handlers(kernel) -> + Env = get_logger_env(kernel), + case get_proxy_opts(Env) of + undefined -> + add_handlers(kernel,Env); + Opts -> + case logger_olp:set_opts(logger_proxy,Opts) of + ok -> add_handlers(kernel,Env); + {error, Reason} -> {error,{bad_proxy_config,Reason}} + end + end; add_handlers(App) when is_atom(App) -> add_handlers(App,get_logger_env(App)); add_handlers(HandlerConfig) -> @@ -729,6 +740,8 @@ check_logger_config(kernel,[{filters,_,_}|Env]) -> check_logger_config(kernel,Env); check_logger_config(kernel,[{module_level,_,_}|Env]) -> check_logger_config(kernel,Env); +check_logger_config(kernel,[{proxy,_}|Env]) -> + check_logger_config(kernel,Env); check_logger_config(_,Bad) -> throw(Bad). @@ -784,6 +797,13 @@ get_primary_filters(Env) -> _ -> throw({multiple_filters,Env}) end. +get_proxy_opts(Env) -> + case [P || P={proxy,_} <- Env] of + [{proxy,Opts}] -> Opts; + [] -> undefined; + _ -> throw({multiple_proxies,Env}) + end. + %% This function looks at the kernel logger environment %% and updates it so that the correct logger is configured init_default_config(Type,Env) when Type==standard_io; @@ -878,42 +898,43 @@ log_allowed(Location,Level,Msg,Meta0) when is_map(Meta0) -> %% (function or macro). Meta = add_default_metadata( maps:merge(Location,maps:merge(proc_meta(),Meta0))), + Tid = tid(), case node(maps:get(gl,Meta)) of Node when Node=/=node() -> - log_remote(Node,Level,Msg,Meta), - do_log_allowed(Level,Msg,Meta); + log_remote(Node,Level,Msg,Meta,Tid), + do_log_allowed(Level,Msg,Meta,Tid); _ -> - do_log_allowed(Level,Msg,Meta) + do_log_allowed(Level,Msg,Meta,Tid) end. -do_log_allowed(Level,{Format,Args}=Msg,Meta) +do_log_allowed(Level,{Format,Args}=Msg,Meta,Tid) when ?IS_LEVEL(Level), is_list(Format), is_list(Args), is_map(Meta) -> - logger_backend:log_allowed(#{level=>Level,msg=>Msg,meta=>Meta},tid()); -do_log_allowed(Level,Report,Meta) + logger_backend:log_allowed(#{level=>Level,msg=>Msg,meta=>Meta},Tid); +do_log_allowed(Level,Report,Meta,Tid) when ?IS_LEVEL(Level), ?IS_REPORT(Report), is_map(Meta) -> logger_backend:log_allowed(#{level=>Level,msg=>{report,Report},meta=>Meta}, - tid()); -do_log_allowed(Level,String,Meta) + Tid); +do_log_allowed(Level,String,Meta,Tid) when ?IS_LEVEL(Level), ?IS_STRING(String), is_map(Meta) -> logger_backend:log_allowed(#{level=>Level,msg=>{string,String},meta=>Meta}, - tid()). + Tid). tid() -> ets:whereis(?LOGGER_TABLE). -log_remote(Node,Level,{Format,Args},Meta) -> - log_remote(Node,{log,Level,Format,Args,Meta}); -log_remote(Node,Level,Msg,Meta) -> - log_remote(Node,{log,Level,Msg,Meta}). +log_remote(Node,Level,{Format,Args},Meta,Tid) -> + log_remote(Node,{log,Level,Format,Args,Meta},Tid); +log_remote(Node,Level,Msg,Meta,Tid) -> + log_remote(Node,{log,Level,Msg,Meta},Tid). -log_remote(Node,Request) -> - {logger,Node} ! Request, +log_remote(Node,Request,Tid) -> + logger_proxy:log(logger_server:get_proxy_ref(Tid),{remote,Node,Request}), ok. add_default_metadata(Meta) -> diff --git a/lib/kernel/src/logger_h_common.erl b/lib/kernel/src/logger_h_common.erl index dd8ace8249..6f55c5997d 100644 --- a/lib/kernel/src/logger_h_common.erl +++ b/lib/kernel/src/logger_h_common.erl @@ -178,7 +178,7 @@ changing_config(SetOrUpdate, log(LogEvent, Config = #{config := #{olp:=Olp}}) -> %% if the handler has crashed, we must drop this event %% and hope the handler restarts so we can try again - true = logger_olp:is_alive(Olp), + true = is_process_alive(logger_olp:get_pid(Olp)), Bin = log_to_binary(LogEvent, Config), logger_olp:load(Olp,Bin). @@ -206,8 +206,9 @@ start(OlpOpts0, #{id := Name, module:=Module, config:=HConfig} = Config0) -> type => worker, modules => [?MODULE]}, case supervisor:start_child(logger_sup, ChildSpec) of - {ok,Pid,{Olp,OlpOpts}} -> + {ok,Pid,Olp} -> ok = logger_handler_watcher:register_handler(Name,Pid), + OlpOpts = logger_olp:get_opts(Olp), {ok,Config0#{config=>(maps:merge(HConfig,OlpOpts))#{olp=>Olp}}}; {error,{Reason,Ch}} when is_tuple(Ch), element(1,Ch)==child -> {error,Reason}; @@ -246,14 +247,14 @@ handle_load(Bin, #{id:=Name, ctrl_sync_count := CtrlSync}=State) -> if CtrlSync==0 -> {_,HS1} = Module:write(Name, sync, Bin, HandlerState), - {ok,State#{handler_state => HS1, - ctrl_sync_count => ?CONTROLLER_SYNC_INTERVAL, - last_op=>write}}; + State#{handler_state => HS1, + ctrl_sync_count => ?CONTROLLER_SYNC_INTERVAL, + last_op=>write}; true -> {_,HS1} = Module:write(Name, async, Bin, HandlerState), - {ok,State#{handler_state => HS1, - ctrl_sync_count => CtrlSync-1, - last_op=>write}} + State#{handler_state => HS1, + ctrl_sync_count => CtrlSync-1, + last_op=>write} end. handle_call(filesync, _From, State = #{id := Name, @@ -295,7 +296,7 @@ handle_info(Info, #{id := Name, module := Module, {noreply,State#{handler_state => Module:handle_info(Name,Info,HandlerState)}}. terminate(overloaded=Reason, #{id:=Name}=State) -> - log_handler_info(Name, "Handler ~p overloaded and stopping", [Name], State), + _ = log_handler_info(Name,"Handler ~p overloaded and stopping",[Name],State), do_terminate(Reason,State), ConfigResult = logger:get_handler_config(Name), case ConfigResult of diff --git a/lib/kernel/src/logger_h_common.hrl b/lib/kernel/src/logger_h_common.hrl index 261b0a6246..f2c2dc2a4e 100644 --- a/lib/kernel/src/logger_h_common.hrl +++ b/lib/kernel/src/logger_h_common.hrl @@ -206,12 +206,16 @@ %%% officially released (as some counters will grow very large %%% over time). -%%-define(SAVE_STATS, true). +%% -define(SAVE_STATS, true). -ifdef(SAVE_STATS). -define(merge_with_stats(STATE), - STATE#{flushes => 0, flushed => 0, drops => 0, - burst_drops => 0, casts => 0, calls => 0, - max_qlen => 0, max_time => 0}). + begin + TIME = ?timestamp(), + STATE#{start => TIME, time => {TIME,0}, + flushes => 0, flushed => 0, drops => 0, + burst_drops => 0, casts => 0, calls => 0, + writes => 0, max_qlen => 0, max_time => 0, + freq => {TIME,0,0}} end). -define(update_max_qlen(QLEN, STATE), begin #{max_qlen := QLEN0} = STATE, @@ -234,13 +238,28 @@ -define(update_other(OTHER, VAR, INCVAL, STATE), begin #{OTHER := VAR} = STATE, STATE#{OTHER => VAR+INCVAL} end). - + + -define(update_freq(TIME,STATE), + begin + case STATE of + #{freq := {START, 49, _}} -> + STATE#{freq => {TIME, 0, trunc(1000000*50/(?diff_time(TIME,START)))}}; + #{freq := {START, N, FREQ}} -> + STATE#{freq => {START, N+1, FREQ}} + end end). + + -define(update_time(TIME,STATE), + begin #{start := START} = STATE, + STATE#{time => {TIME,trunc((?diff_time(TIME,START))/1000000)}} end). + -else. % DEFAULT! -define(merge_with_stats(STATE), STATE). -define(update_max_qlen(_QLEN, STATE), STATE). -define(update_calls_or_casts(_CALL_OR_CAST, _INC, STATE), STATE). -define(update_max_time(_TIME, STATE), STATE). -define(update_other(_OTHER, _VAR, _INCVAL, STATE), STATE). + -define(update_freq(_TIME, STATE), STATE). + -define(update_time(_TIME, STATE), STATE). -endif. %%%----------------------------------------------------------------- diff --git a/lib/kernel/src/logger_olp.erl b/lib/kernel/src/logger_olp.erl index 6b76c78c73..7c6a1a8547 100644 --- a/lib/kernel/src/logger_olp.erl +++ b/lib/kernel/src/logger_olp.erl @@ -25,47 +25,62 @@ %% API -export([start_link/4, load/2, info/1, reset/1, stop/1, restart/1, - set_opts/2, get_opts/1, get_default_opts/0, is_alive/1, - call/2, cast/2]). + set_opts/2, get_opts/1, get_default_opts/0, get_pid/1, + call/2, cast/2, get_ref/0, get_ref/1]). %% gen_server and proc_lib callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). -%%%----------------------------------------------------------------- -%% -define(CONFIG_KEYS,[sync_mode_qlen, -%% drop_mode_qlen, -%% flush_qlen, -%% burst_limit_enable, -%% burst_limit_max_count, -%% burst_limit_window_time, -%% overload_kill_enable, -%% overload_kill_qlen, -%% overload_kill_mem_size, -%% overload_kill_restart_after]). +-define(OPT_KEYS,[sync_mode_qlen, + drop_mode_qlen, + flush_qlen, + burst_limit_enable, + burst_limit_max_count, + burst_limit_window_time, + overload_kill_enable, + overload_kill_qlen, + overload_kill_mem_size, + overload_kill_restart_after]). + +-export_type([olp_ref/0, options/0]). + +-opaque olp_ref() :: {atom(),pid(),ets:tid()}. + +-type options() :: #{sync_mode_qlen => integer(), + drop_mode_qlen => integer(), + flush_qlen => integer(), + burst_limit_enable => boolean(), + burst_limit_max_count => integer(), + burst_limit_window_time => integer(), + overload_kill_enable => boolean(), + overload_kill_qlen => integer(), + overload_kill_mem_size => integer(), + overload_kill_restart_after => integer()}. %%%----------------------------------------------------------------- %%% API -%-spec start_link(Name,Module,Args,Options) -> {ok,Pid,Olp} | {error,Reason}. +-spec start_link(Name,Module,Args,Options) -> {ok,Pid,Olp} | {error,Reason} when + Name :: atom(), + Module :: module(), + Args :: term(), + Options :: options(), + Pid :: pid(), + Olp :: olp_ref(), + Reason :: term(). start_link(Name,Module,Args,Options0) when is_map(Options0) -> Options = maps:merge(get_default_opts(),Options0), case check_opts(Options) of ok -> - case proc_lib:start_link(?MODULE,init, - [[Name,Module,Args,Options]]) of - {ok,Pid,Olp} -> - {ok,Pid,{Olp,Options}}; - Error -> - Error - end; + proc_lib:start_link(?MODULE,init,[[Name,Module,Args,Options]]); Error -> Error end. -is_alive({_Name,Pid,_ModeRef}) -> - is_process_alive(Pid). - +-spec load(Olp, Msg) -> ok when + Olp :: olp_ref(), + Msg :: term(). load({_Name,Pid,ModeRef},Msg) -> %% If the process is getting overloaded, the message will be %% synchronous instead of asynchronous (slows down the tempo of a @@ -92,22 +107,36 @@ load({_Name,Pid,ModeRef},Msg) -> end, ok. +-spec info(Olp) -> map() | {error, busy} when + Olp :: atom() | pid() | olp_ref(). info(Olp) -> call(Olp, info). +-spec reset(Olp) -> ok | {error, busy} when + Olp :: atom() | pid() | olp_ref(). reset(Olp) -> call(Olp, reset). +-spec stop(Olp) -> ok when + Olp :: atom() | pid() | olp_ref(). stop({_Name,Pid,_ModRef}) -> + stop(Pid); +stop(Pid) -> _ = gen_server:call(Pid, stop), ok. -set_opts({_Name,Pid,_ModRef}, Opts) -> - gen_server:call(Pid, {set_opts,Opts}). +-spec set_opts(Olp, Opts) -> ok | {error,term()} | {error, busy} when + Olp :: atom() | pid() | olp_ref(), + Opts :: options(). +set_opts(Olp, Opts) -> + call(Olp, {set_opts,Opts}). -get_opts({_Name,Pid,_ModRef}) -> - gen_server:call(Pid, get_opts). +-spec get_opts(Olp) -> options() | {error, busy} when + Olp :: atom() | pid() | olp_ref(). +get_opts(Olp) -> + call(Olp, get_opts). +-spec get_default_opts() -> options(). get_default_opts() -> #{sync_mode_qlen => ?SYNC_MODE_QLEN, drop_mode_qlen => ?DROP_MODE_QLEN, @@ -120,11 +149,30 @@ get_default_opts() -> overload_kill_mem_size => ?OVERLOAD_KILL_MEM_SIZE, overload_kill_restart_after => ?OVERLOAD_KILL_RESTART_AFTER}. +-spec restart(fun(() -> any())) -> ok. restart(Fun) -> - erlang:display(restarting), - erlang:display(_ = Fun()), + Result = + try Fun() + catch C:R:S -> + {error,{restart_failed,Fun,C,R,S}} + end, + ?LOG_INTERNAL(debug,[{logger_olp,restart}, + {result,Result}]), ok. +-spec get_ref() -> olp_ref(). +get_ref() -> + get(olp_ref). + +-spec get_ref(PidOrName) -> olp_ref() | {error, busy} when + PidOrName :: pid() | atom(). +get_ref(PidOrName) -> + call(PidOrName,get_ref). + +-spec get_pid(olp_ref()) -> pid(). +get_pid({_Name,Pid,_ModeRef}) -> + Pid. + %%%=================================================================== %%% gen_server callbacks %%%=================================================================== @@ -136,13 +184,15 @@ init([Name,Module,Args,Options]) -> ?init_test_hooks(), ?start_observation(Name), - try Module:init(Args) of - {ok,CBState} -> - try ets:new(Name, [public]) of - ModeRef -> + try ets:new(Name, [public]) of + ModeRef -> + OlpRef = {Name,self(),ModeRef}, + put(olp_ref,OlpRef), + try Module:init(Args) of + {ok,CBState} -> ?set_mode(ModeRef, async), T0 = ?timestamp(), - proc_lib:init_ack({ok,self(),{Name,self(),ModeRef}}), + proc_lib:init_ack({ok,self(),OlpRef}), %% Storing options in state to avoid copying %% (sending) the option data with each message State0 = ?merge_with_stats( @@ -156,15 +206,17 @@ init([Name,Module,Args,Options]) -> burst_msg_count => 0, cb_state => CBState}), State = reset_restart_flag(State0), - gen_server:enter_loop(?MODULE, [], State) + gen_server:enter_loop(?MODULE, [], State); + Error -> + _ = ets:delete(ModeRef), + unregister(Name), + proc_lib:init_ack(Error) catch _:Error -> + _ = ets:delete(ModeRef), unregister(Name), proc_lib:init_ack(Error) - end; - Error -> - unregister(Name), - proc_lib:init_ack(Error) + end catch _:Error -> unregister(Name), @@ -177,8 +229,11 @@ handle_call({'$olp_load', Msg}, _From, State) -> %% Result == ok | dropped {reply,Result, State1}; +handle_call(get_ref,_From,#{id:=Name,mode_ref:=ModeRef}=State) -> + {reply,{Name,self(),ModeRef},State}; + handle_call({set_opts,Opts0},_From,State) -> - Opts = maps:merge(get_default_opts(),Opts0), + Opts = maps:merge(maps:with(?OPT_KEYS,State),Opts0), case check_opts(Opts) of ok -> {reply, ok, maps:merge(State,Opts)}; @@ -186,6 +241,9 @@ handle_call({set_opts,Opts0},_From,State) -> {reply, Error, State} end; +handle_call(get_opts,_From,State) -> + {reply, maps:with(?OPT_KEYS,State), State}; + handle_call(info, _From, State) -> {reply, State, State}; @@ -214,7 +272,6 @@ handle_call(Msg, From, #{module:=Module,cb_state:=CBState}=State) -> %% This is the asynchronous load event. handle_cast({'$olp_load', Msg}, State) -> {_Result,State1} = do_load(Msg, cast, State), - %% Result == ok | dropped {noreply,State1}; handle_cast(Msg, #{module:=Module, cb_state:=CBState} = State) -> @@ -230,7 +287,10 @@ handle_info(Msg, #{module := Module, cb_state := CBState} = State) -> {noreply,CBState1} -> {noreply,State#{cb_state=>CBState1}}; {noreply,CBState1,Timeout} -> - {noreply,State#{cb_state=>CBState1},Timeout} + {noreply,State#{cb_state=>CBState1},Timeout}; + {load,CBState1} -> + {_,State1} = do_load(Msg, cast, State#{cb_state=>CBState1}), + {noreply,State1} end. terminate({shutdown,{overloaded,_QLen,_Mem}}, @@ -242,12 +302,11 @@ terminate({shutdown,{overloaded,_QLen,_Mem}}, case try_callback_call(Module,terminate,[overloaded,CBState],ok) of {ok,Fun} when is_function(Fun,0), is_integer(RestartAfter) -> set_restart_flag(State), - timer:apply_after(RestartAfter,?MODULE,restart,[Fun]), + _ = timer:apply_after(RestartAfter,?MODULE,restart,[Fun]), ok; _ -> ok - end, - ok; + end; terminate(Reason, #{id:=Name, module:=Module, cb_state:=CBState}) -> _ = try_callback_call(Module,terminate,[Reason,CBState],ok), unregister(Name), @@ -259,6 +318,8 @@ code_change(_OldVsn, State, _Extra) -> %%%----------------------------------------------------------------- %%% Internal functions +-spec call(Olp, term()) -> term() | {error,busy} when + Olp :: atom() | pid() | olp_ref(). call({_Name, Pid, _ModeRef},Msg) -> call(Pid, Msg); call(Server, Msg) -> @@ -268,6 +329,7 @@ call(Server, Msg) -> _:{timeout,_} -> {error,busy} end. +-spec cast(olp_ref(),term()) -> ok. cast({_Name, Pid, _ModeRef},Msg) -> gen_server:cast(Pid, Msg). @@ -276,26 +338,27 @@ cast({_Name, Pid, _ModeRef},Msg) -> %% before LogWindowSize events have been handled do_load(Msg, CallOrCast, State) -> T1 = ?timestamp(), + State1 = ?update_time(T1,State), %% check if the process is getting overloaded, or if it's %% recovering from overload (the check must be done for each %% event to react quickly to large bursts of events and %% to ensure that the handler can never end up in drop mode %% with an empty mailbox, which would stop operation) - {Mode1,QLen,Mem,State1} = check_load(State), + {Mode1,QLen,Mem,State2} = check_load(State1), %% kill the handler if it can't keep up with the load - kill_if_choked(QLen, Mem, State1), + kill_if_choked(QLen, Mem, State2), if Mode1 == flush -> - flush(T1, State1); + flush(T1, State2); true -> - handle_load(Mode1, T1, Msg, CallOrCast, State1) + handle_load(Mode1, T1, Msg, CallOrCast, State2) end. %% this function is called by do_load/3 after an overload check %% has been performed, where QLen > FlushQLen -flush(T1, State=#{id := _Name, last_load_ts := T0, mode_ref := ModeRef}) -> +flush(T1, State=#{id := _Name, mode := Mode, last_load_ts := T0, mode_ref := ModeRef}) -> %% flush load messages in the mailbox (a limited number in order %% to not cause long delays) NewFlushed = flush_load(?FLUSH_MAX_N), @@ -306,17 +369,18 @@ flush(T1, State=#{id := _Name, last_load_ts := T0, mode_ref := ModeRef}) -> %% because of the receive loop when flushing messages, the %% handler will be scheduled out often and the mailbox could %% grow very large, so we'd better check the queue again here - {_,_QLen1} = process_info(self(), message_queue_len), - ?observe(_Name,{max_qlen,_QLen1}), + {_,QLen1} = process_info(self(), message_queue_len), + ?observe(_Name,{max_qlen,QLen1}), %% Add 1 for the current log event ?observe(_Name,{flushed,NewFlushed+1}), State2 = ?update_max_time(?diff_time(T1,T0),State1), - State3 = ?update_max_qlen(_QLen1,State2), + State3 = ?update_max_qlen(QLen1,State2), + State4 = maybe_notify_mode_change(async,QLen1,State3), {dropped,?update_other(flushed,FLUSHED,NewFlushed, - State3#{mode => ?set_mode(ModeRef,async), - last_qlen => 0, + State4#{mode => ?change_mode(ModeRef,Mode,async), + last_qlen => QLen1, last_load_ts => T1})}. %% this function is called to actually handle the message @@ -334,7 +398,7 @@ handle_load(Mode, T1, Msg, _CallOrCast, {Result,LastQLen1,CBState1} = if DoWrite -> ?observe(_Name,{_CallOrCast,1}), - {ok,CBS} = try_callback_call(Module,handle_load,[Msg,CBState]), + CBS = try_callback_call(Module,handle_load,[Msg,CBState]), {ok,element(2, process_info(self(), message_queue_len)),CBS}; true -> ?observe(_Name,{flushed,1}), @@ -353,9 +417,10 @@ handle_load(Mode, T1, Msg, _CallOrCast, State3 = if (LastQLen1 < ?FILESYNC_OK_QLEN) andalso (Time > ?IDLE_DETECT_TIME_USEC) -> - S = notify(idle,State2), - S#{mode => ?change_mode(ModeRef, Mode, async), - burst_msg_count => 0}; + S1 = notify(idle,State2), + S2 = maybe_notify_mode_change(async,LastQLen1,S1), + S2#{mode => ?change_mode(ModeRef, Mode, async), + burst_msg_count => 0}; true -> State2#{mode => Mode} end, @@ -365,7 +430,14 @@ handle_load(Mode, T1, Msg, _CallOrCast, ?update_max_time(Time, State5#{last_qlen := LastQLen1, last_load_ts => T1}), - {Result,State6}. + State7 = case Result of + ok -> + S = ?update_freq(T1,State6), + ?update_other(writes,WRITES,1,S); + _ -> + State6 + end, + {Result,State7}. %%%----------------------------------------------------------------- @@ -452,7 +524,7 @@ check_load(State = #{id:=_Name, mode_ref := ModeRef, mode := Mode, QLen >= DropModeQLen -> %% Note that drop mode will force load messages to %% be dropped on the client side (never sent to - %% the handler). + %% the olp process). IncDrops = if Mode == drop -> 0; true -> 1 end, {?change_mode(ModeRef, Mode, drop), IncDrops,0}; QLen >= SyncModeQLen -> @@ -461,10 +533,11 @@ check_load(State = #{id:=_Name, mode_ref := ModeRef, mode := Mode, {?change_mode(ModeRef, Mode, async), 0,0} end, State1 = ?update_other(drops,DROPS,_NewDrops,State), - State2 = maybe_notify_mode_change(Mode1,State1), + State2 = ?update_max_qlen(QLen,State1), + State3 = maybe_notify_mode_change(Mode1,QLen,State2), {Mode1, QLen, Mem, ?update_other(flushes,FLUSHES,_NewFlushes, - State2#{last_qlen => QLen})}. + State3#{last_qlen => QLen})}. limit_burst(#{burst_limit_enable := false}=State) -> {true,State}; @@ -517,7 +590,11 @@ flush_load(N, Limit) -> flush_load(N+1, Limit); {'$gen_call',{Pid,MRef},{'$olp_load',_}} -> Pid ! {MRef, dropped}, - flush_load(N+1, Limit) + flush_load(N+1, Limit); + {log,_,_,_,_} -> + flush_load(N+1, Limit); + {log,_,_,_} -> + flush_load(N+1, Limit) after 0 -> N end. @@ -528,13 +605,13 @@ overload_levels_ok(Options) -> FQL = maps:get(flush_qlen, Options, ?FLUSH_QLEN), (DMQL > 1) andalso (SMQL =< DMQL) andalso (DMQL =< FQL). -maybe_notify_mode_change(drop,#{mode:=Mode0}=State) +maybe_notify_mode_change(drop,_QLen,#{mode:=Mode0}=State) when Mode0=/=drop -> notify({mode_change,Mode0,drop},State); -maybe_notify_mode_change(Mode1,#{mode:=drop}=State) +maybe_notify_mode_change(Mode1,_QLen,#{mode:=drop}=State) when Mode1==async; Mode1==sync -> notify({mode_change,drop,Mode1},State); -maybe_notify_mode_change(_,State) -> +maybe_notify_mode_change(_,_,State) -> State. notify(Note,#{module:=Module,cb_state:=CBState}=State) -> diff --git a/lib/kernel/src/logger_proxy.erl b/lib/kernel/src/logger_proxy.erl new file mode 100644 index 0000000000..f89891bff0 --- /dev/null +++ b/lib/kernel/src/logger_proxy.erl @@ -0,0 +1,152 @@ +%% +%% %CopyrightBegin% +%% +%% Copyright Ericsson AB 2017-2018. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%% +%% %CopyrightEnd% +%% +-module(logger_proxy). + +%% API +-export([start_link/0, restart/0, log/2, child_spec/0]). + +%% logger_olp callbacks +-export([init/1, handle_load/2, handle_info/2, terminate/2, + notify/2]). + +-include("logger_internal.hrl"). + +-define(SERVER,?MODULE). + +%%%----------------------------------------------------------------- +%%% API +-spec log(Olp, RemoteLog) -> ok when + Olp :: logger_olp:olp_ref(), + RemoteLog :: {remote,node(),LogEvent}, + LogEvent :: {log,Level,Format,Args,Meta} | + {log,Level,StringOrReport,Meta}, + Level :: logger:level(), + Format :: io:format(), + Args :: list(term()), + StringOrReport :: unicode:chardata() | logger:report(), + Meta :: logger:metadata(). +log(Olp, RemoteLog) -> + case logger_olp:get_pid(Olp) =:= self() of + true -> + %% This happens when the log event comes from the + %% emulator, and the group leader is on a remote node. + _ = handle_load(RemoteLog, no_state), + ok; + false -> + logger_olp:load(Olp, RemoteLog) + end. + +%% Called by supervisor +-spec start_link() -> {ok,pid(),logger_olp:olp_ref()} | {error,term()}. +start_link() -> + %% Notice that sync_mode is only used when logging to remote node, + %% i.e. when the log/2 API function is called. + %% + %% When receiving log events from the emulator or from a remote + %% node, the log event is sent as a message to this process, and + %% thus received directly in handle_info/2. This means that the + %% mode (async/sync/drop) is not read before the message is + %% sent. Thus sync mode is never entered, and drop mode is + %% implemented by setting the system_logger flag to undefined (see + %% notify/2) + %% + %% Burst limit is disabled, since this is only a proxy and we + %% don't want to limit bursts twice (here and in the handler). + Opts = #{sync_mode_qlen=>500, + drop_mode_qlen=>1000, + flush_qlen=>5000, + burst_limit_enable=>false}, + logger_olp:start_link(?SERVER,?MODULE,[],Opts). + +%% Fun used for restarting this process after it has been killed due +%% to overload (must set overload_kill_enable=>true in opts) +restart() -> + case supervisor:start_child(logger_sup, child_spec()) of + {ok,_Pid,Olp} -> + {ok,Olp}; + {error,{Reason,Ch}} when is_tuple(Ch), element(1,Ch)==child -> + {error,Reason}; + Error -> + Error + end. + +%% Called internally and by logger_sup +child_spec() -> + Name = ?SERVER, + #{id => Name, + start => {?MODULE, start_link, []}, + restart => temporary, + shutdown => 2000, + type => worker, + modules => [?MODULE]}. + +%%%=================================================================== +%%% gen_server callbacks +%%%=================================================================== + +init([]) -> + process_flag(trap_exit, true), + _ = erlang:system_flag(system_logger,self()), + logger_server:set_proxy_ref(logger_olp:get_ref()), + {ok,no_state}. + +%% Log event to send to the node where the group leader of it's client resides +handle_load({remote,Node,Log},State) -> + %% If the connection is overloaded (send_nosuspend returns false), + %% we drop the message. + _ = erlang:send_nosuspend({?SERVER,Node},Log), + State; +%% Log event to log on this node +handle_load({log,Level,Format,Args,Meta},State) -> + try_log([Level,Format,Args,Meta]), + State; +handle_load({log,Level,Report,Meta},State) -> + try_log([Level,Report,Meta]), + State. + +%% Log event sent to this process e.g. from the emulator - it is really load +handle_info(Log,State) when is_tuple(Log), element(1,Log)==log -> + {load,State}. + +terminate(overloaded, _State) -> + _ = erlang:system_flag(system_logger,undefined), + {ok,fun ?MODULE:restart/0}; +terminate(_Reason, _State) -> + _ = erlang:system_flag(system_logger,whereis(logger)), + ok. + +notify({mode_change,_Mode0,drop},State) -> + _ = erlang:system_flag(system_logger,undefined), + State; +notify({mode_change,drop,_Mode1},State) -> + _ = erlang:system_flag(system_logger,self()), + State; +notify(_Note,State) -> + State. + +%%%----------------------------------------------------------------- +%%% Internal functions +try_log(Args) -> + try apply(logger,log,Args) + catch C:R:S -> + ?LOG_INTERNAL(debug,[{?MODULE,log_failed}, + {log,Args}, + {reason,{C,R,S}}]) + end. diff --git a/lib/kernel/src/logger_server.erl b/lib/kernel/src/logger_server.erl index b7735dbcf7..c58edf51f8 100644 --- a/lib/kernel/src/logger_server.erl +++ b/lib/kernel/src/logger_server.erl @@ -22,7 +22,7 @@ -behaviour(gen_server). %% API --export([start_link/0, +-export([start_link/0, set_proxy_ref/1, get_proxy_ref/1, add_handler/3, remove_handler/1, add_filter/2, remove_filter/2, set_module_level/2, unset_module_level/0, @@ -43,7 +43,7 @@ -define(SERVER, logger). -define(LOGGER_SERVER_TAG, '$logger_cb_process'). --record(state, {tid, async_req, async_req_queue}). +-record(state, {tid, async_req, async_req_queue, remote_logger}). %%%=================================================================== %%% API @@ -52,6 +52,14 @@ start_link() -> gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). +-spec set_proxy_ref(logger_olp:olp_ref()) -> ok. +set_proxy_ref(ProxyRef) -> + call({set_proxy_ref,ProxyRef}). + +-spec get_proxy_ref(ets:tid()) -> logger_olp:olp_ref(). +get_proxy_ref(Tid) -> + ets:lookup_element(Tid,proxy_ref,2). + add_handler(Id,Module,Config0) -> try {check_id(Id),check_mod(Module)} of {ok,ok} -> @@ -311,7 +319,10 @@ handle_call({set_module_level,Modules,Level}, _From, #state{tid=Tid}=State) -> {reply,Reply,State}; handle_call({unset_module_level,Modules}, _From, #state{tid=Tid}=State) -> Reply = logger_config:unset_module_level(Tid,Modules), - {reply,Reply,State}. + {reply,Reply,State}; +handle_call({set_proxy_ref,ProxyRef},_From,#state{tid=Tid}=State) -> + true = ets:insert(Tid,{proxy_ref,ProxyRef}), + {reply,ok,State}. handle_cast({async_req_reply,_Ref,_Reply} = Reply,State) -> call_h_reply(Reply,State); @@ -357,7 +368,7 @@ terminate(_Reason, _State) -> %%%=================================================================== %%% Internal functions %%%=================================================================== -call(Request) -> +call(Request) when is_tuple(Request) -> Action = element(1,Request), case get(?LOGGER_SERVER_TAG) of true when @@ -369,6 +380,7 @@ call(Request) -> gen_server:call(?SERVER,Request,?DEFAULT_LOGGER_CALL_TIMEOUT) end. + do_add_filter(Tid,Id,{FId,_} = Filter) -> case logger_config:get(Tid,Id) of {ok,Config} -> diff --git a/lib/kernel/src/logger_sup.erl b/lib/kernel/src/logger_sup.erl index 3d6f482e20..9ea8558a16 100644 --- a/lib/kernel/src/logger_sup.erl +++ b/lib/kernel/src/logger_sup.erl @@ -50,7 +50,9 @@ init([]) -> start => {logger_handler_watcher, start_link, []}, shutdown => brutal_kill}, - {ok, {SupFlags, [Watcher]}}. + Proxy = logger_proxy:child_spec(), + + {ok, {SupFlags, [Watcher,Proxy]}}. %%%=================================================================== %%% Internal functions -- cgit v1.2.3