aboutsummaryrefslogtreecommitdiffstats
path: root/lib/kernel/src/disk_log_server.erl
diff options
context:
space:
mode:
authorErlang/OTP <[email protected]>2009-11-20 14:54:40 +0000
committerErlang/OTP <[email protected]>2009-11-20 14:54:40 +0000
commit84adefa331c4159d432d22840663c38f155cd4c1 (patch)
treebff9a9c66adda4df2106dfd0e5c053ab182a12bd /lib/kernel/src/disk_log_server.erl
downloadotp-84adefa331c4159d432d22840663c38f155cd4c1.tar.gz
otp-84adefa331c4159d432d22840663c38f155cd4c1.tar.bz2
otp-84adefa331c4159d432d22840663c38f155cd4c1.zip
The R13B03 release.OTP_R13B03
Diffstat (limited to 'lib/kernel/src/disk_log_server.erl')
-rw-r--r--lib/kernel/src/disk_log_server.erl368
1 files changed, 368 insertions, 0 deletions
diff --git a/lib/kernel/src/disk_log_server.erl b/lib/kernel/src/disk_log_server.erl
new file mode 100644
index 0000000000..8894ed87e8
--- /dev/null
+++ b/lib/kernel/src/disk_log_server.erl
@@ -0,0 +1,368 @@
+%%
+%% %CopyrightBegin%
+%%
+%% Copyright Ericsson AB 1997-2009. All Rights Reserved.
+%%
+%% The contents of this file are subject to the Erlang Public License,
+%% Version 1.1, (the "License"); you may not use this file except in
+%% compliance with the License. You should have received a copy of the
+%% Erlang Public License along with this software. If not, it can be
+%% retrieved online at http://www.erlang.org/.
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% %CopyrightEnd%
+%%
+-module(disk_log_server).
+-behaviour(gen_server).
+
+-export([start_link/0, start/0, open/1, close/1,
+ get_log_pids/1, accessible_logs/0]).
+
+%% Local export.
+-export([dist_open/1, get_local_pid/1]).
+
+%% gen_server callbacks
+-export([init/1, handle_call/3, handle_info/2, terminate/2]).
+-export([handle_cast/2, code_change/3]). % just to avoid compiler warning
+
+-include("disk_log.hrl").
+
+-compile({inline,[{do_get_log_pids,1}]}).
+
+-record(pending, {log, pid, req, from, attach, clients}). % [{Request,From}]
+
+-record(state, {pending = [] :: [#pending{}]}).
+
+%%%-----------------------------------------------------------------
+%%% This module implements the disk_log server. Its primary purpose
+%%% is to keep the ets table 'disk_log_names' updated and to handle
+%%% distribution data (pids) using the module pg2.
+%%%-----------------------------------------------------------------
+%%%----------------------------------------------------------------------
+%%% API
+%%%----------------------------------------------------------------------
+start_link() ->
+ gen_server:start_link({local, disk_log_server}, disk_log_server, [], []).
+
+start() ->
+ ensure_started().
+
+open({ok, A}) ->
+ ensure_started(),
+ gen_server:call(disk_log_server, {open, local, A}, infinity);
+open(Other) ->
+ Other.
+
+%% To be used from this module only.
+dist_open(A) ->
+ ensure_started(),
+ gen_server:call(disk_log_server, {open, distr, A}, infinity).
+
+close(Pid) ->
+ gen_server:call(disk_log_server, {close, Pid}, infinity).
+
+get_log_pids(LogName) ->
+ do_get_log_pids(LogName).
+
+accessible_logs() ->
+ ensure_started(),
+ do_accessible_logs().
+
+%%%----------------------------------------------------------------------
+%%% Callback functions from gen_server
+%%%----------------------------------------------------------------------
+
+%% It would have been really nice to have a tag for disk log groups,
+%% like {distributed_disk_log, Log}, but backward compatibility makes
+%% it hard to introduce.
+-define(group(Log), Log).
+
+init([]) ->
+ process_flag(trap_exit, true),
+ ets:new(?DISK_LOG_NAME_TABLE, [named_table, set]),
+ ets:new(?DISK_LOG_PID_TABLE, [named_table, set]),
+ {ok, #state{}}.
+
+handle_call({open, W, A}, From, State) ->
+ open([{{open, W, A}, From}], State);
+handle_call({close, Pid}, _From, State) ->
+ Reply = do_close(Pid),
+ {reply, Reply, State}.
+
+handle_info({pending_reply, Pid, Result0}, State) ->
+ {value, #pending{log = Name, pid = Pid, from = From,
+ req = Request, attach = Attach,
+ clients = Clients}} =
+ lists:keysearch(Pid, #pending.pid, State#state.pending),
+ NP = lists:keydelete(Pid, #pending.pid, State#state.pending),
+ State1 = State#state{pending = NP},
+ if
+ Attach and (Result0 =:= {error, no_such_log}) ->
+ %% The disk_log process has terminated. Try again.
+ open([{Request,From} | Clients], State1);
+ true ->
+ case Result0 of
+ _ when Attach ->
+ ok;
+ {error, _} ->
+ ok;
+ _ ->
+ put(Pid, Name),
+ link(Pid),
+ {_, Locality, _} = Request,
+ ets:insert(?DISK_LOG_PID_TABLE, {Pid, Name}),
+ ets:insert(?DISK_LOG_NAME_TABLE, {Name, Pid, Locality}),
+ if
+ Locality =:= distr ->
+ ok = pg2:join(?group(Name), Pid);
+ true ->
+ ok
+ end
+ end,
+ gen_server:reply(From, result(Request, Result0)),
+ open(Clients, State1)
+ end;
+handle_info({'EXIT', Pid, _Reason}, State) ->
+ %% If there are clients waiting to be attached to this log, info
+ %% {pending_reply,Pid,{error,no_such_log}} will soon arrive.
+ case get(Pid) of
+ undefined ->
+ ok;
+ Name ->
+ erase_log(Name, Pid)
+ end,
+ {noreply, State};
+handle_info(_, State) ->
+ {noreply, State}.
+
+%% Just to avoid compiler warning.
+handle_cast(_, State) ->
+ {noreply, State}.
+
+%% Just to avoid compiler warning.
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+terminate(_Reason, _) ->
+ ok.
+
+%%%-----------------------------------------------------------------
+%%% Internal functions
+%%%-----------------------------------------------------------------
+
+ensure_started() ->
+ case whereis(disk_log_server) of
+ undefined ->
+ LogSup = {disk_log_sup, {disk_log_sup, start_link, []}, permanent,
+ 1000, supervisor, [disk_log_sup]},
+ supervisor:start_child(kernel_safe_sup, LogSup),
+ LogServer = {disk_log_server,
+ {disk_log_server, start_link, []},
+ permanent, 2000, worker, [disk_log_server]},
+ supervisor:start_child(kernel_safe_sup, LogServer),
+ ok;
+ _ -> ok
+ end.
+
+open([{Req, From} | L], State) ->
+ State2 = case do_open(Req, From, State) of
+ {pending, State1} ->
+ State1;
+ {Reply, State1} ->
+ gen_server:reply(From, Reply),
+ State1
+ end,
+ open(L, State2);
+open([], State) ->
+ {noreply, State}.
+
+%% -> {OpenRet, NewState} | {{node(),OpenRet}, NewState} |
+%% {pending, NewState}
+do_open({open, W, #arg{name = Name}=A}=Req, From, State) ->
+ case check_pending(Name, From, State, Req) of
+ {pending, NewState} ->
+ {pending, NewState};
+ false when W =:= local ->
+ case A#arg.distributed of
+ {true, Nodes} ->
+ Fun = fun() -> open_distr_rpc(Nodes, A, From) end,
+ _Pid = spawn(Fun),
+ %% No pending reply is expected, but don't reply yet.
+ {pending, State};
+ false ->
+ case get_local_pid(Name) of
+ {local, Pid} ->
+ do_internal_open(Name, Pid, From, Req, true,State);
+ {distributed, _Pid} ->
+ {{error, {node_already_open, Name}}, State};
+ undefined ->
+ start_log(Name, Req, From, State)
+ end
+ end;
+ false when W =:= distr ->
+ ok = pg2:create(?group(Name)),
+ case get_local_pid(Name) of
+ undefined ->
+ start_log(Name, Req, From, State);
+ {local, _Pid} ->
+ {{node(),{error, {node_already_open, Name}}}, State};
+ {distributed, Pid} ->
+ do_internal_open(Name, Pid, From, Req, true, State)
+ end
+ end.
+
+%% Spawning a process is a means to avoid deadlock when
+%% disk_log_servers mutually open disk_logs.
+
+-spec open_distr_rpc([node()], _, _) -> no_return(). % XXX: underspecified
+
+open_distr_rpc(Nodes, A, From) ->
+ {AllReplies, BadNodes} = rpc:multicall(Nodes, ?MODULE, dist_open, [A]),
+ {Ok, Bad} = cr(AllReplies, [], []),
+ Old = find_old_nodes(Nodes, AllReplies, BadNodes),
+ NotOk = [{BadNode, {error, nodedown}} || BadNode <- BadNodes ++ Old],
+ Reply = {Ok, Bad ++ NotOk},
+ %% Send the reply to the waiting client:
+ gen_server:reply(From, Reply),
+ exit(normal).
+
+cr([{badrpc, {'EXIT', _}} | T], Nodes, Bad) ->
+ %% This clause can be removed in next release.
+ cr(T, Nodes, Bad);
+cr([R={_Node, {error, _}} | T], Nodes, Bad) ->
+ cr(T, Nodes, [R | Bad]);
+cr([Reply | T], Nodes, Bad) ->
+ cr(T, [Reply | Nodes], Bad);
+cr([], Nodes, Bad) ->
+ {Nodes, Bad}.
+
+%% If a "new" node (one that calls dist_open/1) tries to open a log
+%% on an old node (one that does not have dist_open/1), then the old
+%% node is considered 'down'. In next release, this test will not be
+%% needed since all nodes can be assumed to be "new" by then.
+%% One more thing: if an old node tries to open a log on a new node,
+%% the new node is also considered 'down'.
+find_old_nodes(Nodes, Replies, BadNodes) ->
+ R = [X || {X, _} <- Replies],
+ ordsets:to_list(ordsets:subtract(ordsets:from_list(Nodes),
+ ordsets:from_list(R ++ BadNodes))).
+
+start_log(Name, Req, From, State) ->
+ Server = self(),
+ case supervisor:start_child(disk_log_sup, [Server]) of
+ {ok, Pid} ->
+ do_internal_open(Name, Pid, From, Req, false, State);
+ Error ->
+ {result(Req, Error), State}
+ end.
+
+do_internal_open(Name, Pid, From, {open, _W, A}=Req, Attach, State) ->
+ Server = self(),
+ F = fun() ->
+ Res = disk_log:internal_open(Pid, A),
+ Server ! {pending_reply, Pid, Res}
+ end,
+ _ = spawn(F),
+ PD = #pending{log = Name, pid = Pid, req = Req,
+ from = From, attach = Attach, clients = []},
+ P = [PD | State#state.pending],
+ {pending, State#state{pending = P}}.
+
+check_pending(Name, From, State, Req) ->
+ case lists:keysearch(Name, #pending.log, State#state.pending) of
+ {value, #pending{log = Name, clients = Clients}=P} ->
+ NP = lists:keyreplace(Name, #pending.log, State#state.pending,
+ P#pending{clients = Clients++[{Req,From}]}),
+ {pending, State#state{pending = NP}};
+ false ->
+ false
+ end.
+
+result({_, distr, _}, R) ->
+ {node(), R};
+result({_, local, _}, R) ->
+ R.
+
+do_close(Pid) ->
+ case get(Pid) of
+ undefined ->
+ ok;
+ Name ->
+ erase_log(Name, Pid),
+ unlink(Pid),
+ ok
+ end.
+
+erase_log(Name, Pid) ->
+ case get_local_pid(Name) of
+ undefined ->
+ ok;
+ {local, Pid} ->
+ true = ets:delete(?DISK_LOG_NAME_TABLE, Name),
+ true = ets:delete(?DISK_LOG_PID_TABLE, Pid);
+ {distributed, Pid} ->
+ true = ets:delete(?DISK_LOG_NAME_TABLE, Name),
+ true = ets:delete(?DISK_LOG_PID_TABLE, Pid),
+ ok = pg2:leave(?group(Name), Pid)
+ end,
+ erase(Pid).
+
+do_accessible_logs() ->
+ LocalSpec = {'$1','_',local},
+ Local0 = [hd(L) || L <- ets:match(?DISK_LOG_NAME_TABLE, LocalSpec)],
+ Local = lists:sort(Local0),
+ Groups0 = ordsets:from_list(pg2:which_groups()),
+ Groups = ordsets:to_list(ordsets:subtract(Groups0, Local)),
+ Dist = [L || L <- Groups, dist_pids(L) =/= []],
+ {Local, Dist}.
+
+get_local_pid(LogName) ->
+ case ets:lookup(?DISK_LOG_NAME_TABLE, LogName) of
+ [{LogName, Pid, local}] ->
+ {local, Pid};
+ [{LogName, Pid, distr}] ->
+ {distributed, Pid};
+ [] ->
+ undefined
+ end.
+
+%% Inlined.
+do_get_log_pids(LogName) ->
+ case catch ets:lookup(?DISK_LOG_NAME_TABLE, LogName) of
+ [{LogName, Pid, local}] ->
+ {local, Pid};
+ [{LogName, _Pid, distr}] ->
+ case pg2:get_members(?group(LogName)) of
+ [] -> % The disk_log process has died recently
+ undefined;
+ Members ->
+ {distributed, Members}
+ end;
+ _EmptyOrError ->
+ case dist_pids(LogName) of
+ [] -> undefined;
+ Pids -> {distributed, Pids}
+ end
+ end.
+
+dist_pids(LogName) ->
+ %% Would be much simpler if disk log group names were tagged.
+ GroupName = ?group(LogName),
+ case catch pg2:get_members(GroupName) of
+ [Pid | _] = Pids ->
+ case rpc:call(node(Pid), ?MODULE, get_local_pid, [LogName]) of
+ undefined -> % does not seem to be a disk_log group
+ case catch lists:member(Pid,pg2:get_members(GroupName)) of
+ true -> [];
+ _ -> dist_pids(LogName)
+ end;
+ _ -> % badrpc if get_local_pid is not exported
+ Pids
+ end;
+ _ ->
+ []
+ end.