diff options
author | Erlang/OTP <[email protected]> | 2009-11-20 14:54:40 +0000 |
---|---|---|
committer | Erlang/OTP <[email protected]> | 2009-11-20 14:54:40 +0000 |
commit | 84adefa331c4159d432d22840663c38f155cd4c1 (patch) | |
tree | bff9a9c66adda4df2106dfd0e5c053ab182a12bd /lib/kernel/src/disk_log_server.erl | |
download | otp-84adefa331c4159d432d22840663c38f155cd4c1.tar.gz otp-84adefa331c4159d432d22840663c38f155cd4c1.tar.bz2 otp-84adefa331c4159d432d22840663c38f155cd4c1.zip |
The R13B03 release.OTP_R13B03
Diffstat (limited to 'lib/kernel/src/disk_log_server.erl')
-rw-r--r-- | lib/kernel/src/disk_log_server.erl | 368 |
1 files changed, 368 insertions, 0 deletions
diff --git a/lib/kernel/src/disk_log_server.erl b/lib/kernel/src/disk_log_server.erl new file mode 100644 index 0000000000..8894ed87e8 --- /dev/null +++ b/lib/kernel/src/disk_log_server.erl @@ -0,0 +1,368 @@ +%% +%% %CopyrightBegin% +%% +%% Copyright Ericsson AB 1997-2009. All Rights Reserved. +%% +%% The contents of this file are subject to the Erlang Public License, +%% Version 1.1, (the "License"); you may not use this file except in +%% compliance with the License. You should have received a copy of the +%% Erlang Public License along with this software. If not, it can be +%% retrieved online at http://www.erlang.org/. +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and limitations +%% under the License. +%% +%% %CopyrightEnd% +%% +-module(disk_log_server). +-behaviour(gen_server). + +-export([start_link/0, start/0, open/1, close/1, + get_log_pids/1, accessible_logs/0]). + +%% Local export. +-export([dist_open/1, get_local_pid/1]). + +%% gen_server callbacks +-export([init/1, handle_call/3, handle_info/2, terminate/2]). +-export([handle_cast/2, code_change/3]). % just to avoid compiler warning + +-include("disk_log.hrl"). + +-compile({inline,[{do_get_log_pids,1}]}). + +-record(pending, {log, pid, req, from, attach, clients}). % [{Request,From}] + +-record(state, {pending = [] :: [#pending{}]}). + +%%%----------------------------------------------------------------- +%%% This module implements the disk_log server. Its primary purpose +%%% is to keep the ets table 'disk_log_names' updated and to handle +%%% distribution data (pids) using the module pg2. +%%%----------------------------------------------------------------- +%%%---------------------------------------------------------------------- +%%% API +%%%---------------------------------------------------------------------- +start_link() -> + gen_server:start_link({local, disk_log_server}, disk_log_server, [], []). + +start() -> + ensure_started(). + +open({ok, A}) -> + ensure_started(), + gen_server:call(disk_log_server, {open, local, A}, infinity); +open(Other) -> + Other. + +%% To be used from this module only. +dist_open(A) -> + ensure_started(), + gen_server:call(disk_log_server, {open, distr, A}, infinity). + +close(Pid) -> + gen_server:call(disk_log_server, {close, Pid}, infinity). + +get_log_pids(LogName) -> + do_get_log_pids(LogName). + +accessible_logs() -> + ensure_started(), + do_accessible_logs(). + +%%%---------------------------------------------------------------------- +%%% Callback functions from gen_server +%%%---------------------------------------------------------------------- + +%% It would have been really nice to have a tag for disk log groups, +%% like {distributed_disk_log, Log}, but backward compatibility makes +%% it hard to introduce. +-define(group(Log), Log). + +init([]) -> + process_flag(trap_exit, true), + ets:new(?DISK_LOG_NAME_TABLE, [named_table, set]), + ets:new(?DISK_LOG_PID_TABLE, [named_table, set]), + {ok, #state{}}. + +handle_call({open, W, A}, From, State) -> + open([{{open, W, A}, From}], State); +handle_call({close, Pid}, _From, State) -> + Reply = do_close(Pid), + {reply, Reply, State}. + +handle_info({pending_reply, Pid, Result0}, State) -> + {value, #pending{log = Name, pid = Pid, from = From, + req = Request, attach = Attach, + clients = Clients}} = + lists:keysearch(Pid, #pending.pid, State#state.pending), + NP = lists:keydelete(Pid, #pending.pid, State#state.pending), + State1 = State#state{pending = NP}, + if + Attach and (Result0 =:= {error, no_such_log}) -> + %% The disk_log process has terminated. Try again. + open([{Request,From} | Clients], State1); + true -> + case Result0 of + _ when Attach -> + ok; + {error, _} -> + ok; + _ -> + put(Pid, Name), + link(Pid), + {_, Locality, _} = Request, + ets:insert(?DISK_LOG_PID_TABLE, {Pid, Name}), + ets:insert(?DISK_LOG_NAME_TABLE, {Name, Pid, Locality}), + if + Locality =:= distr -> + ok = pg2:join(?group(Name), Pid); + true -> + ok + end + end, + gen_server:reply(From, result(Request, Result0)), + open(Clients, State1) + end; +handle_info({'EXIT', Pid, _Reason}, State) -> + %% If there are clients waiting to be attached to this log, info + %% {pending_reply,Pid,{error,no_such_log}} will soon arrive. + case get(Pid) of + undefined -> + ok; + Name -> + erase_log(Name, Pid) + end, + {noreply, State}; +handle_info(_, State) -> + {noreply, State}. + +%% Just to avoid compiler warning. +handle_cast(_, State) -> + {noreply, State}. + +%% Just to avoid compiler warning. +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +terminate(_Reason, _) -> + ok. + +%%%----------------------------------------------------------------- +%%% Internal functions +%%%----------------------------------------------------------------- + +ensure_started() -> + case whereis(disk_log_server) of + undefined -> + LogSup = {disk_log_sup, {disk_log_sup, start_link, []}, permanent, + 1000, supervisor, [disk_log_sup]}, + supervisor:start_child(kernel_safe_sup, LogSup), + LogServer = {disk_log_server, + {disk_log_server, start_link, []}, + permanent, 2000, worker, [disk_log_server]}, + supervisor:start_child(kernel_safe_sup, LogServer), + ok; + _ -> ok + end. + +open([{Req, From} | L], State) -> + State2 = case do_open(Req, From, State) of + {pending, State1} -> + State1; + {Reply, State1} -> + gen_server:reply(From, Reply), + State1 + end, + open(L, State2); +open([], State) -> + {noreply, State}. + +%% -> {OpenRet, NewState} | {{node(),OpenRet}, NewState} | +%% {pending, NewState} +do_open({open, W, #arg{name = Name}=A}=Req, From, State) -> + case check_pending(Name, From, State, Req) of + {pending, NewState} -> + {pending, NewState}; + false when W =:= local -> + case A#arg.distributed of + {true, Nodes} -> + Fun = fun() -> open_distr_rpc(Nodes, A, From) end, + _Pid = spawn(Fun), + %% No pending reply is expected, but don't reply yet. + {pending, State}; + false -> + case get_local_pid(Name) of + {local, Pid} -> + do_internal_open(Name, Pid, From, Req, true,State); + {distributed, _Pid} -> + {{error, {node_already_open, Name}}, State}; + undefined -> + start_log(Name, Req, From, State) + end + end; + false when W =:= distr -> + ok = pg2:create(?group(Name)), + case get_local_pid(Name) of + undefined -> + start_log(Name, Req, From, State); + {local, _Pid} -> + {{node(),{error, {node_already_open, Name}}}, State}; + {distributed, Pid} -> + do_internal_open(Name, Pid, From, Req, true, State) + end + end. + +%% Spawning a process is a means to avoid deadlock when +%% disk_log_servers mutually open disk_logs. + +-spec open_distr_rpc([node()], _, _) -> no_return(). % XXX: underspecified + +open_distr_rpc(Nodes, A, From) -> + {AllReplies, BadNodes} = rpc:multicall(Nodes, ?MODULE, dist_open, [A]), + {Ok, Bad} = cr(AllReplies, [], []), + Old = find_old_nodes(Nodes, AllReplies, BadNodes), + NotOk = [{BadNode, {error, nodedown}} || BadNode <- BadNodes ++ Old], + Reply = {Ok, Bad ++ NotOk}, + %% Send the reply to the waiting client: + gen_server:reply(From, Reply), + exit(normal). + +cr([{badrpc, {'EXIT', _}} | T], Nodes, Bad) -> + %% This clause can be removed in next release. + cr(T, Nodes, Bad); +cr([R={_Node, {error, _}} | T], Nodes, Bad) -> + cr(T, Nodes, [R | Bad]); +cr([Reply | T], Nodes, Bad) -> + cr(T, [Reply | Nodes], Bad); +cr([], Nodes, Bad) -> + {Nodes, Bad}. + +%% If a "new" node (one that calls dist_open/1) tries to open a log +%% on an old node (one that does not have dist_open/1), then the old +%% node is considered 'down'. In next release, this test will not be +%% needed since all nodes can be assumed to be "new" by then. +%% One more thing: if an old node tries to open a log on a new node, +%% the new node is also considered 'down'. +find_old_nodes(Nodes, Replies, BadNodes) -> + R = [X || {X, _} <- Replies], + ordsets:to_list(ordsets:subtract(ordsets:from_list(Nodes), + ordsets:from_list(R ++ BadNodes))). + +start_log(Name, Req, From, State) -> + Server = self(), + case supervisor:start_child(disk_log_sup, [Server]) of + {ok, Pid} -> + do_internal_open(Name, Pid, From, Req, false, State); + Error -> + {result(Req, Error), State} + end. + +do_internal_open(Name, Pid, From, {open, _W, A}=Req, Attach, State) -> + Server = self(), + F = fun() -> + Res = disk_log:internal_open(Pid, A), + Server ! {pending_reply, Pid, Res} + end, + _ = spawn(F), + PD = #pending{log = Name, pid = Pid, req = Req, + from = From, attach = Attach, clients = []}, + P = [PD | State#state.pending], + {pending, State#state{pending = P}}. + +check_pending(Name, From, State, Req) -> + case lists:keysearch(Name, #pending.log, State#state.pending) of + {value, #pending{log = Name, clients = Clients}=P} -> + NP = lists:keyreplace(Name, #pending.log, State#state.pending, + P#pending{clients = Clients++[{Req,From}]}), + {pending, State#state{pending = NP}}; + false -> + false + end. + +result({_, distr, _}, R) -> + {node(), R}; +result({_, local, _}, R) -> + R. + +do_close(Pid) -> + case get(Pid) of + undefined -> + ok; + Name -> + erase_log(Name, Pid), + unlink(Pid), + ok + end. + +erase_log(Name, Pid) -> + case get_local_pid(Name) of + undefined -> + ok; + {local, Pid} -> + true = ets:delete(?DISK_LOG_NAME_TABLE, Name), + true = ets:delete(?DISK_LOG_PID_TABLE, Pid); + {distributed, Pid} -> + true = ets:delete(?DISK_LOG_NAME_TABLE, Name), + true = ets:delete(?DISK_LOG_PID_TABLE, Pid), + ok = pg2:leave(?group(Name), Pid) + end, + erase(Pid). + +do_accessible_logs() -> + LocalSpec = {'$1','_',local}, + Local0 = [hd(L) || L <- ets:match(?DISK_LOG_NAME_TABLE, LocalSpec)], + Local = lists:sort(Local0), + Groups0 = ordsets:from_list(pg2:which_groups()), + Groups = ordsets:to_list(ordsets:subtract(Groups0, Local)), + Dist = [L || L <- Groups, dist_pids(L) =/= []], + {Local, Dist}. + +get_local_pid(LogName) -> + case ets:lookup(?DISK_LOG_NAME_TABLE, LogName) of + [{LogName, Pid, local}] -> + {local, Pid}; + [{LogName, Pid, distr}] -> + {distributed, Pid}; + [] -> + undefined + end. + +%% Inlined. +do_get_log_pids(LogName) -> + case catch ets:lookup(?DISK_LOG_NAME_TABLE, LogName) of + [{LogName, Pid, local}] -> + {local, Pid}; + [{LogName, _Pid, distr}] -> + case pg2:get_members(?group(LogName)) of + [] -> % The disk_log process has died recently + undefined; + Members -> + {distributed, Members} + end; + _EmptyOrError -> + case dist_pids(LogName) of + [] -> undefined; + Pids -> {distributed, Pids} + end + end. + +dist_pids(LogName) -> + %% Would be much simpler if disk log group names were tagged. + GroupName = ?group(LogName), + case catch pg2:get_members(GroupName) of + [Pid | _] = Pids -> + case rpc:call(node(Pid), ?MODULE, get_local_pid, [LogName]) of + undefined -> % does not seem to be a disk_log group + case catch lists:member(Pid,pg2:get_members(GroupName)) of + true -> []; + _ -> dist_pids(LogName) + end; + _ -> % badrpc if get_local_pid is not exported + Pids + end; + _ -> + [] + end. |