%%
%% %CopyrightBegin%
%%
%% Copyright Ericsson AB 1997-2013. All Rights Reserved.
%%
%% The contents of this file are subject to the Erlang Public License,
%% Version 1.1, (the "License"); you may not use this file except in
%% compliance with the License. You should have received a copy of the
%% Erlang Public License along with this software. If not, it can be
%% retrieved online at http://www.erlang.org/.
%%
%% Software distributed under the License is distributed on an "AS IS"
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
%% the License for the specific language governing rights and limitations
%% under the License.
%%
%% %CopyrightEnd%
%%
-module(disk_log_server).
-behaviour(gen_server).
-export([start_link/0, start/0, open/1, close/1,
get_log_pids/1, accessible_logs/0]).
%% Local export.
-export([dist_open/1, get_local_pid/1]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_info/2, terminate/2]).
-export([handle_cast/2, code_change/3]). % just to avoid compiler warning
-include("disk_log.hrl").
-compile({inline,[{do_get_log_pids,1}]}).
-record(pending, {log, pid, req, from, attach, clients}). % [{Request,From}]
-record(state, {pending = [] :: [#pending{}]}).
%%%-----------------------------------------------------------------
%%% This module implements the disk_log server. Its primary purpose
%%% is to keep the ets table 'disk_log_names' updated and to handle
%%% distribution data (pids) using the module pg2.
%%%-----------------------------------------------------------------
%%%----------------------------------------------------------------------
%%% API
%%%----------------------------------------------------------------------
start_link() ->
gen_server:start_link({local, disk_log_server}, disk_log_server, [], []).
start() ->
ensure_started().
open({ok, A}) ->
ensure_started(),
gen_server:call(disk_log_server, {open, local, A}, infinity);
open(Other) ->
Other.
%% To be used from this module only.
dist_open(A) ->
ensure_started(),
gen_server:call(disk_log_server, {open, distr, A}, infinity).
close(Pid) ->
gen_server:call(disk_log_server, {close, Pid}, infinity).
get_log_pids(LogName) ->
do_get_log_pids(LogName).
accessible_logs() ->
ensure_started(),
do_accessible_logs().
%%%----------------------------------------------------------------------
%%% Callback functions from gen_server
%%%----------------------------------------------------------------------
%% It would have been really nice to have a tag for disk log groups,
%% like {distributed_disk_log, Log}, but backward compatibility makes
%% it hard to introduce.
-define(group(Log), Log).
init([]) ->
process_flag(trap_exit, true),
_ = ets:new(?DISK_LOG_NAME_TABLE, [named_table, set]),
_= ets:new(?DISK_LOG_PID_TABLE, [named_table, set]),
{ok, #state{}}.
handle_call({open, W, A}, From, State) ->
open([{{open, W, A}, From}], State);
handle_call({close, Pid}, _From, State) ->
Reply = do_close(Pid),
{reply, Reply, State}.
handle_info({pending_reply, Pid, Result0}, State) ->
{value, #pending{log = Name, pid = Pid, from = From,
req = Request, attach = Attach,
clients = Clients}} =
lists:keysearch(Pid, #pending.pid, State#state.pending),
NP = lists:keydelete(Pid, #pending.pid, State#state.pending),
State1 = State#state{pending = NP},
if
Attach and (Result0 =:= {error, no_such_log}) ->
%% The disk_log process has terminated. Try again.
open([{Request,From} | Clients], State1);
true ->
case Result0 of
_ when Attach ->
ok;
{error, _} ->
ok;
_ ->
put(Pid, Name),
link(Pid),
{_, Locality, _} = Request,
ets:insert(?DISK_LOG_PID_TABLE, {Pid, Name}),
ets:insert(?DISK_LOG_NAME_TABLE, {Name, Pid, Locality}),
if
Locality =:= distr ->
ok = pg2:join(?group(Name), Pid);
true ->
ok
end
end,
gen_server:reply(From, result(Request, Result0)),
open(Clients, State1)
end;
handle_info({'EXIT', Pid, _Reason}, State) ->
%% If there are clients waiting to be attached to this log, info
%% {pending_reply,Pid,{error,no_such_log}} will soon arrive.
case get(Pid) of
undefined ->
ok;
Name ->
erase_log(Name, Pid)
end,
{noreply, State};
handle_info(_, State) ->
{noreply, State}.
%% Just to avoid compiler warning.
handle_cast(_, State) ->
{noreply, State}.
%% Just to avoid compiler warning.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
terminate(_Reason, _) ->
ok.
%%%-----------------------------------------------------------------
%%% Internal functions
%%%-----------------------------------------------------------------
ensure_started() ->
case whereis(disk_log_server) of
undefined ->
LogSup = {disk_log_sup, {disk_log_sup, start_link, []}, permanent,
1000, supervisor, [disk_log_sup]},
{ok, _} = ensure_child_started(kernel_safe_sup, LogSup),
LogServer = {disk_log_server,
{disk_log_server, start_link, []},
permanent, 2000, worker, [disk_log_server]},
{ok, _} = ensure_child_started(kernel_safe_sup, LogServer),
ok;
_ -> ok
end.
ensure_child_started(Sup,Child) ->
case supervisor:start_child(Sup, Child) of
{ok,Pid} ->
{ok,Pid};
{error,{already_started,Pid}} ->
{ok,Pid};
Error ->
Error
end.
open([{Req, From} | L], State) ->
State2 = case do_open(Req, From, State) of
{pending, State1} ->
State1;
{Reply, State1} ->
gen_server:reply(From, Reply),
State1
end,
open(L, State2);
open([], State) ->
{noreply, State}.
%% -> {OpenRet, NewState} | {{node(),OpenRet}, NewState} |
%% {pending, NewState}
do_open({open, W, #arg{name = Name}=A}=Req, From, State) ->
case check_pending(Name, From, State, Req) of
{pending, NewState} ->
{pending, NewState};
false when W =:= local ->
case A#arg.distributed of
{true, Nodes} ->
Fun = fun() -> open_distr_rpc(Nodes, A, From) end,
_Pid = spawn(Fun),
%% No pending reply is expected, but don't reply yet.
{pending, State};
false ->
case get_local_pid(Name) of
{local, Pid} ->
do_internal_open(Name, Pid, From, Req, true,State);
{distributed, _Pid} ->
{{error, {node_already_open, Name}}, State};
undefined ->
start_log(Name, Req, From, State)
end
end;
false when W =:= distr ->
ok = pg2:create(?group(Name)),
case get_local_pid(Name) of
undefined ->
start_log(Name, Req, From, State);
{local, _Pid} ->
{{node(),{error, {node_already_open, Name}}}, State};
{distributed, Pid} ->
do_internal_open(Name, Pid, From, Req, true, State)
end
end.
%% Spawning a process is a means to avoid deadlock when
%% disk_log_servers mutually open disk_logs.
-spec open_distr_rpc([node()], _, _) -> no_return(). % XXX: underspecified
open_distr_rpc(Nodes, A, From) ->
{AllReplies, BadNodes} = rpc:multicall(Nodes, ?MODULE, dist_open, [A]),
{Ok, Bad} = cr(AllReplies, [], []),
Old = find_old_nodes(Nodes, AllReplies, BadNodes),
NotOk = [{BadNode, {error, nodedown}} || BadNode <- BadNodes ++ Old],
Reply = {Ok, Bad ++ NotOk},
%% Send the reply to the waiting client:
gen_server:reply(From, Reply),
exit(normal).
cr([{badrpc, {'EXIT', _}} | T], Nodes, Bad) ->
%% This clause can be removed in next release.
cr(T, Nodes, Bad);
cr([R={_Node, {error, _}} | T], Nodes, Bad) ->
cr(T, Nodes, [R | Bad]);
cr([Reply | T], Nodes, Bad) ->
cr(T, [Reply | Nodes], Bad);
cr([], Nodes, Bad) ->
{Nodes, Bad}.
%% If a "new" node (one that calls dist_open/1) tries to open a log
%% on an old node (one that does not have dist_open/1), then the old
%% node is considered 'down'. In next release, this test will not be
%% needed since all nodes can be assumed to be "new" by then.
%% One more thing: if an old node tries to open a log on a new node,
%% the new node is also considered 'down'.
find_old_nodes(Nodes, Replies, BadNodes) ->
R = [X || {X, _} <- Replies],
ordsets:to_list(ordsets:subtract(ordsets:from_list(Nodes),
ordsets:from_list(R ++ BadNodes))).
start_log(Name, Req, From, State) ->
Server = self(),
case supervisor:start_child(disk_log_sup, [Server]) of
{ok, Pid} ->
do_internal_open(Name, Pid, From, Req, false, State);
Error ->
{result(Req, Error), State}
end.
do_internal_open(Name, Pid, From, {open, _W, A}=Req, Attach, State) ->
Server = self(),
F = fun() ->
Res = disk_log:internal_open(Pid, A),
Server ! {pending_reply, Pid, Res}
end,
_ = spawn(F),
PD = #pending{log = Name, pid = Pid, req = Req,
from = From, attach = Attach, clients = []},
P = [PD | State#state.pending],
{pending, State#state{pending = P}}.
check_pending(Name, From, State, Req) ->
case lists:keysearch(Name, #pending.log, State#state.pending) of
{value, #pending{log = Name, clients = Clients}=P} ->
NP = lists:keyreplace(Name, #pending.log, State#state.pending,
P#pending{clients = Clients++[{Req,From}]}),
{pending, State#state{pending = NP}};
false ->
false
end.
result({_, distr, _}, R) ->
{node(), R};
result({_, local, _}, R) ->
R.
do_close(Pid) ->
case get(Pid) of
undefined ->
ok;
Name ->
erase_log(Name, Pid),
unlink(Pid),
ok
end.
erase_log(Name, Pid) ->
case get_local_pid(Name) of
undefined ->
ok;
{local, Pid} ->
true = ets:delete(?DISK_LOG_NAME_TABLE, Name),
true = ets:delete(?DISK_LOG_PID_TABLE, Pid);
{distributed, Pid} ->
true = ets:delete(?DISK_LOG_NAME_TABLE, Name),
true = ets:delete(?DISK_LOG_PID_TABLE, Pid),
ok = pg2:leave(?group(Name), Pid)
end,
erase(Pid).
do_accessible_logs() ->
LocalSpec = {'$1','_',local},
Local0 = [hd(L) || L <- ets:match(?DISK_LOG_NAME_TABLE, LocalSpec)],
Local = lists:sort(Local0),
Groups0 = ordsets:from_list(pg2:which_groups()),
Groups = ordsets:to_list(ordsets:subtract(Groups0, Local)),
Dist = [L || L <- Groups, dist_pids(L) =/= []],
{Local, Dist}.
get_local_pid(LogName) ->
case ets:lookup(?DISK_LOG_NAME_TABLE, LogName) of
[{LogName, Pid, local}] ->
{local, Pid};
[{LogName, Pid, distr}] ->
{distributed, Pid};
[] ->
undefined
end.
%% Inlined.
do_get_log_pids(LogName) ->
case catch ets:lookup(?DISK_LOG_NAME_TABLE, LogName) of
[{LogName, Pid, local}] ->
{local, Pid};
[{LogName, _Pid, distr}] ->
case pg2:get_members(?group(LogName)) of
[] -> % The disk_log process has died recently
undefined;
Members ->
{distributed, Members}
end;
_EmptyOrError ->
case dist_pids(LogName) of
[] -> undefined;
Pids -> {distributed, Pids}
end
end.
dist_pids(LogName) ->
%% Would be much simpler if disk log group names were tagged.
GroupName = ?group(LogName),
case catch pg2:get_members(GroupName) of
[Pid | _] = Pids ->
case rpc:call(node(Pid), ?MODULE, get_local_pid, [LogName]) of
undefined -> % does not seem to be a disk_log group
case catch lists:member(Pid,pg2:get_members(GroupName)) of
true -> [];
_ -> dist_pids(LogName)
end;
_ -> % badrpc if get_local_pid is not exported
Pids
end;
_ ->
[]
end.