%%
%% %CopyrightBegin%
%%
%% Copyright Ericsson AB 2001-2016. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%
%% %CopyrightEnd%
%%
-module(dets_server).
%% Disk based linear hashing lookup dictionary. Server part.
-behaviour(gen_server).
%% External exports.
-export([all/0, close/1, get_pid/1, open_file/1, open_file/2, pid2name/1,
users/1, verbose/1]).
%% Internal.
-export([start_link/0, start/0, stop/0]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3]).
%% record for not yet handled reqeusts to open or close files
-record(pending, {tab, ref, pid, from, reqtype, clients}). % [{From,Args}]
%% state for the dets server
-record(state, {store, parent, pending}). % [pending()]
-include("dets.hrl").
-define(REGISTRY, dets_registry). % {Table, NoUsers, TablePid}
-define(OWNERS, dets_owners). % {TablePid, Table}
-define(STORE, dets). % {User, Table} and {{links,User}, NoLinks}
%%-define(DEBUGF(X,Y), io:format(X, Y)).
-define(DEBUGF(X,Y), void).
-compile({inline, [{pid2name_1,1}]}).
%%%----------------------------------------------------------------------
%%% API
%%%----------------------------------------------------------------------
%% Internal.
start_link() ->
gen_server:start_link({local, ?SERVER_NAME}, dets_server, [self()], []).
start() ->
ensure_started().
stop() ->
case whereis(?SERVER_NAME) of
undefined ->
stopped;
_Pid ->
gen_server:call(?SERVER_NAME, stop, infinity)
end.
all() ->
call(all).
close(Tab) ->
call({close, Tab}).
get_pid(Tab) ->
ets:lookup_element(?REGISTRY, Tab, 3).
open_file(File) ->
call({open, File}).
open_file(Tab, OpenArgs) ->
call({open, Tab, OpenArgs}).
pid2name(Pid) ->
ensure_started(),
pid2name_1(Pid).
users(Tab) ->
call({users, Tab}).
verbose(What) ->
call({set_verbose, What}).
call(Message) ->
ensure_started(),
gen_server:call(?SERVER_NAME, Message, infinity).
%%%----------------------------------------------------------------------
%%% Callback functions from gen_server
%%%----------------------------------------------------------------------
%%----------------------------------------------------------------------
%% Func: init/1
%% Returns: {ok, State} |
%% {ok, State, Timeout} |
%% ignore |
%% {stop, Reason}
%%----------------------------------------------------------------------
init(Parent) ->
Store = init(),
{ok, #state{store=Store, parent=Parent, pending = []}}.
%%----------------------------------------------------------------------
%% Func: handle_call/3
%% Returns: {reply, Reply, State} |
%% {reply, Reply, State, Timeout} |
%% {noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, Reply, State} | (terminate/2 is called)
%% {stop, Reason, State} (terminate/2 is called)
%%----------------------------------------------------------------------
handle_call(all, _From, State) ->
F = fun(X, A) -> [element(1, X) | A] end,
{reply, ets:foldl(F, [], ?REGISTRY), State};
handle_call({close, Tab}, From, State) ->
request([{{close, Tab}, From}], State);
handle_call({open, File}, From, State) ->
request([{{open, File}, From}], State);
handle_call({open, Tab, OpenArgs}, From, State) ->
request([{{open, Tab, OpenArgs}, From}], State);
handle_call(stop, _From, State) ->
{stop, normal, stopped, State};
handle_call({set_verbose, What}, _From, State) ->
set_verbose(What),
{reply, ok, State};
handle_call({users, Tab}, _From, State) ->
Users = ets:select(State#state.store, [{{'$1', Tab}, [], ['$1']}]),
{reply, Users, State}.
%%----------------------------------------------------------------------
%% Func: handle_cast/2
%% Returns: {noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, State} (terminate/2 is called)
%%----------------------------------------------------------------------
handle_cast(_Msg, State) ->
{noreply, State}.
%%----------------------------------------------------------------------
%% Func: handle_info/2
%% Returns: {noreply, State} |
%% {noreply, State, Timeout} |
%% {stop, Reason, State} (terminate/2 is called)
%%----------------------------------------------------------------------
handle_info({pending_reply, {Ref, Result0}}, State) ->
{value, #pending{tab = Tab, pid = Pid, from = {FromPid,_Tag}=From,
reqtype = ReqT, clients = Clients}} =
lists:keysearch(Ref, #pending.ref, State#state.pending),
Store = State#state.store,
Result =
case {Result0, ReqT} of
{ok, add_user} ->
do_link(Store, FromPid),
true = ets:insert(Store, {FromPid, Tab}),
ets:update_counter(?REGISTRY, Tab, 1),
{ok, Tab};
{ok, internal_open} ->
link(Pid),
do_link(Store, FromPid),
true = ets:insert(Store, {FromPid, Tab}),
%% do_internal_open() has already done the following:
%% true = ets:insert(?REGISTRY, {Tab, 1, Pid}),
%% true = ets:insert(?OWNERS, {Pid, Tab}),
{ok, Tab};
{Reply, internal_open} ->
%% Clean up what do_internal_open() did:
true = ets:delete(?REGISTRY, Tab),
true = ets:delete(?OWNERS, Pid),
Reply;
{Reply, _} -> % ok or Error
Reply
end,
gen_server:reply(From, Result),
NP = lists:keydelete(Pid, #pending.pid, State#state.pending),
State1 = State#state{pending = NP},
request(Clients, State1);
handle_info({'EXIT', Pid, _Reason}, State) ->
Store = State#state.store,
case pid2name_1(Pid) of
{ok, Tab} ->
%% A table was killed.
true = ets:delete(?REGISTRY, Tab),
true = ets:delete(?OWNERS, Pid),
Users = ets:select(State#state.store, [{{'$1', Tab}, [], ['$1']}]),
true = ets:match_delete(Store, {'_', Tab}),
lists:foreach(fun(User) -> do_unlink(Store, User) end, Users),
{noreply, State};
undefined ->
%% Close all tables used by Pid.
F = fun({FromPid, Tab}, S) ->
{_, S1} = handle_close(S, {close, Tab},
{FromPid, notag}, Tab),
S1
end,
State1 = lists:foldl(F, State, ets:lookup(Store, Pid)),
{noreply, State1}
end;
handle_info(_Message, State) ->
{noreply, State}.
%%----------------------------------------------------------------------
%% Func: terminate/2
%% Purpose: Shutdown the server
%% Returns: any (ignored by gen_server)
%%----------------------------------------------------------------------
terminate(_Reason, _State) ->
ok.
%%----------------------------------------------------------------------
%% Func: code_change/3
%% Purpose: Convert process state when code is changed
%% Returns: {ok, NewState}
%%----------------------------------------------------------------------
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%%----------------------------------------------------------------------
%%% Internal functions
%%%----------------------------------------------------------------------
ensure_started() ->
case whereis(?SERVER_NAME) of
undefined ->
DetsSup = {dets_sup, {dets_sup, start_link, []}, permanent,
1000, supervisor, [dets_sup]},
_ = supervisor:start_child(kernel_safe_sup, DetsSup),
DetsServer = {?SERVER_NAME, {?MODULE, start_link, []},
permanent, 2000, worker, [?MODULE]},
_ = supervisor:start_child(kernel_safe_sup, DetsServer),
ok;
_ -> ok
end.
init() ->
set_verbose(verbose_flag()),
process_flag(trap_exit, true),
?REGISTRY = ets:new(?REGISTRY, [set, named_table]),
?OWNERS = ets:new(?OWNERS, [set, named_table]),
ets:new(?STORE, [duplicate_bag]).
verbose_flag() ->
case init:get_argument(dets) of
{ok, Args} ->
lists:member(["verbose"], Args);
_ ->
false
end.
set_verbose(true) ->
put(verbose, yes);
set_verbose(_) ->
erase(verbose).
%% Inlined.
pid2name_1(Pid) ->
case ets:lookup(?OWNERS, Pid) of
[] -> undefined;
[{_Pid,Tab}] -> {ok, Tab}
end.
request([{Req, From} | L], State) ->
Res = case Req of
{close, Tab} ->
handle_close(State, Req, From, Tab);
{open, File} ->
do_internal_open(State, From, [File, get(verbose)]);
{open, Tab, OpenArgs} ->
do_open(State, Req, From, OpenArgs, Tab)
end,
State2 = case Res of
{pending, State1} ->
State1;
{Reply, State1} ->
gen_server:reply(From, Reply),
State1
end,
request(L, State2);
request([], State) ->
{noreply, State}.
%% -> {pending, NewState} | {Reply, NewState}
do_open(State, Req, From, Args, Tab) ->
case check_pending(Tab, From, State, Req) of
{pending, NewState} -> {pending, NewState};
false ->
case ets:lookup(?REGISTRY, Tab) of
[] ->
A = [Tab, Args, get(verbose)],
do_internal_open(State, From, A);
[{Tab, _Counter, Pid}] ->
pending_call(Tab, Pid, make_ref(), From, Args,
add_user, State)
end
end.
%% -> {pending, NewState} | {Reply, NewState}
do_internal_open(State, From, Args) ->
case supervisor:start_child(dets_sup, [self()]) of
{ok, Pid} ->
Ref = make_ref(),
Tab = case Args of
[T, _, _] -> T;
[_, _] -> Ref
end,
%% Pretend the table is open. If someone else tries to
%% open the file it will always become a pending
%% 'add_user' request. If someone tries to use the table
%% there will be a delay, but that is OK.
true = ets:insert(?REGISTRY, {Tab, 1, Pid}),
true = ets:insert(?OWNERS, {Pid, Tab}),
pending_call(Tab, Pid, Ref, From, Args, internal_open, State);
Error ->
{Error, State}
end.
%% -> {pending, NewState} | {Reply, NewState}
handle_close(State, Req, {FromPid,_Tag}=From, Tab) ->
case check_pending(Tab, From, State, Req) of
{pending, NewState} -> {pending, NewState};
false ->
Store = State#state.store,
case ets:match_object(Store, {FromPid, Tab}) of
[] ->
?DEBUGF("DETS: Table ~w close attempt by non-owner~w~n",
[Tab, FromPid]),
{{error, not_owner}, State};
[_ | Keep] ->
case ets:lookup(?REGISTRY, Tab) of
[{Tab, 1, Pid}] ->
do_unlink(Store, FromPid),
true = ets:delete(?REGISTRY, Tab),
true = ets:delete(?OWNERS, Pid),
true = ets:match_delete(Store, {FromPid, Tab}),
unlink(Pid),
pending_call(Tab, Pid, make_ref(), From, [],
internal_close, State);
[{Tab, _Counter, Pid}] ->
do_unlink(Store, FromPid),
true = ets:match_delete(Store, {FromPid, Tab}),
true = ets:insert(Store, Keep),
ets:update_counter(?REGISTRY, Tab, -1),
pending_call(Tab, Pid, make_ref(), From, [],
remove_user, State)
end
end
end.
%% Links with counters
do_link(Store, Pid) ->
Key = {links, Pid},
case ets:lookup(Store, Key) of
[] ->
true = ets:insert(Store, {Key, 1}),
link(Pid);
[{_, C}] ->
true = ets:delete(Store, Key),
true = ets:insert(Store, {Key, C+1})
end.
do_unlink(Store, Pid) ->
Key = {links, Pid},
case ets:lookup(Store, Key) of
[{_, C}] when C > 1 ->
true = ets:delete(Store, Key),
true = ets:insert(Store, {Key, C-1});
_ ->
true = ets:delete(Store, Key),
unlink(Pid)
end.
pending_call(Tab, Pid, Ref, {FromPid, _Tag}=From, Args, ReqT, State) ->
Server = self(),
F = fun() ->
Res = case ReqT of
add_user ->
dets:add_user(Pid, Tab, Args);
internal_open ->
dets:internal_open(Pid, Ref, Args);
internal_close ->
dets:internal_close(Pid);
remove_user ->
dets:remove_user(Pid, FromPid)
end,
Server ! {pending_reply, {Ref, Res}}
end,
_ = spawn(F),
PD = #pending{tab = Tab, ref = Ref, pid = Pid, reqtype = ReqT,
from = From, clients = []},
P = [PD | State#state.pending],
{pending, State#state{pending = P}}.
check_pending(Tab, From, State, Req) ->
case lists:keysearch(Tab, #pending.tab, State#state.pending) of
{value, #pending{tab = Tab, clients = Clients}=P} ->
NP = lists:keyreplace(Tab, #pending.tab, State#state.pending,
P#pending{clients = Clients++[{Req,From}]}),
{pending, State#state{pending = NP}};
false ->
false
end.